Saga - Choreography


In the choreography-based saga methodology, every transaction produces domain events. These events subsequently initiate transactions in other services that are subscribed to those domain events.

IMPORTANT

Configuring the saga store is a prerequisite prior to implementing sagas.

Within the Crystal Sharp framework, sagas based on choreography utilize domain events. Key components for choreography-based saga implementation include a saga locator, the implementation of the saga, and a concluding domain event finalizing the transaction. The saga implementation class should derive from the SagaChoreography class. Multiple domain events can also be incorporated, with the final event marking the completion of the transaction.

IMPORTANT

It is expected that you have already reviewed the documentation on “Aggregate”, “Domain Events” and “Event Handlers”. If you haven’t, it is strongly recommended that you do so in order to grasp the concepts discussed in these topics.

In order to implement choreography-based sagas effectively, it is necessary to adhere to the following mandatory steps:

  • Register the implementation of the saga store.
  • Each saga transaction should have its own separate transaction class and separate transaction handler.
  • Each saga transaction class should implement the ISagaTransaction interface.
  • Each saga transaction handler must be inherited from the SagaTransactionHandler base class and override its Handle method.
  • A Saga Locator class that will be inherited from the SagaLocator base class.
  • The starting point of the saga is established by the initial transaction, and the saga implementation class should be derived from the SagaChoreography base class.
  • Implementation of the IAmStartedBy interface is required for the saga implementation class.
  • The saga implementation class must override the Handle method to handle the initial transaction.
  • The implementation of the ISagaChoreographyEvent interface is necessary for the saga implementation class to handle each domain event that participates in a transaction.
  • In order to conclude the transaction, invoke the MarkAsComplete method within the last domain event. This method is provided by the base class SagaChoreography.
  • Execute the transaction using the ISagaTransactionExecutor interface.

IMPORTANT

When comparing saga transactions, saga transaction handlers, and saga transaction executors to commands, command handlers, and command executors, it is evident that they serve different purposes. The saga transaction handler is responsible for managing classes that implement the “ISagaTransaction” interface, with its “Handle” method consistently returning “SagaTransactionResult”. Meanwhile, the saga transaction executor is solely dedicated to executing transactions that adhere to the “ISagaTransaction” interface.

Choreography Implementation

Following are the code snippets (aggregate, domain events, saga transactions, saga transaction handlers, saga locator, saga implementation class, and Web API Controller) that illustrate how to implement choreography-based saga:

“Order.cs” class:

public class Order : AggregateRoot<int>
{
    public string Product { get; private set; }
    public int Quantity { get; private set; }
    public decimal UnitPrice { get; private set; }
    public decimal Amount { get; private set; }
    public decimal AmountPaid { get; private set; }
    public bool PaymentTransferred { get; private set; } = false;
    public bool Delivered { get; private set; } = false;

    public static Order PlaceOrder(string product,
        int quantity,
        decimal unitPrice,
        decimal amountPaid)
    {
        Order order = new Order
        {
            Product = product,
            Quantity = quantity,
            UnitPrice = unitPrice,
            Amount = quantity * unitPrice,
            AmountPaid = amountPaid
        };

        order.Raise(new OrderPlacedDomainEvent(order.GlobalUId,
            order.Product,
            order.Quantity,
            order.UnitPrice,
            order.Amount,
            order.AmountPaid,
            order.PaymentTransferred,
            order.Delivered));

        return order;
    }

    public void TransferPayment()
    {
        ValidatePayment(Amount, AmountPaid);

        PaymentTransferred = true;

        Raise(new PaymentTransferredDomainEvent(GlobalUId, PaymentTransferred));
    }

    public void Deliver()
    {
        ValidatePaymentTransfer(PaymentTransferred);

        Delivered = true;

        Raise(new OrderDeliveredDomainEvent(GlobalUId, Delivered));
    }

    public override void Delete()
    {
        base.Delete();

        Raise(new OrderDeletedDomainEvent(GlobalUId));
    }

