Azure Service Bus¶
To use Azure Service Bus integration, the NuGet package CrystalSharp.Messaging.AzureServiceBus must be installed.
In order to use Azure Service Bus integration effectively, it is necessary to adhere to the following mandatory steps:
- Register Azure Service Bus implementation.
- Use
IMessageBrokerinterface for sending and receiving message. - Use
GeneralQueueMessageclass for sending a message to a single or multiple subscribers. - Use one of these methods
SendObject,SendJson, orSendStringif sending a message to a single subscriber. - Use one of these methods
PublishObject,PublishJsonorPublishStringif sending a message to multiple subscribers. - Use
GeneralConsumerclass to receive the message. - Use
StartConsumingmethod to start receiving the messages. - Use
StopConsumingmethod to stop receiving the messages. - Use
Disconnectmethod for closing the connection.
Azure Service Bus Registration¶
Registration for the Azure Service Bus implementation is required. Following is the code that illustrates how to register the implementation of the Azure Service Bus in the Program.cs file:
“Program.cs” file:
AzureServiceBusSettings settings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");
CrystalSharpAdapter.New(builder.Services)
.AddCqrs(typeof(PlaceOrderCommandHandler))
.AddAzureServiceBus(settings)
.CreateResolver();
In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddAzureServiceBus(settings) for the Azure Service Bus registration.
NOTE
Queues, or topics, must exist on the Azure Service Bus to avoid potential exceptions.
Send an “object” to a Message Queue¶
The following code snippet shows how to send an object to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage<CustomerOrder> message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = customerOrder;
await _messageBroker.SendObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “JSON” to a Message Queue¶
The following code snippet shows how to send JSON to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.SendJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “string” to a Message Queue¶
The following code snippet shows how to send a string to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = orderDetails;
await _messageBroker.SendString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish an “object” to a Topic¶
The following code snippet shows how to publish an object to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage<CustomerOrder> message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = customerOrder;
await _messageBroker.PublishObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “JSON” to a Topic¶
The following code snippet shows how to publish JSON to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.PublishJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “string” to a Topic¶
The following code snippet shows how to publish a string to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = orderDetails;
await _messageBroker.PublishString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Receive and Consume a message from a Queue¶
The following console program code snippet shows how to receive and consume a message from a queue:
IResolver resolver = ConfigureApp();
IMessageBroker messageBroker = resolver.Resolve<IMessageBroker>();
IList<string> queues = new List<string>() { "customer-order-queue" };
GeneralConsumer consumer = new()
{
Queues = queues,
Action = ProcessMessage
};
bool keepRunning = true;
Console.CancelKeyPress += CancelKeyPressHandler;
try
{
Console.WriteLine("Listener is started:");
await messageBroker.StartConsuming(consumer).ConfigureAwait(false);
while (keepRunning)
{
//
}
}
catch
{
keepRunning = false;
}
if (!keepRunning)
{
await messageBroker.StopConsuming().ConfigureAwait(false);
messageBroker.Disconnect();
Console.WriteLine("Listener stopped.");
}
void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs e)
{
e.Cancel = true;
keepRunning = false;
}
void ProcessMessage(string message)
{
Console.WriteLine("============>>> NEW MESSAGE");
Console.WriteLine(message);
Console.WriteLine("===========================");
}
IResolver ConfigureApp()
{
IConfigurationBuilder builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
IConfigurationRoot _configuration = builder.Build();
IServiceCollection services = new ServiceCollection();
AzureServiceBusSettings azureServiceBusSettings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");
IResolver resolver = CrystalSharpAdapter.New(services)
.AddAzureServiceBus(azureServiceBusSettings)
.CreateResolver();
return resolver;
}