88import java .util .concurrent .CompletionStage ;
99import java .util .concurrent .CountDownLatch ;
1010import java .util .concurrent .TimeUnit ;
11+ import java .util .function .BiFunction ;
1112
1213import org .hibernate .SessionFactory ;
1314import org .hibernate .boot .registry .StandardServiceRegistry ;
1415import org .hibernate .boot .registry .StandardServiceRegistryBuilder ;
1516import org .hibernate .cfg .Configuration ;
17+ import org .hibernate .reactive .annotations .DisabledFor ;
18+ import org .hibernate .reactive .containers .DatabaseConfiguration ;
1619import org .hibernate .reactive .provider .ReactiveServiceRegistryBuilder ;
1720import org .hibernate .reactive .stage .Stage ;
1821import org .hibernate .reactive .util .impl .CompletionStages ;
1922import org .hibernate .reactive .vertx .VertxInstance ;
2023
21- import org .junit .jupiter .api .AfterAll ;
22- import org .junit .jupiter .api .BeforeAll ;
24+ import org .junit .jupiter .api .AfterEach ;
25+ import org .junit .jupiter .api .BeforeEach ;
26+ import org .junit .jupiter .api .Disabled ;
2327import org .junit .jupiter .api .Test ;
2428import org .junit .jupiter .api .TestInstance ;
2529import org .junit .jupiter .api .extension .ExtendWith ;
4145import static org .assertj .core .api .Assertions .fail ;
4246import static org .hibernate .cfg .AvailableSettings .SHOW_SQL ;
4347import static org .hibernate .reactive .BaseReactiveTest .setDefaultProperties ;
48+ import static org .hibernate .reactive .containers .DatabaseConfiguration .DBType .DB2 ;
4449import static org .hibernate .reactive .provider .Settings .POOL_CONNECT_TIMEOUT ;
4550import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4651import static org .hibernate .reactive .util .impl .CompletionStages .loop ;
@@ -101,8 +106,8 @@ public class MultithreadedInsertionWithLazyConnectionTest {
101106 private static Vertx vertx ;
102107 private static SessionFactory sessionFactory ;
103108
104- @ BeforeAll
105- public static void setupSessionFactory () {
109+ @ BeforeEach
110+ public void setupSessionFactory () {
106111 vertx = Vertx .vertx ( getVertxOptions () );
107112 Configuration configuration = new Configuration ();
108113 setDefaultProperties ( configuration );
@@ -130,8 +135,8 @@ private static VertxOptions getVertxOptions() {
130135 return vertxOptions ;
131136 }
132137
133- @ AfterAll
134- public static void closeSessionFactory () {
138+ @ AfterEach
139+ public void closeSessionFactory () {
135140 stageSessionFactory .close ();
136141 }
137142
@@ -140,8 +145,33 @@ public void testIdentityGenerator(VertxTestContext context) {
140145 final DeploymentOptions deploymentOptions = new DeploymentOptions ();
141146 deploymentOptions .setInstances ( N_THREADS );
142147
148+ // We are not using transactions on purpose here, because this approach will cause a context switch
149+ // and an assertion error if things aren't handled correctly.
150+ // See Hibernate Reactive issue #2768: https://github.com/hibernate/hibernate-reactive/issues/2768
143151 vertx
144- .deployVerticle ( InsertEntitiesVerticle ::new , deploymentOptions )
152+ .deployVerticle ( () -> new InsertEntitiesVerticle ( (s , entity ) -> s
153+ .persist ( entity )
154+ .thenCompose ( v -> s .flush () )
155+ .thenAccept ( v -> s .clear () ) ), deploymentOptions
156+ )
157+ .onSuccess ( res -> {
158+ endLatch .waitForEveryone ();
159+ context .completeNow ();
160+ } )
161+ .onFailure ( context ::failNow )
162+ .eventually ( () -> vertx .close () );
163+ }
164+
165+ @ Test
166+ @ DisabledFor (value = DB2 , reason = "Exception: IllegalStateException: Needed to have 6 in buffer but only had 0" )
167+ public void testIdentityGeneratorWithTransaction (VertxTestContext context ) {
168+ final DeploymentOptions deploymentOptions = new DeploymentOptions ();
169+ deploymentOptions .setInstances ( N_THREADS );
170+ vertx
171+ .deployVerticle (
172+ () -> new InsertEntitiesVerticle ( (s , entity ) -> s
173+ .withTransaction ( t -> s .persist ( entity ) ) ), deploymentOptions
174+ )
145175 .onSuccess ( res -> {
146176 endLatch .waitForEveryone ();
147177 context .completeNow ();
@@ -152,9 +182,12 @@ public void testIdentityGenerator(VertxTestContext context) {
152182
153183 private static class InsertEntitiesVerticle extends AbstractVerticle {
154184
185+ final BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ;
186+
155187 int sequentialOperation = 0 ;
156188
157- public InsertEntitiesVerticle () {
189+ public InsertEntitiesVerticle (BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ) {
190+ this .insertFun = insertFun ;
158191 }
159192
160193 @ Override
@@ -196,9 +229,8 @@ private CompletionStage<Void> storeEntity(Stage.Session s) {
196229 final int localVerticleOperationSequence = sequentialOperation ++;
197230 final EntityWithGeneratedId entity = new EntityWithGeneratedId ();
198231 entity .name = beforeOperationThread + "__" + localVerticleOperationSequence ;
199-
200- return s
201- .withTransaction ( t -> s .persist ( entity ) )
232+ return insertFun
233+ .apply ( s , entity )
202234 .thenCompose ( v -> beforeOperationThread != Thread .currentThread ()
203235 ? failedFuture ( new IllegalStateException ( "Detected an unexpected switch of carrier threads!" ) )
204236 : voidFuture () );
0 commit comments