Skip to content

Commit

Permalink
Add option to process only metadata of objects in S3 scan mode
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Feb 27, 2025
1 parent b3b6c65 commit f45865d
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,24 @@ void parseS3Object(final S3ObjectReference s3ObjectReference,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException;

/**
* delete S3 object using S3 object reference
* @param s3ObjectReference Contains bucket and s3 object details
*/
void deleteS3Object(final S3ObjectReference s3ObjectReference);

/**
* process S3 object metadata using S3 object reference and pushing to buffer
* @param s3ObjectReference Contains bucket and s3 object details
* @param acknowledgementSet acknowledgement set for the object
* @param sourceCoordinator source coordinator
* @param partitionKey partition key
*
* @throws IOException exception is thrown every time because this is not supported
*/
default void processS3ObjectMetadata(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand All @@ -22,6 +23,8 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -64,6 +67,81 @@ public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) {
this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics();
}

private void processObjectMetadata(final AcknowledgementSet acknowledgementSet,
final S3ObjectReference s3ObjectReference,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics);
final String BUCKET = "bucket";
final String KEY = "key";
final String TIME = "time";
final String LENGTH = "length";
Map<String, Object> data = new HashMap<>();
data.put(BUCKET, s3ObjectReference.getBucketName());
data.put(KEY, s3ObjectReference.getKey());
data.put(TIME, inputFile.getLastModified());
data.put(LENGTH, inputFile.getLength());
Event event = JacksonEvent.builder()
.withEventType("event")
.withData(data)
.build();
final long s3ObjectSize = event.toJsonString().length();
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
AtomicLong lastCheckpointTime = new AtomicLong(System.currentTimeMillis());
final AtomicInteger saveStateCounter = new AtomicInteger();
final Instant lastModifiedTime = inputFile.getLastModified();
final Instant now = Instant.now();
final Instant originationTime = (lastModifiedTime == null || lastModifiedTime.isAfter(now)) ? now : lastModifiedTime;
event.getMetadata().setExternalOriginationTime(originationTime);
event.getEventHandle().setExternalOriginationTime(originationTime);
try {
bufferAccumulator.add(new Record<>(event));
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer.", e);
}
if (acknowledgementSet != null && sourceCoordinator != null && partitionKey != null &&
(System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) {
LOG.debug("Renew partition ownership for the object {}", partitionKey);
sourceCoordinator.saveProgressStateForPartition(partitionKey, null);
lastCheckpointTime.set(System.currentTimeMillis());
saveStateCounter.getAndIncrement();
}

try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer.", e);
}

final int recordsWritten = bufferAccumulator.getTotalWritten();

if (recordsWritten == 0) {
LOG.warn("Failed to get metadata for S3 object: s3ObjectReference={}.", s3ObjectReference);
s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment();
}
s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize);
s3ObjectPluginMetrics.getS3ObjectEventsSummary().record(recordsWritten);
}

public void processS3ObjectMetadata(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout);
try {
s3ObjectPluginMetrics.getS3ObjectReadTimer().recordCallable((Callable<Void>) () -> {
processObjectMetadata(acknowledgementSet, s3ObjectReference, bufferAccumulator, sourceCoordinator, partitionKey);
return null;
});
} catch (final Exception e) {
throw new RuntimeException(e);
}
s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment();
}

