Azure Service Bus


To use Azure Service Bus integration, the NuGet package CrystalSharp.Messaging.AzureServiceBus must be installed.

In order to use Azure Service Bus integration effectively, it is necessary to adhere to the following mandatory steps:

  • Register Azure Service Bus implementation.
  • Use IMessageBroker interface for sending and receiving message.
  • Use GeneralQueueMessage class for sending a message to a single or multiple subscribers.
  • Use one of these methods SendObject, SendJson, or SendString if sending a message to a single subscriber.
  • Use one of these methods PublishObject, PublishJson or PublishString if sending a message to multiple subscribers.
  • Use GeneralConsumer class to receive the message.
  • Use StartConsuming method to start receiving the messages.
  • Use StopConsuming method to stop receiving the messages.
  • Use Disconnect method for closing the connection.

Azure Service Bus Registration

Registration for the Azure Service Bus implementation is required. Following is the code that illustrates how to register the implementation of the Azure Service Bus in the Program.cs file:

“Program.cs” file:

AzureServiceBusSettings settings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");

CrystalSharpAdapter.New(builder.Services)
    .AddCqrs(typeof(PlaceOrderCommandHandler))
    .AddAzureServiceBus(settings)
    .CreateResolver();

In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddAzureServiceBus(settings) for the Azure Service Bus registration.

NOTE

Queues, or topics, must exist on the Azure Service Bus to avoid potential exceptions.

Send an “object” to a Message Queue

The following code snippet shows how to send an object to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage<CustomerOrder> message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = customerOrder;

        await _messageBroker.SendObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Send “JSON” to a Message Queue

The following code snippet shows how to send JSON to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = Serializer.Serialize(customerOrder);

        await _messageBroker.SendJson(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Send “string” to a Message Queue

The following code snippet shows how to send a string to a message queue:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue { Name = "customer-order-queue" }
        };
        message.Body = orderDetails;

        await _messageBroker.SendString(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish an “object” to a Topic

The following code snippet shows how to publish an object to a topic:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage<CustomerOrder> message = new()
        {
            Queue = new GeneralQueue
            {
                Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
            }
        };
        message.Body = customerOrder;

        await _messageBroker.PublishObject<CustomerOrder>(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish “JSON” to a Topic

The following code snippet shows how to publish JSON to a topic:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        CustomerOrder customerOrder = new(request.CustomerName, request.Item, request.UnitPrice, request.Quantity);
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue
            {
                Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
            }
        };
        message.Body = Serializer.Serialize(customerOrder);

        await _messageBroker.PublishJson(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Publish “string” to a Topic

The following code snippet shows how to publish a string to a topic:

public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
    private readonly IMessageBroker _messageBroker;

    public PlaceOrderCommandHandler(IMessageBroker messageBroker)
    {
        _messageBroker = messageBroker;
    }

    public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
    {
        string orderDetails = $"Customer: {request.CustomerName}, Item: {request.Item}, Unit Price: {request.UnitPrice}, Quantity: {request.Quantity}";
        GeneralQueueMessage message = new()
        {
            Queue = new GeneralQueue
            {
                Exchange = "customer-order-topic" // Here "Exchange" property refers to the "Topic" in Azure Service Bus.
            }
        };
        message.Body = orderDetails;

        await _messageBroker.PublishString(message, cancellationToken).ConfigureAwait(false);

        PlaceOrderResponse response = new() { Result = "Order has been submitted." };

        return await Ok(response);
    }
}

Receive and Consume a message from a Queue

The following console program code snippet shows how to receive and consume a message from a queue:

IResolver resolver = ConfigureApp();
IMessageBroker messageBroker = resolver.Resolve<IMessageBroker>();
IList<string> queues = new List<string>() { "customer-order-queue" };
GeneralConsumer consumer = new()
{
    Queues = queues,
    Action = ProcessMessage
};
bool keepRunning = true;
Console.CancelKeyPress += CancelKeyPressHandler;

try
{
    Console.WriteLine("Listener is started:");

    await messageBroker.StartConsuming(consumer).ConfigureAwait(false);

    while (keepRunning)
    {
        //
    }
}
catch
{
    keepRunning = false;
}

if (!keepRunning)
{
    await messageBroker.StopConsuming().ConfigureAwait(false);
    messageBroker.Disconnect();

    Console.WriteLine("Listener stopped.");
}

void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs e)
{
    e.Cancel = true;
    keepRunning = false;
}

void ProcessMessage(string message)
{
    Console.WriteLine("============>>> NEW MESSAGE");
    Console.WriteLine(message);
    Console.WriteLine("===========================");
}

IResolver ConfigureApp()
{
    IConfigurationBuilder builder = new ConfigurationBuilder()
        .SetBasePath(Directory.GetCurrentDirectory())
        .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
    IConfigurationRoot _configuration = builder.Build();
    IServiceCollection services = new ServiceCollection();
    AzureServiceBusSettings azureServiceBusSettings = new("AZURE_SERVICE_BUS_CONNECTION_STRING");

    IResolver resolver = CrystalSharpAdapter.New(services)
        .AddAzureServiceBus(azureServiceBusSettings)
        .CreateResolver();

    return resolver;
}