Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'opensearch-project:main' into secrets-variable-interface
Browse files Browse the repository at this point in the history
san81 authored Jan 15, 2025
2 parents a384d85 + b13a645 commit 2f26378
Showing 31 changed files with 770 additions and 50 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -350,7 +350,8 @@ coreProjects.each { coreProject ->
def assembleTasks = collectTasksRecursively(coreProject, 'assemble')
def publishTasks = collectTasksRecursively(coreProject, 'publish')

// Add these tasks as dependencies of the release task
release.dependsOn assembleTasks
release.dependsOn publishTasks
// Explicitly declare release task for better gradle compatibility
def releaseTask = tasks.named('release').get()
releaseTask.dependsOn assembleTasks
releaseTask.dependsOn publishTasks
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Marks a Data Prepper plugin as experimental.
* <p>
* Experimental plugins do not have the same compatibility guarantees as other plugins and may be unstable.
* They may have breaking changes between minor versions and may even be removed.
* <p>
* Data Prepper administrators must enable experimental plugins in order to use them.
* Otherwise, they are not available to use with pipelines.
*
* @since 2.11
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Experimental {
}
Original file line number Diff line number Diff line change
@@ -110,6 +110,18 @@ public interface Event extends Serializable {
*/
void clear();

/**
* Merges another Event into the current Event.
* The values from the other Event will overwrite the values in the current Event for all keys in the current Event.
* If the other Event has keys that are not in the current Event, they will be unmodified.
*
* @param other the other Event to merge into this Event
* @throws IllegalArgumentException if the input event is not compatible to merge.
* @throws UnsupportedOperationException if the current Event does not support merging.
* @since 2.11
*/
void merge(Event other);

/**
* Generates a serialized Json string of the entire Event
*
Original file line number Diff line number Diff line change
@@ -123,7 +123,6 @@ public static Event fromMessage(String message) {
}

private JsonNode getInitialJsonNode(final Object data) {

if (data == null) {
return mapper.valueToTree(new HashMap<>());
} else if (data instanceof String) {
@@ -348,14 +347,30 @@ public void clear() {
}
}

@Override
public void merge(final Event other) {
if(!(other instanceof JacksonEvent))
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
final JacksonEvent otherJacksonEvent = (JacksonEvent) other;
if(!(otherJacksonEvent.jsonNode instanceof ObjectNode)) {
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent with object data.");
}
final ObjectNode otherObjectNode = (ObjectNode) otherJacksonEvent.jsonNode;

if(!(jsonNode instanceof ObjectNode)) {
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
}

((ObjectNode) jsonNode).setAll(otherObjectNode);
}

@Override
public String toJsonString() {
return jsonNode.toString();
}

@Override
public String getAsJsonString(EventKey key) {

JacksonEventKey jacksonEventKey = asJacksonEventKey(key);
final JsonNode node = getNode(jacksonEventKey);
if (node.isMissingNode()) {
Original file line number Diff line number Diff line change
@@ -446,6 +446,52 @@ public void testClear() {
assertThat(event.toMap().size(), equalTo(0));
}

@Test
void merge_with_non_JacksonEvent_throws() {
final Event otherEvent = mock(Event.class);
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent));
}

@Test
void merge_with_array_JsonNode_throws() {
final JacksonEvent otherEvent = (JacksonEvent) event;
event = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build();
assertThrows(UnsupportedOperationException.class, () -> event.merge(otherEvent));
}

@Test
void merge_with_array_JsonNode_in_other_throws() {
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build();
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent));
}

@Test
void merge_sets_all_values() {
final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
event.put("b", "original");
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent);

assertThat(event.get("b", Object.class), equalTo("original"));
assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.containsKey("info"), equalTo(true));
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
}

@Test
void merge_overrides_existing_values() {
final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
event.put("a", "original");
event.put("b", "original");
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent);

assertThat(event.get("b", Object.class), equalTo("original"));
assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.containsKey("info"), equalTo(true));
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
public void testDelete_withNonexistentKey(final String key) {
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
@@ -27,6 +28,7 @@
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -38,6 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

/**
* Integration test of the plugin framework. These tests should not mock any portion
@@ -49,6 +52,8 @@ class DefaultPluginFactoryIT {
private PipelinesDataFlowModel pipelinesDataFlowModel;
@Mock
private ExtensionsConfiguration extensionsConfiguration;
@Mock
private ExperimentalConfigurationContainer experimentalConfigurationContainer;
private String pluginName;
private String objectPluginName;
private String pipelineName;
@@ -67,6 +72,8 @@ private DefaultPluginFactory createObjectUnderTest() {
final AnnotationConfigApplicationContext coreContext = new AnnotationConfigApplicationContext();
coreContext.setParent(publicContext);

when(experimentalConfigurationContainer.getExperimental()).thenReturn(ExperimentalConfiguration.defaultConfiguration());

coreContext.scan(EventFactoryApplicationContextMarker.class.getPackage().getName());
coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName());
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
@@ -75,6 +82,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.registerBean(PluginErrorsHandler.class, LoggingPluginErrorsHandler::new);
coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.registerBean(ExperimentalConfigurationContainer.class, () -> experimentalConfigurationContainer);
coreContext.refresh();

return coreContext.getBean(DefaultPluginFactory.class);
@@ -188,6 +196,20 @@ void loadPlugin_should_throw_when_a_plugin_configuration_is_invalid() {
assertThat(actualException.getMessage(), equalTo("Plugin test_plugin in pipeline " + pipelineName + " is configured incorrectly: requiredString must not be null"));
}

@Test
void loadPlugin_should_throw_when_a_plugin_is_experimental_by_default() {
pluginName = "test_experimental_plugin";
final PluginSetting pluginSetting = createPluginSettings(Collections.emptyMap());

final DefaultPluginFactory objectUnderTest = createObjectUnderTest();

final NoPluginFoundException actualException = assertThrows(NoPluginFoundException.class,
() -> objectUnderTest.loadPlugin(TestPluggableInterface.class, pluginSetting));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), equalTo("Unable to create experimental plugin test_experimental_plugin. You must enable experimental plugins in data-prepper-config.yaml in order to use them."));
}

private PluginSetting createPluginSettings(final Map<String, Object> pluginSettingMap) {
final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap);
pluginSetting.setPipelineName(pipelineName);
Original file line number Diff line number Diff line change
@@ -176,10 +176,6 @@ private void buildPipelineFromConfiguration(
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());

final List<PluginError> invalidRouteExpressions = pipelineConfiguration.getRoutes()
.stream().filter(route -> !expressionEvaluator.isValidExpressionStatement(route.getCondition()))
.map(route -> PluginError.builder()
@@ -190,8 +186,12 @@ private void buildPipelineFromConfiguration(
.build())
.collect(Collectors.toList());

if (!subPipelinePluginErrors.isEmpty() || !invalidRouteExpressions.isEmpty()) {
subPipelinePluginErrors.addAll(invalidRouteExpressions);
invalidRouteExpressions.forEach(pluginErrorCollector::collectPluginError);
final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());

if (!subPipelinePluginErrors.isEmpty()) {
pluginErrorsHandler.handleErrors(subPipelinePluginErrors);
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
import org.opensearch.dataprepper.core.pipeline.PipelineShutdownOption;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugin.ExperimentalConfiguration;
import org.opensearch.dataprepper.plugin.ExperimentalConfigurationContainer;
import org.opensearch.dataprepper.plugin.ExtensionsConfiguration;

import java.time.Duration;
@@ -31,7 +33,7 @@
/**
* Class to hold configuration for DataPrepper, including server port and Log4j settings
*/
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer, ExperimentalConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";
@@ -55,6 +57,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
private PeerForwarderConfiguration peerForwarderConfiguration;
private Duration processorShutdownTimeout;
private Duration sinkShutdownTimeout;
private ExperimentalConfiguration experimental;
private PipelineExtensions pipelineExtensions;

public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration();
@@ -96,6 +99,7 @@ public DataPrepperConfiguration(
@JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig,
@JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown,
@JsonProperty("event") final EventConfiguration eventConfiguration,
@JsonProperty("experimental") final ExperimentalConfiguration experimental,
@JsonProperty("extensions")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
@@ -126,6 +130,8 @@ public DataPrepperConfiguration(
if (this.sinkShutdownTimeout.isNegative()) {
throw new IllegalArgumentException("sinkShutdownTimeout must be non-negative.");
}
this.experimental = experimental != null ? experimental : ExperimentalConfiguration.defaultConfiguration();

this.pipelineExtensions = pipelineExtensions;
}

@@ -239,4 +245,9 @@ public EventConfiguration getEventConfiguration() {
public PipelineExtensions getPipelineExtensions() {
return pipelineExtensions;
}

@Override
public ExperimentalConfiguration getExperimental() {
return experimental;
}
}
Original file line number Diff line number Diff line change
@@ -140,6 +140,7 @@ void setUp() {
@AfterEach
void tearDown() {
verify(dataPrepperConfiguration).getEventConfiguration();
verify(dataPrepperConfiguration).getExperimental();
verifyNoMoreInteractions(dataPrepperConfiguration);
}

@@ -396,6 +397,7 @@ void parseConfiguration_with_invalid_route_expressions_handles_errors_and_return
final Collection<PluginError> pluginErrorCollection = pluginErrorArgumentCaptor.getValue();
assertThat(pluginErrorCollection, notNullValue());
assertThat(pluginErrorCollection.size(), equalTo(1));
assertThat(pluginErrorCollector.getPluginErrors(), equalTo(pluginErrorCollection));

final PluginError pluginError = pluginErrorCollection.stream().findAny().orElseThrow();
final String expectedErrorMessage = String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, "service", "/value == service");
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.plugin.TestPluggableInterface;

@DataPrepperPlugin(name = "test_experimental_plugin", pluginType = TestPluggableInterface.class)
@Experimental
public class TestExperimentalPlugin {
}
1 change: 1 addition & 0 deletions data-prepper-plugin-framework/build.gradle
Original file line number Diff line number Diff line change
@@ -25,4 +25,5 @@ dependencies {
implementation libs.reflections.core
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-text:1.10.0'
testImplementation 'ch.qos.logback:logback-classic:1.5.16'
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

/**
@@ -41,6 +42,7 @@ public class DefaultPluginFactory implements PluginFactory {
private final PluginBeanFactoryProvider pluginBeanFactoryProvider;
private final PluginConfigurationObservableFactory pluginConfigurationObservableFactory;
private final ApplicationContextToTypedSuppliers applicationContextToTypedSuppliers;
private final List<Consumer<DefinedPlugin<?>>> definedPluginConsumers;

@Inject
DefaultPluginFactory(
@@ -49,8 +51,10 @@ public class DefaultPluginFactory implements PluginFactory {
final PluginConfigurationConverter pluginConfigurationConverter,
final PluginBeanFactoryProvider pluginBeanFactoryProvider,
final PluginConfigurationObservableFactory pluginConfigurationObservableFactory,
final ApplicationContextToTypedSuppliers applicationContextToTypedSuppliers) {
final ApplicationContextToTypedSuppliers applicationContextToTypedSuppliers,
final List<Consumer<DefinedPlugin<?>>> definedPluginConsumers) {
this.applicationContextToTypedSuppliers = applicationContextToTypedSuppliers;
this.definedPluginConsumers = definedPluginConsumers;
Objects.requireNonNull(pluginProviderLoader);
Objects.requireNonNull(pluginConfigurationObservableFactory);
this.pluginCreator = Objects.requireNonNull(pluginCreator);
@@ -140,15 +144,13 @@ private <T> Class<? extends T> getPluginClass(final Class<T> baseClass, final St
.orElseThrow(() -> new NoPluginFoundException(
"Unable to find a plugin named '" + pluginName + "'. Please ensure that plugin is annotated with appropriate values."));

logDeprecatedPluginsNames(pluginClass, pluginName);
handleDefinedPlugins(pluginClass, pluginName);
return pluginClass;
}

private <T> void logDeprecatedPluginsNames(final Class<? extends T> pluginClass, final String pluginName) {
final String deprecatedName = pluginClass.getAnnotation(DataPrepperPlugin.class).deprecatedName();
final String name = pluginClass.getAnnotation(DataPrepperPlugin.class).name();
if (deprecatedName.equals(pluginName)) {
LOG.warn("Plugin name '{}' is deprecated and will be removed in the next major release. Consider using the updated plugin name '{}'.", deprecatedName, name);
}
private <T> void handleDefinedPlugins(final Class<? extends T> pluginClass, final String pluginName) {
final DefinedPlugin<? extends T> definedPlugin = new DefinedPlugin<>(pluginClass, pluginName);

definedPluginConsumers.forEach(definedPluginConsumer -> definedPluginConsumer.accept(definedPlugin));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import java.util.Objects;

class DefinedPlugin<T> {
private final Class<? extends T> pluginClass;
private final String pluginName;

public DefinedPlugin(final Class<? extends T> pluginClass, final String pluginName) {
this.pluginClass = Objects.requireNonNull(pluginClass);
this.pluginName = Objects.requireNonNull(pluginName);
}

public Class<? extends T> getPluginClass() {
return pluginClass;
}

public String getPluginName() {
return pluginName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.util.function.Consumer;

@Named
class DeprecatedPluginDetector implements Consumer<DefinedPlugin<?>> {
private static final Logger LOG = LoggerFactory.getLogger(DeprecatedPluginDetector.class);

@Override
public void accept(final DefinedPlugin<?> definedPlugin) {
logDeprecatedPluginsNames(definedPlugin.getPluginClass(), definedPlugin.getPluginName());
}

private void logDeprecatedPluginsNames(final Class<?> pluginClass, final String pluginName) {
final String deprecatedName = pluginClass.getAnnotation(DataPrepperPlugin.class).deprecatedName();
final String name = pluginClass.getAnnotation(DataPrepperPlugin.class).name();
if (deprecatedName.equals(pluginName)) {
LOG.warn("Plugin name '{}' is deprecated and will be removed in the next major release. Consider using the updated plugin name '{}'.", deprecatedName, name);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Data Prepper configurations for experimental features.
*
* @since 2.11
*/
public class ExperimentalConfiguration {
@JsonProperty("enable_all")
private boolean enableAll = false;

public static ExperimentalConfiguration defaultConfiguration() {
return new ExperimentalConfiguration();
}

/**
* Gets whether all experimental features are enabled.
* @return true if all experimental features are enabled, false otherwise
* @since 2.11
*/
public boolean isEnableAll() {
return enableAll;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

/**
* Interface to decouple how an experimental configuration is defined from
* usage of those configurations.
*
* @since 2.11
*/
public interface ExperimentalConfigurationContainer {
/**
* Gets the experimental configuration.
* @return the experimental configuration
* @since 2.11
*/
ExperimentalConfiguration getExperimental();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;

import javax.inject.Named;
import java.util.function.Consumer;

@Named
class ExperimentalPluginValidator implements Consumer<DefinedPlugin<?>> {
private final ExperimentalConfiguration experimentalConfiguration;

ExperimentalPluginValidator(final ExperimentalConfigurationContainer experimentalConfigurationContainer) {
this.experimentalConfiguration = experimentalConfigurationContainer.getExperimental();
}

@Override
public void accept(final DefinedPlugin<?> definedPlugin) {
if(isPluginDisallowedAsExperimental(definedPlugin.getPluginClass())) {
throw new NoPluginFoundException("Unable to create experimental plugin " + definedPlugin.getPluginName() +
". You must enable experimental plugins in data-prepper-config.yaml in order to use them.");
}
}

private boolean isPluginDisallowedAsExperimental(final Class<?> pluginClass) {
return pluginClass.isAnnotationPresent(Experimental.class) && !experimentalConfiguration.isEnableAll();
}
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.hamcrest.CoreMatchers.equalTo;
@@ -38,6 +39,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -64,6 +66,7 @@ class DefaultPluginFactoryTest {
private PluginConfigurationObservableFactory pluginConfigurationObservableFactory;
private PluginConfigObservable pluginConfigObservable;
private ApplicationContextToTypedSuppliers applicationContextToTypedSuppliers;
private List<Consumer<DefinedPlugin<?>>> definedPluginConsumers;

@BeforeEach
void setUp() {
@@ -92,14 +95,17 @@ void setUp() {
)).willReturn(pluginConfigObservable);

applicationContextToTypedSuppliers = mock(ApplicationContextToTypedSuppliers.class);

definedPluginConsumers = List.of(mock(Consumer.class), mock(Consumer.class));
}

private DefaultPluginFactory createObjectUnderTest() {
return new DefaultPluginFactory(
pluginProviderLoader, pluginCreator, pluginConfigurationConverter,
beanFactoryProvider,
pluginConfigurationObservableFactory,
applicationContextToTypedSuppliers);
applicationContextToTypedSuppliers,
definedPluginConsumers);
}

@Test
@@ -230,6 +236,22 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() {
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
}

@Test
void loadPlugin_should_call_all_definedPluginConsumers() {
createObjectUnderTest().loadPlugin(baseClass, pluginSetting);

assertThat("This test is not valid if there are no defined plugin consumers.",
definedPluginConsumers.size(), greaterThanOrEqualTo(2));
for (final Consumer<DefinedPlugin<?>> definedPluginConsumer : definedPluginConsumers) {
final ArgumentCaptor<DefinedPlugin<?>> definedPluginArgumentCaptor = ArgumentCaptor.forClass(DefinedPlugin.class);
verify(definedPluginConsumer).accept(definedPluginArgumentCaptor.capture());

final DefinedPlugin<?> actualDefinedPlugin = definedPluginArgumentCaptor.getValue();
assertThat(actualDefinedPlugin.getPluginClass(), equalTo(expectedPluginClass));
assertThat(actualDefinedPlugin.getPluginName(), equalTo(pluginName));
}
}

@Test
void loadPlugins_should_throw_for_null_number_of_instances() {

@@ -322,6 +344,23 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number
assertThat(plugin, equalTo(expectedInstance));
}

@Test
void loadPlugin_with_varargs_should_call_all_definedPluginConsumers() {
final Object vararg1 = new Object();
createObjectUnderTest().loadPlugin(baseClass, pluginSetting, vararg1);

assertThat("This test is not valid if there are no defined plugin consumers.",
definedPluginConsumers.size(), greaterThanOrEqualTo(2));
for (final Consumer<DefinedPlugin<?>> definedPluginConsumer : definedPluginConsumers) {
final ArgumentCaptor<DefinedPlugin<?>> definedPluginArgumentCaptor = ArgumentCaptor.forClass(DefinedPlugin.class);
verify(definedPluginConsumer).accept(definedPluginArgumentCaptor.capture());

final DefinedPlugin<?> actualDefinedPlugin = definedPluginArgumentCaptor.getValue();
assertThat(actualDefinedPlugin.getPluginClass(), equalTo(expectedPluginClass));
assertThat(actualDefinedPlugin.getPluginName(), equalTo(pluginName));
}
}

@Test
void loadPlugins_should_return_an_instance_for_the_total_count() {
final TestSink expectedInstance1 = mock(TestSink.class);
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.processor.Processor;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class DeprecatedPluginDetectorTest {
@Mock
private DefinedPlugin definedPlugin;
private TestLogAppender testAppender;

@BeforeEach
void setUp() {
final Logger logger = (Logger) LoggerFactory.getLogger(DeprecatedPluginDetector.class);

testAppender = new TestLogAppender();
testAppender.start();
logger.addAppender(testAppender);
}

private DeprecatedPluginDetector createObjectUnderTest() {
return new DeprecatedPluginDetector();
}

@Test
void accept_on_plugin_without_deprecated_name_does_not_log() {
when(definedPlugin.getPluginClass()).thenReturn(PluginWithoutDeprecatedName.class);
createObjectUnderTest().accept(definedPlugin);

assertThat(testAppender.getLoggedEvents(), empty());
}

@Test
void accept_on_plugin_with_deprecated_name_does_not_log_if_new_name_is_used() {
when(definedPlugin.getPluginClass()).thenReturn(PluginWithDeprecatedName.class);
when(definedPlugin.getPluginName()).thenReturn("test_for_deprecated_detection");
createObjectUnderTest().accept(definedPlugin);

assertThat(testAppender.getLoggedEvents(), empty());
}

@Test
void accept_on_plugin_with_deprecated_name_logs_if_deprecated_name_is_used() {
when(definedPlugin.getPluginClass()).thenReturn(PluginWithDeprecatedName.class);
when(definedPlugin.getPluginName()).thenReturn("test_for_deprecated_detection_deprecated_name");
createObjectUnderTest().accept(definedPlugin);

assertThat(testAppender.getLoggedEvents().stream()
.anyMatch(event -> event.getFormattedMessage().contains("Plugin name 'test_for_deprecated_detection_deprecated_name' is deprecated and will be removed in the next major release. Consider using the updated plugin name 'test_for_deprecated_detection'.")),
equalTo(true));
}

@DataPrepperPlugin(name = "test_for_deprecated_detection", pluginType = Processor.class)
private static class PluginWithoutDeprecatedName {
}

@DataPrepperPlugin(name = "test_for_deprecated_detection", pluginType = Processor.class, deprecatedName = "test_for_deprecated_detection_deprecated_name")
private static class PluginWithDeprecatedName {
}

public static class TestLogAppender extends AppenderBase<ILoggingEvent> {
private final List<ILoggingEvent> events = new ArrayList<>();

@Override
protected void append(final ILoggingEvent eventObject) {
events.add(eventObject);
}

public List<ILoggingEvent> getLoggedEvents() {
return Collections.unmodifiableList(events);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

class ExperimentalConfigurationTest {
@Test
void defaultConfiguration_should_return_config_with_isEnableAll_false() {
final ExperimentalConfiguration objectUnderTest = ExperimentalConfiguration.defaultConfiguration();
assertThat(objectUnderTest, notNullValue());
assertThat(objectUnderTest.isEnableAll(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;

import java.util.UUID;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ExperimentalPluginValidatorTest {

@Mock
private ExperimentalConfigurationContainer experimentalConfigurationContainer;

@Mock
private ExperimentalConfiguration experimentalConfiguration;

@Mock
private DefinedPlugin definedPlugin;

@BeforeEach
void setUp() {
when(experimentalConfigurationContainer.getExperimental()).thenReturn(experimentalConfiguration);
}

private ExperimentalPluginValidator createObjectUnderTest() {
return new ExperimentalPluginValidator(experimentalConfigurationContainer);
}

@Test
void accept_with_non_Experimental_plugin_returns() {
when(definedPlugin.getPluginClass()).thenReturn(NonExperimentalPlugin.class);

createObjectUnderTest().accept(definedPlugin);
}

@Nested
class WithExperimentalPlugin {
@BeforeEach
void setUp() {
when(definedPlugin.getPluginClass()).thenReturn(ExperimentalPlugin.class);
}

@Test
void accept_with_Experimental_plugin_throws_if_experimental_is_not_enabled() {
final String pluginName = UUID.randomUUID().toString();
when(definedPlugin.getPluginName()).thenReturn(pluginName);

final ExperimentalPluginValidator objectUnderTest = createObjectUnderTest();

final NoPluginFoundException actualException = assertThrows(NoPluginFoundException.class, () -> objectUnderTest.accept(definedPlugin));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString(pluginName));
assertThat(actualException.getMessage(), containsString("experimental plugin"));
}

@Test
void accept_with_Experimental_plugin_does_not_throw_if_experimental_is_enabled() {
when(experimentalConfiguration.isEnableAll()).thenReturn(true);

createObjectUnderTest().accept(definedPlugin);
}
}

private static class NonExperimentalPlugin {
}

@Experimental
private static class ExperimentalPlugin {
}
}
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.TableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -234,8 +233,8 @@ private void compareAndCreateChildrenPartitions(List<EnhancedSourcePartition> so
* Conduct Metadata info for table and also perform validation on configuration.
* Once created, the info should not be changed.
*/
private TableInfo getTableInfo(TableConfig tableConfig) {
String tableName = TableUtil.getTableNameFromArn(tableConfig.getTableArn());
private TableInfo getTableInfo(final TableConfig tableConfig) {
final String tableName = tableConfig.getTableArn();
DescribeTableResponse describeTableResult;
try {
// Need to call describe table to get the Key schema for table
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.utils;

import software.amazon.awssdk.arns.Arn;

public class TableUtil {

public static String getTableNameFromArn(String tableArn) {
Arn arn = Arn.fromString(tableArn);
// resourceAsString is table/xxx
return arn.resourceAsString().substring("table/".length());
}

public static String getTableArnFromStreamArn(String streamArn) {
// e.g. Given a stream arn: arn:aws:dynamodb:us-west-2:xxx:table/test-table/stream/2023-07-31T04:59:58.190
// Returns arn:aws:dynamodb:us-west-2:xxx:table/test-table
@@ -21,6 +13,4 @@ public static String getTableArnFromExportArn(String exportArn) {
// returns: arn:aws:dynamodb:us-west-2:123456789012:table/Thread
return exportArn.substring(0, exportArn.lastIndexOf("/export/"));
}


}
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
@@ -223,7 +224,10 @@ void test_should_init() throws InterruptedException {


// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));
// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));
// Acquire the init partition
@@ -252,7 +256,11 @@ void test_PITR_not_enabled_init_should_failed() throws InterruptedException {
executorService.shutdownNow();

// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));

// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));

@@ -277,7 +285,11 @@ void test_streaming_not_enabled_init_should_failed() throws InterruptedException
executorService.shutdownNow();

// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));

// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));

Original file line number Diff line number Diff line change
@@ -15,12 +15,6 @@ class TableUtilTest {
private final String exportArn = tableArn + "/export/01693291918297-bfeccbea";
private final String streamArn = tableArn + "/stream/2023-09-14T05:46:45.367";

@Test
void test_getTableNameFromArn_should_return_tableName() {
String result = TableUtil.getTableNameFromArn(tableArn);
assertThat(result, equalTo(tableName));
}

@Test
void test_getTableArnFromStreamArn_should_return_tableArn() {
String result = TableUtil.getTableArnFromStreamArn(streamArn);
Original file line number Diff line number Diff line change
@@ -187,12 +187,13 @@ void test_data_file_loader_throws_exception_then_give_up_partition() {
}

@Test
void test_shutdown() {
void test_shutdown() throws InterruptedException {
DataFileScheduler objectUnderTest = createObjectUnderTest();
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(objectUnderTest);

objectUnderTest.shutdown();
Thread.sleep(100);

verifyNoMoreInteractions(sourceCoordinator);
executorService.shutdownNow();
Original file line number Diff line number Diff line change
@@ -262,12 +262,13 @@ void test_given_export_partition_and_null_export_task_id_then_close_partition_wi
}

@Test
void test_shutDown() {
void test_shutDown() throws InterruptedException {
lenient().when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.empty());

final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(exportScheduler);
exportScheduler.shutdown();
Thread.sleep(100);
verifyNoMoreInteractions(sourceCoordinator, snapshotManager, exportTaskManager, s3Client,
exportJobSuccessCounter, exportJobFailureCounter, exportS3ObjectsTotalCounter);
executorService.shutdownNow();
Original file line number Diff line number Diff line change
@@ -50,17 +50,21 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj

private final FolderPartitioningOptions folderPartitioningOptions;

private final boolean deleteS3ObjectsOnRead;

public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList,
final S3ScanSchedulingOptions schedulingOptions,
final FolderPartitioningOptions folderPartitioningOptions) {
final FolderPartitioningOptions folderPartitioningOptions,
final boolean deleteS3ObjectsOnRead) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
this.scanOptionsList = scanOptionsList;
this.schedulingOptions = schedulingOptions;
this.folderPartitioningOptions = folderPartitioningOptions;
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
}

@Override
@@ -120,7 +124,7 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
do {
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build());
allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream()
.filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object))
.filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object))
.map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified())))
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
.filter(keyTimestampPair -> excludeKeyPaths.stream()
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions());
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead());
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
}
Original file line number Diff line number Diff line change
@@ -69,15 +69,18 @@ public class S3ScanPartitionCreationSupplierTest {

private FolderPartitioningOptions folderPartitioningOptions;

private boolean isDeleteS3ObjectsOnRead;

@BeforeEach
void setup() {
scanOptionsList = new ArrayList<>();
folderPartitioningOptions = null;
isDeleteS3ObjectsOnRead = false;
}


private Function<Map<String, Object>, List<PartitionIdentifier>> createObjectUnderTest() {
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions);
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead);
}

@Test
@@ -452,4 +455,132 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));
}

@Test
void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_objects_on_read_is_enabled() {
schedulingOptions = mock(S3ScanSchedulingOptions.class);
given(schedulingOptions.getInterval()).willReturn(Duration.ofMillis(0));
given(schedulingOptions.getCount()).willReturn(2);
isDeleteS3ObjectsOnRead = true;

final String firstBucket = "bucket-one";
final String secondBucket = "bucket-two";

final ScanOptions firstBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class);
given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption);
given(firstBucketScanBucketOption.getName()).willReturn(firstBucket);
given(firstBucketScanOptions.getUseStartDateTime()).willReturn(null);
given(firstBucketScanOptions.getUseEndDateTime()).willReturn(null);
final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class);
given(firstBucketScanBucketOption.getS3ScanFilter()).willReturn(firstBucketScanKeyPath);
given(firstBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(List.of(UUID.randomUUID().toString()));
given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid"));
scanOptionsList.add(firstBucketScanOptions);

final ScanOptions secondBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption secondBucketScanBucketOption = mock(S3ScanBucketOption.class);
given(secondBucketScanOptions.getBucketOption()).willReturn(secondBucketScanBucketOption);
given(secondBucketScanBucketOption.getName()).willReturn(secondBucket);
given(secondBucketScanOptions.getUseStartDateTime()).willReturn(null);
given(secondBucketScanOptions.getUseEndDateTime()).willReturn(null);
final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class);
given(secondBucketScanBucketOption.getS3ScanFilter()).willReturn(secondBucketScanKeyPath);
given(secondBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(null);
given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null);
scanOptionsList.add(secondBucketScanOptions);

final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = createObjectUnderTest();

final List<PartitionIdentifier> expectedPartitionIdentifiers = new ArrayList<>();

final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class);
final List<S3Object> s3ObjectsList = new ArrayList<>();

final S3Object invalidFolderObject = mock(S3Object.class);
given(invalidFolderObject.key()).willReturn("folder-key/");
given(invalidFolderObject.lastModified()).willReturn(Instant.now());
s3ObjectsList.add(invalidFolderObject);

final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class);
given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid");
given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2));
s3ObjectsList.add(invalidForFirstBucketSuffixObject);
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build());

final Instant mostRecentFirstScan = Instant.now().plusSeconds(2);
final S3Object validObject = mock(S3Object.class);
given(validObject.key()).willReturn("valid");
given(validObject.lastModified()).willReturn(mostRecentFirstScan);
s3ObjectsList.add(validObject);
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build());
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build());

final S3Object secondScanObject = mock(S3Object.class);
final Instant mostRecentSecondScan = Instant.now().plusSeconds(10);
given(secondScanObject.key()).willReturn("second-scan");
given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan);

