From c28d48f5fa2d656e60b24443c5660b93f94a89a8 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 23 Oct 2023 16:44:27 -0500 Subject: [PATCH 1/3] Retry on dynamic index creation when an OpenSearchException is thrown Signed-off-by: Chase Engelbrecht --- .../sink/opensearch/OpenSearchSink.java | 2 +- .../opensearch/index/DynamicIndexManager.java | 27 +++++++- .../index/DynamicIndexManagerTests.java | 62 +++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ea7a46b44c..9521893a69 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -331,7 +331,7 @@ public void doOutput(final Collection> records) { String indexName = configuredIndexAlias; try { indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); - } catch (IOException | EventKeyNotFoundException e) { + } catch (final Exception e) { LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); dynamicIndexDroppedEvents.increment(); logFailureForDlqObjects(List.of(DlqObject.builder() diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java index a2e613a13b..83eec171fb 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java @@ -5,18 +5,25 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkNotNull; public class DynamicIndexManager extends AbstractIndexManager { + private static final Logger LOG = LoggerFactory.getLogger(DynamicIndexManager.class); + private static final int INDEX_SETUP_RETRY_WAIT_TIME_MS = 1000; + private Cache indexManagerCache; final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30; final int APPROXIMATE_INDEX_MANAGER_SIZE = 32; @@ -72,9 +79,27 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException { indexManager = indexManagerFactory.getIndexManager( indexType, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, fullIndexAlias); indexManagerCache.put(fullIndexAlias, indexManager); - indexManager.setupIndex(); + setupIndexWithRetries(indexManager); } return indexManager.getIndexName(fullIndexAlias); } + + private void setupIndexWithRetries(final IndexManager indexManager) throws IOException { + boolean isIndexSetup = false; + + while (!isIndexSetup) { + try { + indexManager.setupIndex(); + isIndexSetup = true; + } catch (final OpenSearchException e) { + LOG.warn("Failed to setup dynamic index with an exception. ", e); + try { + Thread.sleep(INDEX_SETUP_RETRY_WAIT_TIME_MS); + } catch (final InterruptedException ex) { + LOG.warn("Interrupted while sleeping between index setup retries"); + } + } + } + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java index 8513e70fe7..de8eb37f4e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.IndicesClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; @@ -28,6 +29,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -165,4 +167,64 @@ public void missingDynamicIndexTest() throws IOException { JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(RandomStringUtils.randomAlphabetic(10), DYNAMIC)).build(); assertThrows(EventKeyNotFoundException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); } + + @Test + public void getIndexName_DoesNotRetryOnNonOpenSearchExceptions() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new RuntimeException()) + .when(innerIndexManager).setupIndex(); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); + + verify(innerIndexManager, times(1)).setupIndex(); + } + + @Test + public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilSuccess() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), eq(IndexConstants.ISM_ENABLED_SETTING))) + .thenReturn("true"); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new OpenSearchException(new RuntimeException())) + .doNothing() + .when(innerIndexManager).setupIndex(); + when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + final String indexName = dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)); + assertThat(expectedIndexAlias, equalTo(indexName)); + + verify(innerIndexManager, times(3)).setupIndex(); + } + + @Test + public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new RuntimeException()) + .when(innerIndexManager).setupIndex(); + when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); + + verify(innerIndexManager, times(3)).setupIndex(); + } } From 6db417744fd508ef3540711b451a741a93e36da8 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 23 Oct 2023 17:35:48 -0500 Subject: [PATCH 2/3] Use correct exception type in unit tests Signed-off-by: Chase Engelbrecht --- .../opensearch/index/DynamicIndexManagerTests.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java index de8eb37f4e..d752370d4e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java @@ -74,6 +74,9 @@ public class DynamicIndexManagerTests { @Mock private TemplateStrategy templateStrategy; + @Mock + private OpenSearchException openSearchException; + static final String EVENT_TYPE = "event"; @BeforeEach @@ -195,8 +198,8 @@ public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilSuccess() throws I innerIndexManager = mock(IndexManager.class); when(mockIndexManagerFactory.getIndexManager( IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); - doThrow(new OpenSearchException(new RuntimeException())) - .doThrow(new OpenSearchException(new RuntimeException())) + doThrow(openSearchException) + .doThrow(openSearchException) .doNothing() .when(innerIndexManager).setupIndex(); when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); @@ -216,8 +219,8 @@ public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws I innerIndexManager = mock(IndexManager.class); when(mockIndexManagerFactory.getIndexManager( IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); - doThrow(new OpenSearchException(new RuntimeException())) - .doThrow(new OpenSearchException(new RuntimeException())) + doThrow(openSearchException) + .doThrow(openSearchException) .doThrow(new RuntimeException()) .when(innerIndexManager).setupIndex(); when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); From cdd01c3ff23f6d608df9987bf4abeb46c6e3bb1b Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 10 Nov 2023 14:05:06 -0600 Subject: [PATCH 3/3] Remove older cache imports Signed-off-by: Chase Engelbrecht --- .../plugins/sink/opensearch/index/DynamicIndexManager.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java index 5401b34464..0b8151c6da 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java @@ -12,8 +12,6 @@ import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory;