Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable the circuit breaker for buffers that write data off-heap only… #3619

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ default Duration getDrainTimeout() {
return Duration.ZERO;
}

/**
* Indicates if writes to this buffer are also in some way written
* onto the JVM heap. If writes do go on heap, this should <b>false</b>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be false..

* which is the default.
*
* @return True if this buffer does not write to the JVM heap.
*/
default boolean isWrittenOffHeapOnly() {
return false;
}

/**
* shuts down the buffer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public Duration getDrainTimeout() {
return delegateBuffer.getDrainTimeout();
}

@Override
public boolean isWrittenOffHeapOnly() {
return delegateBuffer.isWrittenOffHeapOnly();
}

@Override
public void shutdown() {
delegateBuffer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,55 @@

package org.opensearch.dataprepper.model.buffer;

import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;

public class BufferTest {
class BufferTest {

private Buffer createObjectUnderTest() {
return spy(Buffer.class);
}

@Test
public void testGetDrainTimeout() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
void testGetDrainTimeout() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();

Assert.assertEquals(Duration.ZERO, buffer.getDrainTimeout());
assertEquals(Duration.ZERO, buffer.getDrainTimeout());
}

@Test
public void testShutdown() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
buffer.shutdown();
}

@Test
public void testIsByteBuffer() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
void testIsByteBuffer() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();

Assert.assertEquals(false, buffer.isByteBuffer());
assertEquals(false, buffer.isByteBuffer());
}

@Test
void isWrittenOffHeapOnly_returns_false_by_default() {
assertThat(createObjectUnderTest().isWrittenOffHeapOnly(), equalTo(false));
}

@Test
public void testWriteBytes() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
void testWriteBytes() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();

byte[] bytes = new byte[2];
Assert.assertThrows(UnsupportedOperationException.class, () -> buffer.writeBytes(bytes, "", 10));
assertThrows(UnsupportedOperationException.class, () -> buffer.writeBytes(bytes, "", 10));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ void getDrainTimeout_returns_inner_getDrainTimeout() {
equalTo(drainTimeout));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void isWrittenOffHeapOnly_returns_inner_isWrittenOffHeapOnly(final boolean isWrittenOffHeapOnly) {
when(innerBuffer.isWrittenOffHeapOnly()).thenReturn(isWrittenOffHeapOnly);

assertThat(createObjectUnderTest().isWrittenOffHeapOnly(),
equalTo(isWrittenOffHeapOnly));
}

@Test
void shutdown_calls_inner_shutdown() {
createObjectUnderTest().shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.parser;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.DelegatingBuffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
* Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator
* acts as a pass-through to the primary buffer for most methods except those that rely on a combination of the primary
* and second. For example, isEmpty depends on all buffers being empty.
*
* @since 2.0
*/
class MultiBufferDecorator<T extends Record<?>> extends DelegatingBuffer<T> implements Buffer<T> {
private final List<Buffer> allBuffers;

MultiBufferDecorator(final Buffer primaryBuffer, final List<Buffer> secondaryBuffers) {
super(primaryBuffer);
allBuffers = new ArrayList<>(1 + secondaryBuffers.size());
allBuffers.add(primaryBuffer);
allBuffers.addAll(secondaryBuffers);
}

@Override
public boolean isEmpty() {
return allBuffers.stream().allMatch(Buffer::isEmpty);
}

@Override
public Duration getDrainTimeout() {
return allBuffers.stream()
.map(Buffer::getDrainTimeout)
.reduce(Duration.ZERO, Duration::plus);
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.dataprepper.pipeline.PipelineConnector;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.plugins.MultiBufferDecorator;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -147,15 +146,7 @@ private void buildPipelineFromConfiguration(
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers);


final Buffer buffer;
if(source instanceof PipelineConnector) {
buffer = multiBufferDecorator;
} else {
buffer = circuitBreakerManager.getGlobalCircuitBreaker()
.map(circuitBreaker -> new CircuitBreakingBuffer<>(multiBufferDecorator, circuitBreaker))
.map(b -> (Buffer)b)
.orElseGet(() -> multiBufferDecorator);
}
final Buffer buffer = applyCircuitBreakerToBuffer(source, multiBufferDecorator);

final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());

Expand Down Expand Up @@ -313,4 +304,17 @@ List<Buffer> getSecondaryBuffers() {
.map(innerEntry -> innerEntry.getValue())
.collect(Collectors.toList());
}

private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buffer) {
if (source instanceof PipelineConnector)
return buffer;

if(buffer.isWrittenOffHeapOnly())
return buffer;

return circuitBreakerManager.getGlobalCircuitBreaker()
.map(circuitBreaker -> new CircuitBreakingBuffer<>(buffer, circuitBreaker))
.map(b -> (Buffer) b)
.orElseGet(() -> buffer);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.opensearch.dataprepper;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;

import java.io.File;
import java.io.IOException;
Expand All @@ -38,6 +38,7 @@ public class TestDataProvider {
public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_extensions.yml";
public static final String VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE = "src/test/resources/single_pipeline_valid_empty_source_plugin_settings.yml";
public static final String VALID_OFF_HEAP_FILE = "src/test/resources/single_pipeline_valid_off_heap_buffer.yml";
public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml";
public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins;
package org.opensearch.dataprepper.parser;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -303,6 +304,27 @@ void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applie
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_uses_unwrapped_buffer_when_circuit_breakers_applied_but_Buffer_is_off_heap() {
final PipelineTransformer objectUnderTest =
createObjectUnderTest(TestDataProvider.VALID_OFF_HEAP_FILE);

final Map<String, Pipeline> pipelineMap = objectUnderTest.transformConfiguration();

assertThat(pipelineMap.size(), equalTo(1));
assertThat(pipelineMap, hasKey("test-pipeline-1"));
final Pipeline pipeline = pipelineMap.get("test-pipeline-1");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getBuffer(), notNullValue());
assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class)));

verifyNoInteractions(circuitBreakerManager);
verify(dataPrepperConfiguration).getProcessorShutdownTimeout();
verify(dataPrepperConfiguration).getSinkShutdownTimeout();
verify(dataPrepperConfiguration).getPeerForwarderConfiguration();
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_applied() {
when(circuitBreakerManager.getGlobalCircuitBreaker())
Expand Down
Loading
Loading