@@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
16
16
async fn test_plain ( ) {
17
17
maybe_start_logging ( ) ;
18
18
19
- let connection = maybe_skip_kafka_integration ! ( ) ;
20
- ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
19
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
20
+ ClientBuilder :: new ( test_cfg. bootstrap_brokers )
21
+ . build ( )
22
+ . await
23
+ . unwrap ( ) ;
21
24
}
22
25
23
26
#[ tokio:: test]
24
27
async fn test_topic_crud ( ) {
25
28
maybe_start_logging ( ) ;
26
29
27
- let connection = maybe_skip_kafka_integration ! ( ) ;
28
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
30
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
31
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
32
+ . build ( )
33
+ . await
34
+ . unwrap ( ) ;
29
35
let controller_client = client. controller_client ( ) . unwrap ( ) ;
30
36
let topics = client. list_topics ( ) . await . unwrap ( ) ;
31
37
@@ -77,10 +83,13 @@ async fn test_topic_crud() {
77
83
async fn test_partition_client ( ) {
78
84
maybe_start_logging ( ) ;
79
85
80
- let connection = maybe_skip_kafka_integration ! ( ) ;
86
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
81
87
let topic_name = random_topic_name ( ) ;
82
88
83
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
89
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
90
+ . build ( )
91
+ . await
92
+ . unwrap ( ) ;
84
93
85
94
let controller_client = client. controller_client ( ) . unwrap ( ) ;
86
95
controller_client
@@ -100,10 +109,10 @@ async fn test_partition_client() {
100
109
async fn test_non_existing_partition ( ) {
101
110
maybe_start_logging ( ) ;
102
111
103
- let connection = maybe_skip_kafka_integration ! ( ) ;
112
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
104
113
let topic_name = random_topic_name ( ) ;
105
114
106
- let client = ClientBuilder :: new ( connection ) . build ( ) . await . unwrap ( ) ;
115
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers ) . build ( ) . await . unwrap ( ) ;
107
116
108
117
// do NOT create the topic
109
118
@@ -167,8 +176,8 @@ async fn test_tls() {
167
176
. with_single_cert ( vec ! [ producer_root] , private_key)
168
177
. unwrap ( ) ;
169
178
170
- let connection = maybe_skip_kafka_integration ! ( ) ;
171
- ClientBuilder :: new ( connection )
179
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
180
+ ClientBuilder :: new ( test_cfg . bootstrap_brokers )
172
181
. tls_config ( Arc :: new ( config) )
173
182
. build ( )
174
183
. await
@@ -180,14 +189,11 @@ async fn test_tls() {
180
189
async fn test_socks5 ( ) {
181
190
maybe_start_logging ( ) ;
182
191
183
- // e.g. "my-connection-kafka-bootstrap:9092"
184
- let connection = maybe_skip_kafka_integration ! ( ) ;
185
- // e.g. "localhost:1080"
186
- let proxy = maybe_skip_SOCKS_PROXY ! ( ) ;
192
+ let test_cfg = maybe_skip_kafka_integration ! ( socks5) ;
187
193
let topic_name = random_topic_name ( ) ;
188
194
189
- let client = ClientBuilder :: new ( connection )
190
- . socks5_proxy ( proxy )
195
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers )
196
+ . socks5_proxy ( test_cfg . socks5_proxy . unwrap ( ) )
191
197
. build ( )
192
198
. await
193
199
. unwrap ( ) ;
@@ -222,11 +228,14 @@ async fn test_socks5() {
222
228
async fn test_produce_empty ( ) {
223
229
maybe_start_logging ( ) ;
224
230
225
- let connection = maybe_skip_kafka_integration ! ( ) ;
231
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
226
232
let topic_name = random_topic_name ( ) ;
227
233
let n_partitions = 2 ;
228
234
229
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
235
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
236
+ . build ( )
237
+ . await
238
+ . unwrap ( ) ;
230
239
let controller_client = client. controller_client ( ) . unwrap ( ) ;
231
240
controller_client
232
241
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -247,11 +256,14 @@ async fn test_produce_empty() {
247
256
async fn test_consume_empty ( ) {
248
257
maybe_start_logging ( ) ;
249
258
250
- let connection = maybe_skip_kafka_integration ! ( ) ;
259
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
251
260
let topic_name = random_topic_name ( ) ;
252
261
let n_partitions = 2 ;
253
262
254
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
263
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
264
+ . build ( )
265
+ . await
266
+ . unwrap ( ) ;
255
267
let controller_client = client. controller_client ( ) . unwrap ( ) ;
256
268
controller_client
257
269
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -274,11 +286,14 @@ async fn test_consume_empty() {
274
286
async fn test_consume_offset_out_of_range ( ) {
275
287
maybe_start_logging ( ) ;
276
288
277
- let connection = maybe_skip_kafka_integration ! ( ) ;
289
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
278
290
let topic_name = random_topic_name ( ) ;
279
291
let n_partitions = 2 ;
280
292
281
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
293
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
294
+ . build ( )
295
+ . await
296
+ . unwrap ( ) ;
282
297
let controller_client = client. controller_client ( ) . unwrap ( ) ;
283
298
controller_client
284
299
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -314,11 +329,11 @@ async fn test_consume_offset_out_of_range() {
314
329
async fn test_get_offset ( ) {
315
330
maybe_start_logging ( ) ;
316
331
317
- let connection = maybe_skip_kafka_integration ! ( ) ;
332
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
318
333
let topic_name = random_topic_name ( ) ;
319
334
let n_partitions = 1 ;
320
335
321
- let client = ClientBuilder :: new ( connection . clone ( ) )
336
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers . clone ( ) )
322
337
. build ( )
323
338
. await
324
339
. unwrap ( ) ;
@@ -382,10 +397,13 @@ async fn test_get_offset() {
382
397
async fn test_produce_consume_size_cutoff ( ) {
383
398
maybe_start_logging ( ) ;
384
399
385
- let connection = maybe_skip_kafka_integration ! ( ) ;
400
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
386
401
let topic_name = random_topic_name ( ) ;
387
402
388
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
403
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
404
+ . build ( )
405
+ . await
406
+ . unwrap ( ) ;
389
407
let controller_client = client. controller_client ( ) . unwrap ( ) ;
390
408
controller_client
391
409
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -460,10 +478,13 @@ async fn test_produce_consume_size_cutoff() {
460
478
async fn test_consume_midbatch ( ) {
461
479
maybe_start_logging ( ) ;
462
480
463
- let connection = maybe_skip_kafka_integration ! ( ) ;
481
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
464
482
let topic_name = random_topic_name ( ) ;
465
483
466
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
484
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
485
+ . build ( )
486
+ . await
487
+ . unwrap ( ) ;
467
488
let controller_client = client. controller_client ( ) . unwrap ( ) ;
468
489
controller_client
469
490
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -508,10 +529,13 @@ async fn test_consume_midbatch() {
508
529
async fn test_delete_records ( ) {
509
530
maybe_start_logging ( ) ;
510
531
511
- let connection = maybe_skip_kafka_integration ! ( ) ;
532
+ let test_cfg = maybe_skip_kafka_integration ! ( delete ) ;
512
533
let topic_name = random_topic_name ( ) ;
513
534
514
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
535
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
536
+ . build ( )
537
+ . await
538
+ . unwrap ( ) ;
515
539
let controller_client = client. controller_client ( ) . unwrap ( ) ;
516
540
controller_client
517
541
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -555,7 +579,10 @@ async fn test_delete_records() {
555
579
let offset_4 = offsets[ 0 ] ;
556
580
557
581
// delete from the middle of the 2nd batch
558
- maybe_skip_delete ! ( partition_client, offset_3) ;
582
+ partition_client
583
+ . delete_records ( offset_3, 1_000 )
584
+ . await
585
+ . unwrap ( ) ;
559
586
560
587
// fetching data before the record fails
561
588
let err = partition_client
0 commit comments