Skip to content

Commit

Permalink
feat (dynamodb): Configurable parallelism in initial offset store que…
Browse files Browse the repository at this point in the history
…ry (#1239)
  • Loading branch information
leviramsey authored Nov 4, 2024
1 parent 625061c commit 5440cb8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
13 changes: 13 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ akka.projection.dynamodb {

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

# Number of slices (within a given projection's slice range) which will be queried for
# offsets simultaneously. The underlying Dynamo client must be able to handle
# (`http.max-concurrency` plus `http.max-pending-connection-acquires`) at least this number
# of concurrent requests.
#
# Set to 1024 to always query for all slices simultaneously. The minimum allowed value
# is 1. If there are more than 64 slices in a range (e.g. fewer than 16 projections
# consuming events), then increasing this may result in slightly faster projection starts;
# conversely, if there are many slices being projected using a given Dynamo client,
# reducing this may result in fewer restarts of the projection due to failure to query
# starting offsets.
offset-slice-read-parallelism = 64
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object DynamoDBProjectionSettings {
evictInterval = config.getDuration("offset-store.evict-interval"),
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
}

Expand All @@ -63,6 +64,7 @@ final class DynamoDBProjectionSettings private (
val evictInterval: JDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
Expand Down Expand Up @@ -92,6 +94,9 @@ final class DynamoDBProjectionSettings private (
def withOffsetBatchSize(offsetBatchSize: Int): DynamoDBProjectionSettings =
copy(offsetBatchSize = offsetBatchSize)

def withOffsetSliceReadParallelism(offsetSliceReadParallelism: Int): DynamoDBProjectionSettings =
copy(offsetSliceReadParallelism = offsetSliceReadParallelism)

def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

Expand All @@ -103,6 +108,7 @@ final class DynamoDBProjectionSettings private (
evictInterval: JDuration = evictInterval,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
Expand All @@ -112,6 +118,7 @@ final class DynamoDBProjectionSettings private (
evictInterval,
warnAboutFilteredEventsInFlow,
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings)

override def toString =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import akka.projection.BySlicesSourceProvider
import akka.projection.ProjectionId
import akka.projection.dynamodb.DynamoDBProjectionSettings
import akka.projection.internal.ManagementState
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.Materializer.matFromSystem
import org.slf4j.LoggerFactory
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
Expand Down Expand Up @@ -246,11 +249,23 @@ private[projection] class DynamoDBOffsetStore(
}

private def readTimestampOffset(): Future[TimestampOffsetBySlice] = {
implicit val sys = system // for implicit stream materializer
val oldState = state.get()
// retrieve latest timestamp for each slice, and use the earliest
val futTimestamps =
(minSlice to maxSlice).map(slice => dao.loadTimestampOffset(slice).map(optTimestamp => slice -> optTimestamp))
val offsetBySliceFut = Future.sequence(futTimestamps).map(_.collect { case (slice, Some(ts)) => slice -> ts }.toMap)
val offsetBySliceFut =
Source(minSlice to maxSlice)
.mapAsyncUnordered(settings.offsetSliceReadParallelism) { slice =>
dao
.loadTimestampOffset(slice)
.map { optTimestampOffset =>
optTimestampOffset.map { timestampOffset => slice -> timestampOffset }
}(ExecutionContext.parasitic)
}
.mapConcat(identity)
.runWith(Sink.fold(Map.empty[Int, TimestampOffset]) { (offsetMap, sliceAndOffset: (Int, TimestampOffset)) =>
offsetMap + sliceAndOffset
})

offsetBySliceFut.map { offsetBySlice =>
val newState = State(offsetBySlice)
logger.debug(
Expand Down

0 comments on commit 5440cb8

Please sign in to comment.