Skip to content

Commit c684b3b

Browse files
committed
Fix native FFI memory leak and improve memory management
- Fix memory leak in list_consumer_group_offsets() where the request object was incorrectly nulled after rd_kafka_ListConsumerGroupOffsets(), preventing the RAII cleanup guard from calling rd_kafka_ListConsumerGroupOffsets_destroy(). The caller retains ownership per librdkafka API contract. - Make KafkaClient recyclable by wrapping AdminClient and BaseConsumer in Mutex<Arc<...>>. Add recycle() method that destroys and recreates internal clients to release accumulated librdkafka metadata handles (rd_kafka_topic_t objects that are never freed until client destruction). - Add TimestampConsumer::recycle_pool() to periodically recreate pooled BaseConsumer instances for the same reason. - Add configurable client_recycle_interval to PerformanceConfig (default: 50 cycles, set 0 to disable). - Add jemalloc as default allocator with tuned configuration (narenas:2, dirty/muzzy_decay_ms:5000, background_thread:true) for better memory return behavior vs glibc malloc. - Tune librdkafka consumer properties for monitoring use: queued.min.messages=100, queued.max.messages.kbytes=1024, topic.metadata.refresh.interval.ms=600000 (or disabled for pool consumers). - Reduce default max_concurrent_fetches from 10 to 5 since each pooled consumer is a full librdkafka client (~5-15 MB). - Add RSS instrumentation to collection cycles for memory diagnostics. - Bump Dockerfile.dev rust image from 1.83 to 1.85 (needed for clap_lex edition2024 support). - Update README, config.example.toml with memory management docs. Closes #56
1 parent 4fe677d commit c684b3b

File tree

12 files changed

+276
-42
lines changed

12 files changed

+276
-42
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ config = "0.14"
3030
tracing = "0.1"
3131
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3232

33+
tikv-jemallocator = { version = "0.6", optional = true }
3334
regex = "1"
3435
dashmap = "6"
3536
thiserror = "1"
@@ -46,7 +47,8 @@ k8s-openapi = { version = "0.24", features = ["v1_32"], optional = true }
4647
kube-lease-manager = { version = "0.8", optional = true }
4748

4849
[features]
49-
default = []
50+
default = ["jemalloc"]
51+
jemalloc = ["tikv-jemallocator"]
5052
kubernetes = ["kube", "k8s-openapi", "kube-lease-manager"]
5153

5254
[dev-dependencies]

Dockerfile.dev

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Build stage
2-
FROM rust:1.83-bookworm AS builder
2+
FROM rust:1.85-bookworm AS builder
33

44
# Install build dependencies for rdkafka
55
RUN apt-get update && apt-get install -y \

