RabbitMQ


To use RabbitMQ integration, the NuGet package CrystalSharp.Messaging.RabbitMq must be installed.

In order to use RabbitMQ integration effectively, it is necessary to adhere to the following mandatory steps:

  • Register RabbitMQ implementation.
  • Use IMessageBroker interface for sending and receiving message.
  • Use GeneralQueueMessage class if sending a message to a single subscriber.
  • Use one of these methods SendObject, SendJson, or SendString if sending a message to a single subscriber.
  • Use GeneralExchangeMessage class if sending a message to multiple subscribers.
  • Use one of these methods PublishObject, PublishJson or PublishString if sending a message to multiple subscribers.
  • Use GeneralConsumer class to receive the message.
  • Use StartConsuming method to start receiving the messages.
  • Use StopConsuming method to stop receiving the messages.
  • Use Disconnect method for closing the connection.

RabbitMQ Registration

Registration for the RabbitMQ implementation is required. Following is the code that illustrates how to register the implementation of the RabbitMQ in the Program.cs file:

“Program.cs” file:

string host = "localhost"; // Modify it to match your configuration.
int port = 5672; // Default port for RabbitMQ. Modify it to match your configuration.
string username = "guest"; // Default username for RabbitMQ. Modify it to match your configuration.
string password = "guest"; // Default password for RabbitMQ. Modify it to match your configuration.
string clientProvidedName = ""; // Modify it to match your configuration.
string virtualHost = "/"; // Modify it to match your configuration.
RabbitMqSettings settings = new(host, port, username, password, clientProvidedName, virtualHost);

CrystalSharpAdapter.New(builder.Services)
    .AddCqrs(typeof(PlaceOrderCommandHandler))
    .AddRabbitMq(settings)
    .CreateResolver();

In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddRabbitMq(settings) for the RabbitMQ registration.

NOTE

Queues, exchanges, or routing keys must exist on the RabbitMQ server to avoid potential exceptions.

Send an “object” to a Message Queue

The following code snippet shows how to send an object to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage<CustomerOrder> message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = customerOrder;

        await _messageBroker.SendObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Send “JSON” to a Message Queue

The following code snippet shows how to send JSON to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = Serializer.Serialize(customerOrder);

        await _messageBroker.SendJson(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Send “string” to a Message Queue

The following code snippet shows how to send a string to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = orderDetails;

        await _messageBroker.SendString(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish an “object” to a Message Queue

The following code snippet shows how to publish an object to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralExchangeMessage<CustomerOrder> message = new()
        {
            Exchange = new GeneralExchange
            {
                Name = "customer-order-exchange",
                RoutingKey = "customer-order-routing-key",
                Queues = new List<GeneralQueue>
                {
                    new GeneralQueue
                    {
                        Name = "customer-order-queue",
                        RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
                    }
                }
            }
        };
        message.Body = customerOrder;

        await _messageBroker.PublishObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish “JSON” to a Message Queue

The following code snippet shows how to publish JSON to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralExchangeMessage message = new()
        {
            Exchange = new GeneralExchange
            {
                Name = "customer-order-exchange",
                RoutingKey = "customer-order-routing-key",
                Queues = new List<GeneralQueue>
                {
                    new GeneralQueue
                    {
                        Name = "customer-order-queue",
                        RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
                    }
                }
            }
        };
        message.Body = Serializer.Serialize(customerOrder);

        await _messageBroker.PublishJson(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish “string” to a Message Queue

The following code snippet shows how to publish a string to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
        GeneralExchangeMessage message = new()
        {
            Exchange = new GeneralExchange
            {
                Name = "customer-order-exchange",
                RoutingKey = "customer-order-routing-key",
                Queues = new List<GeneralQueue>
                {
                    new GeneralQueue
                    {
                        Name = "customer-order-queue",
                        RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
                    }
                }
            }
        };
        message.Body = orderDetails;

        await _messageBroker.PublishString(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Receive and Consume a message from a Queue

The following console program code snippet shows how to receive and consume a message from a queue:

IResolver resolver = ConfigureApp();
IMessageBroker messageBroker = resolver.Resolve<IMessageBroker>();
IList<string> queues = new List<string>() { "customer-order-queue" };
GeneralConsumer consumer = new()
{
    Queues = queues,
    Action = ProcessMessage
};
bool keepRunning = true;
Console.CancelKeyPress += CancelKeyPressHandler;

try
{
    Console.WriteLine("Listener is started:");

    await messageBroker.StartConsuming(consumer).ConfigureAwait(false);

    while (keepRunning)
    {
        //
    }
}
catch
{
    keepRunning = false;
}

if (!keepRunning)
{
    await messageBroker.StopConsuming().ConfigureAwait(false);
    messageBroker.Disconnect();

    Console.WriteLine("Listener stopped.");
}

void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs e)
{
    e.Cancel = true;
    keepRunning = false;
}

void ProcessMessage(string message)
{
    Console.WriteLine("============>>> NEW MESSAGE");
    Console.WriteLine(message);
    Console.WriteLine("===========================");
}

IResolver ConfigureApp()
{
    IConfigurationBuilder builder = new ConfigurationBuilder()
        .SetBasePath(Directory.GetCurrentDirectory())
        .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
    IConfigurationRoot _configuration = builder.Build();
    IServiceCollection services = new ServiceCollection();
    string host = "localhost"; // Modify it to match your configuration.
    int port = 5672; // Default port for RabbitMQ. Modify it to match your configuration.
    string username = "guest"; // Default username for RabbitMQ. Modify it to match your configuration.
    string password = "guest"; // Default password for RabbitMQ. Modify it to match your configuration.
    string clientProvidedName = ""; // Modify it to match your configuration.
    string virtualHost = "/"; // Modify it to match your configuration.
    RabbitMqSettings rabbitMqSettings = new(host, port, username, password, clientProvidedName, virtualHost);

    IResolver resolver = CrystalSharpAdapter.New(services)
        .AddRabbitMq(rabbitMqSettings)
        .CreateResolver();

    return resolver;
}