Skip to content

Commit 1d4db37

Browse files
authored
Merge pull request #2 from fede1024/master
Update to latest master from upstream
2 parents 76148d2 + a02310b commit 1d4db37

File tree

4 files changed

+143
-5
lines changed

4 files changed

+143
-5
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ Here are some of the projects using rust-rdkafka:
139139
- [kafka-benchmark]: a high performance benchmarking tool for Kafka.
140140
- [callysto]: Stream processing framework in Rust.
141141
- [bytewax]: Python stream processing framework using Timely Dataflow.
142+
- [kafka-mock-gen] easy to use mock data producer allowing stress broker
142143

143144
*If you are using rust-rdkafka, please let us know!*
144145

@@ -272,6 +273,7 @@ logging framework.
272273
[rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues
273274
[smol]: https://docs.rs/smol
274275
[Tokio]: https://tokio.rs/
276+
[kafka-mock-gen]: https://github.com/tomaszkubacki/kafka-mock-gen
275277

276278
## rdkafka-sys
277279

src/admin.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
use std::collections::HashMap;
88
use std::ffi::{c_void, CStr, CString};
9+
use std::fmt;
910
use std::future::Future;
1011
use std::pin::Pin;
1112
use std::sync::atomic::{AtomicBool, Ordering};
@@ -43,6 +44,15 @@ pub struct AdminClient<C: ClientContext> {
4344
handle: Option<JoinHandle<()>>,
4445
}
4546

47+
impl<C: ClientContext> fmt::Debug for AdminClient<C> {
48+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49+
let mut debug = f.debug_struct("AdminClient");
50+
debug.field("has_handle", &self.handle.is_some());
51+
debug.field("stop_requested", &self.should_stop.load(Ordering::Relaxed));
52+
debug.finish()
53+
}
54+
}
55+
4656
impl<C: ClientContext> AdminClient<C> {
4757
/// Creates new topics according to the provided `NewTopic` specifications.
4858
///

src/config.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
//!
2323
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
2424
25-
use std::collections::HashMap;
25+
use std::collections::{BTreeMap, HashMap};
2626
use std::ffi::CString;
27+
use std::fmt::Debug;
2728
use std::iter::FromIterator;
2829
use std::os::raw::c_char;
2930
use std::ptr;
@@ -36,6 +37,16 @@ use crate::error::{IsError, KafkaError, KafkaResult};
3637
use crate::log::{log_enabled, DEBUG, INFO, WARN};
3738
use crate::util::{ErrBuf, KafkaDrop, NativePtr};
3839

40+
const SENSITIVE_CONFIG_KEYS: &[&str] = &[
41+
"sasl.password",
42+
"ssl.key.password",
43+
"ssl.keystore.password",
44+
"ssl.truststore.password",
45+
"sasl.oauthbearer.client.secret",
46+
];
47+
48+
const SANITIZED_VALUE_PLACEHOLDER: &str = "[sanitized for safety]";
49+
3950
/// The log levels supported by librdkafka.
4051
#[derive(Copy, Clone, Debug)]
4152
pub enum RDKafkaLogLevel {
@@ -181,14 +192,35 @@ impl NativeClientConfig {
181192
}
182193

183194
/// Client configuration.
184-
#[derive(Clone, Debug)]
195+
#[derive(Clone)]
185196
pub struct ClientConfig {
186197
conf_map: HashMap<String, String>,
187198
/// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
188199
/// of available levels.
189200
pub log_level: RDKafkaLogLevel,
190201
}
191202

203+
impl Debug for ClientConfig {
204+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205+
let sanitized: BTreeMap<&str, &str> = self
206+
.conf_map
207+
.iter()
208+
.filter_map(|(key, value)| {
209+
if SENSITIVE_CONFIG_KEYS.contains(&key.as_str()) {
210+
None
211+
} else {
212+
Some((key.as_str(), value.as_str()))
213+
}
214+
})
215+
.collect();
216+
217+
let mut debug_struct = f.debug_struct("ClientConfig");
218+
debug_struct.field("log_level", &self.log_level);
219+
debug_struct.field("conf_map", &sanitized);
220+
debug_struct.finish()
221+
}
222+
}
223+
192224
impl Default for ClientConfig {
193225
fn default() -> Self {
194226
Self::new()
@@ -204,9 +236,21 @@ impl ClientConfig {
204236
}
205237
}
206238

207-
/// Gets a reference to the underlying config map
208-
pub fn config_map(&self) -> &HashMap<String, String> {
209-
&self.conf_map
239+
/// Returns a sanitized view of the underlying config map.
240+
///
241+
/// Sensitive keys have their values replaced with a placeholder string so they never appear in
242+
/// clear text when inspected.
243+
pub fn config_map(&self) -> BTreeMap<&str, &str> {
244+
self.conf_map
245+
.iter()
246+
.map(|(key, value)| {
247+
if SENSITIVE_CONFIG_KEYS.contains(&key.as_str()) {
248+
(key.as_str(), SANITIZED_VALUE_PLACEHOLDER)
249+
} else {
250+
(key.as_str(), value.as_str())
251+
}
252+
})
253+
.collect()
210254
}
211255

212256
/// Gets the value of a parameter in the configuration.

src/topic_partition_list.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use std::collections::HashMap;
66
use std::ffi::{CStr, CString};
77
use std::fmt;
8+
use std::iter::FromIterator;
89
use std::slice;
910
use std::str;
1011

@@ -398,6 +399,40 @@ impl Default for TopicPartitionList {
398399
}
399400
}
400401

402+
impl FromIterator<(String, i32, Offset)> for TopicPartitionList {
403+
fn from_iter<I>(iter: I) -> Self
404+
where
405+
I: IntoIterator<Item = (String, i32, Offset)>,
406+
{
407+
let iter = iter.into_iter();
408+
let (lower_bound, _) = iter.size_hint();
409+
let mut tpl = TopicPartitionList::with_capacity(lower_bound);
410+
411+
for (topic, partition, offset) in iter {
412+
let mut elem = tpl.add_partition(topic.as_str(), partition);
413+
elem.set_offset(offset).unwrap_or_else(|err| {
414+
panic!(
415+
"failed to set offset via collect() for {}:{} (offset: {:?}): {:?}",
416+
topic, partition, offset, err
417+
);
418+
});
419+
}
420+
421+
tpl
422+
}
423+
}
424+
425+
impl FromIterator<((String, i32), Offset)> for TopicPartitionList {
426+
fn from_iter<I>(iter: I) -> Self
427+
where
428+
I: IntoIterator<Item = ((String, i32), Offset)>,
429+
{
430+
iter.into_iter()
431+
.map(|((topic, partition), offset)| (topic, partition, offset))
432+
.collect()
433+
}
434+
}
435+
401436
impl fmt::Debug for TopicPartitionList {
402437
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403438
f.debug_list().entries(self.elements()).finish()
@@ -541,4 +576,51 @@ mod tests {
541576
assert_eq!(topic_map, topic_map2);
542577
assert_eq!(tpl, tpl2);
543578
}
579+
580+
#[test]
581+
fn collect_topic_partition_list() {
582+
let tpl: TopicPartitionList = vec![
583+
("t1".to_string(), 0, Offset::Beginning),
584+
("t1".to_string(), 1, Offset::Offset(42)),
585+
("t2".to_string(), 0, Offset::End),
586+
]
587+
.into_iter()
588+
.collect();
589+
590+
assert_eq!(tpl.count(), 3);
591+
592+
let t1_0 = tpl.find_partition("t1", 0).unwrap();
593+
assert_eq!(t1_0.offset(), Offset::Beginning);
594+
595+
let t1_1 = tpl.find_partition("t1", 1).unwrap();
596+
assert_eq!(t1_1.offset(), Offset::Offset(42));
597+
598+
let t2_0 = tpl.find_partition("t2", 0).unwrap();
599+
assert_eq!(t2_0.offset(), Offset::End);
600+
}
601+
602+
#[test]
603+
fn collect_topic_partition_list_pairs() {
604+
let tpl: TopicPartitionList = vec![
605+
(("t1".to_string(), 0), Offset::Beginning),
606+
(("t1".to_string(), 1), Offset::Offset(7)),
607+
(("t2".to_string(), 0), Offset::Stored),
608+
]
609+
.into_iter()
610+
.collect();
611+
612+
assert_eq!(tpl.count(), 3);
613+
assert_eq!(
614+
tpl.find_partition("t1", 0).unwrap().offset(),
615+
Offset::Beginning
616+
);
617+
assert_eq!(
618+
tpl.find_partition("t1", 1).unwrap().offset(),
619+
Offset::Offset(7)
620+
);
621+
assert_eq!(
622+
tpl.find_partition("t2", 0).unwrap().offset(),
623+
Offset::Stored
624+
);
625+
}
544626
}

0 commit comments

Comments
 (0)