Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into frequent-check-pointing
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 authored Jan 27, 2025
2 parents 15d0fdf + e9792dc commit 3d7e2f0
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException;

void deleteS3Object(final S3ObjectReference s3ObjectReference);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class S3ObjectPluginMetrics {
static final String S3_OBJECTS_SIZE_PROCESSED = "s3ObjectProcessedBytes";
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
static final String S3_OBJECTS_DELETE_FAILED_METRIC_NAME = "s3ObjectsDeleteFailed";
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
static final String S3_OBJECTS_EVENTS = "s3ObjectsEvents";
static final String S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME = "s3ObjectsNotFound";
Expand All @@ -29,6 +30,8 @@ public class S3ObjectPluginMetrics {
private final DistributionSummary s3ObjectEventsSummary;
private final Counter s3ObjectNoRecordsFound;

private final Counter s3ObjectsDeleteFailed;

public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME);
Expand All @@ -39,6 +42,7 @@ public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
s3ObjectSizeProcessedSummary = pluginMetrics.summary(S3_OBJECTS_SIZE_PROCESSED);
s3ObjectEventsSummary = pluginMetrics.summary(S3_OBJECTS_EVENTS);
s3ObjectNoRecordsFound = pluginMetrics.counter(S3_OBJECTS_NO_RECORDS_FOUND);
s3ObjectsDeleteFailed = pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME);
}