final List<PartitionIdentifier> expectedPartitionIdentifiersSecondScan = new ArrayList<>();
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build());

// Since delete objects on read is enabled, the second scan will pick up the same object from the first scan
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build());

final List<S3Object> secondScanObjects = new ArrayList<>(s3ObjectsList);
secondScanObjects.add(secondScanObject);
given(listObjectsResponse.contents())
.willReturn(s3ObjectsList)
.willReturn(s3ObjectsList)
.willReturn(secondScanObjects)
.willReturn(secondScanObjects);

final ArgumentCaptor<ListObjectsV2Request> listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse);

final Map<String, Object> globalStateMap = new HashMap<>();

final Instant beforeFirstScan = Instant.now();
final List<PartitionIdentifier> resultingPartitions = partitionCreationSupplier.apply(globalStateMap);

assertThat(resultingPartitions, notNullValue());
assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size()));
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey)
.map(Matchers::equalTo).collect(Collectors.toList())));

assertThat(globalStateMap, notNullValue());
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));
assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1));
assertThat(globalStateMap.containsKey(firstBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentFirstScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeFirstScan));
assertThat(globalStateMap.containsKey(secondBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentFirstScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThanOrEqualTo(beforeFirstScan));

final Instant beforeSecondScan = Instant.now();
final List<PartitionIdentifier> secondScanPartitions = partitionCreationSupplier.apply(globalStateMap);
assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size()));
assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));

