Skip to content

Commit

Permalink
feat: connection factory via R2dbcProjectionSettings.withCustomConnec…
Browse files Browse the repository at this point in the history
…tionFactory (#1254)
  • Loading branch information
sebastian-alfers authored Nov 25, 2024
1 parent d5433ff commit a35d083
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.jdk.DurationConverters._

import akka.actor.typed.ActorSystem
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory

object R2dbcProjectionSettings {

Expand Down Expand Up @@ -62,7 +63,8 @@ object R2dbcProjectionSettings {
adoptInterval,
logDbCallsExceeding,
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"))
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
customConnectionFactory = None)
}

/**
Expand All @@ -86,7 +88,8 @@ final class R2dbcProjectionSettings private (
val adoptInterval: JDuration,
val logDbCallsExceeding: FiniteDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int) {
val offsetBatchSize: Int,
val customConnectionFactory: Option[ConnectionFactory]) {

val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable
Expand Down Expand Up @@ -146,6 +149,9 @@ final class R2dbcProjectionSettings private (
def withOffsetBatchSize(offsetBatchSize: Int): R2dbcProjectionSettings =
copy(offsetBatchSize = offsetBatchSize)

def withCustomConnectionFactory(customConnectionFactory: ConnectionFactory): R2dbcProjectionSettings =
copy(customConnectionFactory = Some(customConnectionFactory))

private def copy(
schema: Option[String] = schema,
offsetTable: String = offsetTable,
Expand All @@ -159,7 +165,8 @@ final class R2dbcProjectionSettings private (
adoptInterval: JDuration = adoptInterval,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize) =
offsetBatchSize: Int = offsetBatchSize,
customConnectionFactory: Option[ConnectionFactory] = customConnectionFactory) =
new R2dbcProjectionSettings(
schema,
offsetTable,
Expand All @@ -173,8 +180,9 @@ final class R2dbcProjectionSettings private (
adoptInterval,
logDbCallsExceeding,
warnAboutFilteredEventsInFlow,
offsetBatchSize)
offsetBatchSize,
customConnectionFactory)

override def toString =
s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ object R2dbcProjection {
}

private def connectionFactory(system: ActorSystem[_], r2dbcSettings: R2dbcProjectionSettings): ConnectionFactory = {
ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory)
r2dbcSettings.customConnectionFactory.getOrElse(
ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory))
}

private def closeCallsExceeding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
// #atLeastOnce
// #exactlyOnce

// #customConnectionFactory
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.ConnectionFactory;
// #customConnectionFactory

@SuppressWarnings({"unused", "InnerClassMayBeStatic"})
class R2dbcProjectionDocExample {

Expand Down Expand Up @@ -267,4 +273,22 @@ Projection<EventEnvelope<ShoppingCart.Event>> createProjection(
projectionId, settings, sourceProvider, ShoppingCartHandler::new, system);
//#projectionSettings
}

{
ConnectionPoolConfiguration config = null;

//#customConnectionFactory
ProjectionId projectionId =
ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);

ConnectionFactory connectionFactory = new ConnectionPool(config);

Optional<R2dbcProjectionSettings> settings = Optional.of(
R2dbcProjectionSettings.create(system).withCustomConnectionFactory(connectionFactory));

Projection<EventEnvelope<ShoppingCart.Event>> projection =
R2dbcProjection.atLeastOnce(
projectionId, settings, sourceProvider, ShoppingCartHandler::new, system);
//#customConnectionFactory
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import io.r2dbc.spi.ConnectionFactory

import akka.cluster.sharding.typed.ShardedDaemonProcessSettings

//#handler
Expand All @@ -41,8 +43,11 @@ object R2dbcProjectionDocExample {
}

final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event

final case class ItemRemoved(cartId: String, itemId: String) extends Event

final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event

final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
}

Expand Down Expand Up @@ -71,6 +76,7 @@ object R2dbcProjectionDocExample {
//#handler

//#grouped-handler

import scala.collection.immutable

class GroupedShoppingCartHandler()(implicit ec: ExecutionContext)
Expand Down Expand Up @@ -106,6 +112,7 @@ object R2dbcProjectionDocExample {

object IllustrateInit {
// #initProjections

import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.projection.Projection
Expand Down Expand Up @@ -154,6 +161,7 @@ object R2dbcProjectionDocExample {
}

//#sourceProvider

import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.scaladsl.SourceProvider
Expand All @@ -179,6 +187,7 @@ object R2dbcProjectionDocExample {

object IllustrateExactlyOnce {
//#exactlyOnce

import akka.projection.ProjectionId
import akka.projection.r2dbc.scaladsl.R2dbcProjection

Expand All @@ -192,6 +201,7 @@ object R2dbcProjectionDocExample {

object IllustrateAtLeastOnce {
//#atLeastOnce

import akka.projection.ProjectionId
import akka.projection.r2dbc.scaladsl.R2dbcProjection

Expand All @@ -206,6 +216,7 @@ object R2dbcProjectionDocExample {

object IllustrateGrouped {
//#grouped

import akka.projection.ProjectionId
import akka.projection.r2dbc.scaladsl.R2dbcProjection

Expand Down Expand Up @@ -262,4 +273,24 @@ object R2dbcProjectionDocExample {
//#projectionSettings
}

object CustomConnectionFactory {

//#customConnectionFactory

import akka.projection.ProjectionId
import akka.projection.r2dbc.scaladsl.R2dbcProjection

val connectionFactory: ConnectionFactory = ???

val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice")

val settings = Some(R2dbcProjectionSettings(system).withCustomConnectionFactory(connectionFactory))

val projection =
R2dbcProjection
.atLeastOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler)

//#customConnectionFactory
}

}
16 changes: 16 additions & 0 deletions docs/src/main/paradox/r2dbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,19 @@ Scala

Java
: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#projectionSettings}

## Custom Connection Factory

You can use a custom connection factory by passing it into the projection as part of the `ProjectionSettings`.

@@@ note

When providing a custom connection factory, the existing connection configurations are ignored.

@@@

Scala
: @@snip [Example.scala](/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#customConnectionFactory}

Java
: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory}

0 comments on commit a35d083

Please sign in to comment.