README.md

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ max_concurrent_groups = 20 # Default: 10
513513
514514
# Maximum partitions to fetch watermarks for in parallel
515515
max_concurrent_watermarks = 100 # Default: 50
516+
517+
# Client recycling interval — see Memory Management section below
518+
client_recycle_interval = 50 # Default: 50 (set 0 to disable)
516519
```
517520

518521
### Recommended Settings by Cluster Size
@@ -546,6 +549,52 @@ max_concurrent_watermarks = 100 # Default: 50
546549

547550
4. **Consider running multiple instances** — Split monitoring across clusters or consumer group subsets using different whitelist patterns.
548551

552+
### Memory Management
553+
554+
#### The librdkafka metadata cache problem
555+
556+
librdkafka (the C library underlying the Rust Kafka client) maintains an internal hash table of topic handles. Every time the exporter touches a topic — via watermark fetches, offset lookups, or config queries — librdkafka creates or reuses a handle for that topic. These handles are **never freed** until the client is destroyed. There is no API to evict individual entries.
557+
558+
On large clusters with thousands of topics, the internal cache grows with every collection cycle. If topics are created and deleted over time (topic churn), the handle count only increases — deleted topics remain as stale entries.
559+
560+
#### Client recycling
561+
562+
To prevent unbounded memory growth, klag-exporter periodically destroys and recreates its internal Kafka clients, releasing all accumulated metadata. This is controlled by the `client_recycle_interval` setting:
563+
564+
```toml
565+
[exporter.performance]
566+
# Number of collection cycles between client recycling.
567+
# Set to 0 to disable (recommended for small/stable clusters).
568+
client_recycle_interval = 50 # Default: every 50 cycles (~25 min at 30s poll)
569+
```
570+
571+
| Setting | When to use |
572+
|---------|-------------|
573+
| `0` (disabled) | Small clusters with few topics, or stable clusters with no topic churn |
574+
| `50` (default) | Large clusters with many topics or moderate topic churn |
575+
| `100+` | Large clusters where you want less frequent recycling overhead |
576+
577+
Recycling is safe — it only runs between collection cycles after all in-flight operations have completed. The trade-off is a brief memory spike (~2-10 MB) while new clients are created before old ones are fully torn down.
578+
579+
#### jemalloc
580+
581+
klag-exporter uses [jemalloc](https://jemalloc.net/) as the default memory allocator (enabled via the `jemalloc` feature flag). jemalloc provides significantly better memory return behavior than glibc malloc, which tends to hold onto freed pages indefinitely in long-running processes.
582+
583+
To disable jemalloc:
584+
585+
```bash
586+
cargo build --release --no-default-features
587+
```
588+
589+
#### Timestamp consumer pool sizing
590+
591+
Each entry in the timestamp consumer pool (`max_concurrent_fetches`) is a full librdkafka client with its own background threads and connection state, consuming ~5-15 MB of memory. Size the pool to match your actual concurrency needs, not your topic or partition count:
592+
593+
```toml
594+
[exporter.timestamp_sampling]
595+
max_concurrent_fetches = 5 # Default: 5. Each is a full Kafka client.
596+
```
597+
549598
## Troubleshooting
550599

551600
### Time Lag Shows Gaps in Grafana
@@ -564,9 +613,11 @@ This is expected when:
564613

565614
### High Memory Usage
566615

567-
- Reduce `max_concurrent_fetches`
616+
- Reduce `max_concurrent_fetches` — each concurrent fetch is a full librdkafka client (~5-15 MB)
568617
- Use `granularity = "topic"` instead of `"partition"`
569618
- Add more restrictive `group_blacklist` / `topic_blacklist` patterns
619+
- On large clusters with topic churn, ensure `client_recycle_interval` is enabled (see below)
620+
- jemalloc is the default allocator and provides much better memory behavior than glibc malloc; disable with `--no-default-features` only if needed
570621

571622
### Connection Errors
572623

config.example.toml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@ cache_ttl = "60s"
2323
# Maximum concurrent timestamp fetch operations
2424
max_concurrent_fetches = 10
2525

26+
[exporter.performance]
27+
# Timeout for Kafka API operations (metadata, watermarks, etc.)
28+
# kafka_timeout = "30s"
29+
30+
# Timeout for fetching committed offsets per consumer group
31+
# offset_fetch_timeout = "10s"
32+
33+
# Maximum consumer groups to fetch offsets for in parallel
34+
# max_concurrent_groups = 10
35+
36+
# Maximum partitions to fetch watermarks for in parallel
37+
# max_concurrent_watermarks = 50
38+
39+
# Client recycling interval (number of collection cycles).
40+
#
41+
# librdkafka caches internal topic handles that are never freed until the
42+
# client is destroyed. On large clusters with many topics (especially with
43+
# topic churn — topics being created and deleted), this cache grows
44+
# unboundedly and can consume gigabytes of memory.
45+
#
46+
# Recycling periodically destroys and recreates the internal Kafka clients,
47+
# releasing all accumulated metadata. The trade-off is a brief allocation
48+
# spike during the swap (~2-10 MB depending on cluster size).
49+
#
50+
# Guidelines:
51+
# 0 = disabled (recommended for small/stable clusters)
52+
# 50 = default (~25 min at 30s poll; good for large clusters)
53+
# 100 = less frequent (lower overhead, more metadata accumulation)
54+
#
55+
# client_recycle_interval = 50
56+
2657
[exporter.otel]
2758
# Enable OpenTelemetry export (default: false)
2859
enabled = false

src/cluster/manager.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub struct ClusterManager {
3030
max_concurrent_fetches: usize,
3131
cache_cleanup_interval: Duration,
3232
collection_timeout: Duration,
33+
client_recycle_interval: u64,
3334
}
3435

3536
impl ClusterManager {
@@ -90,6 +91,7 @@ impl ClusterManager {
9091
max_concurrent_fetches: exporter_config.timestamp_sampling.max_concurrent_fetches,
9192
cache_cleanup_interval: exporter_config.timestamp_sampling.cache_ttl * 2,
9293
collection_timeout,
94+
client_recycle_interval: exporter_config.performance.client_recycle_interval,
9395
})
9496
}
9597

@@ -102,6 +104,7 @@ impl ClusterManager {
102104
let mut consecutive_errors = 0u32;
103105
let mut current_backoff = Duration::from_secs(1);
104106
let mut was_leader = leadership.is_leader();
107+
let mut cycle_count: u64 = 0;
105108

106109
if !was_leader {
107110
info!(
@@ -145,6 +148,31 @@ impl ClusterManager {
145148
consecutive_errors = 0;
146149
current_backoff = Duration::from_secs(1);
147150
self.registry.set_healthy(true);
151+
152+
// Periodically recycle Kafka clients to release
153+
// accumulated librdkafka internal metadata
154+
if self.client_recycle_interval > 0 {
155+
cycle_count += 1;
156+
if cycle_count >= self.client_recycle_interval {
157+
cycle_count = 0;
158+
if let Err(e) = self.client.recycle() {
159+
warn!(
160+
cluster = %self.cluster_name,
161+
error = %e,
162+
"Failed to recycle Kafka clients"
163+
);
164+
}
165+
if let Some(ref sampler) = self.timestamp_sampler {
166+
if let Err(e) = sampler.recycle_pool() {
167+
warn!(
168+
cluster = %self.cluster_name,
169+
error = %e,
170+
"Failed to recycle timestamp consumer pool"
171+
);
172+
}
173+
}
174+
}
175+
}
148176
}
149177
Ok(Err(e)) => {
150178
consecutive_errors += 1;

src/collector/timestamp_sampler.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ impl TimestampSampler {
104104
.collect()
105105
}
106106

107+
/// Recycle the underlying consumer pool to release accumulated metadata.
108+
pub fn recycle_pool(&self) -> Result<()> {
109+
self.inner.consumer.recycle_pool()
110+
}
111+
107112
pub fn clear_stale_entries(&self) {
108113
let now = Instant::now();
109114
self.inner

src/config.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ pub struct PerformanceConfig {
6464
/// Maximum number of partitions to fetch watermarks for in parallel
6565
#[serde(default = "default_max_concurrent_watermarks")]
6666
pub max_concurrent_watermarks: usize,
67+
/// Number of collection cycles between Kafka client recycling.
68+
/// Recycling destroys and recreates internal librdkafka clients to release
69+
/// accumulated metadata that librdkafka never frees on its own.
70+
/// Set to 0 to disable. Default: 50 (~25 min at 30s poll interval).
71+
#[serde(default = "default_client_recycle_interval")]
72+
pub client_recycle_interval: u64,
6773
}
6874

6975
#[derive(Debug, Deserialize, Clone)]
@@ -154,7 +160,7 @@ fn default_cache_ttl() -> Duration {
154160
}
155161

156162
fn default_max_concurrent_fetches() -> usize {
157-
10
163+
5
158164
}
159165

160166
fn default_kafka_timeout() -> Duration {
@@ -173,6 +179,10 @@ fn default_max_concurrent_watermarks() -> usize {
173179
50
174180
}
175181

182+
fn default_client_recycle_interval() -> u64 {
183+
50
184+
}
185+
176186
fn default_otel_endpoint() -> String {
177187
"http://localhost:4317".to_string()
178188
}
@@ -250,6 +260,7 @@ impl Default for PerformanceConfig {
250260
offset_fetch_timeout: default_offset_fetch_timeout(),
251261
max_concurrent_groups: default_max_concurrent_groups(),
252262
max_concurrent_watermarks: default_max_concurrent_watermarks(),
263+
client_recycle_interval: default_client_recycle_interval(),
253264
}
254265
}
255266
}

0 commit comments

Comments
 (0)