public Counter getS3ObjectsFailedCounter() {
Expand Down Expand Up @@ -75,4 +79,6 @@ public DistributionSummary getS3ObjectEventsSummary() {
public Counter getS3ObjectNoRecordsFound() {
return s3ObjectNoRecordsFound;
}

public Counter getS3ObjectsDeleteFailed() { return s3ObjectsDeleteFailed; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,6 +36,9 @@ class S3ObjectWorker implements S3ObjectHandler {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class);
private static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;

private static final int MAX_RETRIES_DELETE_OBJECT = 3;
private static final long DELETE_OBJECT_RETRY_DELAY_MS = 1000;

private final S3Client s3Client;
private final Buffer<Record<Event>> buffer;

Expand Down Expand Up @@ -79,6 +84,46 @@ public void parseS3Object(final S3ObjectReference s3ObjectReference,
s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment();
}

@Override
public void deleteS3Object(final S3ObjectReference s3ObjectReference) {
final DeleteObjectRequest.Builder deleteRequestBuilder = DeleteObjectRequest.builder()
.bucket(s3ObjectReference.getBucketName())
.key(s3ObjectReference.getKey());

final Optional<String> bucketOwner = bucketOwnerProvider.getBucketOwner(s3ObjectReference.getBucketName());
bucketOwner.ifPresent(deleteRequestBuilder::expectedBucketOwner);

final DeleteObjectRequest deleteObjectRequest = deleteRequestBuilder.build();

boolean deleteSuccessFul = false;
int retryCount = 0;

while (!deleteSuccessFul && retryCount < MAX_RETRIES_DELETE_OBJECT) {
try {
s3Client.deleteObject(deleteObjectRequest);
deleteSuccessFul = true;
LOG.debug("Successfully deleted object {} from bucket {} on attempt {}",
s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), retryCount);
} catch (final Exception e) {
retryCount++;
if (retryCount == MAX_RETRIES_DELETE_OBJECT) {
LOG.error("Failed to delete object {} from bucket {} after {} attempts: {}",
s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), MAX_RETRIES_DELETE_OBJECT, e.getMessage());
s3ObjectPluginMetrics.getS3ObjectsDeleteFailed().increment();
} else {
LOG.warn("Failed to delete object {} from bucket {} on attempt {}, will retry: {}",
s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), retryCount, e.getMessage());
try {
Thread.sleep(DELETE_OBJECT_RETRY_DELAY_MS);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

private void doParseObject(final AcknowledgementSet acknowledgementSet,
final S3ObjectReference s3ObjectReference,
final BufferAccumulator<Record<Event>> bufferAccumulator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public void parseS3Object(final S3ObjectReference s3ObjectReference,
}
}

@Override
public void deleteS3Object(final S3ObjectReference s3ObjectReference) {
throw new UnsupportedOperationException("Deleting S3 objects is not supported with S3 select");
}

private void selectObject(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException {
final InputSerialization inputSerialization = getInputSerializationFormat(serializationFormatOption);
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ void addS3Object(final S3ObjectReference s3ObjectReference, AcknowledgementSet a
s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, null, null);
}

void deleteS3Object(final S3ObjectReference s3ObjectReference) throws IOException {
s3ObjectHandler.deleteS3Object(s3ObjectReference);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,24 @@ boolean isPrefixPartitionModeValid() {
return true;
}

@AssertTrue(message = "acknowledgments must be enabled when using delete_s3_objects_on_read")
boolean isAcknowledgmentsEnabledWithDeleteS3ObjectsOnRead() {
if (deleteS3ObjectsOnRead && !acknowledgments) {
return false;
}

return true;
}

@AssertTrue(message = "s3_select is not supported with delete_s3_objects_on_read")
boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() {
if (s3SelectOptions != null && deleteS3ObjectsOnRead) {
return false;
}

return true;
}

public NotificationTypeOption getNotificationType() {
return notificationType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(f
final List<ParsedMessage> parsedMessagesToRead = new ArrayList<>();
final Map<ParsedMessage, AcknowledgementSet> messageAcknowledgementSetMap = new HashMap<>();
final Map<ParsedMessage, List<DeleteMessageBatchRequestEntry>> messageWaitingForAcknowledgementsMap = new HashMap<>();
final Map<ParsedMessage, List<S3ObjectReference>> messagesWaitingForS3ObjectDeletion = new HashMap<>();

for (ParsedMessage parsedMessage : s3EventNotificationRecords) {
if (parsedMessage.isFailedParsing()) {
Expand Down Expand Up @@ -236,6 +237,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
Instant.now()
));
List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = new ArrayList<>();
List<S3ObjectReference> s3ObjectDeletionWaitingForAcknowledgments = new ArrayList<>();
AcknowledgementSet acknowledgementSet = null;
final int visibilityTimeout = (int)sqsOptions.getVisibilityTimeout().getSeconds();
final int maxVisibilityTimeout = (int)sqsOptions.getVisibilityDuplicateProtectionTimeout().getSeconds();
Expand All @@ -254,7 +256,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
parsedMessageVisibilityTimesMap.remove(parsedMessage);
}
if (result == true) {
deleteSqsMessages(waitingForAcknowledgements);
final boolean successfullyDeletedAllMessages = deleteSqsMessages(waitingForAcknowledgements);
if (successfullyDeletedAllMessages && s3SourceConfig.isDeleteS3ObjectsOnRead()) {
deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments);
}
}
},
Duration.ofSeconds(expiryTimeout));
Expand All @@ -273,6 +278,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
}
messageAcknowledgementSetMap.put(parsedMessage, acknowledgementSet);
messageWaitingForAcknowledgementsMap.put(parsedMessage, waitingForAcknowledgements);
messagesWaitingForS3ObjectDeletion.put(parsedMessage, s3ObjectDeletionWaitingForAcknowledgments);
}
}

Expand All @@ -284,10 +290,14 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
for (ParsedMessage parsedMessage : parsedMessagesToRead) {
final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage);
final List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage);
final List<S3ObjectReference> s3ObjectDeletionsWaitingForAcknowledgments = messagesWaitingForS3ObjectDeletion.get(parsedMessage);
final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey());
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet);
if (endToEndAcknowledgementsEnabled) {
deleteMessageBatchRequestEntry.ifPresent(waitingForAcknowledgements::add);
if (s3SourceConfig.isDeleteS3ObjectsOnRead()) {
s3ObjectDeletionsWaitingForAcknowledgments.add(s3ObjectReference);
}
acknowledgementSet.complete();
} else {
deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add);
Expand Down Expand Up @@ -333,11 +343,11 @@ private Optional<DeleteMessageBatchRequestEntry> processS3Object(
}
}

private void deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection) {
private boolean deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection) {
if(isStopped)
return;
return false;
if (deleteMessageBatchRequestEntryCollection.size() == 0) {
return;
return false;
}
final DeleteMessageBatchRequest deleteMessageBatchRequest = buildDeleteMessageBatchRequest(deleteMessageBatchRequestEntryCollection);
try {
Expand All @@ -361,6 +371,7 @@ private void deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> delete
.map(failed -> failed.toString())
.collect(Collectors.joining(", "));
LOG.error("Failed to delete {} messages from SQS with errors: [{}].", failedDeleteCount, failedMessages);
return false;
}
}

Expand All @@ -371,6 +382,21 @@ private void deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> delete
if(e instanceof StsException) {
applyBackoff();
}

return false;
}

return true;
}

