Skip to content

Commit

Permalink
Moved ZeroBuffer Implementation into data-prepper-core and addressed …
Browse files Browse the repository at this point in the history
…comments

Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
  • Loading branch information
MohammedAghil committed Feb 7, 2025
1 parent c90320f commit 4488fb8
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 306 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.core.pipeline;

public interface SupportsPipelineRunner {
PipelineRunner getPipelineRunner();

void setPipelineRunner(PipelineRunner pipelineRunner);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.dataprepper.plugins.buffer.zerobuffer;
package org.opensearch.dataprepper.core.pipeline.buffer;

import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer;
import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -12,25 +14,21 @@
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.AccessLevel;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> extends AbstractZeroBuffer<T> {
@DataPrepperPlugin(name = "zero", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> implements Buffer<T>, SupportsPipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
@Getter(value = AccessLevel.PACKAGE)
private final String pipelineName;
@Getter(value = AccessLevel.PACKAGE)
private PipelineRunner pipelineRunner;
@VisibleForTesting
final String pipelineName;
private final Counter writeRecordsCounter;
@Getter(value = AccessLevel.PACKAGE)
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
Expand All @@ -55,7 +53,7 @@ public void write(T record, int timeoutInMillis) throws TimeoutException {
threadLocalStore.get().add(record);
writeRecordsCounter.increment();

runAllProcessorsAndPublishToSinks();
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
Expand All @@ -65,14 +63,14 @@ public void writeAll(Collection<T> records, int timeoutInMillis) throws Exceptio
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(records);
threadLocalStore.set(new ArrayList<>(records));
} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment(records.size() * 1.0);
runAllProcessorsAndPublishToSinks();
writeRecordsCounter.increment((double) records.size());
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
Expand All @@ -86,10 +84,10 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment(storedRecords.size() * 1.0);
readRecordsCounter.increment((double) storedRecords.size());
}

return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState);
return Map.entry(storedRecords, checkpointState);
}

@Override
Expand All @@ -99,4 +97,14 @@ public void checkpoint(CheckpointState checkpointState) {}
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}

@Override
public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

@Override
public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ZeroBufferTests {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBufferTests.class);
private static final String MOCK_PIPELINE_NAME = "mock-pipeline";
private static final int WRITE_TIMEOUT = 100;
private static final int READ_TIMEOUT = 500;
private static final String SINGLE_RECORD_DATA_FORMAT = "{\"message\":\"test\"}";
private static final String BATCH_RECORDS_DATA_FORMAT = "{\"message\":\"test-%d\"}";

@BeforeEach
public void setup() {
new ArrayList<>(Metrics.globalRegistry.getRegistries())
.forEach(Metrics.globalRegistry::remove);
new ArrayList<>(Metrics.globalRegistry.getMeters())
.forEach(Metrics.globalRegistry::remove);
Metrics.addRegistry(new SimpleMeterRegistry());
}

@Nested
class WriteTests {
@Test
public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

Collection<Record<String>> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
assertEquals(1, readRecords.size());

assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData());

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
readRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
assertEquals(1, readRecords.size());
}

@Test
public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

Collection<Record<String>> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
assertEquals(2, readRecords.size());
assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData());
assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData());
}

@Test
public void testWriteAllAndReadReturnsAllRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

Collection<Record<String>> writeRecords = generateRecords(IntStream.range(0, 10)
.mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i))
.collect(Collectors.toList()));
zeroBuffer.writeAll(writeRecords, WRITE_TIMEOUT);

Map.Entry<Collection<Record<String>>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT);
Collection<Record<String>> readRecords = readRecordsMap.getKey();
for (Record<String> record : readRecords) {
LOG.debug(record.getData());
}

// Ensure that the write records are the same as the read records
assertEquals(writeRecords.size(), readRecords.size());
}

@Test
public void testWriteNullRecordThrowsException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

Exception writeException = assertThrows(NullPointerException.class, () -> {
zeroBuffer.write(null, WRITE_TIMEOUT);
});

Exception writeAllException = assertThrows(NullPointerException.class, () -> {
zeroBuffer.writeAll(null, WRITE_TIMEOUT);
});

assertEquals("The write record cannot be null", writeException.getMessage());
assertEquals("The write records cannot be null", writeAllException.getMessage());
}

@Test
public void testWriteEmptyRecordDoesNotThrowException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

Record<String> emptyRecord = generateRecord(null);
Collection<Record<String>> emptyRecordCollection = generateRecords(new ArrayList<>());

assertDoesNotThrow(() -> zeroBuffer.write(emptyRecord, WRITE_TIMEOUT));
assertDoesNotThrow(() -> zeroBuffer.writeAll(emptyRecordCollection, WRITE_TIMEOUT));
}

@Test
public void testThreadReadAndWriteIsolation() throws Exception {
final ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();

Thread workerThread = new Thread(() -> {
try {
PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class);
zeroBuffer.setPipelineRunner(pipelineRunnerMock);
doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks();
zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
} catch (TimeoutException e) {
fail("Timeout exception occurred");
}
});
workerThread.start();
workerThread.join();

// Ensure that main thread does not share the same records store as the worker thread
assertEquals(0, zeroBuffer.read(READ_TIMEOUT).getKey().size());
assertTrue(zeroBuffer.isEmpty());
}

@Test
public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
zeroBuffer.writeAll(generateRecords(IntStream.range(0, 10)
.mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i))
.collect(Collectors.toList())), WRITE_TIMEOUT);

Collection<Record<String>> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
for (Record<String> record : readRecords) {
LOG.debug(record.getData());
}
assertEquals(11, readRecords.size());
}
}

@Nested
class ReadTests {
@Test
public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

Collection<Record<String>> initialReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
Collection<Record<String>> secondAttemptToReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey();

assertEquals(1, initialReadRecords.size());
assertEquals(SINGLE_RECORD_DATA_FORMAT, initialReadRecords.iterator().next().getData());

assertEquals(0, secondAttemptToReadRecords.size());
}

@Test
public void testReadFromEmptyBufferReturnsNoRecords() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

Map.Entry<Collection<Record<String>>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT);
assertTrue(readRecordsMap.getKey().isEmpty());
}
}

@Nested
class EmptyBufferTests {
@Test
public void testIsEmptyReturnsTrueWhenBufferIsEmpty() {
ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();
assertTrue(zeroBuffer.isEmpty());
}

@Test
public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

assertFalse(zeroBuffer.isEmpty());
}
}

@Nested
class CommonTests {
@Test
public void testCreateZeroBufferWithPipelineName() {
ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();
assertEquals(MOCK_PIPELINE_NAME, zeroBuffer.pipelineName);
}

@Test
public void testCheckpointDoesNotThrowException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
assertDoesNotThrow(() -> zeroBuffer.checkpoint(null));
assertDoesNotThrow(() -> zeroBuffer.checkpoint(new CheckpointState(0)));
}
}

/*-------------------------Private Helper Methods---------------------------*/
private <T> Record<T> generateRecord(final T data) {
return new Record<>(data);
}

private <T> Collection<Record<T>> generateRecords(Collection<T> data) {
Collection<Record<T>> records = new ArrayList<>();
for (T recordData : data) {
Record<T> record = new Record<>(recordData);
records.add(record);
}
return records;
}

private <T> ZeroBuffer<Record<T>> setupAndInitializeZeroBuffer() {
ZeroBuffer<Record<T>> zeroBuffer = initializeZeroBufferWithPipelineName();
PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class);
zeroBuffer.setPipelineRunner(pipelineRunnerMock);
doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks();
return zeroBuffer;
}

private <T> ZeroBuffer<Record<T>> initializeZeroBufferWithPipelineName() {
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(MOCK_PIPELINE_NAME);
return new ZeroBuffer<>(pipelineDescription);
}
}
Loading

0 comments on commit 4488fb8

Please sign in to comment.