From 1ffb57240f80fc171ef30c64436d8e4c13a0fbfe Mon Sep 17 00:00:00 2001 From: David Venable Date: Wed, 15 Nov 2023 08:02:31 -0800 Subject: [PATCH] Correct single quote escape character in DynamoDB [#3664] (#3667) Resolves a bug with escaped single quotes in the DynamoDB source by updating the AWS SDK to 2.21.23. Also, skip data that cannot be parsed entirely rather than silently send empty data. Resolves #3664. Signed-off-by: David Venable --- build.gradle | 2 +- .../converter/StreamRecordConverter.java | 26 +++-- .../converter/StreamRecordConverterTest.java | 109 +++++++++++++++++- 3 files changed, 122 insertions(+), 15 deletions(-) diff --git a/build.gradle b/build.gradle index 3d720a6f34..39f6886e44 100644 --- a/build.gradle +++ b/build.gradle @@ -236,7 +236,7 @@ subprojects { configure(subprojects.findAll {it.name != 'data-prepper-api'}) { dependencies { - implementation platform('software.amazon.awssdk:bom:2.20.67') + implementation platform('software.amazon.awssdk:bom:2.21.23') implementation 'jakarta.validation:jakarta.validation-api:3.0.2' } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index f497c7e205..067eaf8519 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -70,10 +70,18 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List data = convertData(record.dynamodb().newImage()); - // Always get keys from dynamodb().keys() - Map keys = convertKeys(record.dynamodb().keys()); + Map data; + Map keys; + try { + // NewImage may be empty + data = convertData(record.dynamodb().newImage()); + // Always get keys from dynamodb().keys() + keys = convertKeys(record.dynamodb().keys()); + } catch (final Exception e) { + LOG.error("Failed to parse and convert data from stream due to {}", e.getMessage()); + changeEventErrorCounter.increment(); + continue; + } try { bytesReceivedSummary.record(bytes); @@ -101,13 +109,9 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List convertData(Map data) { - try { - String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); - return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); - } catch (JsonProcessingException e) { - return null; - } + private Map convertData(Map data) throws JsonProcessingException { + String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); + return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 549bcce8f0..19b4e27b49 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -27,6 +29,8 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -39,6 +43,7 @@ import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -155,11 +160,92 @@ void test_writeSingleRecordToBuffer() throws Exception { assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + verifyNoInteractions(changeEventErrorCounter); verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); } + @ParameterizedTest + @ValueSource(strings = { + "Hello world.", + "I'm sorry.", + "I'm sorry, but I don't have access to that.", + "Re: colons", + "and/or", + "c:\\Home", + "I take\nup multiple\nlines", + "String with some \"backquotes\"." + }) + void test_writeSingleRecordToBuffer_with_other_data(final String additionalString) throws Exception { + + final Map additionalData = Map.of("otherData", AttributeValue.builder().s(additionalString).build()); + List records = buildRecords(1, Instant.now(), additionalData); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + assertThat(event.get("otherData", String.class), equalTo(additionalString)); + + verifyNoInteractions(changeEventErrorCounter); + verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); + verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); + } + + @Test + void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Exception { + + final Map badData = Map.of("otherData", AttributeValue.builder().build()); + List badRecords = buildRecords(2, Instant.now(), badData); + + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + + objectUnderTest.writeToBuffer(null, badRecords); + + verify(bufferAccumulator, never()).add(any(Record.class)); + } + + @Test + void test_writeSingleRecordToBuffer_with_mixed_input_writes_good_records() throws Exception { + + final Map badData = Map.of("otherData", AttributeValue.builder().build()); + List badRecords = buildRecords(2, Instant.now(), badData); + List goodRecords = buildRecords(5, Instant.now()); + + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + + List mixedRecords = new ArrayList<>(); + mixedRecords.addAll(badRecords); + mixedRecords.addAll(goodRecords); + + objectUnderTest.writeToBuffer(null, mixedRecords); + + verify(bufferAccumulator, times(goodRecords.size())).add(any(Record.class)); + } + @Test void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timestamp() throws Exception { final long currentSecond = 1699336310; @@ -252,22 +338,39 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta } private List buildRecords(int count, final Instant creationTime) { + return buildRecords(count, creationTime, Collections.emptyMap()); + } + + private List buildRecords( + int count, + final Instant creationTime, + final Map additionalData) { List records = new ArrayList<>(); for (int i = 0; i < count; i++) { - records.add(buildRecord(creationTime)); + records.add(buildRecord(creationTime, additionalData)); } return records; } private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime) { - Map data = Map.of( + return buildRecord(creationTime, Collections.emptyMap()); + } + + private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime, + Map additionalData) { + Map keysData = Map.of( partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(), sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build()); + + final Map data = new HashMap<>(); + data.putAll(keysData); + data.putAll(additionalData); + StreamRecord streamRecord = StreamRecord.builder() .sizeBytes(RANDOM.nextLong()) .newImage(data) - .keys(data) + .keys(keysData) .sequenceNumber(UUID.randomUUID().toString()) .approximateCreationDateTime(creationTime) .build();