2828import org .elasticsearch .action .get .MultiGetRequest ;
2929import org .elasticsearch .action .get .MultiGetResponse ;
3030import org .elasticsearch .action .index .IndexRequest ;
31+ import org .elasticsearch .action .search .SearchRequest ;
3132import org .elasticsearch .common .Strings ;
3233import org .elasticsearch .common .bytes .BytesArray ;
3334import org .elasticsearch .common .unit .ByteSizeUnit ;
3435import org .elasticsearch .common .unit .ByteSizeValue ;
3536import org .elasticsearch .common .unit .TimeValue ;
3637import org .elasticsearch .common .xcontent .XContentType ;
37- import org .elasticsearch .common .xcontent .json .JsonXContent ;
38+ import org .elasticsearch .search .SearchHit ;
39+ import org .hamcrest .Matcher ;
40+ import org .hamcrest .Matchers ;
3841
42+ import java .io .IOException ;
3943import java .util .Arrays ;
4044import java .util .HashSet ;
4145import java .util .List ;
4448import java .util .concurrent .CountDownLatch ;
4549import java .util .concurrent .TimeUnit ;
4650import java .util .concurrent .atomic .AtomicInteger ;
47-
51+ import java .util .stream .IntStream ;
52+
53+ import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
54+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .fieldFromSource ;
55+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasId ;
56+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasIndex ;
57+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasProperty ;
58+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .hasType ;
4859import static org .hamcrest .Matchers .both ;
60+ import static org .hamcrest .Matchers .containsInAnyOrder ;
4961import static org .hamcrest .Matchers .either ;
5062import static org .hamcrest .Matchers .equalTo ;
63+ import static org .hamcrest .Matchers .everyItem ;
5164import static org .hamcrest .Matchers .greaterThan ;
5265import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
5366import static org .hamcrest .Matchers .is ;
@@ -57,7 +70,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
5770
5871 private static BulkProcessor .Builder initBulkProcessorBuilder (BulkProcessor .Listener listener ) {
5972 return BulkProcessor .builder (
60- (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT , bulkListener ), listener );
73+ (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT , bulkListener ), listener );
6174 }
6275
6376 public void testThatBulkProcessorCountIsCorrect () throws Exception {
@@ -66,10 +79,10 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {
6679
6780 int numDocs = randomIntBetween (10 , 100 );
6881 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
69- //let's make sure that the bulk action limit trips, one single execution will index all the documents
70- .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
71- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
72- .build ()) {
82+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
83+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
84+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
85+ .build ()) {
7386
7487 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
7588
@@ -90,9 +103,9 @@ public void testBulkProcessorFlush() throws Exception {
90103 int numDocs = randomIntBetween (10 , 100 );
91104
92105 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
93- //let's make sure that this bulk won't be automatically flushed
94- .setConcurrentRequests (randomIntBetween (0 , 10 )).setBulkActions (numDocs + randomIntBetween (1 , 100 ))
95- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
106+ //let's make sure that this bulk won't be automatically flushed
107+ .setConcurrentRequests (randomIntBetween (0 , 10 )).setBulkActions (numDocs + randomIntBetween (1 , 100 ))
108+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
96109
97110 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
98111
@@ -125,9 +138,9 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
125138 MultiGetRequest multiGetRequest ;
126139
127140 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
128- .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
129- //set interval and size to high values
130- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
141+ .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
142+ //set interval and size to high values
143+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
131144
132145 multiGetRequest = indexDocs (processor , numDocs );
133146
@@ -165,11 +178,11 @@ public void testBulkProcessorWaitOnClose() throws Exception {
165178
166179 int numDocs = randomIntBetween (10 , 100 );
167180 BulkProcessor processor = initBulkProcessorBuilder (listener )
168- //let's make sure that the bulk action limit trips, one single execution will index all the documents
169- .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
170- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (randomIntBetween (1 , 10 ),
171- RandomPicks .randomFrom (random (), ByteSizeUnit .values ())))
172- .build ();
181+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
182+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
183+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (randomIntBetween (1 , 10 ),
184+ RandomPicks .randomFrom (random (), ByteSizeUnit .values ())))
185+ .build ();
173186
174187 MultiGetRequest multiGetRequest = indexDocs (processor , numDocs );
175188 assertThat (processor .awaitClose (1 , TimeUnit .MINUTES ), is (true ));
@@ -220,20 +233,20 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
220233 BulkProcessorTestListener listener = new BulkProcessorTestListener (latch , closeLatch );
221234
222235 try (BulkProcessor processor = initBulkProcessorBuilder (listener )
223- .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
224- //set interval and size to high values
225- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
236+ .setConcurrentRequests (concurrentRequests ).setBulkActions (bulkActions )
237+ //set interval and size to high values
238+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB )).build ()) {
226239
227240 for (int i = 1 ; i <= numDocs ; i ++) {
228241 if (randomBoolean ()) {
229242 testDocs ++;
230243 processor .add (new IndexRequest ("test" , "test" , Integer .toString (testDocs ))
231- .source (XContentType .JSON , "field" , "value" ));
244+ .source (XContentType .JSON , "field" , "value" ));
232245 multiGetRequest .add ("test" , "test" , Integer .toString (testDocs ));
233246 } else {
234247 testReadOnlyDocs ++;
235248 processor .add (new IndexRequest ("test-ro" , "test" , Integer .toString (testReadOnlyDocs ))
236- .source (XContentType .JSON , "field" , "value" ));
249+ .source (XContentType .JSON , "field" , "value" ));
237250 }
238251 }
239252 }
@@ -268,23 +281,86 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
268281 assertMultiGetResponse (highLevelClient ().mget (multiGetRequest , RequestOptions .DEFAULT ), testDocs );
269282 }
270283
271- private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs ) throws Exception {
284+ public void testGlobalParametersAndBulkProcessor () throws Exception {
285+ final CountDownLatch latch = new CountDownLatch (1 );
286+ BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
287+ createFieldAddingPipleine ("pipeline_id" , "fieldNameXYZ" , "valueXYZ" );
288+
289+ int numDocs = randomIntBetween (10 , 10 );
290+ try (BulkProcessor processor = initBulkProcessorBuilder (listener )
291+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
292+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
293+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
294+ .setDefaultIndex ("test" )
295+ .setDefaultType ("test" )
296+ .setDefaultRouting ("routing" )
297+ .setDefaultPipeline ("pipeline_id" )
298+ .build ()) {
299+
300+ indexDocs (processor , numDocs , null , null , "test" , "test" , "pipeline_id" );
301+ latch .await ();
302+
303+ assertThat (listener .beforeCounts .get (), equalTo (1 ));
304+ assertThat (listener .afterCounts .get (), equalTo (1 ));
305+ assertThat (listener .bulkFailures .size (), equalTo (0 ));
306+ assertResponseItems (listener .bulkItems , numDocs );
307+
308+ Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
309+
310+ assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
311+ assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType ("test" ))));
312+ assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
313+ }
314+ }
315+
316+ @ SuppressWarnings ("unchecked" )
317+ private Matcher <SearchHit >[] expectedIds (int numDocs ) {
318+ return IntStream .rangeClosed (1 , numDocs )
319+ .boxed ()
320+ .map (n -> hasId (n .toString ()))
321+ .<Matcher <SearchHit >>toArray (Matcher []::new );
322+ }
323+
324+ private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs , String localIndex , String localType ,
325+ String globalIndex , String globalType , String globalPipeline ) throws Exception {
272326 MultiGetRequest multiGetRequest = new MultiGetRequest ();
273327 for (int i = 1 ; i <= numDocs ; i ++) {
274328 if (randomBoolean ()) {
275- processor .add (new IndexRequest ("test" , "test" , Integer .toString (i ))
276- .source (XContentType .JSON , "field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )));
329+ processor .add (new IndexRequest (localIndex , localType , Integer .toString (i ))
330+ .source (XContentType .JSON , "field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )));
277331 } else {
278- final String source = "{ \" index\" :{\" _index\" :\" test\" ,\" _type\" :\" test\" ,\" _id\" :\" " + Integer .toString (i ) + "\" } }\n "
279- + Strings .toString (JsonXContent .contentBuilder ()
280- .startObject ().field ("field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )).endObject ()) + "\n " ;
281- processor .add (new BytesArray (source ), null , null , XContentType .JSON );
332+ BytesArray data = bytesBulkRequest (localIndex , localType , i );
333+ processor .add (data , globalIndex , globalType , globalPipeline , null , XContentType .JSON );
282334 }
283- multiGetRequest .add ("test" , "test" , Integer .toString (i ));
335+ multiGetRequest .add (localIndex , localType , Integer .toString (i ));
284336 }
285337 return multiGetRequest ;
286338 }
287339
340+ private static BytesArray bytesBulkRequest (String localIndex , String localType , int id ) throws IOException {
341+ String action = Strings .toString (jsonBuilder ()
342+ .startObject ()
343+ .startObject ("index" )
344+ .field ("_index" , localIndex )
345+ .field ("_type" , localType )
346+ .field ("_id" , Integer .toString (id ))
347+ .endObject ()
348+ .endObject ()
349+ );
350+ String source = Strings .toString (jsonBuilder ()
351+ .startObject ()
352+ .field ("field" , randomRealisticUnicodeOfLengthBetween (1 , 30 ))
353+ .endObject ()
354+ );
355+
356+ String request = action + "\n " + source + "\n " ;
357+ return new BytesArray (request );
358+ }
359+
360+ private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs ) throws Exception {
361+ return indexDocs (processor , numDocs , "test" , "test" , null , null , null );
362+ }
363+
288364 private static void assertResponseItems (List <BulkItemResponse > bulkItemResponses , int numDocs ) {
289365 assertThat (bulkItemResponses .size (), is (numDocs ));
290366 int i = 1 ;
@@ -293,7 +369,7 @@ private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses
293369 assertThat (bulkItemResponse .getType (), equalTo ("test" ));
294370 assertThat (bulkItemResponse .getId (), equalTo (Integer .toString (i ++)));
295371 assertThat ("item " + i + " failed with cause: " + bulkItemResponse .getFailureMessage (),
296- bulkItemResponse .isFailed (), equalTo (false ));
372+ bulkItemResponse .isFailed (), equalTo (false ));
297373 }
298374 }
299375
0 commit comments