Skip to content

Commit

Permalink
chore: Minor cleanup of DynamoDB OffsetStoreDao (#1262)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Nov 22, 2024
1 parent 197d260 commit 31d6268
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 31 deletions.
3 changes: 2 additions & 1 deletion akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ akka.projection.dynamodb {
backtracking-window = ${akka.persistence.dynamodb.query.backtracking.window}

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20
# Must be less than or equal to 25 (hard limit in DynamoDB)
offset-batch-size = 25

# Number of slices (within a given projection's slice range) which will be queried for
# offsets simultaneously. The underlying Dynamo client must be able to handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ final class DynamoDBProjectionSettings private (
val timeToLiveSettings: TimeToLiveSettings,
val retrySettings: RetrySettings) {

// 25 is a hard limit of batch writes in DynamoDB
require(offsetBatchSize <= 25, s"offset-batch-size must be <= 25, was [$offsetBatchSize]")

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
private val log: Logger = LoggerFactory.getLogger(classOf[OffsetStoreDao])

// Hard limits in DynamoDB
private val MaxBatchSize = 25
private val MaxTransactItems = 100

object OffsetStoreAttributes {
Expand All @@ -63,7 +62,7 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
}

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception
final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends RuntimeException
}

/**
Expand All @@ -76,8 +75,8 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
client: DynamoDbAsyncClient) {
import OffsetStoreDao.log
import OffsetStoreDao.BatchWriteFailed
import OffsetStoreDao.MaxBatchSize
import OffsetStoreDao.MaxTransactItems
import settings.offsetBatchSize
import system.executionContext

private val timeToLiveSettings = settings.timeToLiveSettings.projections.get(projectionId.name)
Expand Down Expand Up @@ -119,8 +118,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}(ExecutionContext.parasitic)
}

implicit def sys: ActorSystem[_] = system

private def writeBatchWithRetries(
batchReq: BatchWriteItemRequest,
retries: Int = 0): Future[List[BatchWriteItemResponse]] = {
Expand All @@ -143,13 +140,14 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
if (log.isDebugEnabled) {
val count = unprocessed.asScala.valuesIterator.map(_.size).sum
log.debug(
"Not all writes in batch were applied, retrying in [{}]: [{}] unapplied writes, [{}/{}] retries",
delay.toCoarsest,
"Not all writes in batch were applied, retrying in [{} ms]: [{}] unapplied writes, [{}/{}] retries",
delay.toMillis,
count,
nextRetry,
settings.retrySettings.maxRetries)
}

implicit val sys: ActorSystem[_] = system
after(delay) {
writeBatchWithRetries(newReq, nextRetry)
}.map { responses => response :: responses }(ExecutionContext.parasitic)
Expand Down Expand Up @@ -224,13 +222,16 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSliceItems = failed.lastResponse.unprocessedItems
val unprocessedSlices = failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.iterator
.asScala
.map { req =>
val item = req.putRequest.item
item.get(NameSlice).s
}
.toVector
.map(_.putRequest.item)

val unprocessedSlices = unprocessedSliceItems.map(_.get(NameSlice).s)
log.warn(
"Failed to write latest timestamps for [{}] slices: [{}]",
unprocessedSlices.size,
Expand All @@ -243,10 +244,10 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
}

if (offsetsBySlice.size <= MaxBatchSize) {
if (offsetsBySlice.size <= offsetBatchSize) {
writeBatch(offsetsBySlice.toVector)
} else {
val batches = offsetsBySlice.toVector.sliding(MaxBatchSize, MaxBatchSize)
val batches = offsetsBySlice.toVector.sliding(offsetBatchSize, offsetBatchSize)
Future
.sequence(batches.map(writeBatch))
.map(_ => Done)(ExecutionContext.parasitic)
Expand Down Expand Up @@ -298,17 +299,17 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSeqNrItems =
val unprocessedSeqNrs =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.iterator
.asScala
.map { req =>
val item = req.putRequest.item
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}
.toVector
.map(_.putRequest.item)

val unprocessedSeqNrs = unprocessedSeqNrItems.map { item =>
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}

log.warn(
"Failed to write sequence numbers for [{}] persistence IDs: [{}]",
Expand All @@ -322,10 +323,10 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
}

if (records.size <= MaxBatchSize) {
if (records.size <= offsetBatchSize) {
writeBatch(records)
} else {
val batches = records.sliding(MaxBatchSize, MaxBatchSize)
val batches = records.sliding(offsetBatchSize, offsetBatchSize)
Future
.sequence(batches.map(writeBatch))
.map(_ => Done)(ExecutionContext.parasitic)
Expand Down Expand Up @@ -369,7 +370,7 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
def transactStoreSequenceNumbers(writeItems: Iterable[TransactWriteItem])(records: Seq[Record]): Future[Done] = {
if ((writeItems.size + records.size) > MaxTransactItems)
throw new IllegalArgumentException(
s"Too many transactional write items. Total limit is [${MaxTransactItems}], attempting to store " +
s"Too many transactional write items. Total limit is [$MaxTransactItems], attempting to store " +
s"[${writeItems.size}] write items and [${records.size}] sequence numbers.")

val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Expand Down Expand Up @@ -510,16 +511,16 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedStateItems =
val unprocessedStates =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.iterator
.asScala
.map { req =>
val item = req.putRequest.item
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}
.toVector
.map(_.putRequest.item)

val unprocessedStates = unprocessedStateItems.map { item =>
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}

log.warn(
"Failed to write management state for [{}] slices: [{}]",
Expand All @@ -534,10 +535,10 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}

val sliceRange = (minSlice to maxSlice).toVector
if (sliceRange.size <= MaxBatchSize) {
if (sliceRange.size <= offsetBatchSize) {
writeBatch(sliceRange)
} else {
val batches = sliceRange.sliding(MaxBatchSize, MaxBatchSize)
val batches = sliceRange.sliding(offsetBatchSize, offsetBatchSize)
Future
.sequence(batches.map(writeBatch))
.map(_ => Done)(ExecutionContext.parasitic)
Expand Down

0 comments on commit 31d6268

Please sign in to comment.