Skip to content

Commit 61a9318

Browse files
committed
global parameters on bulk processor
bulk api documentation changes
1 parent 30ff79e commit 61a9318

File tree

12 files changed

+202
-31
lines changed

12 files changed

+202
-31
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.client;
2121

2222
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
23-
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2423
import org.elasticsearch.action.bulk.BulkItemResponse;
2524
import org.elasticsearch.action.bulk.BulkProcessor;
2625
import org.elasticsearch.action.bulk.BulkRequest;
@@ -32,7 +31,6 @@
3231
import org.elasticsearch.action.search.SearchRequest;
3332
import org.elasticsearch.common.Strings;
3433
import org.elasticsearch.common.bytes.BytesArray;
35-
import org.elasticsearch.common.settings.Settings;
3634
import org.elasticsearch.common.unit.ByteSizeUnit;
3735
import org.elasticsearch.common.unit.ByteSizeValue;
3836
import org.elasticsearch.common.unit.TimeValue;
@@ -283,6 +281,41 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
283281
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
284282
}
285283

284+
@SuppressWarnings("unchecked")
285+
public void testGlobalParametersAndSingleRequest() throws Exception {
286+
createIndexWithMultipleShards("test");
287+
288+
final CountDownLatch latch = new CountDownLatch(1);
289+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
290+
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
291+
292+
// tag::bulk-processor-mix-parameters
293+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
294+
.setGlobalIndex("tweets")
295+
.setGlobalType("_doc")
296+
.setGlobalRouting("routing")
297+
.setGlobalPipeline("pipeline_id")
298+
.build()) {
299+
300+
301+
processor.add(new IndexRequest() // <1>
302+
.source(XContentType.JSON, "user", "some user"));
303+
processor.add(new IndexRequest("blogs", "post_type", "1") // <2>
304+
.source(XContentType.JSON, "title", "some title"));
305+
}
306+
// end::bulk-request-mix-pipeline
307+
latch.await();
308+
309+
Iterable<SearchHit> hits = searchAll(new SearchRequest("tweets").routing("routing"));
310+
assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user"))));
311+
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
312+
313+
314+
Iterable<SearchHit> blogs = searchAll(new SearchRequest("blogs").routing("routing"));
315+
assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title"))));
316+
assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
317+
}
318+
286319
@SuppressWarnings("unchecked")
287320
public void testGlobalParametersAndBulkProcessor() throws Exception {
288321
createIndexWithMultipleShards("test");
@@ -424,13 +457,5 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
424457
}
425458
}
426459

427-
private void createIndexWithMultipleShards(String index) throws IOException {
428-
CreateIndexRequest indexRequest = new CreateIndexRequest(index);
429-
int shards = randomIntBetween(2,10);
430-
indexRequest.settings(Settings.builder()
431-
.put("index.number_of_shards", shards)
432-
.put("index.number_of_replicas", 0)
433-
);
434-
highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
435-
}
460+
436461
}

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
package org.elasticsearch.client;
2121

22-
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2322
import org.elasticsearch.action.bulk.BulkRequest;
2423
import org.elasticsearch.action.bulk.BulkResponse;
2524
import org.elasticsearch.action.index.IndexRequest;
2625
import org.elasticsearch.action.search.SearchRequest;
27-
import org.elasticsearch.common.settings.Settings;
2826
import org.elasticsearch.common.xcontent.XContentType;
2927
import org.elasticsearch.search.SearchHit;
3028

@@ -89,16 +87,17 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
8987
createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
9088
createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
9189

90+
// tag::bulk-request-mix-pipeline
9291
BulkRequest request = new BulkRequest();
9392
request.pipeline("globalId");
9493

9594
request.add(new IndexRequest("test", "doc", "1")
9695
.source(XContentType.JSON, "field", "bulk1")
97-
.setPipeline("perIndexId"));
96+
.setPipeline("perIndexId")); // <1>
9897

