Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple entries of 'with_keys' with 'delete_when' co… #5356

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,42 @@
public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DeleteEntryProcessor.class);
private final List<EventKey> entries;
private final List<EventKey> withKeys;
private final String deleteWhen;
private final List<DeleteEntryProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.entries = config.getWithKeys();
this.withKeys = config.getWithKeys();
this.deleteWhen = config.getDeleteWhen();
this.expressionEvaluator = expressionEvaluator;

if (deleteWhen != null
&& !expressionEvaluator.isValidExpressionStatement(deleteWhen)) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen));
&& !expressionEvaluator.isValidExpressionStatement(deleteWhen)) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen));
}

if (this.withKeys != null && !this.withKeys.isEmpty()) {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen);
this.entries = List.of(entry);
} else {
this.entries = config.getEntries();
}

this.entries.forEach(entry -> {
if (entry.getDeleteWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getDeleteWhen())) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getDeleteWhen()));
}
});
}

@Override
Expand All @@ -53,13 +72,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event recordEvent = record.getData();

try {
if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) {
continue;
}


for (final EventKey entry : entries) {
recordEvent.delete(entry);
for (final DeleteEntryProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(),
recordEvent)) {
continue;
}

for (final EventKey key : entry.getWithKeys()) {
recordEvent.delete(key);
}
}
} catch (final Exception e) {
LOG.atError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
import org.opensearch.dataprepper.model.event.EventKey;
Expand All @@ -19,31 +25,90 @@

import java.util.List;

@ConditionalRequired(value = {
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null")},
thenExpect = {@SchemaProperty(field = "with_keys")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "with_keys", value = "null")},
thenExpect = {@SchemaProperty(field = "entries")}
)
})
@JsonPropertyOrder
@JsonClassDescription("The <code>delete_entries</code> processor deletes fields from events. " +
"You can define the keys you want to delete in the <code>with_keys</code> configuration. " +
"Those keys and their values are deleted from events.")
public class DeleteEntryProcessorConfig {
@NotEmpty
@NotNull

@JsonPropertyOrder
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("with_keys")
@EventKeyConfiguration(EventKeyFactory.EventAction.DELETE)
@JsonPropertyDescription("A list of keys to be deleted.")
private List<@NotNull @NotEmpty EventKey> withKeys;

@JsonProperty("delete_when")
@JsonPropertyDescription("Specifies under what condition the deletion should be performed. " +
"By default, keys are always deleted. Example: <code>/mykey == \"---\"</code>")
@ExampleValues({
@Example(value = "/some_key == null", description = "Only runs the deletion if the key some_key is null or does not exist.")
})
private String deleteWhen;

public List<EventKey> getWithKeys() {
return withKeys;
}

public String getDeleteWhen() {
return deleteWhen;
}

public Entry(final List<EventKey> withKeys, final String deleteWhen) {
this.withKeys = withKeys;
this.deleteWhen = deleteWhen;
}

public Entry() {

}
}

@JsonProperty("with_keys")
@EventKeyConfiguration(EventKeyFactory.EventAction.DELETE)
@JsonPropertyDescription("A list of keys to be deleted.")
@JsonPropertyDescription("A list of keys to be deleted (legacy format).")
private List<@NotNull @NotEmpty EventKey> withKeys;

@JsonProperty("delete_when")
@JsonPropertyDescription("Specifies under what condition the <code>delete_entries</code> processor should perform deletion. " +
"By default, keys are always deleted. Example: <code>/mykey == \"---\"</code>")
@ExampleValues({
@Example(value = "/some_key == null", description = "Only runs the delete_entries processor on the Event if the key some_key is null or does not exist.")
})
@JsonPropertyDescription("Specifies under what condition the deletion should be performed (legacy format).")
private String deleteWhen;

@Valid
@JsonProperty("entries")
@JsonPropertyDescription("A list of entries to delete from the event.")
private List<Entry> entries;

@AssertTrue(message = "Either 'entries' or 'with_keys' must be specified, but neither was found")
boolean isConfigurationPresent() {
return entries != null || withKeys != null;
}

@AssertFalse(message = "Either use 'entries' OR 'with_keys' with 'delete_when' configuration, but not both")
boolean hasBothConfigurations() {
return entries != null && withKeys != null;
}

public List<EventKey> getWithKeys() {
return withKeys;
}

public String getDeleteWhen() {
return deleteWhen;
}

public List<Entry> getEntries() {
return entries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -134,6 +135,81 @@ public void testNestedDeleteProcessorTest() {
assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
}

@Test
public void test_multiple_entries_with_different_delete_when_conditions() {
final DeleteEntryProcessorConfig.Entry entry1 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1"
, EventKeyFactory.EventAction.DELETE)), "condition1");
final DeleteEntryProcessorConfig.Entry entry2 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key2"
, EventKeyFactory.EventAction.DELETE)), "condition2");

when(mockConfig.getEntries()).thenReturn(List.of(entry1, entry2));
when(expressionEvaluator.isValidExpressionStatement("condition1")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("condition2")).thenReturn(true);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("test");
record.getData().put("key1", "value1");
record.getData().put("key2", "value2");

when(expressionEvaluator.evaluateConditional("condition1", record.getData())).thenReturn(true);
when(expressionEvaluator.evaluateConditional("condition2", record.getData())).thenReturn(false);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("key1"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("key2"), is(true));
}

@Test
public void test_legacy_format_conversion_to_entries_format() {
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));
when(mockConfig.getDeleteWhen()).thenReturn("condition");
when(expressionEvaluator.isValidExpressionStatement("condition")).thenReturn(true);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("test");

when(expressionEvaluator.evaluateConditional("condition", record.getData())).thenReturn(true);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
}

@Test
public void invalid_delete_when_with_entries_format_throws_InvalidPluginConfigurationException() {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1",
EventKeyFactory.EventAction.DELETE)), "invalid_condition");

when(mockConfig.getEntries()).thenReturn(List.of(entry));
when(expressionEvaluator.isValidExpressionStatement("invalid_condition")).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
public void test_both_configurations_used_together() {
final DeleteEntryProcessorConfig configObjectUnderTest = new DeleteEntryProcessorConfig();
final DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1"
, EventKeyFactory.EventAction.DELETE)), "condition");

ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", List.of(eventKeyFactory.createEventKey("message",
EventKeyFactory.EventAction.DELETE)));
ReflectionTestUtils.setField(configObjectUnderTest, "entries", List.of(entry));

assertThat(configObjectUnderTest.hasBothConfigurations(), is(true));
}

@Test
public void test_no_configuration_used() {
final DeleteEntryProcessorConfig configObjectUnderTest = new DeleteEntryProcessorConfig();

ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", null);
ReflectionTestUtils.setField(configObjectUnderTest, "entries", null);

assertThat(configObjectUnderTest.isConfigurationPresent(), is(false));
}

private DeleteEntryProcessor createObjectUnderTest() {
return new DeleteEntryProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}
Expand Down
Loading