PostgreSQL¶
To use PostgreSQL - Event Store
integration, the NuGet package CrystalSharp.PostgreSql
must be installed.
IMPORTANT
Kindly keep in mind that this particular configuration is intended for the purpose of storing events in the event store and reading the data from the event store. However, the configuration procedures for database integration, Read Model Store, and Sagas will be approached differently.
In order to integrate an event store effectively, it is necessary to adhere to the following mandatory steps:
- Register the implementation of the event store.
- Run the database migrator using the
PostgreSqlEventStoreSetup
class. - An aggregate must raise the events that need to be recorded.
- Use the
IAggregateEventStore
interface for event store operations.
Event Store Registration¶
Registration for the event store implementation is required. Following is the code that illustrates how to register the implementation of the event store in the Program.cs
file:
PostgreSqlSettings postgreSqlEventStoreSettings = new("CONNECTION-STRING");
IResolver resolver = CrystalSharpAdapter.New(builder.Services)
.AddCqrs(typeof(PlaceOrderCommandHandler))
.AddPostgreSqlEventStoreDb<int>(postgreSqlEventStoreSettings)
.CreateResolver();
IPostgreSqlDatabaseMigrator databaseMigrator = resolver.Resolve<IPostgreSqlDatabaseMigrator>();
PostgreSqlEventStoreSetup.Run(databaseMigrator, postgreSqlEventStoreSettings.ConnectionString).Wait();
In the above code snippet, when initializing the Crystal Sharp framework, a call to an extension method is made AddPostgreSqlEventStoreDb<int>(postgreSqlEventStoreSettings)
for the event store registration. Here, pay attention to <TKey>
, which is the data type of the primary key in all the aggregates. It is mandatory that all the aggregates have the same data type for the Id
property, which is the primary key. The <TKey>
is generic, and it can be changed as per system requirements.
After registration of the event store, it is mandatory to run database scripts to create the tables for the event store. The PostgreSqlEventStoreSetup
class provides a Run
method that expects an interface IPostgreSqlDatabaseMigrator
and connection string to execute the database scripts.
IMPORTANT
It is mandatory to run “PostgreSqlEventStoreSetup.Run(databaseMigrator, postgreSqlEventStoreSettings.ConnectionString).Wait()” to create the tables for the event store.
IMPORTANT
It is expected that you have already reviewed the documentation on “Domain Events” and “Event Handlers”. If you haven’t, it is strongly recommended that you do so in order to grasp the concepts discussed in these topics.
Domain Event Class¶
Following is the code for the domain event class:
public class OrderPlacedDomainEvent : DomainEvent
{
public string OrderCode { get; set; }
public decimal TotalPrice { get; set; }
public OrderPlacedDomainEvent(Guid streamId, string orderCode, decimal totalPrice)
{
StreamId = streamId;
OrderCode = orderCode;
TotalPrice = totalPrice;
}
[JsonConstructor]
public OrderPlacedDomainEvent(Guid streamId,
string orderCode,
decimal totalPrice,
int entityStatus,
DateTime createdOn,
DateTime? modifiedOn,
long version)
{
StreamId = streamId;
OrderCode = orderCode;
TotalPrice = totalPrice;
EntityStatus = entityStatus;
CreatedOn = createdOn;
ModifiedOn = modifiedOn;
Version = version;
}
}
In the above code snippet, the OrderPlacedDomainEvent
class is inherited from the DomainEvent
base class. In the Crystal Sharp framework, it is mandatory that every domain event inherit from the DomainEvent
base class.
In the above OrderPlacedDomainEvent
class, there are two constructors. The first constructor will be called when a domain event triggers. In the first constructor, the streamId
parameter must be the GlobalUId
of an aggregate.
The second constructor, with the JsonConstructor
attribute, is called during the deserialization of a domain event. An important point about the second constructor is that here are four additional parameters: entityStatus
, createdOn
, modifiedOn
, and version
are used internally during the deserialization when replaying events.
IMPORTANT
Every domain event class must have both constructors, one for triggering a domain event and a second constructor with the “JsonConstructor” attribute with four additional parameters (“entityStatus”, “createdOn”, “modifiedOn”, and “version”) for deserialization.
Trigger Domain Events¶
To trigger a domain event from an aggregate, a Raise
method is available from the AggregateRoot<TKey>
base class. The following code snippet presents an illustration of how to raise a domain event from an aggregate:
public class Order : AggregateRoot<int>
{
public string OrderCode { get; private set; }
public decimal TotalPrice { get; private set; }
public static Order PlaceOrder(string orderCode, decimal totalPrice)
{
Order order = new() { OrderCode = orderCode, TotalPrice = totalPrice };
order.Raise(new OrderPlacedDomainEvent(order.GlobalUId, order.orderCode, order.totalPrice));
return order;
}
public void Apply(OrderPlacedDomainEvent @event)
{
OrderCode = @event.OrderCode;
TotalPrice = @event.TotalPrice;
}
}
In the above code snippet, the Order
aggregate is depicted with one method: PlaceOrder
. The utilization of the Raise
method from the base class is evident as it serves to trigger a domain event. The Apply
method is intended to handle different types of domain events. It is essential to note that the Apply
method should always include a parameter that matches the domain event type, as illustrated in the provided code.
Store Domain Events¶
To store events in the event store, the IAggregateEventStore
interface is used. The following code illustrates how to save an event:
public class PlaceOrderCommandHandler : CommandHandler<PlaceOrderCommand, PlaceOrderResponse>
{
private readonly IAggregateEventStore<int> _eventStore;
public PlaceOrderCommandHandler(IAggregateEventStore<int> eventStore)
{
_eventStore = eventStore;
}
public override async Task<CommandExecutionResult<PlaceOrderResponse>> Handle(PlaceOrderCommand request, CancellationToken cancellationToken = default)
{
Order order = Order.PlaceOrder(request.OrderCode, request.TotalPrice);
await _eventStore.Store(order, cancellationToken).ConfigureAwait(false);
PlaceOrderResponse response = new() { OrderId = order.GlobalUId };
return await Ok(response);
}
}
In the above code snippet, the IAggregateEventStore<int>
interface is injected into the PlaceOrderCommandHandler
constructor. In the Handle
method, a new order is placed and stored in the event store. Here, pay attention to _eventStore.Store(order, cancellationToken)
, as this method will get the events that were raised in an aggregate and store them in the event store.
NOTE
If there are any domain event handlers, then those domain event handlers will be triggered only after the data has been successfully stored in the event store.
Stream, StreamId and StreamName¶
The event store utilizes the terms Stream
, StreamId
, and StreamName
. The StreamId
represents the GlobalUId
of an aggregate. The Crystal Sharp framework is responsible for internally generating the StreamName
. The stream name format will consist of the aggregate’s class name combined with the GlobalUId
value, without hyphens, in camel case. For instance, if the aggregate class is Order
and the GlobalUId
value is F737EDA2-6796-470B-AFB5-283D7CA75740
, then the stream name will be order-F737EDA26796470BAFB5283D7CA75740
, with the same GlobalUId
serving as the StreamId
for the stream.