assertThat(globalStateMap, notNullValue());
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));
assertThat(globalStateMap.get(SCAN_COUNT), equalTo(2));
assertThat(globalStateMap.containsKey(firstBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentSecondScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeSecondScan));
assertThat(globalStateMap.containsKey(secondBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentSecondScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThan(beforeSecondScan));
assertThat(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME)).isBefore(Instant.now()), equalTo(true));

assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList()));

verify(listObjectsResponse, times(4)).contents();
}
}
8 changes: 7 additions & 1 deletion release/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -11,9 +11,12 @@ ENV ENV_PIPELINE_FILEPATH=$PIPELINE_FILEPATH

# Update all packages
RUN dnf -y update
RUN dnf -y install bash bc
RUN dnf -y install bash bc shadow-utils
RUN dnf -y upgrade

# Create a dedicated user and group with specific UID/GID
RUN useradd -u 1000 -M -U -d / -s /sbin/nologin -c "Data Prepper" data_prepper

# Setup the Adoptium package repo and install Temurin Java
ADD adoptium.repo /etc/yum.repos.d/adoptium.repo
RUN dnf -y install temurin-17-jdk
@@ -25,5 +28,8 @@ RUN mv /usr/share/$ARCHIVE_FILE_UNPACKED /usr/share/data-prepper
COPY default-data-prepper-config.yaml $ENV_CONFIG_FILEPATH
COPY default-keystore.p12 /usr/share/data-prepper/keystore.p12

RUN chown -R 1000:1000 $DATA_PREPPER_PATH /var/log/data-prepper
USER data_prepper

WORKDIR $DATA_PREPPER_PATH
CMD ["bin/data-prepper"]

0 comments on commit 2f26378

Please sign in to comment.