Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Send + Sync from core traits #8

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ jobs:
include:
- build: stable
toolchain: stable
# Service containers to run with `container-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Provide the password for postgres
env:
POSTGRES_PASSWORD: password
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3

Expand All @@ -52,6 +67,9 @@ jobs:
with:
command: test
args: --workspace --all-features
env:
# The hostname used to communicate with the PostgreSQL service container
POSTGRES_HOST: postgres

formatting:
name: Rustfmt Check
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sqlx = { version = "0.8", features = [
"json",
] }

uuid = { version = "1", features = ["v4", "serde"] }
uuid = { version = "1", features = ["v7", "serde"] }
chrono = "0.4"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion eventastic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "eventastic"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
license = "MIT"
readme = "../README.md"
Expand Down
16 changes: 8 additions & 8 deletions eventastic/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@ pub use root::*;
/// using the [`Aggregate::apply`] method.
///
/// More on Aggregates can be found here: `<https://www.dddcommunity.org/library/vernon_2011/>`
pub trait Aggregate: Sized + Send + Sync + Clone {
pub trait Aggregate: Sized + Clone {
/// The current version of the snapshot to store.
/// This number should be increased when a breaking change is made to the apply functions.
const SNAPSHOT_VERSION: u64;

/// The type used to uniquely identify the Aggregate.
type AggregateId: Send + Sync + Clone + Debug + Eq + PartialEq;
type AggregateId: Clone + Debug + Eq + PartialEq;

/// The type of Domain Events that interest this Aggregate.
/// Usually, this type should be an `enum`.
type DomainEvent: Send + Sync + Clone + Debug + Eq + PartialEq + Event<Self::DomainEventId>;
type DomainEvent: Clone + Debug + Eq + PartialEq + Event<Self::DomainEventId>;

/// The type used to uniquely identify the a given domain event.
type DomainEventId: Send + Sync + Clone + Debug + Eq + PartialEq;
type DomainEventId: Clone + Debug + Eq + PartialEq;

/// The error type that can be returned by [`Aggregate::apply`] when
/// mutating the Aggregate state.
type ApplyError: Send + Sync + Debug;
type ApplyError: Debug;

/// The type of side effect that this aggregate can produce.
/// Usually, this type should be an `enum`.
Expand Down Expand Up @@ -92,11 +92,11 @@ pub trait Aggregate: Sized + Send + Sync + Clone {
fn side_effects(&self, event: &Self::DomainEvent) -> Option<Vec<Self::SideEffect>>;
}

pub trait SideEffect: Send + Sync + Debug {
pub trait SideEffect: Debug {
/// The type used to uniquely identify this side effect.
type Id: Send + Sync + Debug + Clone;
type Id: Debug + Clone;
/// The error type that can be returned when calling a [`SideEffectHandler::handle`]
type Error: Send + Sync + Debug;
type Error: Debug;

/// Returns read access to the [`SideEffect::Id`]
fn id(&self) -> &Self::Id;
Expand Down
199 changes: 135 additions & 64 deletions eventastic/src/aggregate/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::repository::{RepositoryTransaction, Snapshot};
use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::repository::{RepositoryError, RepositoryTransaction, Snapshot};
use crate::{
aggregate::Aggregate,
event::{Event, EventStoreEvent},
Expand Down Expand Up @@ -38,7 +42,7 @@ where
T::SNAPSHOT_VERSION
}

/// Returns the list of uncommitted, recorded Domain [Event]s from the [Context]
/// Returns the list of uncommitted, recorded Domain [Events] from the [Context]
/// and resets the internal list to its default value.
#[doc(hidden)]
pub fn take_uncommitted_events(
Expand All @@ -61,7 +65,8 @@ where
///
/// The method can return an error if the event to apply is unexpected
/// given the current state of the Aggregate.
pub(crate) fn rehydrate_from(
#[doc(hidden)]
pub fn rehydrate_from(
event: &EventStoreEvent<T::DomainEventId, T::DomainEvent>,
) -> Result<Context<T>, T::ApplyError> {
Ok(Context {
Expand All @@ -79,7 +84,8 @@ where
///
/// The method can return an error if the event to apply is unexpected
/// given the current state of the Aggregate.
pub(crate) fn apply_rehydrated_event(
#[doc(hidden)]
pub fn apply_rehydrated_event(
mut self,
event: &EventStoreEvent<T::DomainEventId, T::DomainEvent>,
) -> Result<Context<T>, T::ApplyError> {
Expand All @@ -89,43 +95,6 @@ where
Ok(self)
}

/// Checks if the event exists in the repository and that they are equal
pub(crate) async fn check_idempotency<R>(
&self,
repository: &mut R,
aggregate_id: &T::AggregateId,
event: &T::DomainEvent,
) -> Result<bool, RecordError<T, <R as RepositoryTransaction<T>>::DbError>>
where
R: RepositoryTransaction<T>,
{
if let Some(saved_event) = repository
.get_event(aggregate_id, event.id())
.await
.map_err(RecordError::Repository)?
{
if saved_event.event != *event {
return Err(RecordError::IdempotencyError(
saved_event.event,
event.clone(),
));
}
return Ok(true);
}

if let Some(existing_event) = self.uncommitted_events.iter().find(|e| e.id == *event.id()) {
if existing_event.event != *event {
return Err(RecordError::IdempotencyError(
existing_event.event.clone(),
event.clone(),
));
}
return Ok(true);
};

Ok(false)
}

pub(crate) fn record_new(event: T::DomainEvent) -> Result<Context<T>, T::ApplyError> {
let aggregate = T::apply_new(&event)?;
let mut uncommitted_side_effects = vec![];
Expand Down Expand Up @@ -158,23 +127,8 @@ where
///
/// The method can return an error if the event to apply is unexpected
/// given the current state of the Aggregate.
pub async fn record_that<R>(
&mut self,
repository: &mut R,
event: T::DomainEvent,
) -> Result<(), RecordError<T, <R as RepositoryTransaction<T>>::DbError>>
where
R: RepositoryTransaction<T>,
{
// Check if the event is has already been applied, if so let's ignore it.
if self
.check_idempotency(repository, self.aggregate_id(), &event)
.await?
{
return Ok(());
}

self.aggregate.apply(&event).map_err(RecordError::Apply)?;
pub fn record_that(&mut self, event: T::DomainEvent) -> Result<(), T::ApplyError> {
self.aggregate.apply(&event)?;
self.version += 1;

if let Some(mut side_effects) = self.aggregate.side_effects(&event) {
Expand All @@ -189,19 +143,131 @@ where

Ok(())
}

pub async fn save<R>(
&mut self,
transaction: &mut R,
) -> Result<(), SaveError<T, <R as RepositoryTransaction<T>>::DbError>>
where
T: Serialize,
T::SideEffect: Serialize,
R: RepositoryTransaction<T>,
{
let events_to_commit = self.take_uncommitted_events();

let side_effects_to_commit = self.take_uncommitted_side_effects();

if events_to_commit.is_empty() {
return Ok(());
}

let aggregate_id = self.aggregate_id();

let snapshot_version = self.snapshot_version();
let snapshot_to_store = self.state();

let snapshot = Snapshot {
snapshot_version,
aggregate: snapshot_to_store.clone(),
version: self.version(),
};

// When we insert the events, it's possible that the events have already been inserted
// If that's the case, we need to check if the previously inserted events are the same as the ones we have
let inserted_event_ids = transaction
.append(aggregate_id, events_to_commit.clone())
.await
.map_err(SaveError::Repository)?;

if inserted_event_ids.len() != events_to_commit.len() {
// We failed to insert one or more of the events, it's possible that the events have already been inserted
// If that's the case, we need to check if the previously inserted events are the same as the ones we have
for event in events_to_commit {
if !inserted_event_ids.contains(&event.id) {
if let Some(saved_event) = transaction
.get_event(aggregate_id, event.id())
.await
.map_err(SaveError::Repository)?
{
if saved_event.event != event.event {
return Err(SaveError::IdempotencyError(
saved_event.event,
event.event,
));
}
} else {
// The not inserted event was not found in the event store, this happens if a different event was inserted with the same version and aggregate id
// This is a fatal error, so return early
return Err(SaveError::OptimisticConcurrency(
aggregate_id.clone(),
event.version,
));
}
}
}
}

transaction
.store_snapshot(snapshot)
.await
.map_err(SaveError::Repository)?;

transaction
.insert_side_effects(side_effects_to_commit)
.await?;

Ok(())
}

pub async fn load<R>(
transaction: &mut R,
aggregate_id: &T::AggregateId,
) -> Result<Context<T>, RepositoryError<T::ApplyError, T::DomainEventId, R::DbError>>
where
T: DeserializeOwned,
R: RepositoryTransaction<T>,
{
let snapshot = transaction.get_snapshot(aggregate_id).await;

let (context, version) = if let Some(snapshot) = snapshot {
if snapshot.snapshot_version == T::SNAPSHOT_VERSION {
// Snapshot is valid so return it
let context: Context<T> = snapshot.into();
// We want to get the next event in the stream
let version = context.version() + 1;
(Some(context), version)
} else {
(None, 0)
}
} else {
(None, 0)
};

let ctx = transaction
.stream_from(aggregate_id, version)
.map_err(RepositoryError::Repository)
.try_fold(context, |ctx: Option<Context<T>>, event| async move {
let new_ctx_result = match ctx {
None => Context::rehydrate_from(&event),
Some(ctx) => ctx.apply_rehydrated_event(&event),
};

let new_ctx = new_ctx_result.map_err(|e| RepositoryError::Apply(event.id, e))?;

Ok(Some(new_ctx))
})
.await?;

ctx.ok_or(RepositoryError::AggregateNotFound)
}
}

/// List of possible errors that can be returned by when recording events using [`Context::record_that`]
#[derive(Debug, thiserror::Error)]
pub enum RecordError<T, DE>
pub enum SaveError<T, DE>
where
T: Aggregate,
{
/// The [Event] failed to be applied to the [Aggregate].
/// This usually implies that the event is invalid for given state of the aggregate.
#[error("Failed to rehydrate aggregate from event stream. {0:?}")]
Apply(T::ApplyError),

/// This error is returned when the event in the repository with the same ID
/// doesn't have the same content.
#[error("Idempotency Error. Saved event {0:?} does not equal {1:?}")]
Expand All @@ -211,6 +277,11 @@ where
/// an unexpected error while streaming back the Aggregate's Event Stream.
#[error("Event store failed while streaming events: {0}")]
Repository(#[from] DE),

/// This error is returned when the Repository returns
/// when it fails to insert the event because the version already exists
#[error("Optimistic Concurrency Error Version {1} of aggregate {0:?} already exists")]
OptimisticConcurrency(T::AggregateId, u64),
}

impl<T> From<Snapshot<T>> for Context<T>
Expand Down
19 changes: 6 additions & 13 deletions eventastic/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
//! Module `event` contains types and abstractions helpful for working
//! with Domain Events.

use std::fmt::Debug;

use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

/// An [`Event`] that will be / has been persisted to the Event Store.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct EventStoreEvent<Id, Evt>
where
Id: Send + Debug,
Self: Send + Sync,
Evt: Send + Sync + Clone + Eq + PartialEq,
Id: Debug,
Evt: Clone + Eq + PartialEq,
{
/// The id of the event
pub id: Id,
Expand All @@ -27,21 +24,17 @@ where
/// A domain event.
pub trait Event<Id>
where
Id: Send + Debug,
Id: Debug,
{
fn id(&self) -> &Id;
}

impl<Id, Evt> Event<Id> for EventStoreEvent<Id, Evt>
where
Id: Send + Debug,
Self: Send + Sync,
Evt: Send + Sync + Clone + Eq + PartialEq,
Id: Debug,
Evt: Clone + Eq + PartialEq,
{
fn id(&self) -> &Id {
&self.id
}
}

/// Stream is a stream of [`EventStoreEvent`] Domain Events.
pub type Stream<'a, Id, Evt, Err> = BoxStream<'a, Result<EventStoreEvent<Id, Evt>, Err>>;
Loading
Loading