1818 */
1919package org .neo4j .driver .internal ;
2020
21+ import java .util .ArrayList ;
22+ import java .util .List ;
2123import java .util .Map ;
2224import java .util .concurrent .CompletableFuture ;
25+ import java .util .concurrent .CompletionException ;
2326import java .util .concurrent .CompletionStage ;
2427import java .util .function .BiConsumer ;
28+ import java .util .function .BiFunction ;
2529
30+ import org .neo4j .driver .internal .async .InternalStatementResultCursor ;
2631import org .neo4j .driver .internal .async .QueryRunner ;
2732import org .neo4j .driver .internal .handlers .BeginTxResponseHandler ;
2833import org .neo4j .driver .internal .handlers .CommitTxResponseHandler ;
2934import org .neo4j .driver .internal .handlers .NoOpResponseHandler ;
3035import org .neo4j .driver .internal .handlers .RollbackTxResponseHandler ;
3136import org .neo4j .driver .internal .spi .Connection ;
37+ import org .neo4j .driver .internal .spi .ResponseHandler ;
3238import org .neo4j .driver .internal .types .InternalTypeSystem ;
39+ import org .neo4j .driver .internal .util .Futures ;
3340import org .neo4j .driver .v1 .Record ;
3441import org .neo4j .driver .v1 .Session ;
3542import org .neo4j .driver .v1 .Statement ;
4350
4451import static java .util .Collections .emptyMap ;
4552import static java .util .concurrent .CompletableFuture .completedFuture ;
53+ import static org .neo4j .driver .internal .util .Futures .completionErrorCause ;
4654import static org .neo4j .driver .internal .util .Futures .failedFuture ;
4755import static org .neo4j .driver .internal .util .Futures .getBlocking ;
4856import static org .neo4j .driver .v1 .Values .value ;
@@ -86,6 +94,7 @@ private enum State
8694 private final Connection connection ;
8795 private final NetworkSession session ;
8896
97+ private final List <CompletionStage <InternalStatementResultCursor >> resultCursors = new ArrayList <>();
8998 private volatile Bookmark bookmark = Bookmark .empty ();
9099 private volatile State state = State .ACTIVE ;
91100
@@ -169,7 +178,9 @@ else if ( state == State.TERMINATED )
169178 }
170179 else
171180 {
172- return doCommitAsync ().whenComplete ( transactionClosed ( State .COMMITTED ) );
181+ return receiveFailures ()
182+ .thenCompose ( failure -> doCommitAsync ().handle ( handleCommitOrRollback ( failure ) ) )
183+ .whenComplete ( transactionClosed ( State .COMMITTED ) );
173184 }
174185 }
175186
@@ -192,38 +203,12 @@ else if ( state == State.TERMINATED )
192203 }
193204 else
194205 {
195- return doRollbackAsync ().whenComplete ( transactionClosed ( State .ROLLED_BACK ) );
206+ return receiveFailures ()
207+ .thenCompose ( failure -> doRollbackAsync ().handle ( handleCommitOrRollback ( failure ) ) )
208+ .whenComplete ( transactionClosed ( State .ROLLED_BACK ) );
196209 }
197210 }
198211
199- private BiConsumer <Void ,Throwable > transactionClosed ( State newState )
200- {
201- return ( ignore , error ) ->
202- {
203- state = newState ;
204- connection .releaseInBackground ();
205- session .setBookmark ( bookmark );
206- };
207- }
208-
209- private CompletionStage <Void > doCommitAsync ()
210- {
211- CompletableFuture <Void > commitFuture = new CompletableFuture <>();
212- connection .runAndFlush ( COMMIT_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE ,
213- new CommitTxResponseHandler ( commitFuture , this ) );
214-
215- return commitFuture .thenRun ( () -> state = State .COMMITTED );
216- }
217-
218- private CompletionStage <Void > doRollbackAsync ()
219- {
220- CompletableFuture <Void > rollbackFuture = new CompletableFuture <>();
221- connection .runAndFlush ( ROLLBACK_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE ,
222- new RollbackTxResponseHandler ( rollbackFuture ) );
223-
224- return rollbackFuture .thenRun ( () -> state = State .ROLLED_BACK );
225- }
226-
227212 @ Override
228213 public StatementResult run ( String statementText , Value statementParameters )
229214 {
@@ -280,23 +265,31 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
280265 @ Override
281266 public StatementResult run ( Statement statement )
282267 {
283- ensureCanRunQueries ();
284- StatementResultCursor cursor = getBlocking ( QueryRunner .runAsBlocking ( connection , statement , this ) );
268+ StatementResultCursor cursor = getBlocking ( run ( statement , false ) );
285269 return new InternalStatementResult ( cursor );
286270 }
287271
288272 @ Override
289273 public CompletionStage <StatementResultCursor > runAsync ( Statement statement )
290274 {
291- ensureCanRunQueries ();
292275 //noinspection unchecked
293- return (CompletionStage ) QueryRunner . runAsAsync ( connection , statement , this );
276+ return (CompletionStage ) run ( statement , true );
294277 }
295278
296- @ Override
297- public boolean isOpen ()
279+ private CompletionStage <InternalStatementResultCursor > run ( Statement statement , boolean asAsync )
298280 {
299- return state .txOpen ;
281+ ensureCanRunQueries ();
282+ CompletionStage <InternalStatementResultCursor > result ;
283+ if ( asAsync )
284+ {
285+ result = QueryRunner .runAsAsync ( connection , statement , this );
286+ }
287+ else
288+ {
289+ result = QueryRunner .runAsBlocking ( connection , statement , this );
290+ }
291+ resultCursors .add ( result );
292+ return result ;
300293 }
301294
302295 private void ensureCanRunQueries ()
@@ -324,6 +317,12 @@ else if ( state == State.TERMINATED )
324317 }
325318 }
326319
320+ @ Override
321+ public boolean isOpen ()
322+ {
323+ return state .txOpen ;
324+ }
325+
327326 @ Override
328327 public TypeSystem typeSystem ()
329328 {
@@ -347,4 +346,56 @@ public void setBookmark( Bookmark bookmark )
347346 this .bookmark = bookmark ;
348347 }
349348 }
349+
350+ private CompletionStage <Void > doCommitAsync ()
351+ {
352+ CompletableFuture <Void > commitFuture = new CompletableFuture <>();
353+ ResponseHandler pullAllHandler = new CommitTxResponseHandler ( commitFuture , this );
354+ connection .runAndFlush ( COMMIT_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE , pullAllHandler );
355+ return commitFuture ;
356+ }
357+
358+ private CompletionStage <Void > doRollbackAsync ()
359+ {
360+ CompletableFuture <Void > rollbackFuture = new CompletableFuture <>();
361+ ResponseHandler pullAllHandler = new RollbackTxResponseHandler ( rollbackFuture );
362+ connection .runAndFlush ( ROLLBACK_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE , pullAllHandler );
363+ return rollbackFuture ;
364+ }
365+
366+ private BiFunction <Void ,Throwable ,Void > handleCommitOrRollback ( Throwable cursorFailure )
367+ {
368+ return ( ignore , commitOrRollbackError ) ->
369+ {
370+ if ( cursorFailure != null )
371+ {
372+ throw new CompletionException ( completionErrorCause ( cursorFailure ) );
373+ }
374+ else if ( commitOrRollbackError != null )
375+ {
376+ throw new CompletionException ( completionErrorCause ( commitOrRollbackError ) );
377+ }
378+ else
379+ {
380+ return null ;
381+ }
382+ };
383+ }
384+
385+ private BiConsumer <Object ,Throwable > transactionClosed ( State newState )
386+ {
387+ return ( ignore , error ) ->
388+ {
389+ state = newState ;
390+ connection .releaseInBackground ();
391+ session .setBookmark ( bookmark );
392+ };
393+ }
394+
395+ private CompletionStage <Throwable > receiveFailures ()
396+ {
397+ return resultCursors .stream ()
398+ .map ( stage -> stage .thenCompose ( InternalStatementResultCursor ::failureAsync ) )
399+ .reduce ( completedFuture ( null ), Futures ::firstNotNull );
400+ }
350401}
0 commit comments