private void deleteS3Objects(final List<S3ObjectReference> objectsToDelete) {
for (final S3ObjectReference s3ObjectReference : objectsToDelete) {
try {
s3Service.deleteS3Object(s3ObjectReference);
} catch (final Exception e) {
LOG.error("Received an exception while attempting to delete object {} from bucket {}: {}",
s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), e.getMessage());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
Expand All @@ -50,6 +52,7 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -149,11 +152,11 @@ void setUp() throws Exception {
when(s3ObjectReference.getKey()).thenReturn(key);

s3ObjectPluginMetrics = mock(S3ObjectPluginMetrics.class);
when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(s3ObjectReadTimer);
lenient().when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(s3ObjectReadTimer);
objectSize = random.nextInt(100_000) + 10_000;

exceptionThrownByCallable = null;
when(s3ObjectReadTimer.recordCallable(any(Callable.class)))
lenient().when(s3ObjectReadTimer.recordCallable(any(Callable.class)))
.thenAnswer(a -> {
try {
a.getArgument(0, Callable.class).call();
Expand Down Expand Up @@ -494,4 +497,45 @@ void parseS3Object_records_input_file_bytes_read() throws IOException {
verify(s3ObjectSizeProcessedSummary).record(inputStringLength);
}

@Test
void deleteS3Object_calls_delete_object_with_expected_request_success() {
final ArgumentCaptor<DeleteObjectRequest> deleteObjectRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteObjectRequest.class);
final String accountOwner = UUID.randomUUID().toString();

when(bucketOwnerProvider.getBucketOwner(bucketName)).thenReturn(Optional.of(accountOwner));

when(s3Client.deleteObject(deleteObjectRequestArgumentCaptor.capture())).thenReturn(mock(DeleteObjectResponse.class));


final S3ObjectWorker objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics);

objectUnderTest.deleteS3Object(s3ObjectReference);

final DeleteObjectRequest deleteObjectRequest = deleteObjectRequestArgumentCaptor.getValue();
assertThat(deleteObjectRequest, notNullValue());
assertThat(deleteObjectRequest.bucket(), equalTo(bucketName));
assertThat(deleteObjectRequest.key(), equalTo(key));
assertThat(deleteObjectRequest.expectedBucketOwner(), equalTo(accountOwner));
}

@Test
void deleteS3Object_increments_failed_deletion_metric_after_max_retries() {
final ArgumentCaptor<DeleteObjectRequest> deleteObjectRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteObjectRequest.class);
final String accountOwner = UUID.randomUUID().toString();

when(bucketOwnerProvider.getBucketOwner(bucketName)).thenReturn(Optional.of(accountOwner));

when(s3Client.deleteObject(deleteObjectRequestArgumentCaptor.capture())).thenThrow(RuntimeException.class);

final Counter s3ObjectDeteleFailedCounter = mock(Counter.class);
when(s3ObjectPluginMetrics.getS3ObjectsDeleteFailed()).thenReturn(s3ObjectDeteleFailedCounter);


final S3ObjectWorker objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics);

objectUnderTest.deleteS3Object(s3ObjectReference);

verify(s3ObjectDeteleFailedCounter).increment();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opensearch.dataprepper.plugins.source.s3;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class S3ServiceTest {

@Mock
private S3ObjectHandler s3ObjectHandler;

private S3Service createObjectUnderTest() {
return new S3Service(s3ObjectHandler);
}

@Test
void addS3Object_calls_parseS3Object_on_S3ObjectHandler() throws IOException {
final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
final S3ObjectReference s3ObjectReference = mock(S3ObjectReference.class);

doNothing().when(s3ObjectHandler).parseS3Object(eq(s3ObjectReference), eq(acknowledgementSet), eq(null), eq(null));

final S3Service objectUnderTest = createObjectUnderTest();

objectUnderTest.addS3Object(s3ObjectReference, acknowledgementSet);

verify(s3ObjectHandler).parseS3Object(s3ObjectReference, acknowledgementSet, null, null);
}

@Test
void deleteS3Object_calls_deleteS3Object_on_s3ObjectHandler() throws IOException {
final S3ObjectReference s3ObjectReference = mock(S3ObjectReference.class);
doNothing().when(s3ObjectHandler).deleteS3Object(s3ObjectReference);

final S3Service s3Service = createObjectUnderTest();

s3Service.deleteS3Object(s3ObjectReference);
verify(s3ObjectHandler).deleteS3Object(s3ObjectReference);

}
}
Loading

0 comments on commit 3d7e2f0

Please sign in to comment.