diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala index 40d221da1..1ae2f69f9 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala @@ -35,6 +35,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest import software.amazon.awssdk.services.dynamodb.model.WriteRequest +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse /** * INTERNAL API @@ -113,6 +114,24 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest }(ExecutionContext.parasitic) } + def writeWholeBatch(batchReq: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] = { + val result = client.batchWriteItem(batchReq).asScala + + result.flatMap { response => + val respList = List(response) + if (response.hasUnprocessedItems) { + val unprocessed = response.unprocessedItems + val newReq = batchReq.toBuilder.requestItems(unprocessed).build() + client + .batchWriteItem(newReq) + .asScala + .map { newResponse => + newResponse :: respList + }(ExecutionContext.parasitic) + } else Future.successful(respList) + } + } + def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = { import OffsetStoreDao.OffsetStoreAttributes._ @@ -163,14 +182,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeWholeBatch(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote latest timestamps for [{}] slices, consumed [{}] WCU", offsetsBatch.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } result @@ -221,14 +240,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeWholeBatch(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote [{}] sequence numbers, consumed [{}] WCU", recordsBatch.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } @@ -409,14 +428,14 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeWholeBatch(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote management state for [{}] slices, consumed [{}] WCU", slices.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } result