Skip to content

Commit

Permalink
Add append option
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Feb 16, 2024
1 parent faddf01 commit a84b462
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,12 +68,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);

} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
}
} catch (Exception e) {
Expand All @@ -97,4 +101,30 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) {
final Object currentValue = recordEvent.get(key, Object.class);
final List<Object> mergedValue = new ArrayList<>();
if (currentValue instanceof List) {
mergedValue.addAll((List<Object>) currentValue);
} else {
mergedValue.add(currentValue);
}

mergedValue.add(value);
recordEvent.put(key, mergedValue);
}

private void mergeValueToEventMetadata(final Event recordEvent, final String key, final Object value) {
final Object currentValue = recordEvent.getMetadata().getAttribute(key);
final List<Object> mergedValue = new ArrayList<>();
if (currentValue instanceof List) {
mergedValue.addAll((List<Object>) currentValue);
} else {
mergedValue.add(currentValue);
}

mergedValue.add(value);
recordEvent.getMetadata().setAttribute(key, mergedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static class Entry {
@JsonProperty("overwrite_if_key_exists")
private boolean overwriteIfKeyExists = false;

@JsonProperty("append_if_key_exists")
private boolean appendIfKeyExists = false;

public String getKey() {
return key;
}
Expand All @@ -58,19 +61,29 @@ public boolean getOverwriteIfKeyExists() {
return overwriteIfKeyExists;
}

public boolean getAppendIfKeyExists() {
return appendIfKeyExists;
}

public String getAddWhen() { return addWhen; }

@AssertTrue(message = "Either value or format or expression must be specified, and only one of them can be specified")
public boolean hasValueOrFormatOrExpression() {
return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1;
}

@AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set at the same time.")
boolean overwriteAndAppendNotBothSet() {
return !(overwriteIfKeyExists && appendIfKeyExists);
}

public Entry(final String key,
final String metadataKey,
final Object value,
final String format,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen)
{
if (key != null && metadataKey != null) {
Expand All @@ -85,6 +98,7 @@ public Entry(final String key,
this.format = format;
this.valueExpression = valueExpression;
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
}

Expand Down
Loading

0 comments on commit a84b462

Please sign in to comment.