From 7e9866ad8779109c8742cd1cd94704709cce6585 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 10 Sep 2024 11:14:00 -0500 Subject: [PATCH] Add progress check callback to update partition ownership in S3 scan source (#4918) Signed-off-by: Taylor Gray --- .../source/coordinator/SourceCoordinator.java | 7 ++ .../LeaseBasedSourceCoordinator.java | 30 ++++++-- .../LeaseBasedSourceCoordinatorTest.java | 23 ++++++ .../plugins/source/s3/S3ObjectWorker.java | 2 +- .../plugins/source/s3/ScanObjectWorker.java | 23 +++++- .../source/s3/S3ScanObjectWorkerTest.java | 73 +++++++++++++++++++ 6 files changed, 149 insertions(+), 9 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index 71c0232669..ef00db1cb8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -139,5 +139,12 @@ public interface SourceCoordinator { */ void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout); + /** + * Should be called by the source to keep ownership of the partition + * before another instance of Data Prepper can pick it up for processing. + * @param partitionKey - the partition to renew ownership for + */ + void renewPartitionOwnership(final String partitionKey); + void deletePartition(final String partitionKey); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index af0d8578f9..b66edb496a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -59,7 +59,6 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { static final String PARTITION_NOT_FOUND_ERROR_COUNT = "partitionNotFoundErrors"; static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors"; static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors"; - static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10); private static final String hostName; @@ -91,7 +90,6 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { private final Counter saveStatePartitionUpdateErrorCounter; private final Counter closePartitionUpdateErrorCounter; private final Counter completePartitionUpdateErrorCounter; - private final Counter partitionsDeleted; private final ReentrantLock lock; @@ -302,7 +300,8 @@ public void saveProgressStateForPartition(final String partitionKe try { sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate); - } catch (final PartitionUpdateException e) { + } catch (final Exception e) { + LOG.error("Exception while saving state for the partition {}: {}", partitionKey, e.getMessage()); saveStatePartitionUpdateErrorCounter.increment(); throw e; } @@ -315,12 +314,29 @@ public void saveProgressStateForPartition(final String partitionKe @Override public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) { - validateIsInitialized(); + try { + updatePartitionOwnership(partitionKey, ackowledgmentTimeout); + } catch (final Exception e) { + LOG.error("Exception while updating acknowledgment wait for the partition {}: {}", partitionKey, e.getMessage()); + throw e; + } + } + + @Override + public void renewPartitionOwnership(final String partitionKey) { + try { + updatePartitionOwnership(partitionKey, DEFAULT_LEASE_TIMEOUT); + } catch (final Exception e) { + LOG.error("Exception while renewing partition ownership for the partition {}: {}", partitionKey, e.getMessage()); + throw e; + } + } - final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, "update for ack wait"); + private void updatePartitionOwnership(final String partitionKey, final Duration ownershipRenewalTime) { + final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, "update partition ownership"); validatePartitionOwnership(itemToUpdate); - itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout)); + itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ownershipRenewalTime)); sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate); } @@ -371,7 +387,7 @@ public void deletePartition(final String partitionKey) { try { sourceCoordinationStore.tryDeletePartitionItem(deleteItem); } catch (final PartitionUpdateException e) { - LOG.info("Unable to delete partition {}: {}.", deleteItem.getSourcePartitionKey(), e.getMessage()); + LOG.error("Unable to delete partition {}: {}.", deleteItem.getSourcePartitionKey(), e.getMessage()); return; } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java index 2e95671f78..965a0c8786 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java @@ -796,6 +796,29 @@ void updatePartitionForAckWait_updates_partition_ownership_and_removes_active_pa assertThat(newPartitionOwnershipTimeout.isAfter(beforeSave.plus(ackTimeout)), equalTo(true)); } + @Test + void renewPartitionOwnership_updates_partition_ownership() throws UnknownHostException { + final SourcePartition sourcePartition = SourcePartition.builder(String.class) + .withPartitionKey(UUID.randomUUID().toString()) + .withPartitionState(null) + .build(); + + final Instant beforeSave = Instant.now(); + + given(sourcePartitionStoreItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName()); + given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); + + doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); + + final Duration ackTimeout = Duration.ofSeconds(10); + createObjectUnderTest().renewPartitionOwnership(sourcePartition.getPartitionKey()); + + final ArgumentCaptor argumentCaptorForPartitionOwnershipTimeout = ArgumentCaptor.forClass(Instant.class); + verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(argumentCaptorForPartitionOwnershipTimeout.capture()); + final Instant newPartitionOwnershipTimeout = argumentCaptorForPartitionOwnershipTimeout.getValue(); + assertThat(newPartitionOwnershipTimeout.isAfter(beforeSave.plus(ackTimeout)), equalTo(true)); + } + @Test void giveUpPartitions_with_active_partitionKey_that_does_not_exist_in_the_store_removes_the_active_partition() { final SourcePartition sourcePartition = SourcePartition.builder(String.class) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java index 0397183877..6750ada0f3 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java @@ -108,7 +108,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, } bufferAccumulator.add(record); - if (sourceCoordinator != null && partitionKey != null && + if (acknowledgementSet != null && sourceCoordinator != null && partitionKey != null && (System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) { LOG.debug("Renew partition ownership for the object {}", partitionKey); sourceCoordinator.saveProgressStateForPartition(partitionKey, null); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 471a0efa3d..00172701cc 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -52,6 +52,8 @@ public class ScanObjectWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class); private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1; + static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2); + static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1); private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; @@ -60,6 +62,7 @@ public class ScanObjectWorker implements Runnable { static final String NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION = "folderPartitionNoObjectsFound"; + static final String PARTITION_OWNERSHIP_UPDATE_ERRORS = "partitionOwnershipUpdateErrors"; private final S3Client s3Client; private final List scanOptionsBuilderList; @@ -85,6 +88,8 @@ public class ScanObjectWorker implements Runnable { private final Counter acknowledgementSetCallbackCounter; private final Counter folderPartitionNoObjectsFound; + + private final Counter partitionOwnershipUpdateFailures; private final long backOffMs; private final List partitionKeys; @@ -118,6 +123,7 @@ public ScanObjectWorker(final S3Client s3Client, this.pluginMetrics = pluginMetrics; acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); this.folderPartitionNoObjectsFound = pluginMetrics.counter(NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION); + this.partitionOwnershipUpdateFailures = pluginMetrics.counter(PARTITION_OWNERSHIP_UPDATE_ERRORS); this.sourceCoordinator.initialize(); this.partitionKeys = new ArrayList<>(); this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); @@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) { } partitionKeys.remove(objectToProcess.get().getPartitionKey()); }, ACKNOWLEDGEMENT_SET_TIMEOUT); + + acknowledgementSet.addProgressCheck( + (ratio) -> { + try { + sourceCoordinator.renewPartitionOwnership(objectToProcess.get().getPartitionKey()); + } catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) { + LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.get().getPartitionKey()); + partitionOwnershipUpdateFailures.increment(); + } + }, + CHECKPOINT_OWNERSHIP_INTERVAL); } @@ -217,7 +234,11 @@ private void startProcessingObject(final long waitTimeMillis) { if (endToEndAcknowledgementsEnabled) { deleteObjectRequest.ifPresent(deleteRequest -> objectsToDeleteForAcknowledgmentSets.put(objectToProcess.get().getPartitionKey(), Set.of(deleteRequest))); - sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); + try { + sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); + } catch (final PartitionUpdateException e) { + LOG.debug("Failed to update the partition for the acknowledgment wait."); + } acknowledgementSet.complete(); } else { sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), false); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java index fa1645db8d..3e71510bf4 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; @@ -69,8 +70,10 @@ import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY; import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_TIMEOUT; +import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.CHECKPOINT_OWNERSHIP_INTERVAL; import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION; import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION; +import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.PARTITION_OWNERSHIP_UPDATE_ERRORS; @ExtendWith(MockitoExtension.class) class S3ScanObjectWorkerTest { @@ -114,6 +117,9 @@ class S3ScanObjectWorkerTest { @Mock private Counter counter; + @Mock + private Counter partitionOwnershipUpdateErrorCounter; + @Mock private Counter noObjectsFoundForFolderPartitionCounter; @@ -130,6 +136,7 @@ private ScanObjectWorker createObjectUnderTest() { when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter); when(pluginMetrics.counter(NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION)).thenReturn(noObjectsFoundForFolderPartitionCounter); + when(pluginMetrics.counter(PARTITION_OWNERSHIP_UPDATE_ERRORS)).thenReturn(partitionOwnershipUpdateErrorCounter); final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics); verify(sourceCoordinator).initialize(); @@ -207,12 +214,76 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))).thenReturn(acknowledgementSet); + doNothing().when(acknowledgementSet).addProgressCheck(any(Consumer.class), any(Duration.class)); scanObjectWorker.runWithoutInfiniteLoop(); final ArgumentCaptor consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); verify(acknowledgementSetManager).create(consumerArgumentCaptor.capture(), any(Duration.class)); + final ArgumentCaptor progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSet).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); + + final Consumer progressCheckConsumer = progressCheckArgumentCaptor.getValue(); + progressCheckConsumer.accept(mock(ProgressCheck.class)); + + final Consumer ackCallback = consumerArgumentCaptor.getValue(); + ackCallback.accept(true); + + final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet, s3ObjectDeleteWorker); + inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); + inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + inOrder.verify(acknowledgementSet).complete(); + inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey); + inOrder.verify(sourceCoordinator).completePartition(partitionKey, true); + + verify(counter).increment(); + + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); + } + + @ParameterizedTest + @MethodSource("exceptionProvider") + void acknowledgment_progress_check_increments_ownership_error_metric_when_partition_fails_to_update(final Class exception) throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; + + + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(true); + when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest); + + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class), eq(false))).willReturn(Optional.of(partitionToProcess)); + + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).completePartition(anyString(), eq(true)); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + + when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))).thenReturn(acknowledgementSet); + doNothing().when(acknowledgementSet).addProgressCheck(any(Consumer.class), any(Duration.class)); + + scanObjectWorker.runWithoutInfiniteLoop(); + + final ArgumentCaptor consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSetManager).create(consumerArgumentCaptor.capture(), any(Duration.class)); + + final ArgumentCaptor progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSet).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); + + final Consumer progressCheckConsumer = progressCheckArgumentCaptor.getValue(); + doThrow(exception).when(sourceCoordinator).renewPartitionOwnership(partitionKey); + progressCheckConsumer.accept(mock(ProgressCheck.class)); + final Consumer ackCallback = consumerArgumentCaptor.getValue(); ackCallback.accept(true); @@ -220,9 +291,11 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); inOrder.verify(acknowledgementSet).complete(); + inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey); inOrder.verify(sourceCoordinator).completePartition(partitionKey, true); verify(counter).increment(); + verify(partitionOwnershipUpdateErrorCounter).increment(); final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); assertThat(processedObject.getBucketName(), equalTo(bucket));