Skip to content

Commit

Permalink
retry unprocessed items in batchwriterequest
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey committed Nov 21, 2024
1 parent d8dd7bf commit d23ec25
Showing 1 changed file with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

/**
* INTERNAL API
Expand Down Expand Up @@ -113,6 +114,24 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}(ExecutionContext.parasitic)
}

def writeWholeBatch(batchReq: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] = {
val result = client.batchWriteItem(batchReq).asScala

result.flatMap { response =>
val respList = List(response)
if (response.hasUnprocessedItems) {
val unprocessed = response.unprocessedItems
val newReq = batchReq.toBuilder.requestItems(unprocessed).build()
client
.batchWriteItem(newReq)
.asScala
.map { newResponse =>
newResponse :: respList
}(ExecutionContext.parasitic)
} else Future.successful(respList)
}
}

def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

Expand Down Expand Up @@ -163,14 +182,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote latest timestamps for [{}] slices, consumed [{}] WCU",
offsetsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
Expand Down Expand Up @@ -221,14 +240,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote [{}] sequence numbers, consumed [{}] WCU",
recordsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}

Expand Down Expand Up @@ -409,14 +428,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote management state for [{}] slices, consumed [{}] WCU",
slices.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
Expand Down

0 comments on commit d23ec25

Please sign in to comment.