Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup resources properly when Opensearch sink fails to initialize #4758

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics,
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction) {
this.clientFunction = clientFunction;
this.currentConfig = connectionConfiguration;
this.currentClient = clientFunction.apply(connectionConfiguration);
this.currentClient = null;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}
Expand All @@ -44,6 +44,9 @@ public Class<OpenSearchClient> getComponentClass() {
public OpenSearchClient get() {
readWriteLock.readLock().lock();
try {
if (currentClient == null) {
currentClient = clientFunction.apply(currentConfig);
}
return currentClient;
} finally {
readWriteLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand All @@ -201,7 +202,7 @@ public void doInitialize() {
doInitializeInternal();
} catch (IOException e) {
LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage());
closeFiles();
this.shutdown();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e);
this.shutdown();
Expand All @@ -212,7 +213,7 @@ public void doInitialize() {
throw e;
} catch (Exception e) {
LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e);
closeFiles();
this.shutdown();
}
}

Expand Down Expand Up @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException {
bulkRequestSupplier,
pluginSetting);

objectMapper = new ObjectMapper();
this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -615,6 +615,7 @@ private void closeFiles() {
public void shutdown() {
super.shutdown();
closeFiles();
openSearchClient.shutdown();
Copy link
Member

@dinujoh dinujoh Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this shutdown api do ? Does it close the client or shutdown the node ? We don't want the later behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just shuts down the client to the opensearch. We create it again when we do doInitialize. So, shutting down is the correct thing to do.

}

private void maybeUpdateServerlessNetworkPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void testGet() {
@Test
void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD);
Expand All @@ -91,6 +92,7 @@ void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
@Test
void testGetAfterUpdateWithBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -115,6 +117,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() {
void testGetAfterUpdateWithDeprecatedUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -138,6 +141,7 @@ void testGetAfterUpdateWithDeprecatedUsernameChanged() {
void testGetAfterUpdateWithUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -165,6 +169,7 @@ void testGetAfterUpdateWithUsernameChanged() {
void testGetAfterUpdateWithDeprecatedPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -190,6 +195,7 @@ void testGetAfterUpdateWithDeprecatedPasswordChanged() {
void testGetAfterUpdateWithPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -219,6 +225,7 @@ void testGetAfterUpdateClientFailure() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
when(pluginMetrics.counter(CLIENT_REFRESH_ERRORS)).thenReturn(clientRefreshErrorsCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -240,4 +247,4 @@ void testGetAfterUpdateClientFailure() {
verify(clientRefreshErrorsCounter).increment();
verify(clientFunction, times(2)).apply(any());
}
}
}
Loading