Skip to content

Commit

Permalink
Zero Buffer Implementation and Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
  • Loading branch information
MohammedAghil committed Feb 4, 2025
1 parent 7a3cf87 commit 017498d
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.core.pipeline;

public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

/**
* Represents the base class for zero buffer implementation and implements {@link Buffer} interface.
* It provides a common functionality to run all processors and sinks within the same thread context.
*/
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> {
private PipelineRunner pipelineRunner;

protected void runAllProcessorsAndPublishToSinks() {
// TODO : Implement functionality to call the processors and sinks within the same context
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
27 changes: 27 additions & 0 deletions data-prepper-plugins/zero-buffer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-core')
implementation 'io.micrometer:micrometer-core'
implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.opensearch.dataprepper.plugins.buffer.zerobuffer;

import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.AccessLevel;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> extends AbstractZeroBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
@Getter(value = AccessLevel.PACKAGE)
private final String pipelineName;
@Getter(value = AccessLevel.PACKAGE)
private final Counter writeRecordsCounter;
@Getter(value = AccessLevel.PACKAGE)
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
this.pipelineName = pipelineDescription.getPipelineName();
this.threadLocalStore = new ThreadLocal<>();
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

@Override
public void write(T record, int timeoutInMillis) throws TimeoutException {
if (record == null) {
throw new NullPointerException("The write record cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

threadLocalStore.get().add(record);
writeRecordsCounter.increment();

runAllProcessorsAndPublishToSinks();
}

@Override
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
if (records == null) {
throw new NullPointerException("The write records cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(records);
} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment(records.size() * 1.0);
runAllProcessorsAndPublishToSinks();
}

@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

Collection<T> storedRecords = threadLocalStore.get();
CheckpointState checkpointState = new CheckpointState(0);
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment(storedRecords.size() * 1.0);
}

return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState);
}

@Override
public void checkpoint(CheckpointState checkpointState) {}

@Override
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}
}
Loading

0 comments on commit 017498d

Please sign in to comment.