Skip to content

Commit

Permalink
Split HTTP source data to multiple chunks before writing to byte buff…
Browse files Browse the repository at this point in the history
…er (#4266)

* Split HTTP source data to multiple chunks before writing to byte buffer

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified JsonDecoder to parse objects in addition to array of objects

Signed-off-by: Krishna Kondaka <[email protected]>

* Added a test case

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed json to array conversion

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified to add new JsonObjDecoder for decoding single json objects

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed changes to JsonDecoder

Signed-off-by: Krishna Kondaka <[email protected]>

* Renamed JsonObjDecoder to JsonObjectDecoder

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Mar 14, 2024
1 parent 6611631 commit c705c09
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class JsonObjectDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson, timeReceived);
eventConsumer.accept(record);
}
}
}

private Record<Event> createRecord(final Map<String, Object> json, final Instant timeReceived) {
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(json)
.getThis();
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();

return new Record<>(event);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.dataprepper.model.codec;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import org.junit.jupiter.api.BeforeEach;

public class JsonObjectDecoderTest {
private JsonObjectDecoder jsonObjectDecoder;
private Record<Event> receivedRecord;
private Instant receivedTime;

private JsonObjectDecoder createObjectUnderTest() {
return new JsonObjectDecoder();
}

@BeforeEach
void setup() {
jsonObjectDecoder = createObjectUnderTest();
receivedRecord = null;
}

@Test
void test_basicJsonObjectDecoder() {
String stringValue = UUID.randomUUID().toString();
Random r = new Random();
int intValue = r.nextInt();
String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}";
try {
jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
receivedRecord = record;
});
} catch (Exception e){}

assertNotEquals(receivedRecord, null);
Map<String, Object> map = receivedRecord.getData().toMap();
assertThat(map.get("key1"), equalTo(stringValue));
assertThat(map.get("key2"), equalTo(intValue));
}

@Test
void test_basicJsonObjectDecoder_withTimeReceived() {
String stringValue = UUID.randomUUID().toString();
Random r = new Random();
int intValue = r.nextInt();

String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}";
final Instant now = Instant.now();
try {
jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
receivedRecord = record;
receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime();
});
} catch (Exception e){}

assertNotEquals(receivedRecord, null);
Map<String, Object> map = receivedRecord.getData().toMap();
assertThat(map.get("key1"), equalTo(stringValue));
assertThat(map.get("key2"), equalTo(intValue));
assertThat(receivedTime, equalTo(now));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {

final RouterGetRecordStrategy getRecordStrategy =
new RouterCopyRecordStrategy(eventFactory,
(source.areAcknowledgementsEnabled()) ?
(source.areAcknowledgementsEnabled() || buffer.isByteBuffer()) ?
acknowledgementSetManager :
InactiveAcknowledgementSetManager.getInstance(),
sinks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.codec.JsonObjectDecoder;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
Expand Down Expand Up @@ -64,7 +64,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.pipelineName = pipelineDescription.getPipelineName();
this.byteDecoder = new JsonDecoder();
this.byteDecoder = new JsonObjectDecoder();
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
final PluginSetting authenticationPluginSetting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe
}
try {
if (buffer.isByteBuffer()) {
// jsonList is ignored in this path but parse() was done to make
// sure that the data is in the expected json format
buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis);
for (final String json: jsonList) {
buffer.writeBytes(json.getBytes(), null, bufferWriteTimeoutInMillis);
}
} else {
final List<Record<Log>> records = jsonList.stream()
.map(this::buildRecordLog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,6 +127,10 @@ void setUp() {
byteDecoder = null;
}

private KafkaBuffer createObjectUnderTestWithJsonDecoder() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new JsonDecoder(), null, null);
}

private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null);
}
Expand Down Expand Up @@ -193,6 +198,70 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
void writeBigJson_and_read() throws Exception {
final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder();

String inputJson = "[";
final int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
String boolString = (i % 2 == 0) ? "true" : "false";
if (i != 0)
inputJson += ",";
inputJson += "{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}";
}
inputJson += "]";
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(numRecords));

Object[] outputRecords = readResult.getKey().toArray();
for (int i = 0; i < numRecords; i++) {
Record<Event> receivedRecord = (Record<Event>)outputRecords[i];
assertThat(receivedRecord, notNullValue());
assertThat(receivedRecord.getData(), notNullValue());
Map<String, Object> receivedMap = receivedRecord.getData().toMap();
assertThat(receivedMap.get("key"+i), equalTo("value"+i));
assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i));
boolean expectedBoolString = (i % 2 == 0) ? true : false;
assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString));
}
}

@Test
void writeMultipleSmallJson_and_read() throws Exception {
final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder();

final int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
String boolString = (i % 2 == 0) ? "true" : "false";
String inputJson = "[{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}]";
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);
}

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(numRecords));

Object[] outputRecords = readResult.getKey().toArray();
for (int i = 0; i < numRecords; i++) {
Record<Event> receivedRecord = (Record<Event>)outputRecords[i];
assertThat(receivedRecord, notNullValue());
assertThat(receivedRecord.getData(), notNullValue());
Map<String, Object> receivedMap = receivedRecord.getData().toMap();
assertThat(receivedMap.get("key"+i), equalTo("value"+i));
assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i));
boolean expectedBoolString = (i % 2 == 0) ? true : false;
assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString));
}
}

@Test
void writeBytes_and_read() throws Exception {
byteDecoder = new JsonDecoder();
Expand Down

0 comments on commit c705c09

Please sign in to comment.