Skip to content

Commit

Permalink
Introducing core package in the data-prepper-core module (opensearch-…
Browse files Browse the repository at this point in the history
…project#5056)

core package introduced in the data-prepper-core module

Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 authored Oct 15, 2024
1 parent 472b246 commit 3e220d4
Show file tree
Hide file tree
Showing 244 changed files with 852 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.Assert.assertFalse;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Instant;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PipelinesWithAcksIT {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
Expand All @@ -20,9 +23,6 @@
import org.opensearch.dataprepper.plugins.test.TestComponent;
import org.opensearch.dataprepper.plugins.test.TestDISource;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.parser.config.DataPrepperAppConfiguration;
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.core.parser.config.PipelineParserConfiguration;
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.expression.StartsWithExpressionFunction;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.config.DataPrepperAppConfiguration;
import org.opensearch.dataprepper.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.parser.config.PipelineParserConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.plugin.DefaultPluginFactory;
import org.opensearch.dataprepper.plugin.ObjectMapperConfiguration;
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
import org.opensearch.dataprepper.plugins.test.TestExtension;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -92,6 +93,7 @@ void setUp() {
coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName());

coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.scan(StartsWithExpressionFunction.class.getPackage().getName());

when(fileStructurePathProvider.getPipelineConfigFileLocation()).thenReturn(
"src/test/resources/valid_pipeline.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.time.Duration;

/**
* A Data Prepper source which can receive records via the {@link InMemorySourceAccessor}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.dataprepper.test.framework;

import org.opensearch.dataprepper.AbstractContextManager;
import org.opensearch.dataprepper.DataPrepper;
import org.opensearch.dataprepper.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.core.DataPrepper;
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper;

import org.opensearch.dataprepper.core.DataPrepper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
Expand All @@ -24,7 +25,11 @@
public abstract class AbstractContextManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContextManager.class);
private static final String BASE_DATA_PREPPER_PACKAGE = "org.opensearch.dataprepper";
private static final String EXPRESSION_PACKAGE = BASE_DATA_PREPPER_PACKAGE + ".expression";
private static final String[] BASE_DATA_PREPPER_PACKAGES = {
BASE_DATA_PREPPER_PACKAGE + ".core",
BASE_DATA_PREPPER_PACKAGE + ".plugin"
};
private static final String EXPRESSION_PACKAGE = BASE_DATA_PREPPER_PACKAGE + ".expression";

private final AnnotationConfigApplicationContext publicApplicationContext;
private final AnnotationConfigApplicationContext coreApplicationContext;
Expand Down Expand Up @@ -53,7 +58,7 @@ private void start() {

publicApplicationContext.refresh();
coreApplicationContext.setParent(publicApplicationContext);
coreApplicationContext.scan(BASE_DATA_PREPPER_PACKAGE);
coreApplicationContext.scan(BASE_DATA_PREPPER_PACKAGES);
preRefreshCoreApplicationContext(coreApplicationContext);

coreApplicationContext.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;
package org.opensearch.dataprepper.core;

import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.DataPrepperShutdownListener;
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;
import org.opensearch.dataprepper.core.pipeline.PipelinesProvider;
import org.opensearch.dataprepper.core.pipeline.server.DataPrepperServer;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineObserver;
import org.opensearch.dataprepper.pipeline.PipelinesProvider;
import org.opensearch.dataprepper.pipeline.server.DataPrepperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
Expand Down Expand Up @@ -106,6 +108,8 @@ private void shutdownPipelines() {

/**
* Triggers the shutdown of all configured valid pipelines.
* @param shutdownOptions {@link DataPrepperShutdownOptions} to control the behavior of the shutdown process
* e.g. timeout, graceful shutdown, etc.
*/
public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) {
transformationPipelines.forEach((name, pipeline) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.HashSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* AcknowledgementSetMonitor - monitors the acknowledgement sets for completion/expiration
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,12 +19,12 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class DefaultAcknowledgementSet implements AcknowledgementSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.metrics.PluginMetrics;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class DefaultAcknowledgementSetMetrics {
static final String CREATED_METRIC_NAME = "numberOfAcknowledgementSetsCreated";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.util.function.Consumer;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;

import java.time.Duration;
import java.util.function.Consumer;

public class InactiveAcknowledgementSetManager implements AcknowledgementSetManager {
private static InactiveAcknowledgementSetManager theInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;
package org.opensearch.dataprepper.core.breaker;

import org.opensearch.dataprepper.core.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand Down
Loading

0 comments on commit 3e220d4

Please sign in to comment.