diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index 864bf6526..ed88f31a8 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -12,6 +12,7 @@ import scala.jdk.DurationConverters._ import akka.actor.typed.ActorSystem import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory object R2dbcProjectionSettings { @@ -62,7 +63,8 @@ object R2dbcProjectionSettings { adoptInterval, logDbCallsExceeding, warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), - offsetBatchSize = config.getInt("offset-store.offset-batch-size")) + offsetBatchSize = config.getInt("offset-store.offset-batch-size"), + customConnectionFactory = None) } /** @@ -86,7 +88,8 @@ final class R2dbcProjectionSettings private ( val adoptInterval: JDuration, val logDbCallsExceeding: FiniteDuration, val warnAboutFilteredEventsInFlow: Boolean, - val offsetBatchSize: Int) { + val offsetBatchSize: Int, + val customConnectionFactory: Option[ConnectionFactory]) { val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable @@ -146,6 +149,9 @@ final class R2dbcProjectionSettings private ( def withOffsetBatchSize(offsetBatchSize: Int): R2dbcProjectionSettings = copy(offsetBatchSize = offsetBatchSize) + def withCustomConnectionFactory(customConnectionFactory: ConnectionFactory): R2dbcProjectionSettings = + copy(customConnectionFactory = Some(customConnectionFactory)) + private def copy( schema: Option[String] = schema, offsetTable: String = offsetTable, @@ -159,7 +165,8 @@ final class R2dbcProjectionSettings private ( adoptInterval: JDuration = adoptInterval, logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, - offsetBatchSize: Int = offsetBatchSize) = + offsetBatchSize: Int = offsetBatchSize, + customConnectionFactory: Option[ConnectionFactory] = customConnectionFactory) = new R2dbcProjectionSettings( schema, offsetTable, @@ -173,8 +180,9 @@ final class R2dbcProjectionSettings private ( adoptInterval, logDbCallsExceeding, warnAboutFilteredEventsInFlow, - offsetBatchSize) + offsetBatchSize, + customConnectionFactory) override def toString = - s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" + s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory)" } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala index 3cc64ea46..9b33e3450 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala @@ -344,7 +344,8 @@ object R2dbcProjection { } private def connectionFactory(system: ActorSystem[_], r2dbcSettings: R2dbcProjectionSettings): ConnectionFactory = { - ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory) + r2dbcSettings.customConnectionFactory.getOrElse( + ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory)) } private def closeCallsExceeding( diff --git a/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java b/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java index caa23be1f..9844fa4ce 100644 --- a/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java +++ b/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java @@ -56,6 +56,12 @@ // #atLeastOnce // #exactlyOnce +// #customConnectionFactory +import io.r2dbc.pool.ConnectionPool; +import io.r2dbc.pool.ConnectionPoolConfiguration; +import io.r2dbc.spi.ConnectionFactory; +// #customConnectionFactory + @SuppressWarnings({"unused", "InnerClassMayBeStatic"}) class R2dbcProjectionDocExample { @@ -267,4 +273,22 @@ Projection> createProjection( projectionId, settings, sourceProvider, ShoppingCartHandler::new, system); //#projectionSettings } + + { + ConnectionPoolConfiguration config = null; + + //#customConnectionFactory + ProjectionId projectionId = + ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice); + + ConnectionFactory connectionFactory = new ConnectionPool(config); + + Optional settings = Optional.of( + R2dbcProjectionSettings.create(system).withCustomConnectionFactory(connectionFactory)); + + Projection> projection = + R2dbcProjection.atLeastOnce( + projectionId, settings, sourceProvider, ShoppingCartHandler::new, system); + //#customConnectionFactory + } } diff --git a/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala b/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala index 4774aa251..82e3e5f1e 100644 --- a/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala +++ b/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala @@ -19,6 +19,8 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ +import io.r2dbc.spi.ConnectionFactory + import akka.cluster.sharding.typed.ShardedDaemonProcessSettings //#handler @@ -41,8 +43,11 @@ object R2dbcProjectionDocExample { } final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event + final case class ItemRemoved(cartId: String, itemId: String) extends Event + final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event + final case class CheckedOut(cartId: String, eventTime: Instant) extends Event } @@ -71,6 +76,7 @@ object R2dbcProjectionDocExample { //#handler //#grouped-handler + import scala.collection.immutable class GroupedShoppingCartHandler()(implicit ec: ExecutionContext) @@ -106,6 +112,7 @@ object R2dbcProjectionDocExample { object IllustrateInit { // #initProjections + import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.projection.Projection @@ -154,6 +161,7 @@ object R2dbcProjectionDocExample { } //#sourceProvider + import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider @@ -179,6 +187,7 @@ object R2dbcProjectionDocExample { object IllustrateExactlyOnce { //#exactlyOnce + import akka.projection.ProjectionId import akka.projection.r2dbc.scaladsl.R2dbcProjection @@ -192,6 +201,7 @@ object R2dbcProjectionDocExample { object IllustrateAtLeastOnce { //#atLeastOnce + import akka.projection.ProjectionId import akka.projection.r2dbc.scaladsl.R2dbcProjection @@ -206,6 +216,7 @@ object R2dbcProjectionDocExample { object IllustrateGrouped { //#grouped + import akka.projection.ProjectionId import akka.projection.r2dbc.scaladsl.R2dbcProjection @@ -262,4 +273,24 @@ object R2dbcProjectionDocExample { //#projectionSettings } + object CustomConnectionFactory { + + //#customConnectionFactory + + import akka.projection.ProjectionId + import akka.projection.r2dbc.scaladsl.R2dbcProjection + + val connectionFactory: ConnectionFactory = ??? + + val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice") + + val settings = Some(R2dbcProjectionSettings(system).withCustomConnectionFactory(connectionFactory)) + + val projection = + R2dbcProjection + .atLeastOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler) + + //#customConnectionFactory + } + } diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md index d4bce0c18..b6566e7aa 100644 --- a/docs/src/main/paradox/r2dbc.md +++ b/docs/src/main/paradox/r2dbc.md @@ -304,3 +304,19 @@ Scala Java : @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#projectionSettings} + +## Custom Connection Factory + +You can use a custom connection factory by passing it into the projection as part of the `ProjectionSettings`. + +@@@ note + +When providing a custom connection factory, the existing connection configurations are ignored. + +@@@ + +Scala +: @@snip [Example.scala](/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#customConnectionFactory} + +Java +: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory} \ No newline at end of file