3333import org .elasticsearch .cluster .action .index .MappingUpdatedAction ;
3434import org .elasticsearch .cluster .action .shard .ShardStateAction ;
3535import org .elasticsearch .cluster .service .ClusterService ;
36+ import org .elasticsearch .common .UUIDs ;
3637import org .elasticsearch .common .bytes .BytesReference ;
3738import org .elasticsearch .common .compress .CompressedXContent ;
3839import org .elasticsearch .common .inject .Inject ;
@@ -181,20 +182,33 @@ public static void performOnPrimary(
181182
182183 @ Override
183184 protected void doRun () throws Exception {
184- while (context .hasMoreOperationsToExecute ()) {
185- if (executeBulkItemRequest (
186- context ,
187- updateHelper ,
188- nowInMillisSupplier ,
189- mappingUpdater ,
190- waitForMappingUpdate ,
191- ActionListener .wrap (v -> executor .execute (this ), this ::onRejection )
192- ) == false ) {
193- // We are waiting for a mapping update on another thread, that will invoke this action again once its done
194- // so we just break out here.
195- return ;
185+ String uid = UUIDs .base64UUID ();
186+ long transactionId = -1L ;
187+ try {
188+ transactionId = primary .startTransaction (uid );
189+ while (context .hasMoreOperationsToExecute ()) {
190+ if (executeBulkItemRequest (
191+ context ,
192+ updateHelper ,
193+ nowInMillisSupplier ,
194+ mappingUpdater ,
195+ waitForMappingUpdate ,
196+ ActionListener .wrap (v -> executor .execute (this ), this ::onRejection ),
197+ transactionId
198+ ) == false ) {
199+ // We are waiting for a mapping update on another thread, that will invoke this action again once its done
200+ // so we just break out here.
201+ return ;
202+ }
203+ assert context .isInitial (); // either completed and moved to next or reset
196204 }
197- assert context .isInitial (); // either completed and moved to next or reset
205+
206+ primary .commitTransaction (uid , transactionId );
207+ } catch (Exception x ) {
208+ logger .warn ("Encountered an error while executing bulk transaction" , x );
209+ primary .rollbackTransaction (uid , transactionId );
210+ } finally {
211+ primary .closeTransaction (uid , transactionId );
198212 }
199213 primary .getBulkOperationListener ().afterBulk (request .totalSizeInBytes (), System .nanoTime () - startBulkTime );
200214 // We're done, there's no more operations to execute so we resolve the wrapped listener
@@ -206,7 +220,6 @@ public void onRejection(Exception e) {
206220 // We must finish the outstanding request. Finishing the outstanding request can include
207221 // refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
208222 executor .execute (new ActionRunnable <>(listener ) {
209-
210223 @ Override
211224 protected void doRun () {
212225 // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
@@ -250,6 +263,25 @@ private void finishRequest() {
250263 }.run ();
251264 }
252265
266+ static boolean executeBulkItemRequest (
267+ BulkPrimaryExecutionContext context ,
268+ UpdateHelper updateHelper ,
269+ LongSupplier nowInMillisSupplier ,
270+ MappingUpdatePerformer mappingUpdater ,
271+ Consumer <ActionListener <Void >> waitForMappingUpdate ,
272+ ActionListener <Void > itemDoneListener
273+ ) throws Exception {
274+ return executeBulkItemRequest (
275+ context ,
276+ updateHelper ,
277+ nowInMillisSupplier ,
278+ mappingUpdater ,
279+ waitForMappingUpdate ,
280+ itemDoneListener ,
281+ IndexShard .NO_TRANSACTION_ID
282+ );
283+ }
284+
253285 /**
254286 * Executes bulk item requests and handles request execution exceptions.
255287 * @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered
@@ -261,7 +293,8 @@ static boolean executeBulkItemRequest(
261293 LongSupplier nowInMillisSupplier ,
262294 MappingUpdatePerformer mappingUpdater ,
263295 Consumer <ActionListener <Void >> waitForMappingUpdate ,
264- ActionListener <Void > itemDoneListener
296+ ActionListener <Void > itemDoneListener ,
297+ long transactionId
265298 ) throws Exception {
266299 final DocWriteRequest .OpType opType = context .getCurrent ().opType ();
267300
@@ -306,7 +339,8 @@ static boolean executeBulkItemRequest(
306339 request .id (),
307340 request .versionType (),
308341 request .ifSeqNo (),
309- request .ifPrimaryTerm ()
342+ request .ifPrimaryTerm (),
343+ transactionId
310344 );
311345 } else {
312346 final IndexRequest request = context .getRequestToExecute ();
@@ -324,7 +358,8 @@ static boolean executeBulkItemRequest(
324358 request .ifSeqNo (),
325359 request .ifPrimaryTerm (),
326360 request .getAutoGeneratedTimestamp (),
327- request .isRetry ()
361+ request .isRetry (),
362+ transactionId
328363 );
329364 }
330365 if (result .getResultType () == Engine .Result .Type .MAPPING_UPDATE_REQUIRED ) {
@@ -509,7 +544,7 @@ private static BulkItemResponse processUpdateResponse(
509544 protected void dispatchedShardOperationOnReplica (BulkShardRequest request , IndexShard replica , ActionListener <ReplicaResult > listener ) {
510545 ActionListener .completeWith (listener , () -> {
511546 final long startBulkTime = System .nanoTime ();
512- final Translog .Location location = performOnReplica (request , replica );
547+ final Translog .Location location = performOnReplica (request , replica , IndexShard . NO_TRANSACTION_ID );
513548 replica .getBulkOperationListener ().afterBulk (request .totalSizeInBytes (), System .nanoTime () - startBulkTime );
514549 return new WriteReplicaResult <>(request , location , null , replica , logger );
515550 });
@@ -525,7 +560,7 @@ protected int replicaOperationCount(BulkShardRequest request) {
525560 return request .items ().length ;
526561 }
527562
528- public static Translog .Location performOnReplica (BulkShardRequest request , IndexShard replica ) throws Exception {
563+ public static Translog .Location performOnReplica (BulkShardRequest request , IndexShard replica , long transactionId ) throws Exception {
529564 Translog .Location location = null ;
530565 for (int i = 0 ; i < request .items ().length ; i ++) {
531566 final BulkItemRequest item = request .items ()[i ];
@@ -553,7 +588,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
553588 continue ; // ignore replication as it's a noop
554589 }
555590 assert response .getResponse ().getSeqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO ;
556- operationResult = performOpOnReplica (response .getResponse (), item .request (), replica );
591+ operationResult = performOpOnReplica (response .getResponse (), item .request (), replica , transactionId );
557592 }
558593 assert operationResult != null : "operation result must never be null when primary response has no failure" ;
559594 location = syncOperationResultOrThrow (operationResult , location );
@@ -564,7 +599,8 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
564599 private static Engine .Result performOpOnReplica (
565600 DocWriteResponse primaryResponse ,
566601 DocWriteRequest <?> docWriteRequest ,
567- IndexShard replica
602+ IndexShard replica ,
603+ long transactionId
568604 ) throws Exception {
569605 final Engine .Result result ;
570606 switch (docWriteRequest .opType ()) {
@@ -584,7 +620,8 @@ private static Engine.Result performOpOnReplica(
584620 primaryResponse .getVersion (),
585621 indexRequest .getAutoGeneratedTimestamp (),
586622 indexRequest .isRetry (),
587- sourceToParse
623+ sourceToParse ,
624+ transactionId
588625 );
589626 break ;
590627 case DELETE :
@@ -593,7 +630,8 @@ private static Engine.Result performOpOnReplica(
593630 primaryResponse .getSeqNo (),
594631 primaryResponse .getPrimaryTerm (),
595632 primaryResponse .getVersion (),
596- deleteRequest .id ()
633+ deleteRequest .id (),
634+ transactionId
597635 );
598636 break ;
599637 default :
0 commit comments