Skip to content

Commit

Permalink
Provide a config option to do node local aggregation (#4306)
Browse files Browse the repository at this point in the history
* Provide a config option to do node local aggregation

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified config option to be local_mode

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Mar 21, 2024
1 parent 8faf7d1 commit 5596c57
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 0 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
### <a name="when"></a>
* `when` (Optional): A `String` that represents a condition that must be evaluated to true for the aggregation to be applied on the event. Events that do not evaluate to true on the condition are skipped. Default is no condition which means all events are included in the aggregation.

### <a name="local_only"></a>
* `local_only` (Optional): A `Boolean` indicating if the aggregation should be done local to node instead of forwarding to remote peers.

## Available Aggregate Actions

### <a name="remove_duplicates"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private final AggregateAction aggregateAction;

private boolean forceConclude = false;
private boolean localMode = false;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -69,6 +70,7 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
this.actionHandleEventsOutCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_OUT);
this.actionHandleEventsDroppedCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_DROPPED);
this.whenCondition = aggregateProcessorConfig.getWhenCondition();
this.localMode = aggregateProcessorConfig.getLocalMode();

pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize);
}
Expand Down Expand Up @@ -153,6 +155,9 @@ public boolean isApplicableEventForPeerForwarding(Event event) {
if (whenCondition == null) {
return true;
}
if (localMode) {
return false;
}
return expressionEvaluator.evaluateConditional(whenCondition, event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class AggregateProcessorConfig {
@NotNull
private PluginModel aggregateAction;

@JsonProperty("local_mode")
@NotNull
private Boolean localMode = false;

@JsonProperty("aggregate_when")
private String whenCondition;

Expand All @@ -43,6 +47,10 @@ public String getWhenCondition() {
return whenCondition;
}

public Boolean getLocalMode() {
return localMode;
}

public PluginModel getAggregateAction() { return aggregateAction; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public void testDefault() {
final AggregateProcessorConfig aggregateConfig = new AggregateProcessorConfig();

assertThat(aggregateConfig.getGroupDuration(), equalTo(Duration.ofSeconds(AggregateProcessorConfig.DEFAULT_GROUP_DURATION_SECONDS)));
assertThat(aggregateConfig.getLocalMode(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private AggregateProcessor createObjectUnderTest() {
@BeforeEach
void setUp() {
when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(false);
when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -247,6 +248,69 @@ void handleEvent_returning_with_condition_eliminates_one_record() {
verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
final String eventKey = UUID.randomUUID().toString();
final String key1 = UUID.randomUUID().toString();
final String key2 = UUID.randomUUID().toString();
final String condition = "/" + eventKey + " == "+key1;
Event firstEvent;
Event secondEvent;
final Map<String, Object> eventMap1 = new HashMap<>();
eventMap1.put(eventKey, key1);

firstEvent = JacksonEvent.builder()
.withData(eventMap1)
.withEventType("event")
.build();

final Map<String, Object> eventMap2 = new HashMap<>();
eventMap2.put(eventKey, key2);

secondEvent = JacksonEvent.builder()
.withData(eventMap2)
.withEventType("event")
.build();


when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(true);
final AggregateProcessor objectUnderTest = createObjectUnderTest();
when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.emptyList());
when(aggregateActionResponse.getEvent()).thenReturn(event);
when(firstAggregateActionResponse.getEvent()).thenReturn(firstEvent);

event.toMap().put(eventKey, key1);
List<Record<Event>> recordsIn = new ArrayList<>();
recordsIn.add(new Record<Event>(firstEvent));
recordsIn.add(new Record<Event>(secondEvent));
recordsIn.add(new Record<Event>(event));
Collection<Record<Event>> c = recordsIn;
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false));
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(c);

assertThat(recordsOut.size(), equalTo(2));
assertThat(recordsOut.get(0), notNullValue());
assertThat(recordsOut.get(0).getData(), equalTo(firstEvent));
assertThat(recordsOut.get(1), notNullValue());
assertThat(recordsOut.get(1).getData(), equalTo(event));

verify(actionHandleEventsDroppedCounter).increment(1);
verify(actionHandleEventsOutCounter).increment(2);
verifyNoInteractions(actionConcludeGroupEventsDroppedCounter);
verifyNoInteractions(actionConcludeGroupEventsOutCounter);

verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_with_event_adds_event_to_records_out() {
final AggregateProcessor objectUnderTest = createObjectUnderTest();
Expand Down

0 comments on commit 5596c57

Please sign in to comment.