Skip to content

Commit

Permalink
Release events that are not routed to any sinks (#3959)
Browse files Browse the repository at this point in the history
* Release events that are not routed to any sinks

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed a bug in the code that's causing the test failures

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Modified to determine unrouted events after all routing is done

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Add test yaml files

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
kkondaka and Krishna Kondaka authored Jan 16, 2024
1 parent 224518f commit f21937a
Showing 7 changed files with 255 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -35,6 +36,8 @@ class PipelinesWithAcksIT {
private static final String TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-parallel-pipelines-test.yaml";
private static final String THREE_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test.yaml";
private static final String THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-test.yaml";
private static final String THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-unrouted-test.yaml";
private static final String THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-default-test.yaml";
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml";
@@ -120,6 +123,22 @@ void three_pipelines_with_multiple_records() {
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void three_pipelines_with_all_unrouted_records() {
setUp(THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST);
final int numRecords = 2;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(inMemorySourceAccessor != null);
assertTrue(inMemorySourceAccessor.getAckReceived() != null);
});
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords.size(), equalTo(0));
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void three_pipelines_with_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST);
@@ -130,7 +149,23 @@ void three_pipelines_with_route_and_multiple_records() {
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), lessThanOrEqualTo(numRecords));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void three_pipelines_with_default_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST);
final int numRecords = 10;

inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(2*numRecords));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
three-pipelines-route-test-1:
delay: 2
source:
in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
route:
- 2xx_route: '/status >= 200 and /status < 300'
- other_route: '/status >= 300 or /status < 200'
sink:
- pipeline:
name: "three-pipelines-route-test-2"
routes:
- 2xx_route
- pipeline:
name: "three-pipelines-route-test-3"
routes:
- other_route
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

three-pipelines-route-test-2:
source:
pipeline:
name: "three-pipelines-route-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

three-pipelines-route-test-3:
source:
pipeline:
name: "three-pipelines-route-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
three-pipelines-unrouted-test-1:
delay: 2
source:
in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
route:
- 1xx_route: '/status >= 700 and /status < 800'
- other_route: '/status >= 800 and /status < 900'
sink:
- pipeline:
name: "three-pipelines-unrouted-test-2"
routes:
- 1xx_route
- pipeline:
name: "three-pipelines-unrouted-test-3"
routes:
- other_route

three-pipelines-unrouted-test-2:
source:
pipeline:
name: "three-pipelines-unrouted-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

three-pipelines-unrouted-test-3:
source:
pipeline:
name: "three-pipelines-unrouted-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

