Skip to content

Commit

Permalink
Update plugin names to use processor (opensearch-project#1838)
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Sep 30, 2022
1 parent d172bdf commit 967919b
Show file tree
Hide file tree
Showing 31 changed files with 349 additions and 349 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ It is a great way to reduce unnecessary log volume and create aggregated logs ov
## Usage

The following pipeline configuration will aggregate Events based on the entries with keys `sourceIp`, `destinationIp`, and `port`. It uses the [remove_duplicates](#remove_duplicates) action.
While not necessary, a great way to set up the Aggregate Processor [identification_keys](#identification_keys) is with the [Grok Processor](../grok-prepper/README.md) as shown below.
While not necessary, a great way to set up the Aggregate Processor [identification_keys](#identification_keys) is with the [Grok Processor](../grok-processor/README.md) as shown below.
```yaml
source:
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper;
package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -13,9 +13,9 @@
import java.util.Collection;

@DataPrepperPlugin(name = "no-op", pluginType = Processor.class)
public class NoOpPrepper implements Processor<Record<Event>, Record<Event>> {
public class NoOpProcessor implements Processor<Record<Event>, Record<Event>> {

public NoOpPrepper() {
public NoOpProcessor() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper;
package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -27,9 +27,9 @@
* A simple String implementation of {@link Processor} which generates new Records with uppercase or lowercase content. The current
* simpler implementation does not handle errors (if any).
*/
@DataPrepperPlugin(name = "string_converter", pluginType = Processor.class, pluginConfigurationType = StringPrepper.Configuration.class)
public class StringPrepper implements Processor<Record<Event>, Record<Event>> {
private static Logger LOG = LoggerFactory.getLogger(StringPrepper.class);
@DataPrepperPlugin(name = "string_converter", pluginType = Processor.class, pluginConfigurationType = StringProcessor.Configuration.class)
public class StringProcessor implements Processor<Record<Event>, Record<Event>> {
private static Logger LOG = LoggerFactory.getLogger(StringProcessor.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};

Expand All @@ -51,14 +51,14 @@ public void setUpperCase(final boolean upperCase) {

/**
* Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper
* runtime engine to construct an instance of {@link StringPrepper} using an instance of {@link PluginSetting} which
* runtime engine to construct an instance of {@link StringProcessor} using an instance of {@link PluginSetting} which
* has access to pluginSetting metadata from pipeline
* pluginSetting file.
*
* @param configuration instance with metadata information from pipeline pluginSetting file.
*/
@DataPrepperPluginConstructor
public StringPrepper(final Configuration configuration) {
public StringProcessor(final Configuration configuration) {
this.upperCase = configuration.getUpperCase();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper;
package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand All @@ -20,7 +20,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class StringPrepperTests {
public class StringProcessorTests {

private final String TEST_EVENT_TYPE = "test_event_type";
private final String TEST_KEY = "test_key";
Expand All @@ -39,23 +39,23 @@ public class StringPrepperTests {
.build()
);
private final List<Record<Event>> TEST_RECORDS = Arrays.asList(TEST_RECORD_1, TEST_RECORD_2);
private StringPrepper.Configuration configuration;
private StringProcessor.Configuration configuration;

@BeforeEach
void setUp() {
configuration = new StringPrepper.Configuration();
configuration = new StringProcessor.Configuration();
}

private StringPrepper createObjectUnderTest() {
return new StringPrepper(configuration);
private StringProcessor createObjectUnderTest() {
return new StringProcessor(configuration);
}

@Test
public void testStringPrepperDefault() {

final StringPrepper stringPrepper = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringPrepper.execute(TEST_RECORDS);
stringPrepper.shutdown();
final StringProcessor stringProcessor = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringProcessor.execute(TEST_RECORDS);
stringProcessor.shutdown();

final List<Event> modifiedRecordEvents = modifiedRecords.stream().map(Record::getData).collect(Collectors.toList());

Expand All @@ -73,9 +73,9 @@ public void testStringPrepperDefault() {
@Test
public void testStringPrepperLowerCase() {
configuration.setUpperCase(false);
final StringPrepper stringPrepper = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringPrepper.execute(TEST_RECORDS);
stringPrepper.shutdown();
final StringProcessor stringProcessor = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringProcessor.execute(TEST_RECORDS);
stringProcessor.shutdown();

final List<Event> modifiedRecordEvents = modifiedRecords.stream().map(Record::getData).collect(Collectors.toList());

Expand All @@ -93,9 +93,9 @@ public void testStringPrepperLowerCase() {
@Test
public void testStringPrepperUpperCase() {
configuration.setUpperCase(true);
final StringPrepper stringPrepper = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringPrepper.execute(TEST_RECORDS);
stringPrepper.shutdown();
final StringProcessor stringProcessor = createObjectUnderTest();
final List<Record<Event>> modifiedRecords = (List<Record<Event>>) stringProcessor.execute(TEST_RECORDS);
stringProcessor.shutdown();

final List<Event> modifiedRecordEvents = modifiedRecords.stream().map(Record::getData).collect(Collectors.toList());

Expand All @@ -112,11 +112,11 @@ public void testStringPrepperUpperCase() {

@Test
public void testPrepareForShutdown() {
final StringPrepper stringPrepper = createObjectUnderTest();
final StringProcessor stringProcessor = createObjectUnderTest();

stringPrepper.prepareForShutdown();
stringProcessor.prepareForShutdown();

assertThat(stringPrepper.isReadyForShutdown(), equalTo(true));
assertThat(stringProcessor.isReadyForShutdown(), equalTo(true));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper.state;
package org.opensearch.dataprepper.plugins.processor.state;

import org.opensearch.dataprepper.processor.state.ProcessorState;
import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Grok Prepper
# Grok Processor

This is a prepper that takes unstructured data and utilizes pattern matching
This is a processor that takes unstructured data and utilizes pattern matching
to structure and extract important keys and make data more structured and queryable.

The Grok Prepper uses the [java-grok Library](https://github.com/thekrakken/java-grok) internally and supports all java-grok library compatible patterns. The java-grok library is built using the `java.util.regex` regular expression library.
The Grok Processor uses the [java-grok Library](https://github.com/thekrakken/java-grok) internally and supports all java-grok library compatible patterns. The java-grok library is built using the `java.util.regex` regular expression library.

The full set of default patterns can be found [here](https://github.com/thekrakken/java-grok/blob/master/src/main/resources/patterns/patterns). Custom patterns can be added through either the
`patterns_definitions` or `patterns_directories` configuration settings. When debugging custom patterns, the [Grok Debugger](https://grokdebug.herokuapp.com/)
Expand Down Expand Up @@ -231,7 +231,7 @@ The resulting grokked log will now look like this.
All of the grok captures were wrapped in an outer key named `grokked`.<br></br>

* `timeout_millis` (Optional): An `int` that specifies the maximum amount of time, in milliseconds, that matching will be performed on an individual Record before it times out and moves on to the next Record.
Setting a `timeout_millis = 0` will make it so that matching a Record never times out. If a Record does time out, it will remain the same as it was when input to the grok prepper. Default value is `30,000`
Setting a `timeout_millis = 0` will make it so that matching a Record never times out. If a Record does time out, it will remain the same as it was when input to the grok processor. Default value is `30,000`

## Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper.grok;
package org.opensearch.dataprepper.plugins.processor.grok;


import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -53,10 +53,10 @@

@SingleThread
@DataPrepperPlugin(name = "grok", pluginType = Processor.class)
public class GrokPrepper extends AbstractProcessor<Record<Event>, Record<Event>> {
public class GrokProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 300L;

private static final Logger LOG = LoggerFactory.getLogger(GrokPrepper.class);
private static final Logger LOG = LoggerFactory.getLogger(GrokProcessor.class);

static final String GROK_PROCESSING_MATCH_SUCCESS = "grokProcessingMatchSuccess";
static final String GROK_PROCESSING_MATCH_FAILURE = "grokProcessingMatchFailure";
Expand All @@ -72,18 +72,18 @@ public class GrokPrepper extends AbstractProcessor<Record<Event>, Record<Event>>

private final GrokCompiler grokCompiler;
private final Map<String, List<Grok>> fieldToGrok;
private final GrokPrepperConfig grokPrepperConfig;
private final GrokProcessorConfig grokProcessorConfig;
private final Set<String> keysToOverwrite;
private final ExecutorService executorService;

public GrokPrepper(final PluginSetting pluginSetting) {
public GrokProcessor(final PluginSetting pluginSetting) {
this(pluginSetting, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor());
}

GrokPrepper(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService) {
GrokProcessor(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService) {
super(pluginSetting);
this.grokPrepperConfig = GrokPrepperConfig.buildConfig(pluginSetting);
this.keysToOverwrite = new HashSet<>(grokPrepperConfig.getkeysToOverwrite());
this.grokProcessorConfig = GrokProcessorConfig.buildConfig(pluginSetting);
this.keysToOverwrite = new HashSet<>(grokProcessorConfig.getkeysToOverwrite());
this.grokCompiler = grokCompiler;
this.fieldToGrok = new LinkedHashMap<>();
this.executorService = executorService;
Expand Down Expand Up @@ -113,7 +113,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
try {
final Event event = record.getData();

if (grokPrepperConfig.getTimeoutMillis() == 0) {
if (grokProcessorConfig.getTimeoutMillis() == 0) {
grokProcessingTime.record(() -> matchAndMerge(event));
} else {
runWithTimeout(() -> grokProcessingTime.record(() -> matchAndMerge(event)));
Expand All @@ -122,7 +122,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Record<Event> grokkedRecord = new Record<>(event, record.getMetadata());
recordsOut.add(grokkedRecord);
} catch (TimeoutException e) {
LOG.error("Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokPrepperConfig.getTimeoutMillis());
LOG.error("Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis());
recordsOut.add(record);
grokProcessingTimeoutsCounter.increment();
} catch (ExecutionException e) {
Expand Down Expand Up @@ -170,20 +170,20 @@ public void shutdown() {

private void registerPatterns() {
grokCompiler.registerDefaultPatterns();
grokCompiler.register(grokPrepperConfig.getPatternDefinitions());
grokCompiler.register(grokProcessorConfig.getPatternDefinitions());
registerPatternsDirectories();
}

private void registerPatternsDirectories() {
for (final String directory : grokPrepperConfig.getPatternsDirectories()) {
for (final String directory : grokProcessorConfig.getPatternsDirectories()) {
final Path path = FileSystems.getDefault().getPath(directory);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, grokPrepperConfig.getPatternsFilesGlob())) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, grokProcessorConfig.getPatternsFilesGlob())) {
for (final Path patternFile : stream) {
registerPatternsForFile(patternFile.toFile());
}
}
catch (PatternSyntaxException e) {
LOG.error("Glob pattern {} is invalid", grokPrepperConfig.getPatternsFilesGlob());
LOG.error("Glob pattern {} is invalid", grokProcessorConfig.getPatternsFilesGlob());
} catch (NotDirectoryException e) {
LOG.error("{} is not a directory", directory, e);
} catch (IOException e) {
Expand All @@ -203,10 +203,10 @@ private void registerPatternsForFile(final File file) {
}

private void compileMatchPatterns() {
for (final Map.Entry<String, List<String>> entry : grokPrepperConfig.getMatch().entrySet()) {
for (final Map.Entry<String, List<String>> entry : grokProcessorConfig.getMatch().entrySet()) {
fieldToGrok.put(entry.getKey(), entry.getValue()
.stream()
.map(item -> grokCompiler.compile(item, grokPrepperConfig.isNamedCapturesOnly()))
.map(item -> grokCompiler.compile(item, grokProcessorConfig.isNamedCapturesOnly()))
.collect(Collectors.toList()));
}
}
Expand All @@ -219,7 +219,7 @@ private void matchAndMerge(final Event event) {
final String value = event.get(entry.getKey(), String.class);
if (value != null && !value.isEmpty()) {
final Match match = grok.match(value);
match.setKeepEmptyCaptures(grokPrepperConfig.isKeepEmptyCaptures());
match.setKeepEmptyCaptures(grokProcessorConfig.isKeepEmptyCaptures());

final Map<String, Object> captures = match.capture();
mergeCaptures(grokkedCaptures, captures);
Expand All @@ -234,8 +234,8 @@ private void matchAndMerge(final Event event) {
}
}

if (grokPrepperConfig.getTargetKey() != null) {
event.put(grokPrepperConfig.getTargetKey(), grokkedCaptures);
if (grokProcessorConfig.getTargetKey() != null) {
event.put(grokProcessorConfig.getTargetKey(), grokkedCaptures);
} else {
mergeCaptures(event, grokkedCaptures);
}
Expand Down Expand Up @@ -294,11 +294,11 @@ private void mergeValueWithValues(final Object value, final List<Object> values)
}

private boolean shouldBreakOnMatch(final Map<String, Object> captures) {
return captures.size() > 0 && grokPrepperConfig.isBreakOnMatch();
return captures.size() > 0 && grokProcessorConfig.isBreakOnMatch();
}

private void runWithTimeout(final Runnable runnable) throws TimeoutException, ExecutionException, InterruptedException {
Future<?> task = executorService.submit(runnable);
task.get(grokPrepperConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
task.get(grokProcessorConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.prepper.grok;
package org.opensearch.dataprepper.plugins.processor.grok;

import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.List;
import java.util.Map;

public class GrokPrepperConfig {
public class GrokProcessorConfig {
static final String BREAK_ON_MATCH = "break_on_match";
static final String KEEP_EMPTY_CAPTURES = "keep_empty_captures";
static final String MATCH = "match";
Expand Down Expand Up @@ -40,16 +40,16 @@ public class GrokPrepperConfig {
private final int timeoutMillis;
private final String targetKey;

private GrokPrepperConfig(final boolean breakOnMatch,
final boolean keepEmptyCaptures,
final Map<String, List<String>> match,
final boolean namedCapturesOnly,
final List<String> keysToOverwrite,
final List<String> patternsDirectories,
final String patternsFilesGlob,
final Map<String, String> patternDefinitions,
final int timeoutMillis,
final String targetKey) {
private GrokProcessorConfig(final boolean breakOnMatch,
final boolean keepEmptyCaptures,
final Map<String, List<String>> match,
final boolean namedCapturesOnly,
final List<String> keysToOverwrite,
final List<String> patternsDirectories,
final String patternsFilesGlob,
final Map<String, String> patternDefinitions,
final int timeoutMillis,
final String targetKey) {

this.breakOnMatch = breakOnMatch;
this.keepEmptyCaptures = keepEmptyCaptures;
Expand All @@ -63,8 +63,8 @@ private GrokPrepperConfig(final boolean breakOnMatch,
this.targetKey = targetKey;
}

public static GrokPrepperConfig buildConfig(final PluginSetting pluginSetting) {
return new GrokPrepperConfig(pluginSetting.getBooleanOrDefault(BREAK_ON_MATCH, DEFAULT_BREAK_ON_MATCH),
public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) {
return new GrokProcessorConfig(pluginSetting.getBooleanOrDefault(BREAK_ON_MATCH, DEFAULT_BREAK_ON_MATCH),
pluginSetting.getBooleanOrDefault(KEEP_EMPTY_CAPTURES, DEFAULT_KEEP_EMPTY_CAPTURES),
pluginSetting.getTypedListMap(MATCH, String.class, String.class),
pluginSetting.getBooleanOrDefault(NAMED_CAPTURES_ONLY, DEFAULT_NAMED_CAPTURES_ONLY),
Expand Down Expand Up @@ -115,4 +115,4 @@ public int getTimeoutMillis() {
public String getTargetKey() {
return targetKey;
}
}
}
Loading

0 comments on commit 967919b

Please sign in to comment.