diff --git a/build.gradle b/build.gradle index 3d720a6f34..39f6886e44 100644 --- a/build.gradle +++ b/build.gradle @@ -236,7 +236,7 @@ subprojects { configure(subprojects.findAll {it.name != 'data-prepper-api'}) { dependencies { - implementation platform('software.amazon.awssdk:bom:2.20.67') + implementation platform('software.amazon.awssdk:bom:2.21.23') implementation 'jakarta.validation:jakarta.validation-api:3.0.2' } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index f497c7e205..067eaf8519 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -70,10 +70,18 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List data = convertData(record.dynamodb().newImage()); - // Always get keys from dynamodb().keys() - Map keys = convertKeys(record.dynamodb().keys()); + Map data; + Map keys; + try { + // NewImage may be empty + data = convertData(record.dynamodb().newImage()); + // Always get keys from dynamodb().keys() + keys = convertKeys(record.dynamodb().keys()); + } catch (final Exception e) { + LOG.error("Failed to parse and convert data from stream due to {}", e.getMessage()); + changeEventErrorCounter.increment(); + continue; + } try { bytesReceivedSummary.record(bytes); @@ -101,13 +109,9 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List convertData(Map data) { - try { - String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); - return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); - } catch (JsonProcessingException e) { - return null; - } + private Map convertData(Map data) throws JsonProcessingException { + String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); + return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 549bcce8f0..19b4e27b49 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -27,6 +29,8 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -39,6 +43,7 @@ import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -155,11 +160,92 @@ void test_writeSingleRecordToBuffer() throws Exception { assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + verifyNoInteractions(changeEventErrorCounter); verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); } + @ParameterizedTest + @ValueSource(strings = { + "Hello world.", + "I'm sorry.", + "I'm sorry, but I don't have access to that.", + "Re: colons", + "and/or", + "c:\\Home", + "I take\nup multiple\nlines", + "String with some \"backquotes\"." + }) + void test_writeSingleRecordToBuffer_with_other_data(final String additionalString) throws Exception { + + final Map additionalData = Map.of("otherData", AttributeValue.builder().s(additionalString).build()); + List records = buildRecords(1, Instant.now(), additionalData); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + assertThat(event.get("otherData", String.class), equalTo(additionalString)); + + verifyNoInteractions(changeEventErrorCounter); + verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); + verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); + } + + @Test + void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Exception { + + final Map badData = Map.of("otherData", AttributeValue.builder().build()); + List badRecords = buildRecords(2, Instant.now(), badData); + + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + + objectUnderTest.writeToBuffer(null, badRecords); + + verify(bufferAccumulator, never()).add(any(Record.class)); + } + + @Test + void test_writeSingleRecordToBuffer_with_mixed_input_writes_good_records() throws Exception { + + final Map badData = Map.of("otherData", AttributeValue.builder().build()); + List badRecords = buildRecords(2, Instant.now(), badData); + List goodRecords = buildRecords(5, Instant.now()); + + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + + List mixedRecords = new ArrayList<>(); + mixedRecords.addAll(badRecords); + mixedRecords.addAll(goodRecords); + + objectUnderTest.writeToBuffer(null, mixedRecords); + + verify(bufferAccumulator, times(goodRecords.size())).add(any(Record.class)); + } + @Test void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timestamp() throws Exception { final long currentSecond = 1699336310; @@ -252,22 +338,39 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta } private List buildRecords(int count, final Instant creationTime) { + return buildRecords(count, creationTime, Collections.emptyMap()); + } + + private List buildRecords( + int count, + final Instant creationTime, + final Map additionalData) { List records = new ArrayList<>(); for (int i = 0; i < count; i++) { - records.add(buildRecord(creationTime)); + records.add(buildRecord(creationTime, additionalData)); } return records; } private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime) { - Map data = Map.of( + return buildRecord(creationTime, Collections.emptyMap()); + } + + private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime, + Map additionalData) { + Map keysData = Map.of( partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(), sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build()); + + final Map data = new HashMap<>(); + data.putAll(keysData); + data.putAll(additionalData); + StreamRecord streamRecord = StreamRecord.builder() .sizeBytes(RANDOM.nextLong()) .newImage(data) - .keys(data) + .keys(keysData) .sequenceNumber(UUID.randomUUID().toString()) .approximateCreationDateTime(creationTime) .build();