-
Notifications
You must be signed in to change notification settings - Fork 33
Snapshot Service
CQRS and Event-Sourcing allows events to be replayed to recreate the current state of an aggregate. However, as the number of events to be replayed increases, this operation can become increasingly resource intensive. Snapshots allow the state of an aggregate to be captured at regular intervals, ensuring that only a small number of events would need to be replayed after the latest snapshot is applied.
The Snapshot Service has two primary tasks: reading snapshots and writing snapshots: Reading occurs each time a command comes into the system. Writing does not occur each time, but is written only if the the rules determined by a snapshot strategy require it.
A Snapshot is simply the serialized Java Bytecode of your Aggregate Object. It is deserialized from the database and returned by the snapshot repository, to be used by the aggregate service.
Once a CommandHandler receives a command, an aggregate is rebuilt from it's previously stored events. If the snapshot service is enabled, then the latest snapshot is read and deserialised to create your base Aggregate. Any later events not present in the snapshot are then retrieved from the event-store and applied to the aggregate. It is here that the efficiency gains from using snapshots are seen.
Once the aggregate has been rebuilt the incoming command is converted to events, and these are then appended to the stream. If the snapshot strategy determines that a new snapshot should be created, then a new snapshot is written.
This is a class which defines the rule for when a new snapshot should be written. By default it requires a new snapshot to be written every twenty five events.
In this case the AggregateService will simply instantiate a new Aggregate from it's java Class Object using reflection.
- A command comes in to the Command Handler.
- The Aggregate Service is queried for the latest aggregate.
- The Aggregate Service gets the latest snapshot from the Snapshot tables.
- The latest Snapshot is returned as a versioned aggregate.
- The Aggregate is built from the latest returned Snapshot.
- The Event Log tables are read and any new events not present in the snapshot are applied to the aggregate.
- The command is converted into events and are applied to the event stream. This is the usual aggregate behaviour.
- The Snapshot Strategy is read to determine if a new Snapshot should be written.
- If a new Snapshot should be written, then a new Snapshot is written to the snapshot database tables.
The entire aggregate is stored in the database as a Serialized Java object, and on reading is deserialized back into the Aggregate Object. This introduces two issues: firstly, it means that all Aggregates must be Java Serializable, secondly, there is the problem of Aggregates changing over time: fields can be added or renamed. This is solved by introducing the concept of Versioning.
The aggregate is stored in one table in the viewstore database snapshot
, which has the following structure:
Column Name | Type | Description |
---|---|---|
stream_id | uuid | the id of the associated stream |
version_id | long | the current version of the associated stream |
type | String | the java classname of the aggregate |
aggregate | byte array | the serialized bytecode of the aggregate |
version_id
is the version of the event stream at the time the snapshot was written. It is used by the default SnapshotStrategy
to determine if a new snapshot should be created.
Should the insert into the Snapshot table fail, the application will handle the exception silently: without blocking or failing. An error message will be written to the logs.
The Snapshot table is created from liquibase scripts found in aggregate-snapshot-repository-liquibase.
To run liquibase to add the snapshot table to the viewstore database, firstly cd into the root of the microservices project and run the following command:
mvn -f aggregate-snapshot/aggregate-snapshot-repository-liquibase/pom.xml -Dliquibase.url=jdbc:postgresql://localhost:5432/<database name> -Dliquibase.username=<database username> -Dliquibase.password=<database password> -Dliquibase.logLevel=info resources:resources liquibase:update
All snapshots are actually the Java Serialized bytecode of the Aggregate Object. However, Aggregates can change over time: fields can be added/removed or renamed. This means that the Aggregate object would no longer be deserializable from the database bytecode. To resolve this, all Aggregates must be versioned: by implementing Java serialVersionUID
in the Aggregate class. If on deserialization, the serialVersionUID
has changed, the AggregateService throws an AggregateChangeDetectedException
. The Snapshot Service then assumes that all previous Aggregates are now obsolete and they are deleted from the database. The Aggregate is then rebuilt from the entire event log in the usual non-snapshot way. When the process completes the Snapshot Strategy will ensue that this version of the Aggregate is stored as a new Snapshot.
By default a Snapshot Service is always used. The default version DefaultAggregateService
is a 'do nothing' version of the Snapshot Service. To enable snapshots, then microservices needs to know to load SnapshotAwareAggregateService
in your command handler rather than the default. Both classes implement the AggregateService
interface. The SnapshotAwareAggregateService
is loaded by using some CDI magic. SnapshotAwareAggregateService' is contained in an optional microservices jar:
aggregate-snapshot.jar. If this jar is on the classpath, then as
SnapshotAwareAggregateService` has a higher priority than the default, then it will be injected into your command handler instead.
This class is used to determine the frequency of when new snapshots are created. The default DefaultSnapshotStrategy
uses the version_id from the database and creates a new snapshot after every 25 events are created. To implement a custom SnapshotStrategy
, implement the SnapshotStrategy interface and mark your new class with a higher priority than DefaultSnapshotStrategy
. CDI should then inject your custom SnapshotStrategy
into the DefaultSnapshotService.
@ApplicationScoped
@Alternative
@Priority(100)
public class MySnapshotStrategy implements SnapshotStrategy {
@Override
public boolean shouldCreateSnapshot(final long aggregateVersionId, final long snapshotVersionId) {
return true;
}
}
Snapshot Service is enable simply by adding this dependency in the pom of the module containing the Aggregate you intend to snapshot.
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>aggregate-snapshot-service</artifactId>
<version>${project.version}</version>
</dependency>