    private void ValidatePayment(decimal totalAmount, decimal amountPaid)
    {
        if (amountPaid < totalAmount)
        {
            ThrowDomainException("The paid amount is less than the total amount.");
        }
    }

    private void ValidatePaymentTransfer(bool paymentTransferred)
    {
        if (!paymentTransferred)
        {
            ThrowDomainException("The payment was not transferred. Please transfer the payment before the order is delivered.");
        }
    }

    private void Apply(OrderPlacedDomainEvent @event)
    {
        Product = @event.Product;
        Quantity = @event.Quantity;
        UnitPrice = @event.UnitPrice;
        Amount = @event.Amount;
        AmountPaid = @event.AmountPaid;
        PaymentTransferred = @event.PaymentTransferred;
        Delivered = @event.Delivered;
    }

    private void Apply(PaymentTransferredDomainEvent @event)
    {
        PaymentTransferred = @event.PaymentTransferred;
    }

    private void Apply(OrderDeliveredDomainEvent @event)
    {
        Delivered = @event.Delivered;
    }

    private void Apply(OrderDeletedDomainEvent @event)
    {
        //
    }
}

“OrderPlacedDomainEvent.cs” class:

public class OrderPlacedDomainEvent : DomainEvent
{
    public string Product { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
    public decimal Amount { get; set; }
    public decimal AmountPaid { get; set; }
    public bool PaymentTransferred { get; set; }
    public bool Delivered { get; set; }

    public OrderPlacedDomainEvent(Guid streamId,
        string product,
        int quantity,
        decimal unitPrice,
        decimal amount,
        decimal amountPaid,
        bool paymentTransferred,
        bool delivered)
    {
        StreamId = streamId;
        Product = product;
        Quantity = quantity;
        UnitPrice = unitPrice;
        Amount = amount;
        AmountPaid = amountPaid;
        PaymentTransferred = paymentTransferred;
        Delivered = delivered;
    }

    [JsonConstructor]
    public OrderPlacedDomainEvent(Guid streamId,
        string product,
        int quantity,
        decimal unitPrice,
        decimal amount,
        decimal amountPaid,
        bool paymentTransferred,
        bool delivered,
        int entityStatus,
        DateTime createdOn,
        DateTime? modifiedOn,
        long version)
    {
        StreamId = streamId;
        Product = product;
        Quantity = quantity;
        UnitPrice = unitPrice;
        Amount = amount;
        AmountPaid = amountPaid;
        PaymentTransferred = paymentTransferred;
        Delivered = delivered;
        EntityStatus = entityStatus;
        CreatedOn = createdOn;
        ModifiedOn = modifiedOn;
        Version = version;
    }
}

“PaymentTransferredDomainEvent.cs” class:

public class PaymentTransferredDomainEvent : DomainEvent
{
    public bool PaymentTransferred { get; set; }

    public PaymentTransferredDomainEvent(Guid streamId,
 bool paymentTransferred)
    {
        StreamId = streamId;
        PaymentTransferred = paymentTransferred;
    }

    [JsonConstructor]
    public PaymentTransferredDomainEvent(Guid streamId,
        bool paymentTransferred,
        int entityStatus,
        DateTime createdOn,
        DateTime? modifiedOn,
        long version)
    {
        StreamId = streamId;
        PaymentTransferred = paymentTransferred;
        EntityStatus = entityStatus;
        CreatedOn = createdOn;
        ModifiedOn = modifiedOn;
        Version = version;
    }
}

“OrderDeliveredDomainEvent.cs” class:

public class OrderDeliveredDomainEvent : DomainEvent
{
    public bool Delivered { get; set; }

    public OrderDeliveredDomainEvent(Guid streamId,
 bool delivered)
    {
        StreamId = streamId;
        Delivered = delivered;
    }

