@@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
16
16
async fn test_plain ( ) {
17
17
maybe_start_logging ( ) ;
18
18
19
- let connection = maybe_skip_kafka_integration ! ( ) ;
20
- ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
19
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
20
+ ClientBuilder :: new ( test_cfg. bootstrap_brokers )
21
+ . build ( )
22
+ . await
23
+ . unwrap ( ) ;
21
24
}
22
25
23
26
#[ tokio:: test]
24
27
async fn test_topic_crud ( ) {
25
28
maybe_start_logging ( ) ;
26
29
27
- let connection = maybe_skip_kafka_integration ! ( ) ;
28
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
30
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
31
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
32
+ . build ( )
33
+ . await
34
+ . unwrap ( ) ;
29
35
let controller_client = client. controller_client ( ) . unwrap ( ) ;
30
36
let topics = client. list_topics ( ) . await . unwrap ( ) ;
31
37
@@ -77,10 +83,13 @@ async fn test_topic_crud() {
77
83
async fn test_partition_client ( ) {
78
84
maybe_start_logging ( ) ;
79
85
80
- let connection = maybe_skip_kafka_integration ! ( ) ;
86
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
81
87
let topic_name = random_topic_name ( ) ;
82
88
83
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
89
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
90
+ . build ( )
91
+ . await
92
+ . unwrap ( ) ;
84
93
85
94
let controller_client = client. controller_client ( ) . unwrap ( ) ;
86
95
controller_client
@@ -100,10 +109,13 @@ async fn test_partition_client() {
100
109
async fn test_non_existing_partition ( ) {
101
110
maybe_start_logging ( ) ;
102
111
103
- let connection = maybe_skip_kafka_integration ! ( ) ;
112
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
104
113
let topic_name = random_topic_name ( ) ;
105
114
106
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
115
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
116
+ . build ( )
117
+ . await
118
+ . unwrap ( ) ;
107
119
108
120
// do NOT create the topic
109
121
@@ -167,8 +179,8 @@ async fn test_tls() {
167
179
. with_single_cert ( vec ! [ producer_root] , private_key)
168
180
. unwrap ( ) ;
169
181
170
- let connection = maybe_skip_kafka_integration ! ( ) ;
171
- ClientBuilder :: new ( connection )
182
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
183
+ ClientBuilder :: new ( test_cfg . bootstrap_brokers )
172
184
. tls_config ( Arc :: new ( config) )
173
185
. build ( )
174
186
. await
@@ -180,14 +192,11 @@ async fn test_tls() {
180
192
async fn test_socks5 ( ) {
181
193
maybe_start_logging ( ) ;
182
194
183
- // e.g. "my-connection-kafka-bootstrap:9092"
184
- let connection = maybe_skip_kafka_integration ! ( ) ;
185
- // e.g. "localhost:1080"
186
- let proxy = maybe_skip_SOCKS_PROXY ! ( ) ;
195
+ let test_cfg = maybe_skip_kafka_integration ! ( socks5) ;
187
196
let topic_name = random_topic_name ( ) ;
188
197
189
- let client = ClientBuilder :: new ( connection )
190
- . socks5_proxy ( proxy )
198
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers )
199
+ . socks5_proxy ( test_cfg . socks5_proxy . unwrap ( ) )
191
200
. build ( )
192
201
. await
193
202
. unwrap ( ) ;
@@ -222,11 +231,14 @@ async fn test_socks5() {
222
231
async fn test_produce_empty ( ) {
223
232
maybe_start_logging ( ) ;
224
233
225
- let connection = maybe_skip_kafka_integration ! ( ) ;
234
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
226
235
let topic_name = random_topic_name ( ) ;
227
236
let n_partitions = 2 ;
228
237
229
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
238
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
239
+ . build ( )
240
+ . await
241
+ . unwrap ( ) ;
230
242
let controller_client = client. controller_client ( ) . unwrap ( ) ;
231
243
controller_client
232
244
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -247,11 +259,14 @@ async fn test_produce_empty() {
247
259
async fn test_consume_empty ( ) {
248
260
maybe_start_logging ( ) ;
249
261
250
- let connection = maybe_skip_kafka_integration ! ( ) ;
262
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
251
263
let topic_name = random_topic_name ( ) ;
252
264
let n_partitions = 2 ;
253
265
254
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
266
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
267
+ . build ( )
268
+ . await
269
+ . unwrap ( ) ;
255
270
let controller_client = client. controller_client ( ) . unwrap ( ) ;
256
271
controller_client
257
272
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -274,11 +289,14 @@ async fn test_consume_empty() {
274
289
async fn test_consume_offset_out_of_range ( ) {
275
290
maybe_start_logging ( ) ;
276
291
277
- let connection = maybe_skip_kafka_integration ! ( ) ;
292
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
278
293
let topic_name = random_topic_name ( ) ;
279
294
let n_partitions = 2 ;
280
295
281
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
296
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
297
+ . build ( )
298
+ . await
299
+ . unwrap ( ) ;
282
300
let controller_client = client. controller_client ( ) . unwrap ( ) ;
283
301
controller_client
284
302
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -314,11 +332,11 @@ async fn test_consume_offset_out_of_range() {
314
332
async fn test_get_offset ( ) {
315
333
maybe_start_logging ( ) ;
316
334
317
- let connection = maybe_skip_kafka_integration ! ( ) ;
335
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
318
336
let topic_name = random_topic_name ( ) ;
319
337
let n_partitions = 1 ;
320
338
321
- let client = ClientBuilder :: new ( connection . clone ( ) )
339
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers . clone ( ) )
322
340
. build ( )
323
341
. await
324
342
. unwrap ( ) ;
@@ -382,10 +400,13 @@ async fn test_get_offset() {
382
400
async fn test_produce_consume_size_cutoff ( ) {
383
401
maybe_start_logging ( ) ;
384
402
385
- let connection = maybe_skip_kafka_integration ! ( ) ;
403
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
386
404
let topic_name = random_topic_name ( ) ;
387
405
388
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
406
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
407
+ . build ( )
408
+ . await
409
+ . unwrap ( ) ;
389
410
let controller_client = client. controller_client ( ) . unwrap ( ) ;
390
411
controller_client
391
412
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -460,10 +481,13 @@ async fn test_produce_consume_size_cutoff() {
460
481
async fn test_consume_midbatch ( ) {
461
482
maybe_start_logging ( ) ;
462
483
463
- let connection = maybe_skip_kafka_integration ! ( ) ;
484
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
464
485
let topic_name = random_topic_name ( ) ;
465
486
466
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
487
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
488
+ . build ( )
489
+ . await
490
+ . unwrap ( ) ;
467
491
let controller_client = client. controller_client ( ) . unwrap ( ) ;
468
492
controller_client
469
493
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -508,10 +532,13 @@ async fn test_consume_midbatch() {
508
532
async fn test_delete_records ( ) {
509
533
maybe_start_logging ( ) ;
510
534
511
- let connection = maybe_skip_kafka_integration ! ( ) ;
535
+ let test_cfg = maybe_skip_kafka_integration ! ( delete ) ;
512
536
let topic_name = random_topic_name ( ) ;
513
537
514
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
538
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
539
+ . build ( )
540
+ . await
541
+ . unwrap ( ) ;
515
542
let controller_client = client. controller_client ( ) . unwrap ( ) ;
516
543
controller_client
517
544
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -555,7 +582,10 @@ async fn test_delete_records() {
555
582
let offset_4 = offsets[ 0 ] ;
556
583
557
584
// delete from the middle of the 2nd batch
558
- maybe_skip_delete ! ( partition_client, offset_3) ;
585
+ partition_client
586
+ . delete_records ( offset_3, 1_000 )
587
+ . await
588
+ . unwrap ( ) ;
559
589
560
590
// fetching data before the record fails
561
591
let err = partition_client
0 commit comments