Skip to content

Commit

Permalink
Properly handle incomplete batch writes
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey committed Nov 22, 2024
1 parent d23ec25 commit be9bb85
Showing 1 changed file with 58 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.FutureDone

/**
* INTERNAL API
Expand Down Expand Up @@ -114,21 +115,23 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}(ExecutionContext.parasitic)
}

def writeWholeBatch(batchReq: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] = {
def writeWholeBatch(
batchReq: BatchWriteItemRequest,
maxRetriesOfUnprocessed: Int = 3): Future[List[BatchWriteItemResponse]] = {
val result = client.batchWriteItem(batchReq).asScala

result.flatMap { response =>
val respList = List(response)
if (response.hasUnprocessedItems) {
if (response.hasUnprocessedItems && maxRetriesOfUnprocessed > 0) {
val unprocessed = response.unprocessedItems
val newReq = batchReq.toBuilder.requestItems(unprocessed).build()
client
.batchWriteItem(newReq)
.asScala
.map { newResponse =>
newResponse :: respList
}(ExecutionContext.parasitic)
} else Future.successful(respList)
writeWholeBatch(newReq, maxRetriesOfUnprocessed - 1).map { responses =>
response :: responses
}(ExecutionContext.parasitic)
} else {
// TODO: return a partial failure? for now, to check that everything was written, would have to inspect the
// last response for unprocessed items
Future.successful(List(response))
}
}
}

Expand Down Expand Up @@ -193,7 +196,20 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.flatMap { responses =>
val lastResponse = responses.last
if (lastResponse.hasUnprocessedItems) {
val unprocessedSliceItems =
lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector
val unprocessedSlices = unprocessedSliceItems.map(_.putRequest.item.get(NameSlice).s)
log.warn(
"Failed to write latest timestamps for [{}] slices: [{}]",
unprocessedSlices.size,
unprocessedSlices)

Future.failed(new RuntimeException("Failed to save timestamps for all slices"))
} else FutureDone
}
.recoverWith {
case c: CompletionException =>
Future.failed(c.getCause)
Expand Down Expand Up @@ -251,7 +267,22 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
}

result.map(_ => Done)(ExecutionContext.parasitic)
result.flatMap { responses =>
val lastResponse = responses.last
if (lastResponse.hasUnprocessedItems) {
val unprocessedSeqNrItems =
lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector.map(_.putRequest.item)
val unprocessedSeqNrs = unprocessedSeqNrItems.map { item =>
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}
log.warn(
"Failed to write sequence numbers for [{}] persistence IDs: [{}]",
unprocessedSeqNrs.size,
unprocessedSeqNrs.mkString(", "))
Future.failed(new RuntimeException("Failed to save sequence numbers for all persistence IDs"))
} else FutureDone
}
}

if (records.size <= MaxBatchSize) {
Expand Down Expand Up @@ -439,7 +470,21 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.flatMap { responses =>
val lastResponse = responses.last
if (lastResponse.hasUnprocessedItems) {
val unprocessedStateItems =
lastResponse.unprocessedItems.get(settings.timestampOffsetTable).asScala.toVector.map(_.putRequest.item)
val unprocessedStates = unprocessedStateItems.map { item =>
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}
log.warn(
"Failed to write management state for [{}] slices: [{}]",
unprocessedStates.size,
unprocessedStates.mkString(", "))
Future.failed(new RuntimeException("Failed to save management state for all slices"))
} else FutureDone
}
.recoverWith {
case c: CompletionException =>
Future.failed(c.getCause)
Expand Down

0 comments on commit be9bb85

Please sign in to comment.