1010import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
1111import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
1212import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
13- import com .datastax .oss .driver .api .core .session . Session ;
13+ import com .datastax .oss .driver .api .core .metadata . Node ;
1414import com .datastax .oss .driver .api .testinfra .ScyllaOnly ;
1515import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
1616import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
1717import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
18+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
1819import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
1920import com .datastax .oss .driver .internal .core .util .concurrent .Reconnection ;
2021import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableList ;
2122import com .google .common .collect .ImmutableMap ;
2223import com .google .common .collect .ImmutableSet ;
23- import com .google .common .util .concurrent .Uninterruptibles ;
2424import com .tngtech .java .junit .dataprovider .DataProvider ;
2525import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
2626import com .tngtech .java .junit .dataprovider .UseDataProvider ;
2727import java .net .InetSocketAddress ;
2828import java .time .Duration ;
29+ import java .util .Arrays ;
30+ import java .util .Collections ;
2931import java .util .List ;
3032import java .util .Map ;
3133import java .util .Set ;
3234import java .util .concurrent .CompletionStage ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .regex .Pattern ;
37+ import org .awaitility .Awaitility ;
3538import org .junit .After ;
3639import org .junit .Before ;
3740import org .junit .ClassRule ;
@@ -92,6 +95,7 @@ public void stopCapturingLogs() {
9295 @ Test
9396 @ UseDataProvider ("reuseAddressOption" )
9497 public void should_initialize_all_channels (boolean reuseAddress ) {
98+ int expectedChannelsPerNode = 6 ; // Divisible by smp
9599 String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
96100 String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
97101 Pattern reconnectionPattern1 =
@@ -120,15 +124,19 @@ public void should_initialize_all_channels(boolean reuseAddress) {
120124 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
121125 // Due to rounding up the connections per shard this will result in 6 connections per
122126 // node
123- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 4 )
127+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
124128 .build ();
125- try (Session session =
129+ try (CqlSession session =
126130 CqlSession .builder ()
127131 .addContactPoint (
128132 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 19042 ))
129133 .withConfigLoader (loader )
130134 .build ()) {
131- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
135+ List <CqlSession > allSessions = Collections .singletonList (session );
136+ Awaitility .await ()
137+ .atMost (5 , TimeUnit .SECONDS )
138+ .pollInterval (500 , TimeUnit .MILLISECONDS )
139+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
132140 List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
133141 expectedOccurences .forEach (
134142 (pattern , times ) -> assertMatchesExactly (pattern , times , logsCopy ));
@@ -138,20 +146,25 @@ public void should_initialize_all_channels(boolean reuseAddress) {
138146
139147 @ Test
140148 public void should_see_mismatched_shard () {
149+ int expectedChannelsPerNode = 66 ; // Divisible by smp
141150 DriverConfigLoader loader =
142151 SessionUtils .configLoaderBuilder ()
143152 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
144153 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
145154 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
146- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
155+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
147156 .build ();
148- try (Session session =
157+ try (CqlSession session =
149158 CqlSession .builder ()
150159 .addContactPoint (
151160 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 9042 ))
152161 .withConfigLoader (loader )
153162 .build ()) {
154- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
163+ List <CqlSession > allSessions = Collections .singletonList (session );
164+ Awaitility .await ()
165+ .atMost (20 , TimeUnit .SECONDS )
166+ .pollInterval (500 , TimeUnit .MILLISECONDS )
167+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
155168 List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
156169 assertMatchesAtLeast (shardMismatchPattern , 5 , logsCopy );
157170 }
@@ -160,10 +173,11 @@ public void should_see_mismatched_shard() {
160173 // There is no need to run this as a test, but it serves as a comparison
161174 @ SuppressWarnings ("unused" )
162175 public void should_struggle_to_fill_pools () {
176+ int expectedChannelsPerNode = 66 ; // Divisible by smp
163177 DriverConfigLoader loader =
164178 SessionUtils .configLoaderBuilder ()
165179 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , false )
166- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
180+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
167181 .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis (200 ))
168182 .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (4000 ))
169183 .build ();
@@ -180,7 +194,11 @@ public void should_struggle_to_fill_pools() {
180194 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
181195 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
182196 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
183- Uninterruptibles .sleepUninterruptibly (20 , TimeUnit .SECONDS );
197+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
198+ Awaitility .await ()
199+ .atMost (20 , TimeUnit .SECONDS )
200+ .pollInterval (500 , TimeUnit .MILLISECONDS )
201+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
184202 List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
185203 assertNoLogMatches (shardMismatchPattern , logsCopy );
186204 assertMatchesAtLeast (generalReconnectionPattern , 8 , logsCopy );
@@ -189,10 +207,11 @@ public void should_struggle_to_fill_pools() {
189207
190208 @ Test
191209 public void should_not_struggle_to_fill_pools () {
210+ int expectedChannelsPerNode = 66 ;
192211 DriverConfigLoader loader =
193212 SessionUtils .configLoaderBuilder ()
194213 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
195- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
214+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
196215 .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis (10 ))
197216 .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (20 ))
198217 .build ();
@@ -210,7 +229,11 @@ public void should_not_struggle_to_fill_pools() {
210229 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
211230 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
212231 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
213- Uninterruptibles .sleepUninterruptibly (8 , TimeUnit .SECONDS );
232+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
233+ Awaitility .await ()
234+ .atMost (20 , TimeUnit .SECONDS )
235+ .pollInterval (500 , TimeUnit .MILLISECONDS )
236+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
214237 int tolerance = 2 ; // Sometimes socket ends up already in use
215238 String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
216239 String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
@@ -239,6 +262,27 @@ public void should_not_struggle_to_fill_pools() {
239262 }
240263 }
241264
265+ private boolean areAllPoolsFullyInitialized (
266+ List <CqlSession > sessions , int expectedChannelsPerNode ) {
267+ for (CqlSession session : sessions ) {
268+ DefaultSession defaultSession = (DefaultSession ) session ;
269+ Map <Node , ChannelPool > pools = defaultSession .getPools ();
270+ if (pools == null || pools .isEmpty ()) {
271+ return false ;
272+ }
273+
274+ for (ChannelPool pool : pools .values ()) {
275+ if (pool == null ) {
276+ return false ;
277+ }
278+ if (pool .size () < expectedChannelsPerNode ) {
279+ return false ;
280+ }
281+ }
282+ }
283+ return true ;
284+ }
285+
242286 private void assertNoLogMatches (Pattern pattern , List <ILoggingEvent > logs ) {
243287 for (ILoggingEvent log : logs ) {
244288 if (pattern .matcher (log .getFormattedMessage ()).matches ()) {
0 commit comments