Saga - Orchestration


The orchestrator assumes the role of a centralized controller, overseeing all local transactions and keeping track of the overall transaction status. Through messages, the coordinator instructs each service on the actions to be taken and their sequence. It is the coordinator’s responsibility to oversee the transaction flow and ensure the successful execution of each step in the saga.

IMPORTANT

Configuring the saga store is a prerequisite prior to implementing sagas.

Within the Crystal Sharp framework, orchestration-based sagas rely on activities. To successfully implement such sagas, certain components are necessary, such as activities, a saga locator, and an orchestrator implementation.

The orchestrator implementation class should be derived from the SagaOrchestrator class.

All activities involved in the saga must implement the ISagaActivity interface, with the last activity ensuring transaction completion. An activity acts as a unit for executing transactions and may have optional rollback compensation.

IMPORTANT

It is expected that you have already reviewed the documentation on “Aggregate”. If you haven’t, it is strongly recommended that you do so in order to grasp the concepts discussed in these topics.

In order to implement orchestration-based sagas effectively, it is necessary to adhere to the following mandatory steps:

  • Register the implementation of the saga store.
  • An initial transaction that implements the ISagaTransaction interface.
  • One or more activities and the implementation of the ISagaActivity interface are required for each activity.
  • If a rollback is required, then compensation activity is necessary.
  • The last activity must complete the transaction.
  • A Saga Locator class that will be inherited from the SagaLocator class.
  • The implementation class for the saga orchestrator must inherit from the SagaOrchestrator class.
  • Override the Handle method from the base class SagaOrchestrator to handle the initial transaction in order to set up and execute the saga orchestrator.
  • Execute the transaction using the ISagaTransactionExecutor interface.

Orchestration Implementation

Following are the code snippets (aggregate, activities, saga locator, orchestrator implementation class, and Web API Controller) that illustrate how to implement orchestration-based saga:

“Trip.cs” class:

public class Trip : AggregateRoot<int>
{
    public Guid CorrelationId { get; private set; }
    public string Name { get; private set; }
    public string Hotel { get; private set; }
    public decimal HotelReservation { get; private set; }
    public decimal HotelReservationPaidByCustomer { get; private set; }
    public bool HotelReservationConfirmed { get; private set; } = false;
    public string Flight { get; private set; }
    public decimal FlightFare { get; private set; }
    public decimal FlightFarePaidByCustomer { get; private set; }
    public bool FlightConfirmed { get; private set; } = false;
    public decimal TotalAmount { get; private set; }
    public bool Confirm { get; private set; } = false;

    public static Trip Create(Guid correlationId, string name)
    {
        Trip trip = new() { CorrelationId = correlationId, Name = name };

        return trip;
    }

    public void BookHotel(string hotel, decimal reservationAmount, decimal amountPaidByCutomer)
    {
        ValidateServiceAmount(reservationAmount, amountPaidByCutomer);

        Hotel = hotel;
        HotelReservation = reservationAmount;
        HotelReservationPaidByCustomer = amountPaidByCutomer;
        HotelReservationConfirmed = true;
        SetTotalAmount(HotelReservation);
    }

    public void CancelHotelReservation()
    {
        HotelReservationPaidByCustomer = 0;
        HotelReservationConfirmed = false;
    }

    public void BookFlight(string flight, decimal fare, decimal amountPaidByCustomer)
    {
        ValidateServiceAmount(fare, amountPaidByCustomer);

        Flight = flight;
        FlightFare = fare;
        FlightFarePaidByCustomer = amountPaidByCustomer;
        FlightConfirmed = true;

        SetTotalAmount(FlightFare);
    }

    public void CancelFlight()
    {
        FlightFarePaidByCustomer = 0;
        FlightConfirmed = false;
    }

    public void ConfirmTrip()
    {
        decimal amountPaid = HotelReservationPaidByCustomer + FlightFarePaidByCustomer;

        ValidateTotalAmount(TotalAmount, amountPaid);

        Confirm = true;
    }

