Azure Service Bus¶
To use Azure Service Bus
integration, the NuGet package CrystalSharp.Messaging.AzureServiceBus
must be installed.
In order to use Azure Service Bus integration effectively, it is necessary to adhere to the following mandatory steps:
- Register Azure Service Bus implementation.
- Use
IMessageBroker
interface for sending and receiving message. - Use
GeneralQueueMessage
class for sending a message to a single or multiple subscribers. - Use one of these methods
SendObject
,SendJson
, orSendString
if sending a message to a single subscriber. - Use one of these methods
PublishObject
,PublishJson
orPublishString
if sending a message to multiple subscribers. - Use
GeneralConsumer
class to receive the message. - Use
StartConsuming
method to start receiving the messages. - Use
StopConsuming
method to stop receiving the messages. - Use
Disconnect
method for closing the connection.
Azure Service Bus Registration¶
Registration for the Azure Service Bus implementation is required. Following is the code that illustrates how to register the implementation of the Azure Service Bus in the Program.cs
file:
“Program.cs” file:
AzureServiceBusSettings settings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");
CrystalSharpAdapter.New(builder.Services)
.AddCqrs(typeof(PlaceOrderCommandHandler))
.AddAzureServiceBus(settings)
.CreateResolver();
In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddAzureServiceBus(settings)
for the Azure Service Bus registration.
NOTE
Queues, or topics, must exist on the Azure Service Bus to avoid potential exceptions.
Send an “object” to a Message Queue¶
The following code snippet shows how to send an object
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage<CustomerOrder> message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = customerOrder;
await _messageBroker.SendObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “JSON” to a Message Queue¶
The following code snippet shows how to send JSON
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.SendJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Send “string” to a Message Queue¶
The following code snippet shows how to send a string
to a message queue:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue { Name = "customer-order-queue" }
};
message.Body = orderDetails;
await _messageBroker.SendString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish an “object” to a Topic¶
The following code snippet shows how to publish an object
to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage<CustomerOrder> message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = customerOrder;
await _messageBroker.PublishObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “JSON” to a Topic¶
The following code snippet shows how to publish JSON
to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = Serializer.Serialize(customerOrder);
await _messageBroker.PublishJson(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Publish “string” to a Topic¶
The following code snippet shows how to publish a string
to a topic:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IMessageBroker _messageBroker;
public PlaceOrderCommandHandler(IMessageBroker messageBroker)
{
_messageBroker = messageBroker;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
GeneralQueueMessage message = new()
{
Queue = new GeneralQueue
{
Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
}
};
message.Body = orderDetails;
await _messageBroker.PublishString(message, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { Result = "Order has been submitted." };
return await Ok(response);
}
}
Receive and Consume a message from a Queue¶
The following console program code snippet shows how to receive and consume a message from a queue:
IResolver resolver = ConfigureApp();
IMessageBroker messageBroker = resolver.Resolve<IMessageBroker>();
IList<string> queues = new List<string>() { "customer-order-queue" };
GeneralConsumer consumer = new()
{
Queues = queues,
Action = ProcessMessage
};
bool keepRunning = true;
Console.CancelKeyPress += CancelKeyPressHandler;
try
{
Console.WriteLine("Listener is started:");
await messageBroker.StartConsuming(consumer).ConfigureAwait(false);
while (keepRunning)
{
//
}
}
catch
{
keepRunning = false;
}
if (!keepRunning)
{
await messageBroker.StopConsuming().ConfigureAwait(false);
messageBroker.Disconnect();
Console.WriteLine("Listener stopped.");
}
void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs e)
{
e.Cancel = true;
keepRunning = false;
}
void ProcessMessage(string message)
{
Console.WriteLine("============>>> NEW MESSAGE");
Console.WriteLine(message);
Console.WriteLine("===========================");
}
IResolver ConfigureApp()
{
IConfigurationBuilder builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
IConfigurationRoot _configuration = builder.Build();
IServiceCollection services = new ServiceCollection();
AzureServiceBusSettings azureServiceBusSettings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");
IResolver resolver = CrystalSharpAdapter.New(services)
.AddAzureServiceBus(azureServiceBusSettings)
.CreateResolver();
return resolver;
}