public void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class ScanObjectWorker implements Runnable {

private final Duration acknowledgmentSetTimeout;

private final boolean metadataOnly;

public ScanObjectWorker(final S3Client s3Client,
final List<ScanOptions> scanOptionsBuilderList,
final S3ObjectHandler s3ObjectHandler,
Expand All @@ -115,6 +117,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.s3ObjectHandler= s3ObjectHandler;
this.bucketOwnerProvider = bucketOwnerProvider;
this.sourceCoordinator = sourceCoordinator;
this.metadataOnly = s3SourceConfig.getS3ScanScanOptions().getMetdataOnly();
this.s3ScanSchedulingOptions = s3SourceConfig.getS3ScanScanOptions().getSchedulingOptions();
this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
this.acknowledgementSetManager = acknowledgementSetManager;
Expand Down Expand Up @@ -246,6 +249,17 @@ private void startProcessingObject(final long waitTimeMillis) {
}
}

private void processS3ObjectMetadata(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final SourcePartition<S3SourceProgressState> sourcePartition) {
try {
s3ObjectHandler.processS3ObjectMetadata(s3ObjectReference, acknowledgementSet, sourceCoordinator, sourcePartition.getPartitionKey());
} catch (final IOException ex) {
LOG.error("Error while process the processS3ObjectMetadata. ",ex);
}
}

private Optional<DeleteObjectRequest> processS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
Expand Down Expand Up @@ -334,6 +348,7 @@ private boolean shouldDeleteFolderPartition(final SourcePartition<S3SourceProgre
return false;
}


private void processObjectsForFolderPartition(final List<S3ObjectReference> objectsToProcess,
final SourcePartition<S3SourceProgressState> folderPartition) {
int objectsProcessed = 0;
Expand Down Expand Up @@ -365,11 +380,15 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
acknowledgmentsRemainingForPartitions.put(folderPartition.getPartitionKey(), acknowledgmentsRemainingForPartition);
}

final Optional<DeleteObjectRequest> deleteObjectRequest = processS3Object(s3ObjectReference,
acknowledgementSet, sourceCoordinator, folderPartition);
if (metadataOnly) {
processS3ObjectMetadata(s3ObjectReference, acknowledgementSet, sourceCoordinator, folderPartition);
} else {
final Optional<DeleteObjectRequest> deleteObjectRequest = processS3Object(s3ObjectReference,
acknowledgementSet, sourceCoordinator, folderPartition);

if (deleteObjectRequest.isPresent()) {
objectsToDeleteForAcknowledgmentSets.get(activeAcknowledgmentSetId).add(deleteObjectRequest.get());
if (deleteObjectRequest.isPresent()) {
objectsToDeleteForAcknowledgmentSets.get(activeAcknowledgmentSetId).add(deleteObjectRequest.get());
}
}

objectsProcessed++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class S3ScanScanOptions {
@JsonProperty("end_time")
private LocalDateTime endTime;

@JsonProperty("metadata_only")
private boolean metadata_only = false;

@JsonProperty("buckets")
@Valid
private List<S3ScanBucketOptions> buckets;
Expand Down Expand Up @@ -89,4 +92,6 @@ public S3ScanSchedulingOptions getSchedulingOptions() {
public FolderPartitioningOptions getPartitioningOptions() { return folderPartitioningOptions; }

public Duration getAcknowledgmentTimeout() { return acknowledgmentTimeout; }

public boolean getMetdataOnly() { return metadata_only; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand All @@ -74,7 +76,7 @@ class S3ObjectWorkerTest {

@Mock
private S3Client s3Client;

@Mock
private Buffer<Record<Event>> buffer;

Expand Down Expand Up @@ -122,12 +124,17 @@ class S3ObjectWorkerTest {
@Mock
private GetObjectResponse getObjectResponse;

private AtomicInteger recordsWritten;

private Record<Event> receivedRecord;

@Mock
private HeadObjectResponse headObjectResponse;

private Exception exceptionThrownByCallable;
private Random random;
private long objectSize;
private Instant testTime;
@Mock
private S3ObjectPluginMetrics s3ObjectPluginMetrics;
@Mock
Expand Down Expand Up @@ -168,6 +175,8 @@ void setUp() throws Exception {
});
lenient().when(getObjectResponse.contentLength()).thenReturn(objectSize);
lenient().when(headObjectResponse.contentLength()).thenReturn(objectSize);
testTime = Instant.now();
lenient().when(headObjectResponse.lastModified()).thenReturn(testTime);
}

private S3ObjectWorker createObjectUnderTest(final S3ObjectPluginMetrics s3ObjectPluginMetrics) {
Expand Down Expand Up @@ -244,6 +253,38 @@ void parseS3Object_calls_Codec_parse_on_S3InputStream() throws Exception {
assertThat(actualInputFile, instanceOf(S3InputFile.class));
}

@Test
void S3ObjectWorker_with_MetadataOnly_Test() throws Exception {
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
recordsWritten = new AtomicInteger(0);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class);
doAnswer(a -> {
receivedRecord = a.getArgument(0);
recordsWritten.incrementAndGet();
return null;
}).when(bufferAccumulator).add(any(Record.class));

doAnswer(a -> {
return recordsWritten.get();
}).when(bufferAccumulator).getTotalWritten();

try (final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout))
.thenReturn(bufferAccumulator);
createObjectUnderTest(s3ObjectPluginMetrics).processS3ObjectMetadata(s3ObjectReference, acknowledgementSet, null, null);
assertThat(recordsWritten.get(), equalTo(1));
Event event = receivedRecord.getData();
assertThat(event.get("bucket", String.class), equalTo(bucketName));
assertThat(event.get("key", String.class), equalTo(key));
assertThat(event.get("length", Long.class), equalTo(objectSize));
assertThat(event.get("time", Instant.class), equalTo(testTime));
}
//createObjectUnderTest(s3ObjectPluginMetrics).processS3ObjectMetadata(s3ObjectReference, acknowledgementSet, null, null);
}

@Test
void parseS3Object_codec_parse_exception() throws Exception {
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
Expand Down

0 comments on commit f45865d

Please sign in to comment.