Skip to content

Commit 2f855b2

Browse files
Merge pull request #171 from influxdata/crepererum/redpanda_22_2_1
fix: update redpanda, fix issues
2 parents aa82995 + 3379959 commit 2f855b2

15 files changed

+299
-81
lines changed

.circleci/config.yml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ jobs:
148148
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
149149
docker:
150150
- image: quay.io/influxdb/rust:ci
151-
- image: vectorized/redpanda:v22.1.4
151+
- image: vectorized/redpanda:v22.2.1
152152
name: redpanda-0
153153
command:
154154
- redpanda
@@ -161,7 +161,8 @@ jobs:
161161
- --check=false
162162
- --kafka-addr redpanda-0:9092
163163
- --rpc-addr redpanda-0:33145
164-
- image: vectorized/redpanda:v22.1.4
164+
- --set redpanda.auto_create_topics_enabled=false
165+
- image: vectorized/redpanda:v22.2.1
165166
name: redpanda-1
166167
command:
167168
- redpanda
@@ -175,7 +176,8 @@ jobs:
175176
- --kafka-addr redpanda-1:9092
176177
- --rpc-addr redpanda-1:33145
177178
- --seeds redpanda-0:33145
178-
- image: vectorized/redpanda:v22.1.4
179+
- --set redpanda.auto_create_topics_enabled=false
180+
- image: vectorized/redpanda:v22.2.1
179181
name: redpanda-2
180182
command:
181183
- redpanda
@@ -189,6 +191,7 @@ jobs:
189191
- --kafka-addr redpanda-2:9092
190192
- --rpc-addr redpanda-2:33145
191193
- --seeds redpanda-0:33145
194+
- --set redpanda.auto_create_topics_enabled=false
192195
- image: serjs/go-socks5-proxy
193196
name: proxy
194197
resource_class: xlarge # use of a smaller executor tends crashes on link
@@ -243,6 +246,7 @@ jobs:
243246
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
244247
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://kafka-0:9093
245248
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
249+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
246250
- image: docker.io/bitnami/kafka:3
247251
name: kafka-1
248252
environment:
@@ -253,6 +257,7 @@ jobs:
253257
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
254258
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093
255259
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
260+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
256261
- image: docker.io/bitnami/kafka:3
257262
name: kafka-2
258263
environment:
@@ -263,6 +268,7 @@ jobs:
263268
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
264269
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093
265270
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
271+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
266272
- image: serjs/go-socks5-proxy
267273
name: proxy
268274
resource_class: xlarge # use of a smaller executor tends crashes on link

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ It will be a good fit for workloads that:
3232
use rskafka::{
3333
client::{
3434
ClientBuilder,
35-
partition::Compression,
35+
partition::{Compression, UnknownTopicHandling},
3636
},
3737
record::Record,
3838
};
@@ -58,6 +58,7 @@ let partition_client = client
5858
.partition_client(
5959
topic.to_owned(),
6060
0, // partition
61+
UnknownTopicHandling::Retry,
6162
)
6263
.await
6364
.unwrap();

