Skip to content

Commit

Permalink
Add progress check callback to update partition ownership in S3 scan …
Browse files Browse the repository at this point in the history
…source (#4918)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Sep 10, 2024
1 parent af9cab8 commit 7e9866a
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,12 @@ public interface SourceCoordinator<T> {
*/
void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout);

/**
* Should be called by the source to keep ownership of the partition
* before another instance of Data Prepper can pick it up for processing.
* @param partitionKey - the partition to renew ownership for
*/
void renewPartitionOwnership(final String partitionKey);

void deletePartition(final String partitionKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
static final String PARTITION_NOT_FOUND_ERROR_COUNT = "partitionNotFoundErrors";
static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors";
static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors";

static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10);

private static final String hostName;
Expand Down Expand Up @@ -91,7 +90,6 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final Counter saveStatePartitionUpdateErrorCounter;
private final Counter closePartitionUpdateErrorCounter;
private final Counter completePartitionUpdateErrorCounter;

private final Counter partitionsDeleted;
private final ReentrantLock lock;

Expand Down Expand Up @@ -302,7 +300,8 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);
} catch (final PartitionUpdateException e) {
} catch (final Exception e) {
LOG.error("Exception while saving state for the partition {}: {}", partitionKey, e.getMessage());
saveStatePartitionUpdateErrorCounter.increment();
throw e;
}
Expand All @@ -315,12 +314,29 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe

@Override
public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) {
validateIsInitialized();
try {
updatePartitionOwnership(partitionKey, ackowledgmentTimeout);
} catch (final Exception e) {
LOG.error("Exception while updating acknowledgment wait for the partition {}: {}", partitionKey, e.getMessage());
throw e;
}
}

@Override
public void renewPartitionOwnership(final String partitionKey) {
try {
updatePartitionOwnership(partitionKey, DEFAULT_LEASE_TIMEOUT);
} catch (final Exception e) {
LOG.error("Exception while renewing partition ownership for the partition {}: {}", partitionKey, e.getMessage());
throw e;
}
}

final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, "update for ack wait");
private void updatePartitionOwnership(final String partitionKey, final Duration ownershipRenewalTime) {
final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, "update partition ownership");
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));
itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ownershipRenewalTime));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);
}
Expand Down Expand Up @@ -371,7 +387,7 @@ public void deletePartition(final String partitionKey) {
try {
sourceCoordinationStore.tryDeletePartitionItem(deleteItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to delete partition {}: {}.", deleteItem.getSourcePartitionKey(), e.getMessage());
LOG.error("Unable to delete partition {}: {}.", deleteItem.getSourcePartitionKey(), e.getMessage());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,29 @@ void updatePartitionForAckWait_updates_partition_ownership_and_removes_active_pa
assertThat(newPartitionOwnershipTimeout.isAfter(beforeSave.plus(ackTimeout)), equalTo(true));
}

@Test
void renewPartitionOwnership_updates_partition_ownership() throws UnknownHostException {
final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
.withPartitionKey(UUID.randomUUID().toString())
.withPartitionState(null)
.build();

final Instant beforeSave = Instant.now();

given(sourcePartitionStoreItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName());
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));

doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem);

final Duration ackTimeout = Duration.ofSeconds(10);
createObjectUnderTest().renewPartitionOwnership(sourcePartition.getPartitionKey());

final ArgumentCaptor<Instant> argumentCaptorForPartitionOwnershipTimeout = ArgumentCaptor.forClass(Instant.class);
verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(argumentCaptorForPartitionOwnershipTimeout.capture());
final Instant newPartitionOwnershipTimeout = argumentCaptorForPartitionOwnershipTimeout.getValue();
assertThat(newPartitionOwnershipTimeout.isAfter(beforeSave.plus(ackTimeout)), equalTo(true));
}

@Test
void giveUpPartitions_with_active_partitionKey_that_does_not_exist_in_the_store_removes_the_active_partition() {
final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet,
}
bufferAccumulator.add(record);

if (sourceCoordinator != null && partitionKey != null &&
if (acknowledgementSet != null && sourceCoordinator != null && partitionKey != null &&
(System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) {
LOG.debug("Renew partition ownership for the object {}", partitionKey);
sourceCoordinator.saveProgressStateForPartition(partitionKey, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ScanObjectWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class);
private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1;

static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2);

static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1);
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;

Expand All @@ -60,6 +62,7 @@ public class ScanObjectWorker implements Runnable {

static final String NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION = "folderPartitionNoObjectsFound";

static final String PARTITION_OWNERSHIP_UPDATE_ERRORS = "partitionOwnershipUpdateErrors";
private final S3Client s3Client;

private final List<ScanOptions> scanOptionsBuilderList;
Expand All @@ -85,6 +88,8 @@ public class ScanObjectWorker implements Runnable {
private final Counter acknowledgementSetCallbackCounter;

private final Counter folderPartitionNoObjectsFound;

private final Counter partitionOwnershipUpdateFailures;
private final long backOffMs;
private final List<String> partitionKeys;

Expand Down Expand Up @@ -118,6 +123,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.pluginMetrics = pluginMetrics;
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME);
this.folderPartitionNoObjectsFound = pluginMetrics.counter(NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION);
this.partitionOwnershipUpdateFailures = pluginMetrics.counter(PARTITION_OWNERSHIP_UPDATE_ERRORS);
this.sourceCoordinator.initialize();
this.partitionKeys = new ArrayList<>();
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
Expand Down Expand Up @@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) {
}
partitionKeys.remove(objectToProcess.get().getPartitionKey());
}, ACKNOWLEDGEMENT_SET_TIMEOUT);

