From c705c0933eb89e3fcca1255656752e4889a928f9 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:18:57 -0700 Subject: [PATCH] Split HTTP source data to multiple chunks before writing to byte buffer (#4266) * Split HTTP source data to multiple chunks before writing to byte buffer Signed-off-by: Krishna Kondaka * Modified JsonDecoder to parse objects in addition to array of objects Signed-off-by: Krishna Kondaka * Added a test case Signed-off-by: Krishna Kondaka * Removed json to array conversion Signed-off-by: Krishna Kondaka * Modified to add new JsonObjDecoder for decoding single json objects Signed-off-by: Krishna Kondaka * Removed changes to JsonDecoder Signed-off-by: Krishna Kondaka * Renamed JsonObjDecoder to JsonObjectDecoder Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../model/codec/JsonObjectDecoder.java | 57 ++++++++++++++ .../model/codec/JsonObjectDecoderTest.java | 75 +++++++++++++++++++ .../dataprepper/pipeline/Pipeline.java | 2 +- .../plugins/source/loghttp/HTTPSource.java | 4 +- .../source/loghttp/LogHTTPService.java | 6 +- .../plugins/kafka/buffer/KafkaBufferIT.java | 69 +++++++++++++++++ 6 files changed, 207 insertions(+), 6 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoder.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoderTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoder.java new file mode 100644 index 0000000000..6485de0e87 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoder.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.codec; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +public class JsonObjectDecoder implements ByteDecoder { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonFactory jsonFactory = new JsonFactory(); + + public void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + + final Record record = createRecord(innerJson, timeReceived); + eventConsumer.accept(record); + } + } + } + + private Record createRecord(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return new Record<>(event); + } + +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoderTest.java new file mode 100644 index 0000000000..3f9aff511d --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonObjectDecoderTest.java @@ -0,0 +1,75 @@ +package org.opensearch.dataprepper.model.codec; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.ByteArrayInputStream; +import java.time.Instant; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import org.junit.jupiter.api.BeforeEach; + +public class JsonObjectDecoderTest { + private JsonObjectDecoder jsonObjectDecoder; + private Record receivedRecord; + private Instant receivedTime; + + private JsonObjectDecoder createObjectUnderTest() { + return new JsonObjectDecoder(); + } + + @BeforeEach + void setup() { + jsonObjectDecoder = createObjectUnderTest(); + receivedRecord = null; + } + + @Test + void test_basicJsonObjectDecoder() { + String stringValue = UUID.randomUUID().toString(); + Random r = new Random(); + int intValue = r.nextInt(); + String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}"; + try { + jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> { + receivedRecord = record; + }); + } catch (Exception e){} + + assertNotEquals(receivedRecord, null); + Map map = receivedRecord.getData().toMap(); + assertThat(map.get("key1"), equalTo(stringValue)); + assertThat(map.get("key2"), equalTo(intValue)); + } + + @Test + void test_basicJsonObjectDecoder_withTimeReceived() { + String stringValue = UUID.randomUUID().toString(); + Random r = new Random(); + int intValue = r.nextInt(); + + String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}"; + final Instant now = Instant.now(); + try { + jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> { + receivedRecord = record; + receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime(); + }); + } catch (Exception e){} + + assertNotEquals(receivedRecord, null); + Map map = receivedRecord.getData().toMap(); + assertThat(map.get("key1"), equalTo(stringValue)); + assertThat(map.get("key2"), equalTo(intValue)); + assertThat(receivedTime, equalTo(now)); + } + +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index a71f8d14b1..2b103df91c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -339,7 +339,7 @@ List> publishToSinks(final Collection records) { final RouterGetRecordStrategy getRecordStrategy = new RouterCopyRecordStrategy(eventFactory, - (source.areAcknowledgementsEnabled()) ? + (source.areAcknowledgementsEnabled() || buffer.isByteBuffer()) ? acknowledgementSetManager : InactiveAcknowledgementSetManager.getInstance(), sinks); diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 9022eaca60..5dd7292417 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -20,7 +20,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.codec.ByteDecoder; -import org.opensearch.dataprepper.model.codec.JsonDecoder; +import org.opensearch.dataprepper.model.codec.JsonObjectDecoder; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; @@ -64,7 +64,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.pipelineName = pipelineDescription.getPipelineName(); - this.byteDecoder = new JsonDecoder(); + this.byteDecoder = new JsonObjectDecoder(); this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig); final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); final PluginSetting authenticationPluginSetting; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java index b43891108b..af423e9d10 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -87,9 +87,9 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe } try { if (buffer.isByteBuffer()) { - // jsonList is ignored in this path but parse() was done to make - // sure that the data is in the expected json format - buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis); + for (final String json: jsonList) { + buffer.writeBytes(json.getBytes(), null, bufferWriteTimeoutInMillis); + } } else { final List> records = jsonList.stream() .map(this::buildRecordLog) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index f959149bfa..261008e050 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -30,6 +30,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; +import org.opensearch.dataprepper.model.codec.JsonDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +127,10 @@ void setUp() { byteDecoder = null; } + private KafkaBuffer createObjectUnderTestWithJsonDecoder() { + return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new JsonDecoder(), null, null); + } + private KafkaBuffer createObjectUnderTest() { return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null); } @@ -193,6 +198,70 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); } + @Test + void writeBigJson_and_read() throws Exception { + final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder(); + + String inputJson = "["; + final int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + String boolString = (i % 2 == 0) ? "true" : "false"; + if (i != 0) + inputJson += ","; + inputJson += "{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}"; + } + inputJson += "]"; + objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000); + + Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(numRecords)); + + Object[] outputRecords = readResult.getKey().toArray(); + for (int i = 0; i < numRecords; i++) { + Record receivedRecord = (Record)outputRecords[i]; + assertThat(receivedRecord, notNullValue()); + assertThat(receivedRecord.getData(), notNullValue()); + Map receivedMap = receivedRecord.getData().toMap(); + assertThat(receivedMap.get("key"+i), equalTo("value"+i)); + assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i)); + boolean expectedBoolString = (i % 2 == 0) ? true : false; + assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString)); + } + } + + @Test + void writeMultipleSmallJson_and_read() throws Exception { + final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder(); + + final int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + String boolString = (i % 2 == 0) ? "true" : "false"; + String inputJson = "[{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}]"; + objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000); + } + + Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(numRecords)); + + Object[] outputRecords = readResult.getKey().toArray(); + for (int i = 0; i < numRecords; i++) { + Record receivedRecord = (Record)outputRecords[i]; + assertThat(receivedRecord, notNullValue()); + assertThat(receivedRecord.getData(), notNullValue()); + Map receivedMap = receivedRecord.getData().toMap(); + assertThat(receivedMap.get("key"+i), equalTo("value"+i)); + assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i)); + boolean expectedBoolString = (i % 2 == 0) ? true : false; + assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString)); + } + } + @Test void writeBytes_and_read() throws Exception { byteDecoder = new JsonDecoder();