Skip to content

Commit e8429ab

Browse files
committed
Reconfigure bulk processor after review
1 parent c9da4af commit e8429ab

File tree

2 files changed

+18
-32
lines changed

2 files changed

+18
-32
lines changed

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public Collection<Object> createComponents(
7575
IndexNameExpressionResolver indexNameExpressionResolver,
7676
Supplier<RepositoriesService> repositoriesServiceSupplier
7777
) {
78-
DeprecationIndexingComponent component = new DeprecationIndexingComponent(threadPool, client);
78+
DeprecationIndexingComponent component = new DeprecationIndexingComponent(client, environment.settings());
7979

8080
clusterService.addListener(component);
8181

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.logging.log4j.core.LoggerContext;
1313
import org.apache.logging.log4j.core.config.Configuration;
14+
import org.elasticsearch.action.bulk.BackoffPolicy;
1415
import org.elasticsearch.action.bulk.BulkProcessor;
1516
import org.elasticsearch.action.bulk.BulkRequest;
1617
import org.elasticsearch.action.bulk.BulkResponse;
@@ -25,8 +26,11 @@
2526
import org.elasticsearch.common.logging.Loggers;
2627
import org.elasticsearch.common.logging.RateLimitingFilter;
2728
import org.elasticsearch.common.settings.Setting;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.ByteSizeUnit;
31+
import org.elasticsearch.common.unit.ByteSizeValue;
2832
import org.elasticsearch.common.unit.TimeValue;
29-
import org.elasticsearch.threadpool.ThreadPool;
33+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3034
import org.elasticsearch.xpack.core.ClientHelper;
3135

3236
import java.util.function.Consumer;
@@ -49,9 +53,9 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
4953
private final BulkProcessor processor;
5054
private final RateLimitingFilter filter;
5155

52-
public DeprecationIndexingComponent(ThreadPool threadPool, Client client) {
53-
this.processor = getBulkProcessor(new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN));
54-
final Consumer<IndexRequest> consumer = buildIndexRequestConsumer(threadPool);
56+
public DeprecationIndexingComponent(Client client, Settings settings) {
57+
this.processor = getBulkProcessor(new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN), settings);
58+
final Consumer<IndexRequest> consumer = this.processor::add;
5559

5660
final LoggerContext context = (LoggerContext) LogManager.getContext(false);
5761
final Configuration configuration = context.getConfiguration();
@@ -100,42 +104,24 @@ public void clusterChanged(ClusterChangedEvent event) {
100104
}
101105
}
102106

103-
/**
104-
* Constructs a {@link Consumer} that knows what to do with the {@link IndexRequest} instances that the
105-
* {@link DeprecationIndexingAppender} creates. This logic is separated from the service in order to make
106-
* testing significantly easier, and to separate concerns.
107-
* <p>
108-
* Writes are done via {@link BulkProcessor}, which handles batching up writes and retries.
109-
*
110-
* @param threadPool due to <a href="https://github.com/elastic/elasticsearch/issues/50440">#50440</a>,
111-
* extra care must be taken to avoid blocking the thread that writes a deprecation message.
112-
* @return a consumer that accepts an index request and handles all the details of writing it
113-
* into the cluster
114-
*/
115-
private Consumer<IndexRequest> buildIndexRequestConsumer(ThreadPool threadPool) {
116-
return indexRequest -> {
117-
try {
118-
// TODO: remove the threadpool wrapping when the .add call is non-blocking
119-
// (it can currently execute the bulk request occasionally)
120-
// see: https://github.com/elastic/elasticsearch/issues/50440
121-
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> this.processor.add(indexRequest));
122-
} catch (Exception e) {
123-
logger.error("Failed to queue deprecation message index request: " + e.getMessage(), e);
124-
}
125-
};
126-
}
127-
128107
/**
129108
* Constructs a bulk processor for writing documents
130109
* @param client the client to use
110+
* @param settings the settings to use
131111
* @return an initialised bulk processor
132112
*/
133-
private BulkProcessor getBulkProcessor(Client client) {
113+
private BulkProcessor getBulkProcessor(Client client, Settings settings) {
134114
final OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN);
135115
final BulkProcessor.Listener listener = new DeprecationBulkListener();
136116

117+
// This configuration disables the size count and size thresholds,
118+
// and instead uses a scheduled flush only. This means that calling
119+
// processor.add() will not block the calling thread.
137120
return BulkProcessor.builder(originSettingClient::bulk, listener)
138-
.setBulkActions(100)
121+
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
122+
.setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings)))
123+
.setBulkActions(-1)
124+
.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES))
139125
.setFlushInterval(TimeValue.timeValueSeconds(5))
140126
.build();
141127
}

0 commit comments

Comments
 (0)