    [JsonConstructor]
    public OrderDeliveredDomainEvent(Guid streamId,
        bool delivered,
        int entityStatus,
        DateTime createdOn,
        DateTime? modifiedOn,
        long version)
    {
        StreamId = streamId;
        Delivered = delivered;
        EntityStatus = entityStatus;
        CreatedOn = createdOn;
        ModifiedOn = modifiedOn;
        Version = version;
    }
}

“OrderDeletedDomainEvent” class:

public class OrderDeletedDomainEvent : DomainEvent
{
    public OrderDeletedDomainEvent(Guid streamId)
    {
        StreamId = streamId;
    }

    [JsonConstructor]
    public OrderDeletedDomainEvent(Guid streamId,
        int entityStatus,
        DateTime createdOn,
        DateTime? modifiedOn,
        long version)
    {
        StreamId = streamId;
        EntityStatus = entityStatus;
        CreatedOn = createdOn;
        ModifiedOn = modifiedOn;
        Version = version;
    }
}

“PlaceOrderTransaction.cs” class:

public class PlaceOrderTransaction : ISagaTransaction
{
    public string Product { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
    public decimal AmountPaid { get; set; }
}

“TransferPaymentTransaction.cs” class:

public class TransferPaymentTransaction : ISagaTransaction
{
    public Guid GlobalUId { get; set; }
}

“DeliverOrderTransaction.cs” class:

public class DeliverOrderTransaction : ISagaTransaction
{
    public Guid GlobalUId { get; set; }
}

“TransferPaymentTransactionHandler.cs” class:

public class TransferPaymentTransactionHandler : SagaTransactionHandler<TransferPaymentTransaction>
{
    private readonly AppDbContext _dbContext;

    public TransferPaymentTransactionHandler(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public override async Task<SagaTransactionResult> Handle(TransferPaymentTransaction request, CancellationToken cancellationToken = default)
    {
        Order order = _dbContext.Order.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active 
            && x.GlobalUId == request.GlobalUId);

        if (order == null)
        {
            return await Fail(request.GlobalUId, "Order not found.");
        }

        order.TransferPayment();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return await Ok(order.GlobalUId);
    }
}

“DeliverOrderTransactionHandler.cs” class:

public class DeliverOrderTransactionHandler : SagaTransactionHandler<DeliverOrderTransaction>
{
    private readonly AppDbContext _dbContext;

