Skip to content

Commit

Permalink
A way to pass headers for auth in projection-grpc (#700)
Browse files Browse the repository at this point in the history
* feat: API for intercepting requests to gRPC producer service
* feat: A way to pass auth headers from the query / consumer
* ApiMayChange all the projection-grpc APIs
* Publish local 2.13 artifacts for verifying samples
* Update GrpcQuerySettings.scala
  • Loading branch information
johanandren authored Oct 10, 2022
1 parent c7838fe commit 473b184
Show file tree
Hide file tree
Showing 19 changed files with 542 additions and 188 deletions.
2 changes: 1 addition & 1 deletion akka-projection-grpc/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="akka.projection.grpc" level="DEBUG" />
<logger name="akka.projection.grpc" level="TRACE" />
<logger name="akka.projection.r2dbc" level="DEBUG" />
<logger name="akka.persistence.r2dbc" level="DEBUG" />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.grpc.GrpcClientSettings
import akka.grpc.GrpcServiceException
import akka.grpc.scaladsl.Metadata
import akka.grpc.scaladsl.MetadataBuilder
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
Expand All @@ -21,18 +24,21 @@ import akka.persistence.typed.PersistenceId
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor
import akka.projection.r2dbc.scaladsl.R2dbcHandler
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.r2dbc.scaladsl.R2dbcSession
import akka.projection.scaladsl.Handler
import akka.testkit.SocketUtil
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.grpc.Status
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -96,6 +102,8 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
with LogCapturing {
import IntegrationSpec._

def this() = this(new TestContainerConf)

override def typedSystem: ActorSystem[_] = system
private implicit val ec: ExecutionContext = system.executionContext
private val numberOfTests = 4
Expand Down Expand Up @@ -126,8 +134,10 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
EventSourcedProvider.eventsBySlices[String](
system,
GrpcReadJournal(
system,
streamId,
GrpcQuerySettings(
streamId = streamId,
protoClassMapping = Map.empty,
additionalRequestMetadata = Some(new MetadataBuilder().addText("x-secret", "top_secret").build())),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", grpcPort)
.withTls(false)),
Expand Down Expand Up @@ -173,8 +183,16 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
EventProducerSource(source.entityType, source.streamId, transformation, EventProducerSettings(system)))
.toSet

val authInterceptor = new EventProducerInterceptor {
def intercept(streamId: String, requestMetadata: Metadata): Future[Done] = {
if (!requestMetadata.getText("x-secret").contains("top_secret"))
throw new GrpcServiceException(Status.PERMISSION_DENIED)
else Future.successful(Done)
}
}

val eventProducerService =
EventProducer.grpcServiceHandler(eventProducerSources)
EventProducer.grpcServiceHandler(eventProducerSources, Some(authInterceptor))

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
Expand Down Expand Up @@ -292,7 +310,6 @@ class IntegrationSpec(testContainerConf: TestContainerConf)

def expectedLogMessage(seqNr: Long): String =
s"Received backtracking event from [127.0.0.1] persistenceId [${pid.id}] with seqNr [$seqNr]"

val projection =
LoggingTestKit.trace(expectedLogMessage(1)).expect {
LoggingTestKit.trace(expectedLogMessage(2)).expect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestData
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
Expand All @@ -25,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike

import java.time.Instant
import java.time.{Duration => JDuration}
import java.time.{ Duration => JDuration }
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -40,7 +41,6 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)

def this() = this(new TestContainerConf)


protected override def afterAll(): Unit = {
super.afterAll()
testContainerConf.stop()
Expand All @@ -59,8 +59,7 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))

lazy val grpcReadJournal = GrpcReadJournal(
system,
streamId,
GrpcQuerySettings(streamId, Map.empty, None),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.TestData
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
Expand Down Expand Up @@ -60,8 +61,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))

