From be9bb85561226f49cdcdb27299c3a72773f23147 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Thu, 21 Nov 2024 19:41:50 -0500 Subject: [PATCH] Properly handle incomplete batch writes --- .../dynamodb/internal/OffsetStoreDao.scala | 71 +++++++++++++++---- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala index 1ae2f69f9..5cc15a4a0 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala @@ -36,6 +36,7 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest import software.amazon.awssdk.services.dynamodb.model.WriteRequest import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse +import akka.projection.dynamodb.internal.DynamoDBOffsetStore.FutureDone /** * INTERNAL API @@ -114,21 +115,23 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse }(ExecutionContext.parasitic) } - def writeWholeBatch(batchReq: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] = { + def writeWholeBatch( + batchReq: BatchWriteItemRequest, + maxRetriesOfUnprocessed: Int = 3): Future[List[BatchWriteItemResponse]] = { val result = client.batchWriteItem(batchReq).asScala result.flatMap { response => - val respList = List(response) - if (response.hasUnprocessedItems) { + if (response.hasUnprocessedItems && maxRetriesOfUnprocessed > 0) { val unprocessed = response.unprocessedItems val newReq = batchReq.toBuilder.requestItems(unprocessed).build() - client - .batchWriteItem(newReq) - .asScala - .map { newResponse => - newResponse :: respList - }(ExecutionContext.parasitic) - } else Future.successful(respList) + writeWholeBatch(newReq, maxRetriesOfUnprocessed - 1).map { responses => + response :: responses + }(ExecutionContext.parasitic) + } else { + // TODO: return a partial failure? for now, to check that everything was written, would have to inspect the + // last response for unprocessed items + Future.successful(List(response)) + } } } @@ -193,7 +196,20 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse } } result - .map(_ => Done)(ExecutionContext.parasitic) + .flatMap { responses => + val lastResponse = responses.last + if (lastResponse.hasUnprocessedItems) { + val unprocessedSliceItems = + lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector + val unprocessedSlices = unprocessedSliceItems.map(_.putRequest.item.get(NameSlice).s) + log.warn( + "Failed to write latest timestamps for [{}] slices: [{}]", + unprocessedSlices.size, + unprocessedSlices) + + Future.failed(new RuntimeException("Failed to save timestamps for all slices")) + } else FutureDone + } .recoverWith { case c: CompletionException => Future.failed(c.getCause) @@ -251,7 +267,22 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse } } - result.map(_ => Done)(ExecutionContext.parasitic) + result.flatMap { responses => + val lastResponse = responses.last + if (lastResponse.hasUnprocessedItems) { + val unprocessedSeqNrItems = + lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector.map(_.putRequest.item) + val unprocessedSeqNrs = unprocessedSeqNrItems.map { item => + import OffsetStoreDao.OffsetStoreAttributes._ + s"${item.get(NameSlice).s}: ${item.get(Pid).s}" + } + log.warn( + "Failed to write sequence numbers for [{}] persistence IDs: [{}]", + unprocessedSeqNrs.size, + unprocessedSeqNrs.mkString(", ")) + Future.failed(new RuntimeException("Failed to save sequence numbers for all persistence IDs")) + } else FutureDone + } } if (records.size <= MaxBatchSize) { @@ -439,7 +470,21 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse } } result - .map(_ => Done)(ExecutionContext.parasitic) + .flatMap { responses => + val lastResponse = responses.last + if (lastResponse.hasUnprocessedItems) { + val unprocessedStateItems = + lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector.map(_.putRequest.item) + val unprocessedStates = unprocessedStateItems.map { item => + s"${item.get(NameSlice).s}-${item.get(Paused).bool}" + } + log.warn( + "Failed to write management state for [{}] slices: [{}]", + unprocessedStates.size, + unprocessedStates.mkString(", ")) + Future.failed(new RuntimeException("Failed to save management state for all slices")) + } else FutureDone + } .recoverWith { case c: CompletionException => Future.failed(c.getCause)