Original file line number Diff line number Diff line change
@@ -6,24 +6,29 @@
package org.opensearch.dataprepper.pipeline.router;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.parser.DataFlowComponent;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Provides routing of event records over a collection of {@link DataFlowComponent} objects.
*/
public class Router {
private final RouteEventEvaluator routeEventEvaluator;
private final DataFlowComponentRouter dataFlowComponentRouter;
private final Consumer<Event> noRouteHandler;

Router(final RouteEventEvaluator routeEventEvaluator, final DataFlowComponentRouter dataFlowComponentRouter) {
Router(final RouteEventEvaluator routeEventEvaluator, final DataFlowComponentRouter dataFlowComponentRouter, final Consumer<Event> noRouteHandler) {
this.routeEventEvaluator = Objects.requireNonNull(routeEventEvaluator);
this.dataFlowComponentRouter = dataFlowComponentRouter;
this.noRouteHandler = noRouteHandler;
}

public <C> void route(
@@ -38,8 +43,19 @@ public <C> void route(

final Map<Record, Set<String>> recordsToRoutes = routeEventEvaluator.evaluateEventRoutes(allRecords);

Set<Record> recordsUnRouted = new HashSet<>(allRecords);

for (DataFlowComponent<C> dataFlowComponent : dataFlowComponents) {
dataFlowComponentRouter.route(allRecords, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
dataFlowComponentRouter.route(allRecords, dataFlowComponent, recordsToRoutes, getRecordStrategy, (component, records) -> {
recordsUnRouted.removeAll(records);
componentRecordsConsumer.accept(component, records);
});
}

for (Record record: recordsUnRouted) {
if (record.getData() instanceof Event) {
noRouteHandler.accept((Event)record.getData());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ public class RouterFactory {

public Router createRouter(final Set<ConditionalRoute> routes) {
final RouteEventEvaluator routeEventEvaluator = new RouteEventEvaluator(expressionEvaluator, routes);
return new Router(routeEventEvaluator, dataFlowComponentRouter);
return new Router(routeEventEvaluator, dataFlowComponentRouter,
event -> event.getEventHandle().release(true));
}
}
Original file line number Diff line number Diff line change
@@ -9,24 +9,32 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.util.Collections;
import java.util.Set;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;

@ExtendWith(MockitoExtension.class)
class RouterFactoryTest {

@Mock
private ExpressionEvaluator expressionEvaluator;
private Set<ConditionalRoute> routes;
private Consumer<Event> consumer;

@BeforeEach
void setUp() {
@@ -58,4 +66,20 @@ void createRouter_returns_new_Router_with_empty_routes() {

assertThat(router, notNullValue());
}

@Test
void test_createRouterWithUnroutedHandler() {
try (final MockedConstruction<Router> ignored =
mockConstruction(Router.class, (mock, context) -> {
consumer = (Consumer<Event>) context.arguments().get(2);
})) {
Event event = mock(Event.class);
EventHandle eventHandle = mock(EventHandle.class);
when(event.getEventHandle()).thenReturn(eventHandle);
final Router router = createObjectUnderTest().createRouter(routes);
consumer.accept(event);
verify(event.getEventHandle()).release(true);
verify(eventHandle).release(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -13,13 +13,18 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.parser.DataFlowComponent;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -30,6 +35,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;

@ExtendWith(MockitoExtension.class)
class RouterTest {
@@ -45,6 +52,8 @@ class RouterTest {
@Mock
private RouterGetRecordStrategy getRecordStrategy;

private Consumer<Event> noRouteHandler;

private Collection<Record> recordsIn;

private static class TestComponent {
@@ -58,7 +67,13 @@ void setUp() {
}

private Router createObjectUnderTest() {
return new Router(routeEventEvaluator, dataFlowComponentRouter);
noRouteHandler = mock(Consumer.class);
return new Router(routeEventEvaluator, dataFlowComponentRouter, noRouteHandler);
}

private Router createObjectUnderTestWithDataFlowComponentRouter() {
noRouteHandler = mock(Consumer.class);
return new Router(routeEventEvaluator, new DataFlowComponentRouter(), noRouteHandler);
}

@Test
@@ -123,7 +138,7 @@ void route_with_single_DataFlowComponent() {

createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

verify(dataFlowComponentRouter).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}

@Test
@@ -138,11 +153,82 @@ void route_with_multiple_DataFlowComponents() {
createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

for (DataFlowComponent<TestComponent> dataFlowComponent : dataFlowComponents) {
verify(dataFlowComponentRouter).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}
}
}

@Nested
class WithUnroutedRecords {
@Test
void route_with_multiple_DataFlowComponents_with_unrouted_events() {
Event event1 = mock(Event.class);
Event event2 = mock(Event.class);
Event event3 = mock(Event.class);
EventHandle eventHandle3 = mock(EventHandle.class);
Record record1 = mock(Record.class);
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record record4 = mock(Record.class);
when(record3.getData()).thenReturn(event3);
Object notAnEvent = mock(Object.class);
when(record4.getData()).thenReturn(notAnEvent);
List<Record> recordsIn = List.of(record1, record2, record3, record4);
Map<Record, Set<String>> recordsToRoutes = new HashMap<>();
recordsToRoutes.put(record1, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record2, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record3, Set.of());
recordsToRoutes.put(record4, Set.of());
when(routeEventEvaluator.evaluateEventRoutes(recordsIn)).thenReturn(recordsToRoutes);
dataFlowComponents = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final DataFlowComponent dataFlowComponent = mock(DataFlowComponent.class);
dataFlowComponents.add(dataFlowComponent);
}

createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

for (DataFlowComponent<TestComponent> dataFlowComponent : dataFlowComponents) {
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}
// Verify noRouteHandler gets invoked only for record3 and not
// for record4, because record4 has non-Event type data
verify(noRouteHandler, times(1)).accept(any());
}

@Test
void route_with_multiple_DataFlowComponents_with_unrouted_events_and_sink_with_noroutes() {
Event event1 = mock(Event.class);
Event event2 = mock(Event.class);
Event event3 = mock(Event.class);
EventHandle eventHandle3 = mock(EventHandle.class);
Record record1 = mock(Record.class);
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record record4 = mock(Record.class);
Object notAnEvent = mock(Object.class);
List<Record> recordsIn = List.of(record1, record2, record3, record4);
Map<Record, Set<String>> recordsToRoutes = new HashMap<>();
recordsToRoutes.put(record1, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record2, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record3, Set.of());
recordsToRoutes.put(record4, Set.of());
when(routeEventEvaluator.evaluateEventRoutes(recordsIn)).thenReturn(recordsToRoutes);
dataFlowComponents = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final DataFlowComponent dataFlowComponent = mock(DataFlowComponent.class);
dataFlowComponents.add(dataFlowComponent);
final Set<String> routes = i ==0 ? Collections.emptySet() : Set.of(UUID.randomUUID().toString());
when(dataFlowComponent.getRoutes()).thenReturn(routes);
}
when(getRecordStrategy.getAllRecords(any())).thenReturn(recordsIn);

createObjectUnderTestWithDataFlowComponentRouter().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

verify(noRouteHandler, times(0)).accept(any());
}
}

@Nested
class WithRecords {

@@ -169,7 +255,7 @@ void route_with_single_DataFlowComponent() {

createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

verify(dataFlowComponentRouter).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}

@Test
@@ -184,10 +270,11 @@ void route_with_multiple_DataFlowComponents() {
createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

for (DataFlowComponent<TestComponent> dataFlowComponent : dataFlowComponents) {
verify(dataFlowComponentRouter).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}
}


@Test
void route_with_multiple_DataFlowComponents_And_Strategy() {
final DataFlowComponent dataFlowComponent = mock(DataFlowComponent.class);
@@ -196,7 +283,7 @@ void route_with_multiple_DataFlowComponents_And_Strategy() {
dataFlowComponents.add(dataFlowComponent);
}
createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter, times(5)).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
verify(dataFlowComponentRouter, times(5)).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}
}
}

0 comments on commit f21937a

Please sign in to comment.