lazy val grpcReadJournal = GrpcReadJournal(
system,
streamId,
GrpcQuerySettings(streamId, Map.empty, None),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false))
Expand Down
8 changes: 8 additions & 0 deletions akka-projection-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ akka.projection.grpc {
consumer {
class = "akka.projection.grpc.consumer.GrpcReadJournalProvider"

# Note: these settings are only applied when constructing the consumer from config
# if creating the GrpcQuerySettings programmatically these settings are ignored

# Configuration of gRPC client.
# See https://doc.akka.io/docs/akka-grpc/current/client/configuration.html#by-configuration
client = ${akka.grpc.client."*"}
Expand All @@ -16,6 +19,11 @@ akka.projection.grpc {
# Protobuf events.
proto-class-mapping {
}

# Pass these additional request headers as string values in each request to the producer
# can be used for example for authorization in combination with an interceptor in the producer.
# Example "x-auth-header": "secret"
additional-request-headers {}
}

producer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,90 @@

package akka.projection.grpc.consumer

import akka.annotation.ApiMayChange
import akka.grpc.scaladsl.Metadata
import akka.grpc.scaladsl.MetadataBuilder
import com.typesafe.config.Config
import akka.util.ccompat.JavaConverters._

import java.util.Optional
import scala.compat.java8.OptionConverters._

@ApiMayChange
object GrpcQuerySettings {
def apply(config: Config): GrpcQuerySettings =
new GrpcQuerySettings(config)
def apply(config: Config): GrpcQuerySettings = {
val streamId = config.getString("stream-id")
require(
streamId != "",
"Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.")

val protoClassMapping: Map[String, String] = {
import scala.jdk.CollectionConverters._
config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map {
case (k, v) => k -> v.toString
}

}

val additionalHeaders: Option[Metadata] = {
import scala.jdk.CollectionConverters._
val map = config.getConfig("additional-request-headers").root.unwrapped.asScala.toMap.map {
case (k, v) => k -> v.toString
}
if (map.isEmpty) None
else
Some(
map
.foldLeft(new MetadataBuilder()) {
case (builder, (key, value)) =>
builder.addText(key, value)
}
.build())
}

new GrpcQuerySettings(streamId, protoClassMapping, additionalHeaders)
}

/**
* Scala API: Programmatic construction of GrpcQuerySettings
*
* @param streamId The stream id to consume
* @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
* when deserializing Protobuf events.
* @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
* on the remote side.
*/
def apply(
streamId: String,
protoClassMapping: Map[String, String],
additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, protoClassMapping, additionalRequestMetadata)
}

/**
* Java API: Programmatic construction of GrpcQuerySettings
*
* @param streamId The stream id to consume
* @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
* when deserializing Protobuf events.
* @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
* on the remote side.
*/
def create(
streamId: String,
protoClassMapping: java.util.Map[String, String],
additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, protoClassMapping.asScala.toMap, additionalRequestMetadata.asScala.map(_.asScala))
}
}

class GrpcQuerySettings(config: Config) {
val grpcClientConfig: Config = config.getConfig("client")
val streamId = config.getString("stream-id")
@ApiMayChange
final class GrpcQuerySettings(
val streamId: String,
val protoClassMapping: Map[String, String],
val additionalRequestMetadata: Option[Metadata]) {
require(
streamId != "",
"Configuration property [stream-id] must be an id exposed by the streaming side (but was empty).")

val protoClassMapping: Map[String, String] = {
import scala.jdk.CollectionConverters._
config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map {
case (k, v) => k -> v.toString
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import java.time.Instant
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._

import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.japi.Pair
import akka.persistence.query.Offset
Expand All @@ -24,10 +23,12 @@ import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.projection.grpc.consumer.scaladsl
import akka.stream.javadsl.Source

@ApiMayChange
object GrpcReadJournal {
val Identifier: String = scaladsl.GrpcReadJournal.Identifier
}

@ApiMayChange
class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal)
extends ReadJournal
with EventsBySliceQuery
Expand Down
Loading

0 comments on commit 473b184

Please sign in to comment.