    public void CancelTrip()
    {
        CancelHotelReservation();
        CancelFlight();

        Confirm = false;
    }

    public override void Delete()
    {
        base.Delete();
    }

    private void SetTotalAmount(decimal amount)
    {
        TotalAmount += amount;
    }

    private void ValidateServiceAmount(decimal serviceAmount, decimal amountPaidByCustomer)
    {
        if (amountPaidByCustomer < serviceAmount)
        {
            ThrowDomainException("The paid amount is less than the amount required for this service.");
        }
    }

    private void ValidateTotalAmount(decimal totalAmount, decimal amountPaid)
    {
        if (amountPaid < totalAmount)
        {
            ThrowDomainException("The paid amount is less than the amount required for this trip.");
        }
    }
}

“PlanTripTransaction.cs” class:

public class PlanTripTransaction : ISagaTransaction
{
    public string Name { get; set; }
    public string Hotel { get; set; }
    public decimal HotelReservationAmount { get; set; }
    public decimal HotelReservationPaidByCustomer { get; set; }
    public string Flight { get; set; }
    public decimal Fare { get; set; }
    public decimal FlightFarePaidByCustomer { get; set; }
}

“CreateTripActivity.cs” class:

public class CreateTripActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public CreateTripActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = Trip.Create(sagaContext.CorrelationId, transaction.Name);

        await _dbContext.Trip.AddAsync(trip, cancellationToken).ConfigureAwait(false);
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

The provided code snippet showcases the implementation of the ISagaActivity interface by the CreateTripActivity. It is essential to implement this interface for every activity or compensation. Specifically, the Execute method of ISagaActivity must be implemented. This method takes a parameter called SagaOrchestratorContext which includes the properties CorrelationId and Data. CorrelationId is a Guid type used to link transactions during orchestrator execution, while Data is of type object. The Data property in SagaOrchestratorContext is initially passed to the Prepare method when the orchestrator is set up, and being of type object, it can be converted into a suitable type.

“BookHotelActivity.cs” class:

public class BookHotelActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public BookHotelActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.BookHotel(transaction.Hotel, transaction.HotelReservationAmount, transaction.HotelReservationPaidByCustomer);
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“CancelHotelReservationActivity.cs” class:

public class CancelHotelReservationActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public CancelHotelReservationActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.CancelHotelReservation();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“BookFlightActivity.cs” class:

public class BookFlightActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public BookFlightActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.BookFlight(transaction.Flight, transaction.Fare, transaction.FlightFarePaidByCustomer);
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“CancelFlightActivity.cs” class:

public class CancelFlightActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public CancelFlightActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.CancelFlight();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“ConfirmTripActivity.cs” class:

public class ConfirmTripActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public ConfirmTripActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.ConfirmTrip();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“CancelTripActivity.cs” class:

public class CancelTripActivity : ISagaActivity
{
    private readonly AppDbContext _dbContext;

    public CancelTripActivity(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }

    public async Task<SagaTransactionResult> Execute(SagaOrchestratorContext sagaContext, CancellationToken cancellationToken = default)
    {
        PlanTripTransaction transaction = (PlanTripTransaction)sagaContext.Data;
        Trip trip = _dbContext.Trip.SingleOrDefault(x => x.EntityStatus == EntityStatus.Active && x.CorrelationId == sagaContext.CorrelationId);

        if (trip == null)
        {
            return SagaTransactionResult.WithError(new List<Error> { new Error(ReservedErrorCode.SystemError, "Trip not found.") });
        }

        trip.CancelTrip();
        await _dbContext.SaveChanges(cancellationToken).ConfigureAwait(false);

        return new SagaTransactionResult(sagaContext.CorrelationId, true);
    }
}

“TripSagaLocator.cs” class:

public class TripSagaLocator : SagaLocator
{
    // Just inherit this class from the "SagaLocator" base class.
}

