Skip to content

Commit be8aabb

Browse files
committed
feat: expose low-level metadata to user
Helpful for debugging and certain admin operations like topic re-assignment and leader election.
1 parent 13527e7 commit be8aabb

File tree

4 files changed

+146
-0
lines changed

4 files changed

+146
-0
lines changed

src/client/mod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use thiserror::Error;
55
use crate::{
66
client::partition::PartitionClient,
77
connection::{BrokerConnector, TlsConfig},
8+
metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic},
89
protocol::primitives::Boolean,
910
topic::Topic,
1011
};
@@ -145,4 +146,53 @@ impl Client {
145146
})
146147
.collect())
147148
}
149+
150+
/// Return cluster-wide metadata.
151+
pub async fn metadata(&self) -> Result<Metadata> {
152+
let response = self.brokers.request_metadata(None, None).await?;
153+
154+
Ok(Metadata {
155+
brokers: response
156+
.brokers
157+
.into_iter()
158+
.map(|response| MetadataBroker {
159+
node_id: response.node_id.0,
160+
host: response.host.0,
161+
port: response.port.0,
162+
rack: response.rack.and_then(|s| s.0),
163+
})
164+
.collect(),
165+
controller_id: response.controller_id.map(|id| id.0),
166+
topics: response
167+
.topics
168+
.into_iter()
169+
.map(|response| MetadataTopic {
170+
name: response.name.0,
171+
is_internal: response.is_internal.map(|b| b.0),
172+
partitions: response
173+
.partitions
174+
.into_iter()
175+
.map(|response| MetadataPartition {
176+
partition_index: response.partition_index.0,
177+
leader_id: response.leader_id.0,
178+
replica_nodes: response
179+
.replica_nodes
180+
.0
181+
.unwrap_or_default()
182+
.into_iter()
183+
.map(|i| i.0)
184+
.collect(),
185+
isr_nodes: response
186+
.isr_nodes
187+
.0
188+
.unwrap_or_default()
189+
.into_iter()
190+
.map(|i| i.0)
191+
.collect(),
192+
})
193+
.collect(),
194+
})
195+
.collect(),
196+
})
197+
}
148198
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ mod backoff;
2222
pub mod client;
2323

2424
mod connection;
25+
2526
#[cfg(feature = "unstable-fuzzing")]
2627
pub mod messenger;
2728
#[cfg(not(feature = "unstable-fuzzing"))]
2829
mod messenger;
2930

31+
pub mod metadata;
32+
3033
#[cfg(feature = "unstable-fuzzing")]
3134
pub mod protocol;
3235
#[cfg(not(feature = "unstable-fuzzing"))]

src/metadata.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//! Cluster-wide Kafka metadata.
2+
3+
/// Metadata container for the entire cluster.
4+
#[derive(Debug, PartialEq)]
5+
pub struct Metadata {
6+
/// Brokers.
7+
pub brokers: Vec<MetadataBroker>,
8+
9+
/// The ID of the controller broker.
10+
pub controller_id: Option<i32>,
11+
12+
/// Topics.
13+
pub topics: Vec<MetadataTopic>,
14+
}
15+
16+
/// Metadata for a certain broker.
17+
#[derive(Debug, PartialEq)]
18+
pub struct MetadataBroker {
19+
/// The broker ID
20+
pub node_id: i32,
21+
22+
/// The broker hostname
23+
pub host: String,
24+
25+
/// The broker port
26+
pub port: i32,
27+
28+
/// Rack.
29+
pub rack: Option<String>,
30+
}
31+
32+
/// Metadata for a certain topic.
33+
#[derive(Debug, PartialEq)]
34+
pub struct MetadataTopic {
35+
/// The topic name
36+
pub name: String,
37+
38+
/// True if the topic is internal
39+
pub is_internal: Option<bool>,
40+
41+
/// Each partition in the topic
42+
pub partitions: Vec<MetadataPartition>,
43+
}
44+
45+
/// Metadata for a certain partition.
46+
#[derive(Debug, PartialEq)]
47+
pub struct MetadataPartition {
48+
/// The partition index
49+
pub partition_index: i32,
50+
51+
/// The ID of the leader broker
52+
pub leader_id: i32,
53+
54+
/// The set of all nodes that host this partition
55+
pub replica_nodes: Vec<i32>,
56+
57+
/// The set of all nodes that are in sync with the leader for this partition
58+
pub isr_nodes: Vec<i32>,
59+
}

tests/client.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,40 @@ async fn test_delete_records() {
541541
);
542542
}
543543

544+
#[tokio::test]
545+
async fn test_metadata() {
546+
maybe_start_logging();
547+
548+
let connection = maybe_skip_kafka_integration!();
549+
let topic_name = random_topic_name();
550+
551+
let client = ClientBuilder::new(connection).build().await.unwrap();
552+
553+
let controller_client = client.controller_client().unwrap();
554+
controller_client
555+
.create_topic(&topic_name, 1, 1, 5_000)
556+
.await
557+
.unwrap();
558+
559+
let md = client.metadata().await.unwrap();
560+
assert!(!md.brokers.is_empty());
561+
562+
// topic metadata might take a while to converge
563+
tokio::time::timeout(Duration::from_millis(1_000), async {
564+
loop {
565+
let md = client.metadata().await.unwrap();
566+
let topic = md.topics.into_iter().find(|topic| topic.name == topic_name);
567+
if topic.is_some() {
568+
return;
569+
}
570+
571+
tokio::time::sleep(Duration::from_millis(10)).await;
572+
}
573+
})
574+
.await
575+
.unwrap();
576+
}
577+
544578
#[tokio::test]
545579
async fn test_reassign_partitions() {
546580
maybe_start_logging();

0 commit comments

Comments
 (0)