2020
2121import java .util .Map ;
2222import java .util .concurrent .CompletableFuture ;
23+ import java .util .concurrent .CompletionException ;
2324import java .util .concurrent .CompletionStage ;
2425import java .util .function .BiConsumer ;
26+ import java .util .function .BiFunction ;
2527
28+ import org .neo4j .driver .internal .async .InternalStatementResultCursor ;
2629import org .neo4j .driver .internal .async .QueryRunner ;
30+ import org .neo4j .driver .internal .async .ResultCursorsHolder ;
2731import org .neo4j .driver .internal .handlers .BeginTxResponseHandler ;
2832import org .neo4j .driver .internal .handlers .CommitTxResponseHandler ;
2933import org .neo4j .driver .internal .handlers .NoOpResponseHandler ;
3034import org .neo4j .driver .internal .handlers .RollbackTxResponseHandler ;
3135import org .neo4j .driver .internal .spi .Connection ;
36+ import org .neo4j .driver .internal .spi .ResponseHandler ;
3237import org .neo4j .driver .internal .types .InternalTypeSystem ;
3338import org .neo4j .driver .v1 .Record ;
3439import org .neo4j .driver .v1 .Session ;
4348
4449import static java .util .Collections .emptyMap ;
4550import static java .util .concurrent .CompletableFuture .completedFuture ;
51+ import static org .neo4j .driver .internal .util .Futures .completionErrorCause ;
4652import static org .neo4j .driver .internal .util .Futures .failedFuture ;
4753import static org .neo4j .driver .internal .util .Futures .getBlocking ;
4854import static org .neo4j .driver .v1 .Values .value ;
@@ -56,28 +62,36 @@ public class ExplicitTransaction implements Transaction
5662 private enum State
5763 {
5864 /** The transaction is running with no explicit success or failure marked */
59- ACTIVE ,
65+ ACTIVE ( true ) ,
6066
6167 /** Running, user marked for success, meaning it'll value committed */
62- MARKED_SUCCESS ,
68+ MARKED_SUCCESS ( true ) ,
6369
6470 /** User marked as failed, meaning it'll be rolled back. */
65- MARKED_FAILED ,
71+ MARKED_FAILED ( true ) ,
6672
6773 /**
6874 * This transaction has been explicitly terminated by calling {@link Session#reset()}.
6975 */
70- TERMINATED ,
76+ TERMINATED ( false ) ,
7177
7278 /** This transaction has successfully committed */
73- COMMITTED ,
79+ COMMITTED ( false ) ,
7480
7581 /** This transaction has been rolled back */
76- ROLLED_BACK
82+ ROLLED_BACK ( false );
83+
84+ final boolean txOpen ;
85+
86+ State ( boolean txOpen )
87+ {
88+ this .txOpen = txOpen ;
89+ }
7790 }
7891
7992 private final Connection connection ;
8093 private final NetworkSession session ;
94+ private final ResultCursorsHolder resultCursors ;
8195
8296 private volatile Bookmark bookmark = Bookmark .empty ();
8397 private volatile State state = State .ACTIVE ;
@@ -86,6 +100,7 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
86100 {
87101 this .connection = connection ;
88102 this .session = session ;
103+ this .resultCursors = new ResultCursorsHolder ();
89104 }
90105
91106 public CompletionStage <ExplicitTransaction > beginAsync ( Bookmark initialBookmark )
@@ -162,7 +177,9 @@ else if ( state == State.TERMINATED )
162177 }
163178 else
164179 {
165- return doCommitAsync ().whenComplete ( transactionClosed ( State .COMMITTED ) );
180+ return resultCursors .retrieveNotConsumedError ()
181+ .thenCompose ( error -> doCommitAsync ().handle ( handleCommitOrRollback ( error ) ) )
182+ .whenComplete ( transactionClosed ( State .COMMITTED ) );
166183 }
167184 }
168185
@@ -185,38 +202,12 @@ else if ( state == State.TERMINATED )
185202 }
186203 else
187204 {
188- return doRollbackAsync ().whenComplete ( transactionClosed ( State .ROLLED_BACK ) );
205+ return resultCursors .retrieveNotConsumedError ()
206+ .thenCompose ( error -> doRollbackAsync ().handle ( handleCommitOrRollback ( error ) ) )
207+ .whenComplete ( transactionClosed ( State .ROLLED_BACK ) );
189208 }
190209 }
191210
192- private BiConsumer <Void ,Throwable > transactionClosed ( State newState )
193- {
194- return ( ignore , error ) ->
195- {
196- state = newState ;
197- connection .releaseInBackground ();
198- session .setBookmark ( bookmark );
199- };
200- }
201-
202- private CompletionStage <Void > doCommitAsync ()
203- {
204- CompletableFuture <Void > commitFuture = new CompletableFuture <>();
205- connection .runAndFlush ( COMMIT_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE ,
206- new CommitTxResponseHandler ( commitFuture , this ) );
207-
208- return commitFuture .thenRun ( () -> state = State .COMMITTED );
209- }
210-
211- private CompletionStage <Void > doRollbackAsync ()
212- {
213- CompletableFuture <Void > rollbackFuture = new CompletableFuture <>();
214- connection .runAndFlush ( ROLLBACK_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE ,
215- new RollbackTxResponseHandler ( rollbackFuture ) );
216-
217- return rollbackFuture .thenRun ( () -> state = State .ROLLED_BACK );
218- }
219-
220211 @ Override
221212 public StatementResult run ( String statementText , Value statementParameters )
222213 {
@@ -273,23 +264,31 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
273264 @ Override
274265 public StatementResult run ( Statement statement )
275266 {
276- ensureCanRunQueries ();
277- StatementResultCursor cursor = getBlocking ( QueryRunner .runAsBlocking ( connection , statement , this ) );
267+ StatementResultCursor cursor = getBlocking ( run ( statement , false ) );
278268 return new InternalStatementResult ( cursor );
279269 }
280270
281271 @ Override
282272 public CompletionStage <StatementResultCursor > runAsync ( Statement statement )
283273 {
284- ensureCanRunQueries ();
285274 //noinspection unchecked
286- return (CompletionStage ) QueryRunner . runAsAsync ( connection , statement , this );
275+ return (CompletionStage ) run ( statement , true );
287276 }
288277
289- @ Override
290- public boolean isOpen ()
278+ private CompletionStage <InternalStatementResultCursor > run ( Statement statement , boolean asAsync )
291279 {
292- return state != State .COMMITTED && state != State .ROLLED_BACK && state != State .TERMINATED ;
280+ ensureCanRunQueries ();
281+ CompletionStage <InternalStatementResultCursor > cursorStage ;
282+ if ( asAsync )
283+ {
284+ cursorStage = QueryRunner .runAsAsync ( connection , statement , this );
285+ }
286+ else
287+ {
288+ cursorStage = QueryRunner .runAsBlocking ( connection , statement , this );
289+ }
290+ resultCursors .add ( cursorStage );
291+ return cursorStage ;
293292 }
294293
295294 private void ensureCanRunQueries ()
@@ -317,6 +316,12 @@ else if ( state == State.TERMINATED )
317316 }
318317 }
319318
319+ @ Override
320+ public boolean isOpen ()
321+ {
322+ return state .txOpen ;
323+ }
324+
320325 @ Override
321326 public TypeSystem typeSystem ()
322327 {
@@ -340,4 +345,49 @@ public void setBookmark( Bookmark bookmark )
340345 this .bookmark = bookmark ;
341346 }
342347 }
348+
349+ private CompletionStage <Void > doCommitAsync ()
350+ {
351+ CompletableFuture <Void > commitFuture = new CompletableFuture <>();
352+ ResponseHandler pullAllHandler = new CommitTxResponseHandler ( commitFuture , this );
353+ connection .runAndFlush ( COMMIT_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE , pullAllHandler );
354+ return commitFuture ;
355+ }
356+
357+ private CompletionStage <Void > doRollbackAsync ()
358+ {
359+ CompletableFuture <Void > rollbackFuture = new CompletableFuture <>();
360+ ResponseHandler pullAllHandler = new RollbackTxResponseHandler ( rollbackFuture );
361+ connection .runAndFlush ( ROLLBACK_QUERY , emptyMap (), NoOpResponseHandler .INSTANCE , pullAllHandler );
362+ return rollbackFuture ;
363+ }
364+
365+ private BiFunction <Void ,Throwable ,Void > handleCommitOrRollback ( Throwable cursorFailure )
366+ {
367+ return ( ignore , commitOrRollbackError ) ->
368+ {
369+ if ( cursorFailure != null )
370+ {
371+ throw new CompletionException ( completionErrorCause ( cursorFailure ) );
372+ }
373+ else if ( commitOrRollbackError != null )
374+ {
375+ throw new CompletionException ( completionErrorCause ( commitOrRollbackError ) );
376+ }
377+ else
378+ {
379+ return null ;
380+ }
381+ };
382+ }
383+
384+ private BiConsumer <Object ,Throwable > transactionClosed ( State newState )
385+ {
386+ return ( ignore , error ) ->
387+ {
388+ state = newState ;
389+ connection .releaseInBackground ();
390+ session .setBookmark ( bookmark );
391+ };
392+ }
343393}
0 commit comments