9998
request.add(new IndexRequest("test", "doc", "2")
100-
.source(XContentType.JSON, "field", "bulk2"));
101-
99+
.source(XContentType.JSON, "field", "bulk2")); // <2>
100+
// end::bulk-request-mix-pipeline
102101
bulk(request);
103102

104103
Iterable<SearchHit> hits = searchAll("test");
@@ -215,15 +214,4 @@ private BulkResponse bulk(BulkRequest request) throws IOException {
215214
private static <T> Function<SearchHit, T> fieldFromSource(String fieldName) {
216215
return (response) -> (T) response.getSourceAsMap().get(fieldName);
217216
}
218-
219-
private void createIndexWithMultipleShards(String index) throws IOException {
220-
CreateIndexRequest indexRequest = new CreateIndexRequest(index);
221-
// keeping shard # high to make sure different routings will lead to different shards - reduces the chances of random failure
222-
int shards = randomIntBetween(8,10);
223-
indexRequest.settings(Settings.builder()
224-
.put("index.number_of_shards", shards)
225-
.put("index.number_of_replicas", 0)
226-
);
227-
highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
228-
}
229217
}

client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
24+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2425
import org.elasticsearch.action.ingest.PutPipelineRequest;
2526
import org.elasticsearch.action.search.SearchRequest;
2627
import org.elasticsearch.action.search.SearchResponse;
@@ -195,4 +196,14 @@ protected void refreshIndexes(String... indices) throws IOException {
195196
Response refreshResponse = client().performRequest(new Request("POST", "/" + joinedIndices + "/_refresh"));
196197
assertEquals(200, refreshResponse.getStatusLine().getStatusCode());
197198
}
199+
200+
protected void createIndexWithMultipleShards(String index) throws IOException {
201+
CreateIndexRequest indexRequest = new CreateIndexRequest(index);
202+
int shards = randomIntBetween(8,10);
203+
indexRequest.settings(Settings.builder()
204+
.put("index.number_of_shards", shards)
205+
.put("index.number_of_replicas", 0)
206+
);
207+
highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
208+
}
198209
}

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,16 @@ public void testBulk() throws Exception {
748748
request.waitForActiveShards(2); // <1>
749749
request.waitForActiveShards(ActiveShardCount.ALL); // <2>
750750
// end::bulk-request-active-shards
751+
// tag::bulk-request-pipeline
752+
request.pipeline("pipelineId"); // <1>
753+
// end::bulk-request-pipeline
754+
// tag::bulk-request-routing
755+
request.routing("routingId"); // <1>
756+
// end::bulk-request-routing
757+
758+
// tag::bulk-request-index-type
759+
BulkRequest requestWithDefaultedIndexAndType = new BulkRequest("posts","_doc"); // <1>
760+
// end::bulk-request-index-type
751761

752762
// tag::bulk-execute-listener
753763
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {

docs/java-api/docs/bulk.asciidoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,27 @@ client.admin().indices().prepareRefresh().get();
165165
client.prepareSearch().get();
166166
--------------------------------------------------
167167

168+
169+
[[java-docs-bulk-global-parameters]]
170+
==== Global Parameters
171+
172+
Similarly to REST API where you can provide URL parameters, you can specify global parameters on BulkRequest as well
173+
as BulkProcessor. These global parameters serve as defaults and can be overriden by local parameters specified on
174+
each sub request. Some parameters have to be set before any sub request is added - index, type - and you have to specify them
175+
during BulkRequest or BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is
176+
sent.
177+
178+
["source","java",subs="attributes,callouts,macros"]
179+
--------------------------------------------------
180+
include-tagged::{doc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
181+
--------------------------------------------------
182+
<1> global parameters from the BulkRequest will be applied on a sub request
183+
<2> local pipeline parameter on a sub request will override global parameters from BulkRequest
184+
185+
186+
["source","java",subs="attributes,callouts,macros"]
187+
--------------------------------------------------
188+
include-tagged::{doc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
189+
--------------------------------------------------
190+
<1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
191+
<2> global parameter from the BulkRequest will be applied on a sub request

docs/java-rest/high-level/document/bulk.asciidoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ the index/update/delete operations.
6262
<2> Number of shard copies provided as a `ActiveShardCount`: can be `ActiveShardCount.ALL`,
6363
`ActiveShardCount.ONE` or `ActiveShardCount.DEFAULT` (default)
6464

65+
["source","java",subs="attributes,callouts,macros"]
66+
--------------------------------------------------
67+
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-pipeline]
68+
--------------------------------------------------
69+
<1> Global pipelineId used on all sub requests, unless overridden on a sub request
70+
71+
["source","java",subs="attributes,callouts,macros"]
72+
--------------------------------------------------
73+
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-routing]
74+
--------------------------------------------------
75+
<1> Global routingId used on all sub requests, unless overridden on a sub request
76+
77+
["source","java",subs="attributes,callouts,macros"]
78+
--------------------------------------------------
79+
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-index-type]
80+
--------------------------------------------------
81+
<1> A bulk request with global index and type used on all sub requests, unless overridden on a sub request.
82+
Both parameters are @Nullable and can only be set during BulkRequest creation.
6583

6684
[[java-rest-high-document-bulk-sync]]
6785
==== Synchronous Execution

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,13 +342,13 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
342342
String index = defaultIndex;
343343
String type = defaultType;
344344
String id = null;
345-
String routing = defaultRouting;
345+
String routing = valueOrDefault(defaultRouting, globalRouting);
346346
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
347347
String opType = null;
348348
long version = Versions.MATCH_ANY;
349349
VersionType versionType = VersionType.INTERNAL;
350350
int retryOnConflict = 0;
351-
String pipeline = defaultPipeline;
351+
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
352352

353353
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
354354
// or START_OBJECT which will have another set of parameters
@@ -520,8 +520,8 @@ public final BulkRequest pipeline(String globalPipeline) {
520520
return this;
521521
}
522522

523-
public final BulkRequest routing(String defaultRouting){
524-
this.globalRouting = defaultRouting;
523+
public final BulkRequest routing(String globalRouting){
524+
this.globalRouting = globalRouting;
525525
return this;
526526
}
527527
/**

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse>
4242
implements WriteRequestBuilder<BulkRequestBuilder> {
4343

44+
public BulkRequestBuilder(ElasticsearchClient client, BulkAction action, @Nullable String globalIndex, @Nullable String globalType) {
45+
super(client, action, new BulkRequest(globalIndex, globalType));
46+
}
47+
4448
public BulkRequestBuilder(ElasticsearchClient client, BulkAction action) {
4549
super(client, action, new BulkRequest());
4650
}
@@ -153,4 +157,14 @@ public final BulkRequestBuilder setTimeout(String timeout) {
153157
public int numberOfActions() {
154158
return request.numberOfActions();
155159
}
160+
161+
public BulkRequestBuilder pipeline(String globalPipeline) {
162+
request.pipeline(globalPipeline);
163+
return this;
164+
}
165+
166+
public BulkRequestBuilder routing(String globalRouting) {
167+
request.routing(globalRouting);
168+
return this;
169+
}
156170
}

server/src/main/java/org/elasticsearch/client/Client.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ public interface Client extends ElasticsearchClient, Releasable {
232232
*/
233233
BulkRequestBuilder prepareBulk();
234234

235+
/**
236+
* Executes a bulk of index / delete operations with default index and/or type
237+
*/
238+
BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType);
239+
235240
/**
236241
* Gets the document that was indexed from an index with a type and id.
237242
*

server/src/main/java/org/elasticsearch/client/support/AbstractClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,11 @@ public BulkRequestBuilder prepareBulk() {
471471
return new BulkRequestBuilder(this, BulkAction.INSTANCE);
472472
}
473473

474+
@Override
475+
public BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType) {
476+
return new BulkRequestBuilder(this, BulkAction.INSTANCE, globalIndex, globalType);
477+
}
478+
474479
@Override
475480
public ActionFuture<GetResponse> get(final GetRequest request) {
476481
return execute(GetAction.INSTANCE, request);

0 commit comments

Comments
 (0)