RabbitMQ¶
To use RabbitMQ
integration, the NuGet package CrystalSharp.Messaging.RabbitMq
must be installed.
In order to use RabbitMQ integration effectively, it is necessary to adhere to the following mandatory steps:
- Register RabbitMQ implementation.
- Use
IMessageBroker
interface for sending and receiving message. - Use
GeneralQueueMessage
class if sending a message to a single subscriber. - Use one of these methods
SendObject
,SendJson
, orSendString
if sending a message to a single subscriber. - Use
GeneralExchangeMessage
class if sending a message to multiple subscribers. - Use one of these methods
PublishObject
,PublishJson
orPublishString
if sending a message to multiple subscribers. - Use
GeneralConsumer
class to receive the message. - Use
StartConsuming
method to start receiving the messages. - Use
StopConsuming
method to stop receiving the messages. - Use
Disconnect
method for closing the connection.
RabbitMQ Registration¶
Registration for the RabbitMQ implementation is required. Following is the code that illustrates how to register the implementation of the RabbitMQ in the Program.cs
file:
“Program.cs” file:
string host = "localhost"; // Modify it to match your configuration.
int port = 5672; // Default port for RabbitMQ. Modify it to match your configuration.
string username = "guest"; // Default username for RabbitMQ. Modify it to match your configuration.
string password = "guest"; // Default password for RabbitMQ. Modify it to match your configuration.
string clientProvidedName = ""; // Modify it to match your configuration.
string virtualHost = "/"; // Modify it to match your configuration.
RabbitMqSettings settings = new(host, port, username, password, clientProvidedName, virtualHost);
CrystalSharpAdapter.New(builder.Services)
.AddCqrs(typeof(PlaceOrderCommandHandler))
.AddRabbitMq(settings)
.CreateResolver();
In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddRabbitMq(settings)
for the RabbitMQ registration.
NOTE
Queues, exchanges, or routing keys must exist on the RabbitMQ server to avoid potential exceptions.
Send an “object” to a Message Queue¶
The following code snippet shows how to send an object
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage<CustomerOrder> message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = customerOrder;
await _messageBroker.SendObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “JSON” to a Message Queue¶
The following code snippet shows how to send JSON
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.SendJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “string” to a Message Queue¶
The following code snippet shows how to send a string
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = orderDetails;
await _messageBroker.SendString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish an “object” to a Message Queue¶
The following code snippet shows how to publish an object
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralExchangeMessage<CustomerOrder> message = new()
{
Exchange = new GeneralExchange
{
Name = "customer-order-exchange",
RoutingKey = "customer-order-routing-key",
Queues = new List<GeneralQueue>
{
new GeneralQueue
{
Name = "customer-order-queue",
RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
}
}
}
};
message.Body = customerOrder;
await _messageBroker.PublishObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “JSON” to a Message Queue¶
The following code snippet shows how to publish JSON
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralExchangeMessage message = new()
{
Exchange = new GeneralExchange
{
Name = "customer-order-exchange",
RoutingKey = "customer-order-routing-key",
Queues = new List<GeneralQueue>
{
new GeneralQueue
{
Name = "customer-order-queue",
RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
}
}
}
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.PublishJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “string” to a Message Queue¶
The following code snippet shows how to publish a string
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralExchangeMessage message = new()
{
Exchange = new GeneralExchange
{
Name = "customer-order-exchange",
RoutingKey = "customer-order-routing-key",
Queues = new List<GeneralQueue>
{
new GeneralQueue
{
Name = "customer-order-queue",
RoutingKeys = new HashSet<string> { "customer-order-routing-key" }
}
}
}
};
message.Body = orderDetails;
await _messageBroker.PublishString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Receive and Consume a message from a Queue¶
The following console program code snippet shows how to receive and consume a message from a queue:
IResolver resolver = ConfigureApp();
IMessageBroker messageBroker = resolver.Resolve<IMessageBroker>();
IList<string> queues = new List<string>() { "customer-order-queue" };
GeneralConsumer consumer = new()
{
Queues = queues,
Action = ProcessMessage
};
bool keepRunning = true;
Console.CancelKeyPress += CancelKeyPressHandler;
try
{
Console.WriteLine("Listener is started:");
await messageBroker.StartConsuming(consumer).ConfigureAwait(false);
while (keepRunning)
{
//
}
}
catch
{
keepRunning = false;
}
if (!keepRunning)
{
await messageBroker.StopConsuming().ConfigureAwait(false);
messageBroker.Disconnect();
Console.WriteLine("Listener stopped.");
}
void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs e)
{
e.Cancel = true;
keepRunning = false;
}
void ProcessMessage(string message)
{
Console.WriteLine("============>>> NEW MESSAGE");
Console.WriteLine(message);
Console.WriteLine("===========================");
}
IResolver ConfigureApp()
{
IConfigurationBuilder builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
IConfigurationRoot _configuration = builder.Build();
IServiceCollection services = new ServiceCollection();
string host = "localhost"; // Modify it to match your configuration.
int port = 5672; // Default port for RabbitMQ. Modify it to match your configuration.
string username = "guest"; // Default username for RabbitMQ. Modify it to match your configuration.
string password = "guest"; // Default password for RabbitMQ. Modify it to match your configuration.
string clientProvidedName = ""; // Modify it to match your configuration.
string virtualHost = "/"; // Modify it to match your configuration.
RabbitMqSettings rabbitMqSettings = new(host, port, username, password, clientProvidedName, virtualHost);
IResolver resolver = CrystalSharpAdapter.New(services)
.AddRabbitMq(rabbitMqSettings)
.CreateResolver();
return resolver;
}