-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancements to map_to_list processor #4033
Changes from 3 commits
8ad32e2
042546f
341e72a
1b240a4
5f42713
5ea0661
61a7120
6715526
934ce99
7e7b3a3
ce3aecb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
|
||
package org.opensearch.dataprepper.plugins.processor.mutateevent; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.opensearch.dataprepper.expression.ExpressionEvaluator; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
|
@@ -27,6 +29,7 @@ | |
@DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class) | ||
public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class); | ||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
private final MapToListProcessorConfig config; | ||
private final ExpressionEvaluator expressionEvaluator; | ||
private final Set<String> excludeKeySet = new HashSet<>(); | ||
|
@@ -49,36 +52,75 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor | |
} | ||
|
||
try { | ||
final Map<String, Object> sourceMap = recordEvent.get(config.getSource(), Map.class); | ||
final List<Map<String, Object>> targetList = new ArrayList<>(); | ||
|
||
Map<String, Object> modifiedSourceMap = new HashMap<>(); | ||
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) { | ||
if (excludeKeySet.contains(entry.getKey())) { | ||
if (config.getRemoveProcessedFields()) { | ||
modifiedSourceMap.put(entry.getKey(), entry.getValue()); | ||
final Map<String, Object> sourceMap = getSourceMap(recordEvent); | ||
|
||
if (config.getConvertFieldToList()) { | ||
final List<List<Object>> targetNestedList = new ArrayList<>(); | ||
|
||
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) { | ||
if (!excludeKeySet.contains(entry.getKey())) { | ||
targetNestedList.add(List.of(entry.getKey(), entry.getValue())); | ||
} | ||
continue; | ||
} | ||
targetList.add(Map.of( | ||
config.getKeyName(), entry.getKey(), | ||
config.getValueName(), entry.getValue() | ||
)); | ||
} | ||
|
||
if (config.getRemoveProcessedFields()) { | ||
recordEvent.put(config.getSource(), modifiedSourceMap); | ||
} | ||
removeProcessedFields(sourceMap, recordEvent); | ||
recordEvent.put(config.getTarget(), targetNestedList); | ||
} else { | ||
final List<Map<String, Object>> targetList = new ArrayList<>(); | ||
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) { | ||
if (!excludeKeySet.contains(entry.getKey())) { | ||
targetList.add(Map.of( | ||
config.getKeyName(), entry.getKey(), | ||
config.getValueName(), entry.getValue() | ||
)); | ||
} | ||
} | ||
removeProcessedFields(sourceMap, recordEvent); | ||
recordEvent.put(config.getTarget(), targetList); | ||
} | ||
|
||
recordEvent.put(config.getTarget(), targetList); | ||
} catch (Exception e) { | ||
LOG.error("Fail to perform Map to List operation", e); | ||
//TODO: add tagging on failure | ||
recordEvent.getMetadata().addTags(config.getTagsOnFailure()); | ||
} | ||
} | ||
return records; | ||
} | ||
|
||
private Map<String, Object> getSourceMap(Event recordEvent) throws JsonProcessingException { | ||
final Map<String, Object> sourceMap; | ||
if (config.getSource() == null) { | ||
// Source is root | ||
sourceMap = OBJECT_MAPPER.treeToValue(recordEvent.getJsonNode(), Map.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a high level question here, if you get the entire JSON node if source is null, I think it's not going to work. Correct me if I am wrong.
The result will be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I think adding test cases and documentation for euch cases would help. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asifsmohammed Good question, though it isn't necessarily related to root being the source. I think it's an issue with nested map in the source map. Currently we assume the source map structure is flat, so what you show is the expected result. We can have a recursive option to go deeper into each field in the future if needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added readme and more test cases. |
||
} else { | ||
sourceMap = recordEvent.get(config.getSource(), Map.class); | ||
} | ||
return sourceMap; | ||
} | ||
|
||
private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent) { | ||
if (!config.getRemoveProcessedFields()) { | ||
return; | ||
} | ||
|
||
if (config.getSource() == null) { | ||
// Source is root | ||
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) { | ||
if (excludeKeySet.contains(entry.getKey())) { | ||
continue; | ||
} | ||
recordEvent.delete(entry.getKey()); | ||
} | ||
} else { | ||
Map<String, Object> modifiedSourceMap = new HashMap<>(); | ||
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) { | ||
if (excludeKeySet.contains(entry.getKey())) { | ||
modifiedSourceMap.put(entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
recordEvent.put(config.getSource(), modifiedSourceMap); | ||
} | ||
} | ||
|
||
@Override | ||
public void prepareForShutdown() { | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a desirable feature?
If so, can we make it take in
/
instead ofnull
? I think this could lead to confusion when users don't set thesource
field they are probably not getting what they want.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we will need it for #3965.
In JsonPointer specification, it is actually the empty string
""
that refers to the whole document and"/"
refers to the field with""
as key. If we remove the restrictions in JacksonEvent that key cannot be empty string, we should already support referring to the root document with Json Pointer""
. What do you think of doing it this way?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did some testing if we use "/" as source, it will try to get the value with empty string as key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oeyh , That is a very useful finding.
I think we will need to make the syntax be explicit then:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented this in recent commits.