Skip to content

Commit

Permalink
gRPC as Replicated ES transport (#757)
Browse files Browse the repository at this point in the history
* Replicated Event Sourcing over gRPC, with Scala and Java DSLs
* Support pluggable projections and composable gRPC endpoints
* Replication settings from config or created programmatically
* Add miminal parallel updates flow (replace with MapAsyncPartitioned)
* Integration tests for Scala and Java DSLs
* Initial documentation and examples
  • Loading branch information
johanandren authored Jan 25, 2023
1 parent ad213c6 commit ee1d5e4
Show file tree
Hide file tree
Showing 36 changed files with 2,636 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ package akka.projection.internal
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier

import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

import akka.NotUsed
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.dispatch.ExecutionContexts
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.scaladsl.Source
import akka.stream.javadsl.{ Source => JSource }

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
@InternalApi private[projection] class SourceProviderAdapter[Offset, Envelope](
@InternalStableApi private[projection] class SourceProviderAdapter[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

Expand All @@ -39,3 +41,26 @@ import akka.stream.scaladsl.Source

def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)
}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope](
delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider)
extends javadsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider {
override def source(
offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] =
delegate
.source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic))
.map(_.asJava)(ExecutionContexts.parasitic)
.toJava

override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)

def minSlice: Int = delegate.minSlice

def maxSlice: Int = delegate.maxSlice
}
43 changes: 43 additions & 0 deletions akka-projection-grpc/src/it/resources/db/default-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,46 @@ CREATE TABLE IF NOT EXISTS akka_projection_management (
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);

-- For Replicated Event Sourcing over gRPC tests
CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCA (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);

CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCB (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);

CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCC (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);
12 changes: 10 additions & 2 deletions akka-projection-grpc/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
<appender-ref ref="STDOUT"/>
</logger>

<!-- One of the tests depend on these, don't change -->
<logger name="akka.projection.grpc" level="TRACE" />
<logger name="akka.projection.r2dbc" level="DEBUG" />
<logger name="akka.persistence.r2dbc" level="DEBUG" />
<logger name="akka.persistence.typed" level="TRACE" />

<!-- Silence some other stuff -->
<logger name="akka.actor.typed.pubsub" level="INFO" />
<logger name="akka.cluster.typed.internal.receptionist" level="INFO" />
<logger name="io.grpc.netty.shaded.io.grpc.netty" level="INFO" />
<logger name="io.r2dbc.postgresql" level="INFO" />
<logger name="reactor.netty.resources" level="INFO" />
<logger name="io.r2dbc.pool" level="INFO" />

<root level="INFO">
<root level="TRACE">
<appender-ref ref="CapturingAppender"/>
<!-- <appender-ref ref="STDOUT"/>-->
</root>

</configuration>
Loading

0 comments on commit ee1d5e4

Please sign in to comment.