    public DeliverOrderTransactionHandler(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public override async Task<SagaTransactionResult> Handle(DeliverOrderTransaction request, CancellationToken cancellationToken = default)
    {
        Order order = _dbContext.Order.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active 
            && x.GlobalUId == request.GlobalUId);

        if (order == null)
        {
            return await Fail(request.GlobalUId, "Order not found.");
        }

        order.Deliver();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return await Ok(order.GlobalUId);
    }
}

“OrderSagaLocator.cs” class:

public class OrderSagaLocator : SagaLocator
{
    // Just inherit this class from the "SagaLocator" base class.
}

In the above code snippet, the class OrderSagaLocator is derived from the base class SagaLocator. The saga locator class plays a crucial role in locating sagas during the transaction.

“OrderSaga.cs” class:

public class OrderSaga : SagaChoreography<OrderSagaLocator, PlaceOrderTransaction>,
    IAmStartedBy<PlaceOrderTransaction>,
    ISagaChoreographyEvent<Order, int, OrderPlacedDomainEvent>,
    ISagaChoreographyEvent<Order, int, PaymentTransferredDomainEvent>,
    ISagaChoreographyEvent<Order, int, OrderDeliveredDomainEvent>
{
    private readonly AppDbContext _dbContext;

    public OrderSaga(AppDbContext dbContext,
        ISagaStore sagaStore,
        OrderSagaLocator sagaLocator,
        ISagaTransactionExecutor sagaTransactionExecutor)
        : base(sagaStore, sagaLocator, sagaTransactionExecutor)
    {
        _dbContext = dbContext;
    }

    public override async Task<SagaTransactionResult> Handle(PlaceOrderTransaction request, CancellationToken cancellationToken = default)
    {
        Order order = Order.PlaceOrder(request.Product,
            request.Quantity,
            request.UnitPrice,
            request.AmountPaid);

        await _dbContext.Order.AddAsync(order, cancellationToken).ConfigureAwait(false);
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return await Ok(order.GlobalUId);
    }

    public async Task Handle(OrderPlacedDomainEvent @event, CancellationToken cancellationToken = default)
    {
        TransferPaymentTransaction transaction = new() { GlobalUId = @event.StreamId };

        async Task compensation() { await RejectOrder(@event.StreamId, cancellationToken).ConfigureAwait(false); }

        await Execute(@event.StreamId, transaction, compensation, cancellationToken).ConfigureAwait(false);
    }

    public async Task Handle(PaymentTransferredDomainEvent @event, CancellationToken cancellationToken = default)
    {
        if (@event.PaymentTransferred)
        {
            DeliverOrderTransaction transaction = new() { GlobalUId = @event.StreamId };

            await Execute(@event.StreamId, transaction, null, cancellationToken).ConfigureAwait(false);
        }
    }

    public async Task Handle(OrderDeliveredDomainEvent @event, CancellationToken cancellationToken = default)
    {
        await MarkAsComplete(@event.StreamId, cancellationToken).ConfigureAwait(false);
    }

    private async Task RejectOrder(Guid uId, CancellationToken cancellationToken)
    {
        Order order = await _dbContext.Order.SingleOrDefaultAsync(x => 
            x.EntityStatus == EntityStatus.Active 
            && x.GlobalUId == uId,
            cancellationToken)
            .ConfigureAwait(false);

        if (order != null)
        {
            order.Delete();

            await _dbContext.SaveChanges(cancellationToken);
        }
    }
}

In the code snippet above, the class OrderSaga inherits from SagaChoreography<TSagaLocator, TRequest>. The TSagaLocator represents the saga locator, while TRequest represents the initiating transaction. In order for the base class SagaChoreography to work effectively, it is essential to provide the necessary dependencies (saga store, saga locator, and saga transaction executor). It is essential to implement the IAmStartedBy<TRequest> interface, as it serves as the starting point. The base class provides the Handle method to handle the initial TRequest transaction.

To handle domain events, the ISagaChoreography<TAggregate, TKey, TDomainEvent> interface must be implemented. In this context, TAggregate refers to the aggregate, TKey denotes the type of the primary key Id, and TDomainEvent signifies the domain event being processed.

Within the Handle method for domain events, special attention should be given to the Execute method, which is provided by the base class. Execute(Guid correlationId, ISagaTransaction transaction, Func<Task> compensation, CancellationToken cancellationToken = default) method, which forwards the transaction to the next transaction handler. The parameter correlationId corresponds to the StreamId present in each domain event; the parameter transaction represents the subsequent transaction to be executed; and the parameter compensation refers to the method responsible for reverting changes. In cases where no compensation is needed, a null value can be passed.

It is crucial to understand that the correlationId plays a significant role in sagas by linking multiple transactions.

“OrderController.cs” class:

[Route("api/[controller]")]
[ApiController]
public class OrderController : ControllerBase
{
    private readonly ISagaTransactionExecutor _sagaTransactionExecutor;

    public OrderController(ISagaTransactionExecutor sagaTransactionExecutor)
    {
        _sagaTransactionExecutor = sagaTransactionExecutor;
    }

    [HttpPost]
    [Route("place-order")]
    public async Task<ActionResult<SagaTransactionResult>> PostPlaceOrder([FromBody] PlaceOrderRequest request)
    {
        PlaceOrderTransaction transaction = new()
        {
            Product = request.Product,
            Quantity = request.Quantity,
            UnitPrice = request.UnitPrice,
            AmountPaid = request.AmountPaid
        };

        return await _sagaTransactionExecutor.Execute(transaction, CancellationToken.None);
    }
}

Within the given code snippet, the transaction has been executed by making use of the ISagaTransactionExecutor interface.