Skip to content

Commit 14ae812

Browse files
crepererumdomodwyer
andcommitted
refactor: clarify binding mechanism
Co-authored-by: Dom <[email protected]>
1 parent b59d9ad commit 14ae812

File tree

10 files changed

+60
-57
lines changed

10 files changed

+60
-57
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ It will be a good fit for workloads that:
3232
use rskafka::{
3333
client::{
3434
ClientBuilder,
35-
partition::{Compression, PartitionClientBindMode},
35+
partition::{Compression, UnknownTopicHandling},
3636
},
3737
record::Record,
3838
};
@@ -58,7 +58,7 @@ let partition_client = client
5858
.partition_client(
5959
topic.to_owned(),
6060
0, // partition
61-
PartitionClientBindMode::Strong,
61+
UnknownTopicHandling::Retry,
6262
)
6363
.await
6464
.unwrap();

benches/throughput.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use rdkafka::{
2222
use rskafka::{
2323
client::{
2424
consumer::{StartOffset, StreamConsumerBuilder as RsStreamConsumerBuilder},
25-
partition::{Compression, PartitionClient, PartitionClientBindMode},
25+
partition::{Compression, PartitionClient, UnknownTopicHandling},
2626
producer::{aggregator::RecordAggregator, BatchProducerBuilder},
2727
ClientBuilder,
2828
},
@@ -449,7 +449,7 @@ async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
449449
.unwrap();
450450

451451
client
452-
.partition_client(topic_name, 0, PartitionClientBindMode::Strong)
452+
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
453453
.await
454454
.unwrap()
455455
}

src/client/consumer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//! StartOffset,
1111
//! StreamConsumerBuilder,
1212
//! },
13-
//! partition::PartitionClientBindMode,
13+
//! partition::UnknownTopicHandling,
1414
//! };
1515
//! use std::sync::Arc;
1616
//!
@@ -21,7 +21,7 @@
2121
//! client.partition_client(
2222
//! "my_topic",
2323
//! 0,
24-
//! PartitionClientBindMode::Strong,
24+
//! UnknownTopicHandling::Retry,
2525
//! ).await.unwrap()
2626
//! );
2727
//!

src/client/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod producer;
1818

1919
use error::{Error, Result};
2020

21-
use self::{controller::ControllerClient, partition::PartitionClientBindMode};
21+
use self::{controller::ControllerClient, partition::UnknownTopicHandling};
2222

2323
#[derive(Debug, Error)]
2424
pub enum ProduceError {
@@ -120,13 +120,13 @@ impl Client {
120120
&self,
121121
topic: impl Into<String> + Send,
122122
partition: i32,
123-
bind_mode: PartitionClientBindMode,
123+
unknown_topic_handling: UnknownTopicHandling,
124124
) -> Result<PartitionClient> {
125125
PartitionClient::new(
126126
topic.into(),
127127
partition,
128128
Arc::clone(&self.brokers),
129-
bind_mode,
129+
unknown_topic_handling,
130130
)
131131
.await
132132
}

src/client/partition.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,23 @@ use super::error::ServerErrorResponse;
4141
///
4242
/// We offer the following ways to deal with this:
4343
///
44-
/// - Use a [strong binding](Self::Strong) which assumes a partition exists and retries [`ProtocolError::UnknownTopicOrPartition`]
45-
/// - Use a [weak binding](Self::Weak). All other methods (including the creation of a [`PartitionClient`]) may produce
44+
/// - Use a [`Error`](Self::Error). All other methods (including the creation of a [`PartitionClient`]) may produce
4645
/// sporadic [`ProtocolError::UnknownTopicOrPartition`] errors.
46+
/// - Use a [`Retry`](Self::Error) which assumes a partition exists and retries [`ProtocolError::UnknownTopicOrPartition`]
4747
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48-
pub enum PartitionClientBindMode {
49-
/// Assume that the partition (or topic that contains it) can be deleted at any time.
48+
pub enum UnknownTopicHandling {
49+
/// When a [`ProtocolError::UnknownTopicOrPartition`] is returned by Kafka,
50+
/// it is passed up to the user to handle.
5051
///
51-
/// This bubbles [`ProtocolError::UnknownTopicOrPartition`] to the user.
52-
Weak,
52+
/// These errors may occur spuriously.
53+
Error,
5354

54-
/// Assume that the partition (and topic that contais it) exist.
55+
/// Always retry operations that return a
56+
/// [`ProtocolError::UnknownTopicOrPartition`], until the operation succeeds
57+
/// or a different error is returned.
5558
///
56-
/// This retries [`ProtocolError::UnknownTopicOrPartition`].
57-
Strong,
59+
/// This may unpredictably increase operation latency.
60+
Retry,
5861
}
5962

6063
/// Compression of records.
@@ -115,7 +118,7 @@ pub struct PartitionClient {
115118
/// Current broker connection if any
116119
current_broker: Mutex<Option<BrokerConnection>>,
117120

118-
bind_mode: PartitionClientBindMode,
121+
unknown_topic_handling: UnknownTopicHandling,
119122
}
120123

121124
impl std::fmt::Debug for PartitionClient {
@@ -129,22 +132,22 @@ impl PartitionClient {
129132
topic: String,
130133
partition: i32,
131134
brokers: Arc<BrokerConnector>,
132-
bind_mode: PartitionClientBindMode,
135+
unknown_topic_handling: UnknownTopicHandling,
133136
) -> Result<Self> {
134137
let p = Self {
135138
topic,
136139
partition,
137140
brokers: Arc::clone(&brokers),
138141
backoff_config: Default::default(),
139142
current_broker: Mutex::new(None),
140-
bind_mode,
143+
unknown_topic_handling,
141144
};
142145

143146
// Force discover and establish a cached connection to the leader
144147
let scope = &p;
145148
maybe_retry(
146149
&p.backoff_config,
147-
p.bind_mode,
150+
p.unknown_topic_handling,
148151
&*brokers,
149152
"leader_detection",
150153
|| async move {
@@ -183,7 +186,7 @@ impl PartitionClient {
183186

184187
maybe_retry(
185188
&self.backoff_config,
186-
self.bind_mode,
189+
self.unknown_topic_handling,
187190
self,
188191
"produce",
189192
|| async move {
@@ -214,7 +217,7 @@ impl PartitionClient {
214217

215218
let partition = maybe_retry(
216219
&self.backoff_config,
217-
self.bind_mode,
220+
self.unknown_topic_handling,
218221
self,
219222
"fetch_records",
220223
|| async move {
@@ -241,7 +244,7 @@ impl PartitionClient {
241244

242245
let partition = maybe_retry(
243246
&self.backoff_config,
244-
self.bind_mode,
247+
self.unknown_topic_handling,
245248
self,
246249
"get_offset",
247250
|| async move {
@@ -265,7 +268,7 @@ impl PartitionClient {
265268

266269
maybe_retry(
267270
&self.backoff_config,
268-
self.bind_mode,
271+
self.unknown_topic_handling,
269272
self,
270273
"delete_records",
271274
|| async move {
@@ -451,7 +454,7 @@ impl BrokerCache for &PartitionClient {
451454
/// and handles certain classes of error
452455
async fn maybe_retry<B, R, F, T>(
453456
backoff_config: &BackoffConfig,
454-
bind_mode: PartitionClientBindMode,
457+
unknown_topic_handling: UnknownTopicHandling,
455458
broker_cache: B,
456459
request_name: &str,
457460
f: R,
@@ -489,7 +492,7 @@ where
489492
Error::ServerError {
490493
protocol_error: ProtocolError::UnknownTopicOrPartition,
491494
..
492-
} if bind_mode == PartitionClientBindMode::Strong => {
495+
} if unknown_topic_handling == UnknownTopicHandling::Retry => {
493496
broker_cache.invalidate().await;
494497
}
495498
_ => {

src/client/producer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
//! use rskafka::{
4343
//! client::{
4444
//! ClientBuilder,
45-
//! partition::PartitionClientBindMode,
45+
//! partition::UnknownTopicHandling,
4646
//! producer::{
4747
//! aggregator::RecordAggregator,
4848
//! BatchProducerBuilder,
@@ -64,7 +64,7 @@
6464
//! client.partition_client(
6565
//! "my_topic",
6666
//! 0,
67-
//! PartitionClientBindMode::Strong,
67+
//! UnknownTopicHandling::Retry,
6868
//! ).await.unwrap()
6969
//! );
7070
//!
@@ -96,7 +96,7 @@
9696
//! use rskafka::{
9797
//! client::{
9898
//! ClientBuilder,
99-
//! partition::PartitionClientBindMode,
99+
//! partition::UnknownTopicHandling,
100100
//! producer::{
101101
//! aggregator::{
102102
//! Aggregator,
@@ -186,7 +186,7 @@
186186
//! client.partition_client(
187187
//! "my_topic",
188188
//! 0,
189-
//! PartitionClientBindMode::Strong,
189+
//! UnknownTopicHandling::Retry,
190190
//! ).await.unwrap()
191191
//! );
192192
//!

tests/client.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use assert_matches::assert_matches;
22
use rskafka::{
33
client::{
44
error::{Error as ClientError, ProtocolError, ServerErrorResponse},
5-
partition::{Compression, OffsetAt, PartitionClientBindMode},
5+
partition::{Compression, OffsetAt, UnknownTopicHandling},
66
ClientBuilder,
77
},
88
record::{Record, RecordAndOffset},
@@ -89,7 +89,7 @@ async fn test_partition_client() {
8989
.unwrap();
9090

9191
let partition_client = client
92-
.partition_client(topic_name.clone(), 0, PartitionClientBindMode::Strong)
92+
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Retry)
9393
.await
9494
.unwrap();
9595
assert_eq!(partition_client.topic(), &topic_name);
@@ -109,15 +109,15 @@ async fn test_non_existing_partition() {
109109

110110
tokio::time::timeout(Duration::from_millis(100), async {
111111
client
112-
.partition_client(topic_name.clone(), 0, PartitionClientBindMode::Strong)
112+
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Retry)
113113
.await
114114
.unwrap();
115115
})
116116
.await
117117
.unwrap_err();
118118

119119
let err = client
120-
.partition_client(topic_name.clone(), 0, PartitionClientBindMode::Weak)
120+
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Error)
121121
.await
122122
.unwrap_err();
123123
assert_matches!(
@@ -199,7 +199,7 @@ async fn test_socks5() {
199199
.unwrap();
200200

201201
let partition_client = client
202-
.partition_client(topic_name, 0, PartitionClientBindMode::Strong)
202+
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
203203
.await
204204
.unwrap();
205205

@@ -234,7 +234,7 @@ async fn test_produce_empty() {
234234
.unwrap();
235235

236236
let partition_client = client
237-
.partition_client(&topic_name, 1, PartitionClientBindMode::Strong)
237+
.partition_client(&topic_name, 1, UnknownTopicHandling::Retry)
238238
.await
239239
.unwrap();
240240
partition_client
@@ -259,7 +259,7 @@ async fn test_consume_empty() {
259259
.unwrap();
260260

261261
let partition_client = client
262-
.partition_client(&topic_name, 1, PartitionClientBindMode::Strong)
262+
.partition_client(&topic_name, 1, UnknownTopicHandling::Retry)
263263
.await
264264
.unwrap();
265265
let (records, watermark) = partition_client
@@ -286,7 +286,7 @@ async fn test_consume_offset_out_of_range() {
286286
.unwrap();
287287

288288
let partition_client = client
289-
.partition_client(&topic_name, 1, PartitionClientBindMode::Strong)
289+
.partition_client(&topic_name, 1, UnknownTopicHandling::Retry)
290290
.await
291291
.unwrap();
292292
let record = record(b"");
@@ -329,7 +329,7 @@ async fn test_get_offset() {
329329
.unwrap();
330330

331331
let partition_client = client
332-
.partition_client(topic_name.clone(), 0, PartitionClientBindMode::Strong)
332+
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Retry)
333333
.await
334334
.unwrap();
335335

@@ -394,7 +394,7 @@ async fn test_produce_consume_size_cutoff() {
394394

395395
let partition_client = Arc::new(
396396
client
397-
.partition_client(&topic_name, 0, PartitionClientBindMode::Strong)
397+
.partition_client(&topic_name, 0, UnknownTopicHandling::Retry)
398398
.await
399399
.unwrap(),
400400
);
@@ -471,7 +471,7 @@ async fn test_consume_midbatch() {
471471
.unwrap();
472472

473473
let partition_client = client
474-
.partition_client(&topic_name, 0, PartitionClientBindMode::Strong)
474+
.partition_client(&topic_name, 0, UnknownTopicHandling::Retry)
475475
.await
476476
.unwrap();
477477

@@ -519,7 +519,7 @@ async fn test_delete_records() {
519519
.unwrap();
520520

521521
let partition_client = client
522-
.partition_client(&topic_name, 0, PartitionClientBindMode::Strong)
522+
.partition_client(&topic_name, 0, UnknownTopicHandling::Retry)
523523
.await
524524
.unwrap();
525525

0 commit comments

Comments
 (0)