diff --git a/akka-projection-grpc/src/it/resources/logback-test.xml b/akka-projection-grpc/src/it/resources/logback-test.xml
index c4f5da0d0..be95b5fe0 100644
--- a/akka-projection-grpc/src/it/resources/logback-test.xml
+++ b/akka-projection-grpc/src/it/resources/logback-test.xml
@@ -12,7 +12,7 @@
-
+
diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala
index 0033d20d1..4c7d9d646 100644
--- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala
+++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala
@@ -12,6 +12,9 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.grpc.GrpcClientSettings
+import akka.grpc.GrpcServiceException
+import akka.grpc.scaladsl.Metadata
+import akka.grpc.scaladsl.MetadataBuilder
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
@@ -21,11 +24,13 @@ import akka.persistence.typed.PersistenceId
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
+import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor
import akka.projection.r2dbc.scaladsl.R2dbcHandler
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.r2dbc.scaladsl.R2dbcSession
@@ -33,6 +38,7 @@ import akka.projection.scaladsl.Handler
import akka.testkit.SocketUtil
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
+import io.grpc.Status
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
@@ -96,6 +102,8 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
with LogCapturing {
import IntegrationSpec._
+ def this() = this(new TestContainerConf)
+
override def typedSystem: ActorSystem[_] = system
private implicit val ec: ExecutionContext = system.executionContext
private val numberOfTests = 4
@@ -126,8 +134,10 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
EventSourcedProvider.eventsBySlices[String](
system,
GrpcReadJournal(
- system,
- streamId,
+ GrpcQuerySettings(
+ streamId = streamId,
+ protoClassMapping = Map.empty,
+ additionalRequestMetadata = Some(new MetadataBuilder().addText("x-secret", "top_secret").build())),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", grpcPort)
.withTls(false)),
@@ -173,8 +183,16 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
EventProducerSource(source.entityType, source.streamId, transformation, EventProducerSettings(system)))
.toSet
+ val authInterceptor = new EventProducerInterceptor {
+ def intercept(streamId: String, requestMetadata: Metadata): Future[Done] = {
+ if (!requestMetadata.getText("x-secret").contains("top_secret"))
+ throw new GrpcServiceException(Status.PERMISSION_DENIED)
+ else Future.successful(Done)
+ }
+ }
+
val eventProducerService =
- EventProducer.grpcServiceHandler(eventProducerSources)
+ EventProducer.grpcServiceHandler(eventProducerSources, Some(authInterceptor))
val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
@@ -292,7 +310,6 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
def expectedLogMessage(seqNr: Long): String =
s"Received backtracking event from [127.0.0.1] persistenceId [${pid.id}] with seqNr [$seqNr]"
-
val projection =
LoggingTestKit.trace(expectedLogMessage(1)).expect {
LoggingTestKit.trace(expectedLogMessage(2)).expect {
diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
index bb0f70bc3..a41fb34ac 100644
--- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
+++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
@@ -17,6 +17,7 @@ import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestData
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
+import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
@@ -25,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import java.time.Instant
-import java.time.{Duration => JDuration}
+import java.time.{ Duration => JDuration }
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
@@ -40,7 +41,6 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
def this() = this(new TestContainerConf)
-
protected override def afterAll(): Unit = {
super.afterAll()
testContainerConf.stop()
@@ -59,8 +59,7 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))
lazy val grpcReadJournal = GrpcReadJournal(
- system,
- streamId,
+ GrpcQuerySettings(streamId, Map.empty, None),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")))
}
diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
index c3ca5db0c..7208f6449 100644
--- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
+++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
@@ -18,6 +18,7 @@ import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.TestData
+import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
@@ -60,8 +61,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))
lazy val grpcReadJournal = GrpcReadJournal(
- system,
- streamId,
+ GrpcQuerySettings(streamId, Map.empty, None),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false))
diff --git a/akka-projection-grpc/src/main/resources/reference.conf b/akka-projection-grpc/src/main/resources/reference.conf
index 913e93b0b..f9c4d274c 100644
--- a/akka-projection-grpc/src/main/resources/reference.conf
+++ b/akka-projection-grpc/src/main/resources/reference.conf
@@ -2,6 +2,9 @@ akka.projection.grpc {
consumer {
class = "akka.projection.grpc.consumer.GrpcReadJournalProvider"
+ # Note: these settings are only applied when constructing the consumer from config
+ # if creating the GrpcQuerySettings programmatically these settings are ignored
+
# Configuration of gRPC client.
# See https://doc.akka.io/docs/akka-grpc/current/client/configuration.html#by-configuration
client = ${akka.grpc.client."*"}
@@ -16,6 +19,11 @@ akka.projection.grpc {
# Protobuf events.
proto-class-mapping {
}
+
+ # Pass these additional request headers as string values in each request to the producer
+ # can be used for example for authorization in combination with an interceptor in the producer.
+ # Example "x-auth-header": "secret"
+ additional-request-headers {}
}
producer {
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala
index 9bd2f80c5..1943b74af 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala
@@ -4,25 +4,90 @@
package akka.projection.grpc.consumer
+import akka.annotation.ApiMayChange
+import akka.grpc.scaladsl.Metadata
+import akka.grpc.scaladsl.MetadataBuilder
import com.typesafe.config.Config
+import akka.util.ccompat.JavaConverters._
+import java.util.Optional
+import scala.compat.java8.OptionConverters._
+
+@ApiMayChange
object GrpcQuerySettings {
- def apply(config: Config): GrpcQuerySettings =
- new GrpcQuerySettings(config)
+ def apply(config: Config): GrpcQuerySettings = {
+ val streamId = config.getString("stream-id")
+ require(
+ streamId != "",
+ "Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.")
+
+ val protoClassMapping: Map[String, String] = {
+ import scala.jdk.CollectionConverters._
+ config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map {
+ case (k, v) => k -> v.toString
+ }
+
+ }
+
+ val additionalHeaders: Option[Metadata] = {
+ import scala.jdk.CollectionConverters._
+ val map = config.getConfig("additional-request-headers").root.unwrapped.asScala.toMap.map {
+ case (k, v) => k -> v.toString
+ }
+ if (map.isEmpty) None
+ else
+ Some(
+ map
+ .foldLeft(new MetadataBuilder()) {
+ case (builder, (key, value)) =>
+ builder.addText(key, value)
+ }
+ .build())
+ }
+
+ new GrpcQuerySettings(streamId, protoClassMapping, additionalHeaders)
+ }
+
+ /**
+ * Scala API: Programmatic construction of GrpcQuerySettings
+ *
+ * @param streamId The stream id to consume
+ * @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
+ * when deserializing Protobuf events.
+ * @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
+ * on the remote side.
+ */
+ def apply(
+ streamId: String,
+ protoClassMapping: Map[String, String],
+ additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = {
+ new GrpcQuerySettings(streamId, protoClassMapping, additionalRequestMetadata)
+ }
+
+ /**
+ * Java API: Programmatic construction of GrpcQuerySettings
+ *
+ * @param streamId The stream id to consume
+ * @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
+ * when deserializing Protobuf events.
+ * @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
+ * on the remote side.
+ */
+ def create(
+ streamId: String,
+ protoClassMapping: java.util.Map[String, String],
+ additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = {
+ new GrpcQuerySettings(streamId, protoClassMapping.asScala.toMap, additionalRequestMetadata.asScala.map(_.asScala))
+ }
}
-class GrpcQuerySettings(config: Config) {
- val grpcClientConfig: Config = config.getConfig("client")
- val streamId = config.getString("stream-id")
+@ApiMayChange
+final class GrpcQuerySettings(
+ val streamId: String,
+ val protoClassMapping: Map[String, String],
+ val additionalRequestMetadata: Option[Metadata]) {
require(
streamId != "",
"Configuration property [stream-id] must be an id exposed by the streaming side (but was empty).")
- val protoClassMapping: Map[String, String] = {
- import scala.jdk.CollectionConverters._
- config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map {
- case (k, v) => k -> v.toString
- }
-
- }
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala
index adaf4b119..28c9f8e11 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala
@@ -8,11 +8,10 @@ import java.time.Instant
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage
-
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
-
import akka.NotUsed
+import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.japi.Pair
import akka.persistence.query.Offset
@@ -24,10 +23,12 @@ import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.projection.grpc.consumer.scaladsl
import akka.stream.javadsl.Source
+@ApiMayChange
object GrpcReadJournal {
val Identifier: String = scaladsl.GrpcReadJournal.Identifier
}
+@ApiMayChange
class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal)
extends ReadJournal
with EventsBySliceQuery
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala
index a2410f993..9c05da444 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala
@@ -13,7 +13,12 @@ import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
+import akka.annotation.ApiMayChange
import akka.grpc.GrpcClientSettings
+import akka.grpc.scaladsl.SingleResponseRequestBuilder
+import akka.grpc.scaladsl.BytesEntry
+import akka.grpc.scaladsl.StreamResponseRequestBuilder
+import akka.grpc.scaladsl.StringEntry
import akka.persistence.Persistence
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
@@ -41,11 +46,11 @@ import akka.projection.grpc.internal.proto.StreamOut
import akka.stream.scaladsl.Source
import com.google.protobuf.timestamp.Timestamp
import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
+@ApiMayChange
object GrpcReadJournal {
val Identifier = "akka.projection.grpc.consumer"
@@ -53,29 +58,21 @@ object GrpcReadJournal {
LoggerFactory.getLogger(classOf[GrpcReadJournal])
/**
- * Construct a gRPC read journal for the given stream-id and explicit `GrpcClientSettings` to control
+ * Construct a gRPC read journal for the given settings and explicit `GrpcClientSettings` to control
* how to reach the Akka Projection gRPC producer service (host, port etc).
*/
- def apply(
- system: ClassicActorSystemProvider,
- streamId: String,
- clientSettings: GrpcClientSettings): GrpcReadJournal = {
- val extended = system.classicSystem.asInstanceOf[ExtendedActorSystem]
-
- val configPath = GrpcReadJournal.Identifier
- val config = ConfigFactory
- .parseString(s"""akka.projection.grpc.consumer.stream-id=$streamId""")
- .withFallback(extended.settings.config)
- .getConfig(configPath)
-
- val settings = GrpcQuerySettings(config)
+ def apply(settings: GrpcQuerySettings, clientSettings: GrpcClientSettings)(
+ implicit system: ClassicActorSystemProvider) = {
val clientSettingsWithOverrides =
// compose with potential user overrides to allow overriding our defaults
clientSettings.withChannelBuilderOverrides(
channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides))
- new scaladsl.GrpcReadJournal(extended, settings, clientSettingsWithOverrides)
+ new scaladsl.GrpcReadJournal(
+ system.classicSystem.asInstanceOf[ExtendedActorSystem],
+ settings,
+ clientSettingsWithOverrides)
}
private def channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder =
@@ -85,6 +82,7 @@ object GrpcReadJournal {
}
+@ApiMayChange
final class GrpcReadJournal private (
system: ExtendedActorSystem,
settings: GrpcQuerySettings,
@@ -95,17 +93,9 @@ final class GrpcReadJournal private (
with LoadEventQuery {
import GrpcReadJournal.log
- private def this(system: ExtendedActorSystem, settings: GrpcQuerySettings) =
- this(
- system,
- settings,
- GrpcClientSettings
- .fromConfig(settings.grpcClientConfig)(system)
- .withChannelBuilderOverrides(GrpcReadJournal.channelBuilderOverrides))
-
// entry point when created through Akka Persistence
def this(system: ExtendedActorSystem, config: Config, cfgPath: String) =
- this(system, GrpcQuerySettings(config))
+ this(system, GrpcQuerySettings(config), GrpcClientSettings.fromConfig(config.getConfig("client"))(system))
private implicit val typedSystem = system.toTyped
private val persistenceExt = Persistence(system)
@@ -113,6 +103,24 @@ final class GrpcReadJournal private (
new ProtoAnySerialization(system.toTyped, settings.protoClassMapping)
private val client = EventProducerServiceClient(clientSettings)
+ private val additionalRequestHeaders = settings.additionalRequestMetadata match {
+ case Some(meta) => meta.asList
+ case None => Seq.empty
+ }
+
+ private def addRequestHeaders[Req, Res](
+ builder: StreamResponseRequestBuilder[Req, Res]): StreamResponseRequestBuilder[Req, Res] =
+ additionalRequestHeaders.foldLeft(builder) {
+ case (acc, (key, StringEntry(str))) => acc.addHeader(key, str)
+ case (acc, (key, BytesEntry(bytes))) => acc.addHeader(key, bytes)
+ }
+
+ private def addRequestHeaders[Req, Res](
+ builder: SingleResponseRequestBuilder[Req, Res]): SingleResponseRequestBuilder[Req, Res] =
+ additionalRequestHeaders.foldLeft(builder) {
+ case (acc, (key, StringEntry(str))) => acc.addHeader(key, str)
+ case (acc, (key, BytesEntry(bytes))) => acc.addHeader(key, bytes)
+ }
override def sliceForPersistenceId(persistenceId: String): Int =
persistenceExt.sliceForPersistenceId(persistenceId)
@@ -185,11 +193,13 @@ final class GrpcReadJournal private (
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
- val streamOut =
- client.eventsBySlices(streamIn).recover {
- case th: Throwable =>
- throw new RuntimeException(s"Failure to consume gRPC event stream for [${streamId}]", th)
- }
+ val streamOut: Source[StreamOut, NotUsed] =
+ addRequestHeaders(client.eventsBySlices())
+ .invoke(streamIn)
+ .recover {
+ case th: Throwable =>
+ throw new RuntimeException(s"Failure to consume gRPC event stream for [${streamId}]", th)
+ }
streamOut.map {
case StreamOut(StreamOut.Message.Event(event), _) =>
@@ -269,8 +279,8 @@ final class GrpcReadJournal private (
// EventTimestampQuery
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
import system.dispatcher
- client
- .eventTimestamp(EventTimestampRequest(settings.streamId, persistenceId, sequenceNr))
+ addRequestHeaders(client.eventTimestamp())
+ .invoke(EventTimestampRequest(settings.streamId, persistenceId, sequenceNr))
.map(_.timestamp.map(_.asJavaInstant))
}
@@ -282,8 +292,8 @@ final class GrpcReadJournal private (
persistenceId,
sequenceNr)
import system.dispatcher
- client
- .loadEvent(LoadEventRequest(settings.streamId, persistenceId, sequenceNr))
+ addRequestHeaders(client.loadEvent())
+ .invoke(LoadEventRequest(settings.streamId, persistenceId, sequenceNr))
.map {
case LoadEventResponse(LoadEventResponse.Message.Event(event), _) =>
eventToEnvelope(event, settings.streamId)
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala
index e7e9ad6f2..0cd095881 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala
@@ -4,6 +4,8 @@
package akka.projection.grpc.internal
+import akka.Done
+
import java.time.Instant
import scala.concurrent.Future
import akka.NotUsed
@@ -11,6 +13,7 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
import akka.grpc.GrpcServiceException
+import akka.grpc.scaladsl.Metadata
import akka.persistence.query.NoOffset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
@@ -19,7 +22,7 @@ import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.persistence.typed.PersistenceId
import akka.projection.grpc.internal.proto.Event
-import akka.projection.grpc.internal.proto.EventProducerService
+import akka.projection.grpc.internal.proto.EventProducerServicePowerApi
import akka.projection.grpc.internal.proto.EventTimestampRequest
import akka.projection.grpc.internal.proto.EventTimestampResponse
import akka.projection.grpc.internal.proto.FilteredEvent
@@ -32,6 +35,7 @@ import akka.projection.grpc.internal.proto.StreamIn
import akka.projection.grpc.internal.proto.StreamOut
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
@@ -48,7 +52,7 @@ import scala.annotation.nowarn
@InternalApi private[akka] object EventProducerServiceImpl {
val log: Logger =
LoggerFactory.getLogger(classOf[EventProducerServiceImpl])
-
+ private val futureDone = Future.successful(Done)
}
/**
@@ -57,9 +61,12 @@ import scala.annotation.nowarn
@InternalApi private[akka] class EventProducerServiceImpl(
system: ActorSystem[_],
eventsBySlicesQueriesPerStreamId: Map[String, EventsBySliceQuery],
- sources: Set[EventProducer.EventProducerSource])
- extends EventProducerService {
- import EventProducerServiceImpl.log
+ sources: Set[EventProducer.EventProducerSource],
+ interceptor: Option[EventProducerInterceptor])
+ extends EventProducerServicePowerApi {
+ import EventProducerServiceImpl._
+ import system.executionContext
+
require(
sources.nonEmpty,
"Empty set of EventProducerSource passed to EventProducerService, must contain at least one")
@@ -82,16 +89,22 @@ import scala.annotation.nowarn
.map(s => s"(stream id: [${s.streamId}], entity type: [${s.entityType}])")
.mkString(", "))
+ private def intercept(streamId: String, metadata: Metadata): Future[Done] =
+ interceptor match {
+ case Some(interceptor) => interceptor.intercept(streamId, metadata)
+ case None => futureDone
+ }
+
private def eventProducerSourceFor(streamId: String): EventProducer.EventProducerSource =
streamIdToSourceMap.getOrElse(
streamId,
throw new GrpcServiceException(
Status.NOT_FOUND.withDescription(s"Stream id [${streamId}] is not available for consumption")))
- override def eventsBySlices(in: Source[StreamIn, NotUsed]): Source[StreamOut, NotUsed] = {
+ override def eventsBySlices(in: Source[StreamIn, NotUsed], metadata: Metadata): Source[StreamOut, NotUsed] = {
in.prefixAndTail(1).flatMapConcat {
case (Seq(StreamIn(StreamIn.Message.Init(init), _)), tail) =>
- tail.via(runEventsBySlices(init, tail))
+ tail.via(runEventsBySlices(init, tail, metadata))
case (Seq(), _) =>
// if error during recovery in proxy the stream will be completed before init
log.warn("Event stream closed before init.")
@@ -109,62 +122,66 @@ import scala.annotation.nowarn
@nowarn("msg=never used")
private def runEventsBySlices(
init: InitReq,
- nextReq: Source[StreamIn, NotUsed]): Flow[StreamIn, StreamOut, NotUsed] = {
- val producerSource = eventProducerSourceFor(init.streamId)
+ nextReq: Source[StreamIn, NotUsed],
+ metadata: Metadata): Flow[StreamIn, StreamOut, NotUsed] = {
+ val futureFlow = intercept(init.streamId, metadata).map { _ =>
+ val producerSource = eventProducerSourceFor(init.streamId)
- val offset = init.offset match {
- case None => NoOffset
- case Some(o) =>
- val timestamp =
- o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH)
- val seen = o.seen.map {
- case PersistenceIdSeqNr(pid, seqNr, _) =>
- pid -> seqNr
- }.toMap
- TimestampOffset(timestamp, seen)
- }
+ val offset = init.offset match {
+ case None => NoOffset
+ case Some(o) =>
+ val timestamp =
+ o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH)
+ val seen = o.seen.map {
+ case PersistenceIdSeqNr(pid, seqNr, _) =>
+ pid -> seqNr
+ }.toMap
+ TimestampOffset(timestamp, seen)
+ }
- log.debugN(
- "Starting eventsBySlices stream [{}], [{}], slices [{} - {}], offset [{}]",
- producerSource.streamId,
- producerSource.entityType,
- init.sliceMin,
- init.sliceMax,
- offset match {
- case t: TimestampOffset => t.timestamp
- case _ => offset
- })
+ log.debugN(
+ "Starting eventsBySlices stream [{}], [{}], slices [{} - {}], offset [{}]",
+ producerSource.streamId,
+ producerSource.entityType,
+ init.sliceMin,
+ init.sliceMax,
+ offset match {
+ case t: TimestampOffset => t.timestamp
+ case _ => offset
+ })
- val events: Source[EventEnvelope[Any], NotUsed] =
- eventsBySlicesQueriesPerStreamId(init.streamId)
- .eventsBySlices[Any](producerSource.entityType, init.sliceMin, init.sliceMax, offset)
+ val events: Source[EventEnvelope[Any], NotUsed] =
+ eventsBySlicesQueriesPerStreamId(init.streamId)
+ .eventsBySlices[Any](producerSource.entityType, init.sliceMin, init.sliceMax, offset)
- val eventsStreamOut: Source[StreamOut, NotUsed] =
- events.mapAsync(producerSource.settings.transformationParallelism) { env =>
- import system.executionContext
- transformAndEncodeEvent(producerSource.transformation, env).map {
- case Some(event) =>
- log.traceN(
- "Emitting {}event from persistenceId [{}] with seqNr [{}], offset [{}]",
- if (event.payload.isEmpty) "backtracking " else "",
- env.persistenceId,
- env.sequenceNr,
- env.offset)
- StreamOut(StreamOut.Message.Event(event))
- case None =>
- log.traceN(
- "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}]",
- env.persistenceId,
- env.sequenceNr,
- env.offset)
- StreamOut(
- StreamOut.Message.FilteredEvent(
- FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)))))
+ val eventsStreamOut: Source[StreamOut, NotUsed] =
+ events.mapAsync(producerSource.settings.transformationParallelism) { env =>
+ import system.executionContext
+ transformAndEncodeEvent(producerSource.transformation, env).map {
+ case Some(event) =>
+ log.traceN(
+ "Emitting {}event from persistenceId [{}] with seqNr [{}], offset [{}]",
+ if (event.payload.isEmpty) "backtracking " else "",
+ env.persistenceId,
+ env.sequenceNr,
+ env.offset)
+ StreamOut(StreamOut.Message.Event(event))
+ case None =>
+ log.traceN(
+ "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}]",
+ env.persistenceId,
+ env.sequenceNr,
+ env.offset)
+ StreamOut(
+ StreamOut.Message.FilteredEvent(
+ FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)))))
+ }
}
- }
- // FIXME nextReq not handled yet
- Flow.fromSinkAndSource(Sink.ignore, eventsStreamOut)
+ // FIXME nextReq not handled yet
+ Flow.fromSinkAndSource(Sink.ignore, eventsStreamOut)
+ }
+ Flow.futureFlow(futureFlow).mapMaterializedValue(_ => NotUsed)
}
private def protoOffset(env: EventEnvelope[_]): Offset = {
@@ -203,64 +220,68 @@ import scala.annotation.nowarn
}
}
- override def eventTimestamp(req: EventTimestampRequest): Future[EventTimestampResponse] = {
- val producerSource = streamIdToSourceMap(req.streamId)
- val entityTypeFromPid = PersistenceId.extractEntityType(req.persistenceId)
- if (entityTypeFromPid != producerSource.entityType) {
- throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(
- s"Persistence id is for a type of entity that is not available for consumption (expected type " +
- s" in persistence id for stream id [${req.streamId}] is [${producerSource.entityType}] but was [$entityTypeFromPid])"))
- }
- eventsBySlicesQueriesPerStreamId(req.streamId) match {
- case q: EventTimestampQuery =>
- import system.executionContext
- q.timestampOf(req.persistenceId, req.seqNr).map {
- case Some(instant) => EventTimestampResponse(Some(Timestamp(instant)))
- case None => EventTimestampResponse.defaultInstance
- }
- case other =>
- Future.failed(new UnsupportedOperationException(s"eventTimestamp not supported by [${other.getClass.getName}]"))
+ override def eventTimestamp(req: EventTimestampRequest, metadata: Metadata): Future[EventTimestampResponse] = {
+ intercept(req.streamId, metadata).flatMap { _ =>
+ val producerSource = streamIdToSourceMap(req.streamId)
+ val entityTypeFromPid = PersistenceId.extractEntityType(req.persistenceId)
+ if (entityTypeFromPid != producerSource.entityType) {
+ throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(
+ s"Persistence id is for a type of entity that is not available for consumption (expected type " +
+ s" in persistence id for stream id [${req.streamId}] is [${producerSource.entityType}] but was [$entityTypeFromPid])"))
+ }
+ eventsBySlicesQueriesPerStreamId(req.streamId) match {
+ case q: EventTimestampQuery =>
+ import system.executionContext
+ q.timestampOf(req.persistenceId, req.seqNr).map {
+ case Some(instant) => EventTimestampResponse(Some(Timestamp(instant)))
+ case None => EventTimestampResponse.defaultInstance
+ }
+ case other =>
+ Future.failed(
+ new UnsupportedOperationException(s"eventTimestamp not supported by [${other.getClass.getName}]"))
+ }
}
}
- override def loadEvent(req: LoadEventRequest): Future[LoadEventResponse] = {
- val producerSource = eventProducerSourceFor(req.streamId)
- val entityTypeFromPid = PersistenceId.extractEntityType(req.persistenceId)
- if (entityTypeFromPid != producerSource.entityType)
- throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(
- s"Persistence id is for a type of entity that is not available for consumption (expected type " +
- s" in persistence id for stream id [${req.streamId}] is [${producerSource.entityType}] but was [$entityTypeFromPid])"))
- eventsBySlicesQueriesPerStreamId(req.streamId) match {
- case q: LoadEventQuery =>
- import system.executionContext
- q.loadEnvelope[Any](req.persistenceId, req.seqNr)
- .flatMap { env =>
- transformAndEncodeEvent(producerSource.transformation, env).map {
- case Some(event) =>
- log.traceN(
- "Loaded event from persistenceId [{}] with seqNr [{}], offset [{}]",
- env.persistenceId,
- env.sequenceNr,
- env.offset)
- LoadEventResponse(LoadEventResponse.Message.Event(event))
- case None =>
- log.traceN(
- "Filtered loaded event from persistenceId [{}] with seqNr [{}], offset [{}]",
- env.persistenceId,
- env.sequenceNr,
- env.offset)
- LoadEventResponse(
- LoadEventResponse.Message.FilteredEvent(
+ override def loadEvent(req: LoadEventRequest, metadata: Metadata): Future[LoadEventResponse] = {
+ intercept(req.streamId, metadata).flatMap { _ =>
+ val producerSource = eventProducerSourceFor(req.streamId)
+ val entityTypeFromPid = PersistenceId.extractEntityType(req.persistenceId)
+ if (entityTypeFromPid != producerSource.entityType)
+ throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(
+ s"Persistence id is for a type of entity that is not available for consumption (expected type " +
+ s" in persistence id for stream id [${req.streamId}] is [${producerSource.entityType}] but was [$entityTypeFromPid])"))
+ eventsBySlicesQueriesPerStreamId(req.streamId) match {
+ case q: LoadEventQuery =>
+ import system.executionContext
+ q.loadEnvelope[Any](req.persistenceId, req.seqNr)
+ .flatMap { env =>
+ transformAndEncodeEvent(producerSource.transformation, env).map {
+ case Some(event) =>
+ log.traceN(
+ "Loaded event from persistenceId [{}] with seqNr [{}], offset [{}]",
+ env.persistenceId,
+ env.sequenceNr,
+ env.offset)
+ LoadEventResponse(LoadEventResponse.Message.Event(event))
+ case None =>
+ log.traceN(
+ "Filtered loaded event from persistenceId [{}] with seqNr [{}], offset [{}]",
+ env.persistenceId,
+ env.sequenceNr,
+ env.offset)
+ LoadEventResponse(LoadEventResponse.Message.FilteredEvent(
FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)))))
+ }
}
- }
- .recoverWith {
- case e: NoSuchElementException =>
- log.warn(e.getMessage)
- Future.failed(new GrpcServiceException(Status.NOT_FOUND.withDescription(e.getMessage)))
- }
- case other =>
- Future.failed(new UnsupportedOperationException(s"loadEvent not supported by [${other.getClass.getName}]"))
+ .recoverWith {
+ case e: NoSuchElementException =>
+ log.warn(e.getMessage)
+ Future.failed(new GrpcServiceException(Status.NOT_FOUND.withDescription(e.getMessage)))
+ }
+ case other =>
+ Future.failed(new UnsupportedOperationException(s"loadEvent not supported by [${other.getClass.getName}]"))
+ }
}
}
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala
index 2e5245ba2..af5f7f60f 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/EventProducerSettings.scala
@@ -5,8 +5,10 @@
package akka.projection.grpc.producer
import akka.actor.typed.ActorSystem
+import akka.annotation.ApiMayChange
import com.typesafe.config.Config
+@ApiMayChange
object EventProducerSettings {
def apply(system: ActorSystem[_]): EventProducerSettings =
apply(system.settings.config.getConfig("akka.projection.grpc.producer"))
@@ -20,8 +22,8 @@ object EventProducerSettings {
}
}
-case class EventProducerSettings(queryPluginId: String, transformationParallelism: Int) {
-
+@ApiMayChange
+final case class EventProducerSettings(queryPluginId: String, transformationParallelism: Int) {
require(transformationParallelism >= 1, "Configuration property [transformation-parallelism] must be >= 1.")
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala
index b4a65bb87..4385f5aaa 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala
@@ -4,22 +4,31 @@
package akka.projection.grpc.producer.javadsl
+import akka.Done
+
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import akka.actor.typed.ActorSystem
+import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
+import akka.grpc.internal.JavaMetadataImpl
+import akka.grpc.scaladsl.{ Metadata => ScalaMetadata }
import akka.projection.grpc.internal.EventProducerServiceImpl
import akka.japi.function.{ Function => JapiFunction }
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
-import akka.projection.grpc.internal.proto.EventProducerServiceHandler
+import akka.projection.grpc.internal.proto.EventProducerServicePowerApiHandler
+import akka.util.ccompat.JavaConverters._
import java.util.Collections
-import scala.jdk.CollectionConverters._
+import java.util.Optional
+import scala.compat.java8.OptionConverters.RichOptionalGeneric
+import scala.concurrent.Future
/**
* The event producer implementation that can be included a gRPC route in an Akka HTTP server.
*/
+@ApiMayChange
object EventProducer {
/**
@@ -39,16 +48,31 @@ object EventProducer {
*/
def grpcServiceHandler(
system: ActorSystem[_],
- sources: java.util.Set[EventProducerSource]): JapiFunction[HttpRequest, CompletionStage[HttpResponse]] = {
+ sources: java.util.Set[EventProducerSource]): JapiFunction[HttpRequest, CompletionStage[HttpResponse]] =
+ grpcServiceHandler(system, sources, Optional.empty())
+
+ /**
+ * The gRPC route that can be included in an Akka HTTP server.
+ *
+ * @param sources All sources that should be available from this event producer
+ * @param interceptor An optional request interceptor applied to each request to the service
+ */
+ def grpcServiceHandler(
+ system: ActorSystem[_],
+ sources: java.util.Set[EventProducerSource],
+ interceptor: Optional[EventProducerInterceptor]): JapiFunction[HttpRequest, CompletionStage[HttpResponse]] = {
val scalaProducerSources = sources.asScala.map(_.asScala).toSet
val eventsBySlicesQueriesPerStreamId =
akka.projection.grpc.producer.scaladsl.EventProducer
.eventsBySlicesQueriesForStreamIds(scalaProducerSources, system)
- val eventProducerService =
- new EventProducerServiceImpl(system, eventsBySlicesQueriesPerStreamId, scalaProducerSources)
+ val eventProducerService = new EventProducerServiceImpl(
+ system,
+ eventsBySlicesQueriesPerStreamId,
+ scalaProducerSources,
+ interceptor.asScala.map(new InterceptorAdapter(_)))
- val handler = EventProducerServiceHandler(eventProducerService)(system)
+ val handler = EventProducerServicePowerApiHandler(eventProducerService)(system)
new JapiFunction[HttpRequest, CompletionStage[HttpResponse]] {
override def apply(request: HttpRequest): CompletionStage[HttpResponse] =
handler(request.asInstanceOf[akka.http.scaladsl.model.HttpRequest])
@@ -56,4 +80,16 @@ object EventProducer {
.toJava
}
}
+
+ private final class InterceptorAdapter(interceptor: EventProducerInterceptor)
+ extends akka.projection.grpc.producer.scaladsl.EventProducerInterceptor {
+ def intercept(streamId: String, requestMetadata: ScalaMetadata): Future[Done] =
+ interceptor
+ .intercept(
+ streamId,
+ // FIXME: Akka gRPC internal class, add public API for Scala to Java metadata there
+ new JavaMetadataImpl(requestMetadata))
+ .toScala
+ }
+
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerInterceptor.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerInterceptor.scala
new file mode 100644
index 000000000..8006dac51
--- /dev/null
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerInterceptor.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc.
+ */
+
+package akka.projection.grpc.producer.javadsl
+
+import akka.Done
+import akka.annotation.ApiMayChange
+import akka.grpc.javadsl.Metadata
+
+import java.util.concurrent.CompletionStage
+
+/**
+ * Interceptor allowing for example authentication/authorization of incoming requests to consume a specific stream.
+ */
+@ApiMayChange
+@FunctionalInterface
+trait EventProducerInterceptor {
+
+ /**
+ * Let's requests through if method returns, can fail request by throwing a [[akka.grpc.GrpcServiceException]]
+ */
+ def intercept(streamId: String, requestMetadata: Metadata): CompletionStage[Done]
+
+}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala
index ae1afdac1..326ba1d56 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala
@@ -4,6 +4,7 @@
package akka.projection.grpc.producer.javadsl
+import akka.annotation.ApiMayChange
import akka.projection.grpc.producer.EventProducerSettings
/**
@@ -12,6 +13,7 @@ import akka.projection.grpc.producer.EventProducerSettings
* @param transformation Transformations for turning the internal events to public message types
* @param settings The event producer settings used (can be shared for multiple sources)
*/
+@ApiMayChange
final class EventProducerSource(
entityType: String,
streamId: String,
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala
index 0ea5b52e4..bb50542b8 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala
@@ -4,17 +4,18 @@
package akka.projection.grpc.producer.javadsl
+import akka.annotation.ApiMayChange
+
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.{ Function => JFunction }
-
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag
-
import akka.dispatch.ExecutionContexts
import akka.projection.grpc.producer.scaladsl
+@ApiMayChange
object Transformation {
val empty: Transformation = new Transformation(scaladsl.EventProducer.Transformation.empty)
@@ -28,6 +29,7 @@ object Transformation {
* Transformation of events to the external (public) representation.
* Events can be excluded by mapping them to `Optional.empty`.
*/
+@ApiMayChange
final class Transformation private (private[akka] val delegate: scaladsl.EventProducer.Transformation) {
def registerAsyncMapper[A, B](
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala
index 97a2b9065..e2631bb4c 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala
@@ -4,21 +4,25 @@
package akka.projection.grpc.producer.scaladsl
+import akka.Done
+
import scala.concurrent.Future
import scala.reflect.ClassTag
-
import akka.actor.typed.ActorSystem
+import akka.annotation.ApiMayChange
+import akka.grpc.scaladsl.Metadata
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.projection.grpc.internal.EventProducerServiceImpl
-import akka.projection.grpc.internal.proto.EventProducerServiceHandler
+import akka.projection.grpc.internal.proto.EventProducerServicePowerApiHandler
import akka.projection.grpc.producer.EventProducerSettings
/**
* The event producer implementation that can be included a gRPC route in an Akka HTTP server.
*/
+@ApiMayChange
object EventProducer {
/**
@@ -27,6 +31,7 @@ object EventProducer {
* @param transformation Transformations for turning the internal events to public message types
* @param settings The event producer settings used (can be shared for multiple sources)
*/
+ @ApiMayChange
final case class EventProducerSource(
entityType: String,
streamId: String,
@@ -36,6 +41,7 @@ object EventProducer {
require(streamId.nonEmpty, "Stream id must not be empty")
}
+ @ApiMayChange
object Transformation {
val empty: Transformation = new Transformation(
mappers = Map.empty,
@@ -53,6 +59,7 @@ object EventProducer {
* Transformation of events to the external (public) representation.
* Events can be excluded by mapping them to `None`.
*/
+ @ApiMayChange
final class Transformation private (
val mappers: Map[Class[_], Any => Future[Option[Any]]],
val orElse: Any => Future[Option[Any]]) {
@@ -84,17 +91,29 @@ object EventProducer {
/**
* The gRPC route that can be included in an Akka HTTP server.
+ *
+ * @param sources All sources that should be available from this event producer
*/
def grpcServiceHandler(sources: Set[EventProducerSource])(
implicit system: ActorSystem[_]): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
+ grpcServiceHandler(sources, None)
+ }
+
+ /**
+ * The gRPC route that can be included in an Akka HTTP server.
+ *
+ * @param sources All sources that should be available from this event producer
+ * @param interceptor An optional request interceptor applied to each request to the service
+ */
+ def grpcServiceHandler(sources: Set[EventProducerSource], interceptor: Option[EventProducerInterceptor])(
+ implicit system: ActorSystem[_]): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
+
val eventsBySlicesQueriesPerStreamId =
eventsBySlicesQueriesForStreamIds(sources, system)
- val eventProducerService =
- new EventProducerServiceImpl(system, eventsBySlicesQueriesPerStreamId, sources)
-
- EventProducerServiceHandler.partial(eventProducerService)
+ EventProducerServicePowerApiHandler.partial(
+ new EventProducerServiceImpl(system, eventsBySlicesQueriesPerStreamId, sources, interceptor))
}
/**
@@ -127,3 +146,18 @@ object EventProducer {
}
}
+
+/**
+ * Interceptor allowing for example authentication/authorization of incoming requests to consume a specific stream.
+ */
+@ApiMayChange
+trait EventProducerInterceptor {
+
+ /**
+ * Let's requests through if method returns, can fail request by throwing asynchronously failing the returned
+ * future with a [[akka.grpc.GrpcServiceException]]
+ *
+ */
+ def intercept(streamId: String, requestMetadata: Metadata): Future[Done]
+
+}
diff --git a/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala b/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala
new file mode 100644
index 000000000..a853b4b12
--- /dev/null
+++ b/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc.
+ */
+
+package akka.projection.grpc.consumer
+
+import akka.grpc.scaladsl.StringEntry
+import com.typesafe.config.ConfigFactory
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class GrpcQuerySettingsSpec extends AnyWordSpecLike with Matchers {
+ "The GrpcQuerySettings" should {
+ "parse from config" in {
+ val config = ConfigFactory.parseString("""
+ stream-id = "my-stream-id"
+ proto-class-mapping {
+ "proto.MyMessage" = "java.MyClass"
+ }
+ additional-request-headers {
+ "x-auth-header" = "secret"
+ }
+ """)
+
+ val settings = GrpcQuerySettings(config)
+ settings.streamId shouldBe "my-stream-id"
+ settings.protoClassMapping shouldBe (Map("proto.MyMessage" -> "java.MyClass"))
+ settings.additionalRequestMetadata.map(_.asList) shouldBe Some(List("x-auth-header" -> StringEntry("secret")))
+ }
+ }
+
+}
diff --git a/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala
index a4d9dc038..fd23ec543 100644
--- a/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala
+++ b/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala
@@ -4,6 +4,8 @@
package akka.projection.grpc.internal
+import akka.Done
+
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import scala.collection.immutable
@@ -13,6 +15,9 @@ import akka.NotUsed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
+import akka.grpc.GrpcServiceException
+import akka.grpc.scaladsl.Metadata
+import akka.grpc.scaladsl.MetadataBuilder
import akka.persistence.Persistence
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
@@ -20,13 +25,17 @@ import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.persistence.typed.PersistenceId
+import akka.projection.grpc.internal.proto.EventTimestampRequest
import akka.projection.grpc.internal.proto.InitReq
+import akka.projection.grpc.internal.proto.LoadEventRequest
import akka.projection.grpc.internal.proto.StreamIn
import akka.projection.grpc.internal.proto.StreamOut
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor
import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
@@ -34,6 +43,8 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.SocketUtil
import com.typesafe.config.ConfigFactory
+import io.grpc.Status
+import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
object EventProducerServiceSpec {
@@ -78,6 +89,7 @@ object EventProducerServiceSpec {
class EventProducerServiceSpec
extends ScalaTestWithActorTestKit(ConfigFactory.load())
with AnyWordSpecLike
+ with Matchers
with TestData
with LogCapturing {
import EventProducerServiceSpec._
@@ -104,12 +116,12 @@ class EventProducerServiceSpec
val queries =
Map(streamId1 -> eventsBySlicesQuery1, streamId2 -> eventsBySlicesQuery2)
private val eventProducerService =
- new EventProducerServiceImpl(system, queries, eventProducerSources)
+ new EventProducerServiceImpl(system, queries, eventProducerSources, None)
private def runEventsBySlices(streamIn: Source[StreamIn, NotUsed]) = {
val probePromise = Promise[TestSubscriber.Probe[StreamOut]]()
eventProducerService
- .eventsBySlices(streamIn)
+ .eventsBySlices(streamIn, MetadataBuilder.empty)
.toMat(TestSink[StreamOut]())(Keep.right)
.mapMaterializedValue { probe =>
probePromise.trySuccess(probe)
@@ -196,6 +208,81 @@ class EventProducerServiceSpec
out3.getEvent.seqNr shouldBe env3.sequenceNr
}
+ "intercept and fail requests" in {
+ val interceptedProducerService =
+ new EventProducerServiceImpl(system, queries, eventProducerSources, Some(new EventProducerInterceptor {
+ def intercept(streamId: String, requestMetadata: Metadata): Future[Done] = {
+ if (streamId == "nono-direct")
+ throw new GrpcServiceException(Status.PERMISSION_DENIED.withDescription("nono-direct"))
+ else if (requestMetadata.getText("nono-meta-direct").isDefined)
+ throw new GrpcServiceException(Status.PERMISSION_DENIED.withDescription("nono-meta-direct"))
+ else if (streamId == "nono-async")
+ Future.failed(new GrpcServiceException(Status.PERMISSION_DENIED.withDescription("nono-async")))
+ else Future.successful(Done)
+ }
+ }))
+
+ def assertGrpcStatusDenied(
+ fail: Throwable,
+ expectedDescription: String,
+ status: Status = Status.PERMISSION_DENIED) = {
+ fail shouldBe a[GrpcServiceException]
+ fail.asInstanceOf[GrpcServiceException].status.getCode shouldBe (status.getCode)
+ fail.asInstanceOf[GrpcServiceException].status.getDescription shouldBe (expectedDescription)
+ }
+
+ val directStreamIdFail = interceptedProducerService
+ .eventsBySlices(
+ Source
+ .single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = None)))),
+ MetadataBuilder.empty)
+ .runWith(Sink.head)
+ .failed
+ .futureValue
+ assertGrpcStatusDenied(directStreamIdFail, "nono-direct")
+
+ val directMetaFail = interceptedProducerService
+ .eventsBySlices(
+ Source
+ .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
+ new MetadataBuilder().addText("nono-meta-direct", "value").build())
+ .runWith(Sink.head)
+ .failed
+ .futureValue
+ assertGrpcStatusDenied(directMetaFail, "nono-meta-direct")
+
+ val asyncStreamFail = interceptedProducerService
+ .eventsBySlices(
+ Source
+ .single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = None)))),
+ MetadataBuilder.empty)
+ .runWith(Sink.head)
+ .failed
+ .futureValue
+ assertGrpcStatusDenied(asyncStreamFail, "nono-async")
+
+ val passThrough = interceptedProducerService
+ .eventsBySlices(
+ Source
+ .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
+ MetadataBuilder.empty)
+ .runWith(Sink.head)
+ .failed
+ .futureValue
+ // no such stream id so will still fail, but not from interceptor
+ assertGrpcStatusDenied(passThrough, "Stream id [ok] is not available for consumption", Status.NOT_FOUND)
+
+ // check the other methods as well for good measure
+ val loadFailure =
+ interceptedProducerService.loadEvent(LoadEventRequest("nono-async"), MetadataBuilder.empty).failed.futureValue
+ assertGrpcStatusDenied(loadFailure, "nono-async")
+
+ val timestampFailure = interceptedProducerService
+ .eventTimestamp(EventTimestampRequest("nono-async"), MetadataBuilder.empty)
+ .failed
+ .futureValue
+ assertGrpcStatusDenied(timestampFailure, "nono-async")
+ }
}
}
diff --git a/build.sbt b/build.sbt
index edfea3f8e..536ce1828 100644
--- a/build.sbt
+++ b/build.sbt
@@ -112,7 +112,8 @@ lazy val grpc =
.enablePlugins(AkkaGrpcPlugin)
.settings(
// no previous artifact so must disable MiMa until this is released at least once.
- mimaPreviousArtifacts := Set.empty)
+ mimaPreviousArtifacts := Set.empty,
+ akkaGrpcCodeGeneratorSettings += "server_power_apis")
lazy val examples = project
.configs(IntegrationTest.extend(Test))
diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md
index 85fa8382d..7265b7363 100644
--- a/docs/src/main/paradox/grpc.md
+++ b/docs/src/main/paradox/grpc.md
@@ -91,6 +91,18 @@ Java
This example includes an application specific `ShoppingCartService`, which is unrelated to Akka Projections gRPC,
but it illustrates how to combine the `EventProducer` service with other gRPC services.
+## Access control
+
+### From the consumer
+
+The consumer can pass metadata, such as auth headers, in each request to the producer service by passing @apidoc[akka.grpc.*.Metadata] to the @apidoc[GrpcQuerySettings] when constructing the read journal.
+
+### In the producer
+
+Authentication and authorization for the producer can be done by implementing a @apidoc[EventProducerInterceptor] and pass
+it to the `grpcServiceHandler` method during producer bootstrap. The interceptor is invoked with the stream id and
+gRPC request metadata for each incoming request and can return a suitable error through @apidoc[GrpcServiceException]
+
## Performance considerations
### Lower latency