acknowledgementSet.addProgressCheck(
(ratio) -> {
try {
sourceCoordinator.renewPartitionOwnership(objectToProcess.get().getPartitionKey());
} catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) {
LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.get().getPartitionKey());
partitionOwnershipUpdateFailures.increment();
}
},
CHECKPOINT_OWNERSHIP_INTERVAL);
}


Expand All @@ -217,7 +234,11 @@ private void startProcessingObject(final long waitTimeMillis) {

if (endToEndAcknowledgementsEnabled) {
deleteObjectRequest.ifPresent(deleteRequest -> objectsToDeleteForAcknowledgmentSets.put(objectToProcess.get().getPartitionKey(), Set.of(deleteRequest)));
sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
try {
sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
} catch (final PartitionUpdateException e) {
LOG.debug("Failed to update the partition for the acknowledgment wait.");
}
acknowledgementSet.complete();
} else {
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException;
Expand Down Expand Up @@ -69,8 +70,10 @@
import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_TIMEOUT;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.CHECKPOINT_OWNERSHIP_INTERVAL;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION;
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.PARTITION_OWNERSHIP_UPDATE_ERRORS;

@ExtendWith(MockitoExtension.class)
class S3ScanObjectWorkerTest {
Expand Down Expand Up @@ -114,6 +117,9 @@ class S3ScanObjectWorkerTest {
@Mock
private Counter counter;

@Mock
private Counter partitionOwnershipUpdateErrorCounter;

@Mock
private Counter noObjectsFoundForFolderPartitionCounter;

Expand All @@ -130,6 +136,7 @@ private ScanObjectWorker createObjectUnderTest() {
when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions);
when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter);
when(pluginMetrics.counter(NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION)).thenReturn(noObjectsFoundForFolderPartitionCounter);
when(pluginMetrics.counter(PARTITION_OWNERSHIP_UPDATE_ERRORS)).thenReturn(partitionOwnershipUpdateErrorCounter);
final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider,
sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics);
verify(sourceCoordinator).initialize();
Expand Down Expand Up @@ -207,22 +214,88 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob
final ScanObjectWorker scanObjectWorker = createObjectUnderTest();

when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))).thenReturn(acknowledgementSet);
doNothing().when(acknowledgementSet).addProgressCheck(any(Consumer.class), any(Duration.class));

scanObjectWorker.runWithoutInfiniteLoop();

final ArgumentCaptor<Consumer> consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSetManager).create(consumerArgumentCaptor.capture(), any(Duration.class));

final ArgumentCaptor<Consumer> progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSet).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL));

final Consumer<ProgressCheck> progressCheckConsumer = progressCheckArgumentCaptor.getValue();
progressCheckConsumer.accept(mock(ProgressCheck.class));

final Consumer<Boolean> ackCallback = consumerArgumentCaptor.getValue();
ackCallback.accept(true);

final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet, s3ObjectDeleteWorker);
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
inOrder.verify(acknowledgementSet).complete();
inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey);
inOrder.verify(sourceCoordinator).completePartition(partitionKey, true);

verify(counter).increment();

final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue();
assertThat(processedObject.getBucketName(), equalTo(bucket));
assertThat(processedObject.getKey(), equalTo(objectKey));
}

@ParameterizedTest
@MethodSource("exceptionProvider")
void acknowledgment_progress_check_increments_ownership_error_metric_when_partition_fails_to_update(final Class<Throwable> exception) throws IOException {
final String bucket = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();
final String partitionKey = bucket + "|" + objectKey;


when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(true);
when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest);

final SourcePartition<S3SourceProgressState> partitionToProcess = SourcePartition.builder(S3SourceProgressState.class)
.withPartitionKey(partitionKey)
.withPartitionClosedCount(0L)
.build();

given(sourceCoordinator.getNextPartition(any(Function.class), eq(false))).willReturn(Optional.of(partitionToProcess));

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).completePartition(anyString(), eq(true));

final ScanObjectWorker scanObjectWorker = createObjectUnderTest();

when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))).thenReturn(acknowledgementSet);
doNothing().when(acknowledgementSet).addProgressCheck(any(Consumer.class), any(Duration.class));

scanObjectWorker.runWithoutInfiniteLoop();

final ArgumentCaptor<Consumer> consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSetManager).create(consumerArgumentCaptor.capture(), any(Duration.class));

final ArgumentCaptor<Consumer> progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSet).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL));

final Consumer<ProgressCheck> progressCheckConsumer = progressCheckArgumentCaptor.getValue();
doThrow(exception).when(sourceCoordinator).renewPartitionOwnership(partitionKey);
progressCheckConsumer.accept(mock(ProgressCheck.class));

final Consumer<Boolean> ackCallback = consumerArgumentCaptor.getValue();
ackCallback.accept(true);

final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet, s3ObjectDeleteWorker);
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
inOrder.verify(acknowledgementSet).complete();
inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey);
inOrder.verify(sourceCoordinator).completePartition(partitionKey, true);

verify(counter).increment();
verify(partitionOwnershipUpdateErrorCounter).increment();

final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue();
assertThat(processedObject.getBucketName(), equalTo(bucket));
Expand Down

0 comments on commit 7e9866a

Please sign in to comment.