benches/throughput.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use rdkafka::{
2222
use rskafka::{
2323
client::{
2424
consumer::{StartOffset, StreamConsumerBuilder as RsStreamConsumerBuilder},
25-
partition::{Compression, PartitionClient},
25+
partition::{Compression, PartitionClient, UnknownTopicHandling},
2626
producer::{aggregator::RecordAggregator, BatchProducerBuilder},
2727
ClientBuilder,
2828
},
@@ -448,7 +448,10 @@ async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
448448
.await
449449
.unwrap();
450450

451-
client.partition_client(topic_name, 0).await.unwrap()
451+
client
452+
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
453+
.await
454+
.unwrap()
452455
}
453456

454457
static LOG_SETUP: Once = Once::new();

docker-compose-kafka.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ services:
2222
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
2323
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
2424
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
25+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
2526
volumes:
2627
- kafka_0_data:/bitnami/kafka
2728
depends_on:
@@ -38,6 +39,7 @@ services:
3839
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
3940
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
4041
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
42+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
4143
volumes:
4244
- kafka_1_data:/bitnami/kafka
4345
depends_on:
@@ -54,6 +56,7 @@ services:
5456
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
5557
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
5658
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
59+
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
5760
volumes:
5861
- kafka_2_data:/bitnami/kafka
5962
depends_on:

docker-compose-redpanda.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
version: '3.7'
33
services:
44
redpanda-0:
5-
image: vectorized/redpanda:v22.1.4
5+
image: vectorized/redpanda:v22.2.1
66
container_name: redpanda-0
77
ports:
88
- '9010:9010'
@@ -19,8 +19,9 @@ services:
1919
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9010,FOR_PROXY://redpanda-0:9020
2020
- --rpc-addr 0.0.0.0:33145
2121
- --advertise-rpc-addr redpanda-0:33145
22+
- --set redpanda.auto_create_topics_enabled=false
2223
redpanda-1:
23-
image: vectorized/redpanda:v22.1.4
24+
image: vectorized/redpanda:v22.2.1
2425
container_name: redpanda-1
2526
ports:
2627
- '9011:9011'
@@ -38,8 +39,9 @@ services:
3839
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9011,FOR_PROXY://redpanda-1:9021
3940
- --rpc-addr 0.0.0.0:33146
4041
- --advertise-rpc-addr redpanda-1:33146
42+
- --set redpanda.auto_create_topics_enabled=false
4143
redpanda-2:
42-
image: vectorized/redpanda:v22.1.4
44+
image: vectorized/redpanda:v22.2.1
4345
container_name: redpanda-2
4446
ports:
4547
- '9012:9012'
@@ -57,6 +59,7 @@ services:
5759
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9012,FOR_PROXY://redpanda-2:9022
5860
- --rpc-addr 0.0.0.0:33147
5961
- --advertise-rpc-addr redpanda-2:33147
62+
- --set redpanda.auto_create_topics_enabled=false
6063
proxy:
6164
image: serjs/go-socks5-proxy
6265
ports:

src/client/consumer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@
1010
//! StartOffset,
1111
//! StreamConsumerBuilder,
1212
//! },
13+
//! partition::UnknownTopicHandling,
1314
//! };
1415
//! use std::sync::Arc;
1516
//!
1617
//! // get partition client
1718
//! let connection = "localhost:9093".to_owned();
1819
//! let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
1920
//! let partition_client = Arc::new(
20-
//! client.partition_client("my_topic", 0).await.unwrap()
21+
//! client.partition_client(
22+
//! "my_topic",
23+
//! 0,
24+
//! UnknownTopicHandling::Retry,
25+
//! ).await.unwrap()
2126
//! );
2227
//!
2328
//! // construct stream consumer

src/client/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ pub enum Error {
9393

9494
#[error("All retries failed: {0}")]
9595
RetryFailed(#[from] crate::backoff::BackoffError),
96+
97+
#[error("Timeout")]
98+
Timeout,
9699
}
97100

98101
impl Error {

src/client/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod producer;
1818

1919
use error::{Error, Result};
2020

21-
use self::controller::ControllerClient;
21+
use self::{controller::ControllerClient, partition::UnknownTopicHandling};
2222

2323
#[derive(Debug, Error)]
2424
pub enum ProduceError {
@@ -120,8 +120,15 @@ impl Client {
120120
&self,
121121
topic: impl Into<String> + Send,
122122
partition: i32,
123+
unknown_topic_handling: UnknownTopicHandling,
123124
) -> Result<PartitionClient> {
124-
PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers)).await
125+
PartitionClient::new(
126+
topic.into(),
127+
partition,
128+
Arc::clone(&self.brokers),
129+
unknown_topic_handling,
130+
)
131+
.await
125132
}
126133

127134
/// Returns a list of topics in the cluster

0 commit comments

Comments
 (0)