1
+ use crate :: utils:: setup_tracing;
2
+ use crate :: utils:: unique_keyspace_name;
3
+ use crate :: utils:: PerformDDL ;
4
+ use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
1
5
use scylla:: batch:: Batch ;
2
6
use scylla:: batch:: BatchType ;
3
7
use scylla:: client:: session:: Session ;
@@ -9,13 +13,10 @@ use scylla::query::Query;
9
13
use scylla:: value:: { Counter , CqlValue , MaybeUnset } ;
10
14
use std:: collections:: HashMap ;
11
15
use std:: string:: String ;
12
-
13
- use crate :: utils:: setup_tracing;
14
- use crate :: utils:: unique_keyspace_name;
15
- use crate :: utils:: PerformDDL ;
16
- use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
16
+ use std:: sync:: Arc ;
17
17
18
18
use assert_matches:: assert_matches;
19
+ use scylla:: response:: query_result:: { QueryResult , QueryRowsResult } ;
19
20
20
21
const BATCH_COUNT : usize = 100 ;
21
22
@@ -25,6 +26,8 @@ async fn create_test_session(table_name: &str, supports_tablets: bool) -> Sessio
25
26
26
27
let mut create_ks = format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ;
27
28
29
+ // Need to disable tablets in this test because they don't support counters yet.
30
+ // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e).
28
31
if !supports_tablets && scylla_supports_tablets ( & session) . await {
29
32
create_ks += " AND TABLETS = {'enabled': false}"
30
33
}
@@ -142,6 +145,145 @@ async fn batch_statements_and_values_mismatch_detected() {
142
145
}
143
146
}
144
147
148
+ #[ tokio:: test]
149
+ async fn test_batch ( ) {
150
+ setup_tracing ( ) ;
151
+ let session = Arc :: new ( create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ) ;
152
+ let ks = unique_keyspace_name ( ) ;
153
+
154
+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
155
+ session
156
+ . ddl ( format ! (
157
+ "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))" ,
158
+ ks
159
+ ) )
160
+ . await
161
+ . unwrap ( ) ;
162
+
163
+ let prepared_statement = session
164
+ . prepare ( format ! (
165
+ "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" ,
166
+ ks
167
+ ) )
168
+ . await
169
+ . unwrap ( ) ;
170
+
171
+ // TODO: Add API that supports binding values to statements in batch creation process,
172
+ // to avoid problem of statements/values count mismatch
173
+ use scylla:: batch:: Batch ;
174
+ let mut batch: Batch = Default :: default ( ) ;
175
+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" , ks) [ ..] ) ;
176
+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')" , ks) [ ..] ) ;
177
+ batch. append_statement ( prepared_statement. clone ( ) ) ;
178
+
179
+ let four_value: i32 = 4 ;
180
+ let hello_value: String = String :: from ( "hello" ) ;
181
+ let session_clone = session. clone ( ) ;
182
+ // We're spawning to a separate task here to test that it works even in that case, because in some scenarios
183
+ // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries)
184
+ // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on
185
+ // some lifetime for some unknown reason), so can't be spawned on tokio.
186
+ // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details
187
+ tokio:: spawn ( async move {
188
+ let values = (
189
+ ( 1_i32 , 2_i32 , "abc" ) ,
190
+ ( ) ,
191
+ ( 1_i32 , & four_value, hello_value. as_str ( ) ) ,
192
+ ) ;
193
+ session_clone. batch ( & batch, values) . await . unwrap ( ) ;
194
+ } )
195
+ . await
196
+ . unwrap ( ) ;
197
+
198
+ let mut results: Vec < ( i32 , i32 , String ) > = session
199
+ . query_unpaged ( format ! ( "SELECT a, b, c FROM {}.t_batch" , ks) , & [ ] )
200
+ . await
201
+ . unwrap ( )
202
+ . into_rows_result ( )
203
+ . unwrap ( )
204
+ . rows :: < ( i32 , i32 , String ) > ( )
205
+ . unwrap ( )
206
+ . collect :: < Result < _ , _ > > ( )
207
+ . unwrap ( ) ;
208
+
209
+ results. sort ( ) ;
210
+ assert_eq ! (
211
+ results,
212
+ vec![
213
+ ( 1 , 2 , String :: from( "abc" ) ) ,
214
+ ( 1 , 4 , String :: from( "hello" ) ) ,
215
+ ( 7 , 11 , String :: from( "" ) )
216
+ ]
217
+ ) ;
218
+
219
+ // Test repreparing statement inside a batch
220
+ let mut batch: Batch = Default :: default ( ) ;
221
+ batch. append_statement ( prepared_statement) ;
222
+ let values = ( ( 4_i32 , 20_i32 , "foobar" ) , ) ;
223
+
224
+ // This statement flushes the prepared statement cache
225
+ session
226
+ . ddl ( format ! (
227
+ "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42" ,
228
+ ks
229
+ ) )
230
+ . await
231
+ . unwrap ( ) ;
232
+ session. batch ( & batch, values) . await . unwrap ( ) ;
233
+
234
+ let results: Vec < ( i32 , i32 , String ) > = session
235
+ . query_unpaged (
236
+ format ! ( "SELECT a, b, c FROM {}.t_batch WHERE a = 4" , ks) ,
237
+ & [ ] ,
238
+ )
239
+ . await
240
+ . unwrap ( )
241
+ . into_rows_result ( )
242
+ . unwrap ( )
243
+ . rows :: < ( i32 , i32 , String ) > ( )
244
+ . unwrap ( )
245
+ . collect :: < Result < _ , _ > > ( )
246
+ . unwrap ( ) ;
247
+
248
+ assert_eq ! ( results, vec![ ( 4 , 20 , String :: from( "foobar" ) ) ] ) ;
249
+ }
250
+
251
+ // This is a regression test for #1134.
252
+ #[ tokio:: test]
253
+ async fn test_batch_to_multiple_tables ( ) {
254
+ setup_tracing ( ) ;
255
+ let session = create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ;
256
+ let ks = unique_keyspace_name ( ) ;
257
+
258
+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
259
+ session. use_keyspace ( & ks, true ) . await . unwrap ( ) ;
260
+ session
261
+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))" )
262
+ . await
263
+ . unwrap ( ) ;
264
+ session
265
+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))" )
266
+ . await
267
+ . unwrap ( ) ;
268
+
269
+ let prepared_statement = session
270
+ . prepare (
271
+ "
272
+ BEGIN BATCH
273
+ INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?);
274
+ INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?);
275
+ APPLY BATCH;
276
+ " ,
277
+ )
278
+ . await
279
+ . unwrap ( ) ;
280
+
281
+ session
282
+ . execute_unpaged ( & prepared_statement, ( 1 , 2 , "ala" , 4 , 5 , "ma" ) )
283
+ . await
284
+ . unwrap ( ) ;
285
+ }
286
+
145
287
#[ tokio:: test]
146
288
async fn test_batch_of_simple_statements ( ) {
147
289
setup_tracing ( ) ;
@@ -324,34 +466,144 @@ async fn test_batch_of_mixed_bound_and_simple_statements() {
324
466
verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
325
467
}
326
468
469
+ // Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied.
327
470
#[ tokio:: test]
328
- async fn test_cas_batch ( ) {
329
- setup_tracing ( ) ;
330
- let test_name = String :: from ( "test_cas_batch" ) ;
471
+ async fn test_batch_lwts ( ) {
472
+ let test_name = String :: from ( "test_counter_batch" ) ;
331
473
let session = create_test_session ( & test_name, false ) . await ;
332
474
333
- let query_str = format ! (
334
- "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS" ,
335
- & test_name
336
- ) ;
337
- let prepared = session. prepare ( Query :: new ( query_str) ) . await . unwrap ( ) ;
475
+ session
476
+ . query_unpaged (
477
+ format ! ( "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)" , & test_name) ,
478
+ ( & test_name, 0 , 0 ) ,
479
+ )
480
+ . await
481
+ . unwrap ( ) ;
482
+
338
483
let mut batch = Batch :: new ( BatchType :: Unlogged ) ;
339
- let mut batch_values = Vec :: with_capacity ( BATCH_COUNT ) ;
340
- for i in 0 ..BATCH_COUNT as i32 {
341
- batch. append_statement ( prepared. clone ( ) ) ;
342
- batch_values. push ( ( & test_name, i, i + 1 ) ) ;
484
+ batch. append_statement (
485
+ format ! (
486
+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
487
+ & test_name, & test_name
488
+ )
489
+ . as_str ( ) ,
490
+ ) ;
491
+ batch. append_statement (
492
+ format ! (
493
+ "INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)" ,
494
+ & test_name, & test_name
495
+ )
496
+ . as_str ( ) ,
497
+ ) ;
498
+ batch. append_statement (
499
+ format ! (
500
+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
501
+ & test_name, & test_name
502
+ )
503
+ . as_str ( ) ,
504
+ ) ;
505
+
506
+ let batch_res: QueryResult = session. batch ( & batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
507
+ let batch_deserializer = batch_res. into_rows_result ( ) . unwrap ( ) ;
508
+
509
+ // Scylla returns 4 columns, but Cassandra returns only 1
510
+ let is_scylla: bool = batch_deserializer. column_specs ( ) . len ( ) == 4 ;
511
+
512
+ if is_scylla {
513
+ test_batch_lwts_for_scylla ( & session, & batch, & batch_deserializer, & test_name) . await ;
514
+ } else {
515
+ test_batch_lwts_for_cassandra ( & session, & batch, & batch_deserializer, & test_name) . await ;
343
516
}
344
- let result = session. batch ( & batch, batch_values. clone ( ) ) . await . unwrap ( ) ;
345
- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
346
- result. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
347
- assert ! ( row. 0 , "First CAS batch should be applied" ) ;
517
+ }
348
518
349
- verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
519
+ async fn test_batch_lwts_for_scylla (
520
+ session : & Session ,
521
+ batch : & Batch ,
522
+ query_rows_result : & QueryRowsResult ,
523
+ k0_value : & str ,
524
+ ) {
525
+ // Alias required by clippy
526
+ type IntOrNull = Option < i32 > ;
527
+ type StrOrNull = Option < String > ;
528
+
529
+ // Returned columns are:
530
+ // [applied], k0, k1, v
531
+ let batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = query_rows_result
532
+ . rows ( )
533
+ . unwrap ( )
534
+ . collect :: < Result < _ , _ > > ( )
535
+ . unwrap ( ) ;
536
+
537
+ let k0_value = k0_value. to_string ( ) ;
538
+ let expected_batch_res_rows = vec ! [
539
+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
540
+ ( true , None , None , None ) ,
541
+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
542
+ ] ;
543
+
544
+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
545
+
546
+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
547
+ let prepared_batch_res: QueryResult =
548
+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
549
+
550
+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
551
+ . into_rows_result ( )
552
+ . unwrap ( )
553
+ . rows ( )
554
+ . unwrap ( )
555
+ . map ( |r| r. unwrap ( ) )
556
+ . collect ( ) ;
557
+
558
+ let expected_prepared_batch_res_rows = vec ! [
559
+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
560
+ ( false , None , None , None ) ,
561
+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
562
+ ] ;
563
+
564
+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
565
+ }
566
+
567
+ async fn test_batch_lwts_for_cassandra (
568
+ session : & Session ,
569
+ batch : & Batch ,
570
+ query_rows_result : & QueryRowsResult ,
571
+ k0_value : & str ,
572
+ ) {
573
+ // Alias required by clippy
574
+ type IntOrNull = Option < i32 > ;
575
+ type StrOrNull = Option < String > ;
576
+
577
+ // Returned columns are:
578
+ // [applied]
579
+ let batch_res_rows: Vec < ( bool , ) > = query_rows_result
580
+ . rows ( )
581
+ . unwrap ( )
582
+ . map ( |r| r. unwrap ( ) )
583
+ . collect ( ) ;
584
+
585
+ let expected_batch_res_rows = vec ! [ ( true , ) ] ;
586
+
587
+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
588
+
589
+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
590
+ let prepared_batch_res: QueryResult =
591
+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
592
+
593
+ // Returned columns are:
594
+ // [applied], p1, c1, r1, r2
595
+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
596
+ . into_rows_result ( )
597
+ . unwrap ( )
598
+ . rows ( )
599
+ . unwrap ( )
600
+ . map ( |r| r. unwrap ( ) )
601
+ . collect ( ) ;
602
+
603
+ let expected_prepared_batch_res_rows =
604
+ vec ! [ ( false , Some ( k0_value. to_string( ) ) , Some ( 0 ) , Some ( 1 ) ) ] ;
350
605
351
- let result2 = session. batch ( & batch, batch_values) . await . unwrap ( ) ;
352
- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
353
- result2. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
354
- assert ! ( !row. 0 , "Second CAS batch should not be applied" ) ;
606
+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
355
607
}
356
608
357
609
#[ tokio:: test]
0 commit comments