2020package org .elasticsearch .client ;
2121
2222import com .carrotsearch .randomizedtesting .generators .RandomPicks ;
23+ import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
2324import org .elasticsearch .action .bulk .BulkItemResponse ;
2425import org .elasticsearch .action .bulk .BulkProcessor ;
2526import org .elasticsearch .action .bulk .BulkRequest ;
2829import org .elasticsearch .action .get .MultiGetRequest ;
2930import org .elasticsearch .action .get .MultiGetResponse ;
3031import org .elasticsearch .action .index .IndexRequest ;
32+ import org .elasticsearch .action .search .SearchRequest ;
3133import org .elasticsearch .common .Strings ;
3234import org .elasticsearch .common .bytes .BytesArray ;
35+ import org .elasticsearch .common .settings .Settings ;
3336import org .elasticsearch .common .unit .ByteSizeUnit ;
3437import org .elasticsearch .common .unit .ByteSizeValue ;
3538import org .elasticsearch .common .unit .TimeValue ;
3639import org .elasticsearch .common .xcontent .XContentType ;
37- import org .elasticsearch .common .xcontent .json .JsonXContent ;
40+ import org .elasticsearch .search .SearchHit ;
41+ import org .hamcrest .Matcher ;
42+ import org .hamcrest .Matchers ;
3843
44+ import java .io .IOException ;
3945import java .util .Arrays ;
4046import java .util .HashSet ;
4147import java .util .List ;
4450import java .util .concurrent .CountDownLatch ;
4551import java .util .concurrent .TimeUnit ;
4652import java .util .concurrent .atomic .AtomicInteger ;
47-
53+ import java .util .stream .IntStream ;
54+
55+ import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
56+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .fieldFromSource ;
57+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasId ;
58+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasIndex ;
59+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasProperty ;
60+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasType ;
4861import static org .hamcrest .Matchers .both ;
62+ import static org .hamcrest .Matchers .containsInAnyOrder ;
4963import static org .hamcrest .Matchers .either ;
5064import static org .hamcrest .Matchers .equalTo ;
65+ import static org .hamcrest .Matchers .everyItem ;
5166import static org .hamcrest .Matchers .greaterThan ;
5267import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
5368import static org .hamcrest .Matchers .is ;
@@ -57,7 +72,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
5772
5873 private static BulkProcessor .Builder initBulkProcessorBuilder (BulkProcessor .Listener listener ) {
5974 return BulkProcessor .builder (
60- (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT , bulkListener ), listener );
75+ (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT , bulkListener ), listener );
6176 }
6277
6378 public void testThatBulkProcessorCountIsCorrect () throws Exception {
@@ -66,10 +81,10 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {
6681
6782 int numDocs = randomIntBetween (10 , 100 );
6883 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
69- //let's make sure that the bulk action limit trips, one single execution will index all the documents
70- .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
71- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
72- .build ()) {
84+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
85+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
86+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
87+ .build ()) {
7388
7489 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
7590
@@ -90,9 +105,9 @@ public void testBulkProcessorFlush() throws Exception {
90105 int numDocs = randomIntBetween (10 , 100 );
91106
92107 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
93- //let's make sure that this bulk won't be automatically flushed
94- .setConcurrentRequests (randomIntBetween (0 , 10 )).setBulkActions (numDocs + randomIntBetween (1 , 100 ))
95- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
108+ //let's make sure that this bulk won't be automatically flushed
109+ .setConcurrentRequests (randomIntBetween (0 , 10 )).setBulkActions (numDocs + randomIntBetween (1 , 100 ))
110+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
96111
97112 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
98113
@@ -125,9 +140,9 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
125140 MultiGetRequest multiGetRequest ;
126141
127142 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
128- .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
129- //set interval and size to high values
130- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
143+ .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
144+ //set interval and size to high values
145+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
131146
132147 multiGetRequest = indexDocs (processor , numDocs );
133148
@@ -165,11 +180,11 @@ public void testBulkProcessorWaitOnClose() throws Exception {
165180
166181 int numDocs = randomIntBetween (10 , 100 );
167182 BulkProcessor processor = initBulkProcessorBuilder (listener )
168- //let's make sure that the bulk action limit trips, one single execution will index all the documents
169- .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
170- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (randomIntBetween (1 , 10 ),
171- RandomPicks .randomFrom (random (), ByteSizeUnit .values ())))
172- .build ();
183+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
184+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
185+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (randomIntBetween (1 , 10 ),
186+ RandomPicks .randomFrom (random (), ByteSizeUnit .values ())))
187+ .build ();
173188
174189 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
175190 assertThat (processor .awaitClose (1 , TimeUnit .MINUTES ), is (true ));
@@ -220,20 +235,20 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
220235 BulkProcessorTestListener listener = new BulkProcessorTestListener (latch , closeLatch );
221236
222237 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
223- .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
224- //set interval and size to high values
225- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
238+ .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
239+ //set interval and size to high values
240+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
226241
227242 for (int i = 1 ; i <= numDocs ; i ++) {
228243 if (randomBoolean ()) {
229244 testDocs ++;
230245 processor .add (new IndexRequest ("test" , "test" , Integer .toString (testDocs ))
231- .source (XContentType .JSON , "field" , "value" ));
246+ .source (XContentType .JSON , "field" , "value" ));
232247 multiGetRequest .add ("test" , "test" , Integer .toString (testDocs ));
233248 } else {
234249 testReadOnlyDocs ++;
235250 processor .add (new IndexRequest ("test-ro" , "test" , Integer .toString (testReadOnlyDocs ))
236- .source (XContentType .JSON , "field" , "value" ));
251+ .source (XContentType .JSON , "field" , "value" ));
237252 }
238253 }
239254 }
@@ -268,23 +283,88 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
268283 assertMultiGetResponse (highLevelClient ().mget (multiGetRequest , RequestOptions .DEFAULT ), testDocs );
269284 }
270285
271- private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs ) throws Exception {
286+ public void testGlobalParametersAndBulkProcessor () throws Exception {
287+ createIndexWithTwoShards ();
288+
289+ final CountDownLatch latch = new CountDownLatch (1 );
290+ BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
291+ createFieldAddingPipleine ("pipeline_id" , "fieldNameXYZ" , "valueXYZ" );
292+
293+ int numDocs = randomIntBetween (10 , 10 );
294+ try (BulkProcessor processor = initBulkProcessorBuilder (listener )
295+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
296+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
297+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
298+ .setDefaultIndex ("test" )
299+ .setDefaultType ("test" )
300+ .setDefaultRouting ("routing" )
301+ .setDefaultPipeline ("pipeline_id" )
302+ .build ()) {
303+
304+ indexDocs (processor , numDocs , null , null , "test" , "test" , "pipeline_id" );
305+ latch .await ();
306+
307+ assertThat (listener .beforeCounts .get (), equalTo (1 ));
308+ assertThat (listener .afterCounts .get (), equalTo (1 ));
309+ assertThat (listener .bulkFailures .size (), equalTo (0 ));
310+ assertResponseItems (listener .bulkItems , numDocs );
311+
312+ Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
313+
314+ assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
315+ assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType ("test" ))));
316+ assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
317+ }
318+ }
319+
320+ @ SuppressWarnings ("unchecked" )
321+ private Matcher <SearchHit >[] expectedIds (int numDocs ) {
322+ return IntStream .rangeClosed (1 , numDocs )
323+ .boxed ()
324+ .map (n -> hasId (n .toString ()))
325+ .<Matcher <SearchHit >>toArray (Matcher []::new );
326+ }
327+
328+ private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs , String localIndex , String localType ,
329+ String globalIndex , String globalType , String globalPipeline ) throws Exception {
272330 MultiGetRequest multiGetRequest = new MultiGetRequest ();
273331 for (int i = 1 ; i <= numDocs ; i ++) {
274332 if (randomBoolean ()) {
275- processor .add (new IndexRequest ("test" , "test" , Integer .toString (i ))
276- .source (XContentType .JSON , "field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )));
333+ processor .add (new IndexRequest (localIndex , localType , Integer .toString (i ))
334+ .source (XContentType .JSON , "field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )));
277335 } else {
278- final String source = "{ \" index\" :{\" _index\" :\" test\" ,\" _type\" :\" test\" ,\" _id\" :\" " + Integer .toString (i ) + "\" } }\n "
279- + Strings .toString (JsonXContent .contentBuilder ()
280- .startObject ().field ("field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )).endObject ()) + "\n " ;
281- processor .add (new BytesArray (source ), null , null , XContentType .JSON );
336+ BytesArray data = bytesBulkRequest (localIndex , localType , i );
337+ processor .add (data , globalIndex , globalType , globalPipeline , null , XContentType .JSON );
282338 }
283- multiGetRequest .add ("test" , "test" , Integer .toString (i ));
339+ multiGetRequest .add (localIndex , localType , Integer .toString (i ));
284340 }
285341 return multiGetRequest ;
286342 }
287343
344+ private static BytesArray bytesBulkRequest (String localIndex , String localType , int id ) throws IOException {
345+ String action = Strings .toString (jsonBuilder ()
346+ .startObject ()
347+ .startObject ("index" )
348+ .field ("_index" , localIndex )
349+ .field ("_type" , localType )
350+ .field ("_id" , Integer .toString (id ))
351+ .endObject ()
352+ .endObject ()
353+ );
354+ String source = Strings .toString (jsonBuilder ()
355+ .startObject ()
356+ .field ("field" , randomRealisticUnicodeOfLengthBetween (1 , 30 ))
357+ .endObject ()
358+ );
359+
360+ String request = action + "\n " + source + "\n " ;
361+ return new BytesArray (request );
362+ }
363+
364+ private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs ) throws Exception {
365+ return indexDocs (processor , numDocs , "test" , "test" , null , null , null );
366+ }
367+
288368 private static void assertResponseItems (List <BulkItemResponse > bulkItemResponses , int numDocs ) {
289369 assertThat (bulkItemResponses .size (), is (numDocs ));
290370 int i = 1 ;
@@ -293,7 +373,7 @@ private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses
293373 assertThat (bulkItemResponse .getType (), equalTo ("test" ));
294374 assertThat (bulkItemResponse .getId (), equalTo (Integer .toString (i ++)));
295375 assertThat ("item " + i + " failed with cause: " + bulkItemResponse .getFailureMessage (),
296- bulkItemResponse .isFailed (), equalTo (false ));
376+ bulkItemResponse .isFailed (), equalTo (false ));
297377 }
298378 }
299379
@@ -343,4 +423,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
343423 }
344424 }
345425
426+
427+ private void createIndexWithTwoShards () throws IOException {
428+ CreateIndexRequest indexRequest = new CreateIndexRequest ("test" );
429+ indexRequest .settings (Settings .builder ()
430+ .put ("index.number_of_shards" , 2 )
431+ .put ("index.number_of_replicas" , 1 )
432+ );
433+ highLevelClient ().indices ().create (indexRequest , RequestOptions .DEFAULT );
434+ }
346435}
0 commit comments