In the above code snippet, the class TripSagaLocator is derived from the base class SagaLocator. The saga locator class plays a crucial role in locating sagas during the transaction.

“TripSaga.cs” class:

public class TripSaga : SagaOrchestrator<TripSagaLocator, PlanTripTransaction>
{
    public TripSaga(IResolver resolver,
        ISagaStore sagaStore,
        TripSagaLocator sagaLocator)
        : base(resolver, sagaStore, sagaLocator)
    {
        //
    }

    public override async Task<SagaTransactionResult> Handle(PlanTripTransaction request, CancellationToken cancellationToken = default)
    {
        SagaResult sagaResult = await PrepareOrchestrator(request)
            .Activity<CreateTripActivity>("Create Trip")
            .Activity<BookHotelActivity>("Book Hotel")
            .WithCompensation<CancelHotelReservationActivity>("Cancel Hotel Reservation")
            .Activity<BookFlightActivity>("Book Flight")
            .WithCompensation<CancelFlightActivity>("Cancel Flight")
            .Activity<ConfirmTripActivity>("Confirm Trip")
            .WithCompensation<CancelTripActivity>("Cancel Trip")
            .Run(cancellationToken)
            .ConfigureAwait(false);

        IEnumerable<Error> errors = sagaResult.Trail.Where(t => t.Errors.HasAny()).SelectMany(e => e.Errors);

        return new SagaTransactionResult(sagaResult.CorrelationId, sagaResult.Success, errors);
    }
}

The TripSaga class in the provided code snippet inherits from the SagaOrchestrator<TSagaLocator, TRequest> class. The TSagaLocator represents the saga locator, while TRequest represents the initiating transaction. To ensure that the base class SagaOrchestrator functions effectively, it is necessary to provide the required dependencies, such as the resolver, saga store, and saga locator. The Handle method, which has been overridden, manages the initial TRequest transaction.

Within the Handle method, special attention should be given to the PrepareOrchestrator(TRequest initialTransaction) method provided by the base class SagaOrchestrator. The PrepareOrchestrator method expects the initial transaction and returns the fluent interface of the ISagaOrchestrator to define activities and compensations. The initialTransaction parameter in the PrepareOrchestrator allows the utilization of passed data in activities and compensations.

The orchestrator has been started by invoking the Run(CancellationToken cancellationToken = default) method. The Run method returns an instance of SagaResult, which contains the properties CorrelationId, Success, and IEnumerable<SagaTrail>. These properties define the outcome of the saga. The CorrelationId is a Guid-based identifier used to connect the transactions during the orchestrator’s execution. The Success property is a boolean that indicates whether all transactions were successfully executed. If any transaction fails, the Success property will be set to false. The IEnumerable<SagaTrail> property contains the trail of each activity.

“TripController.cs” class:

[Route("api/[controller]")]
[ApiController]
public class TripController : ControllerBase
{
    private readonly ISagaTransactionExecutor _sagaTransactionExecutor;

    public TripController(ISagaTransactionExecutor sagaTransactionExecutor)
    {
        _sagaTransactionExecutor = sagaTransactionExecutor;
    }

    [HttpPost]
    [Route("plan-trip")]
    public async Task<ActionResult<SagaTransactionResult>> PostPlanTrip([FromBody] PlanTripRequest request)
    {
        PlanTripTransaction transaction = new()
        {
            Name = request.Name,
            Hotel = request.Hotel,
            HotelReservationAmount = request.HotelReservationAmount,
            HotelReservationPaidByCustomer = request.HotelReservationPaidByCustomer,
            Flight = request.Flight,
            Fare = request.Fare,
            FlightFarePaidByCustomer = request.FlightFarePaidByCustomer
        };

        return await _sagaTransactionExecutor.Execute(transaction, CancellationToken.None);
    }
}

Within the given code snippet, the transaction has been executed by making use of the ISagaTransactionExecutor interface.