Fix native FFI memory leak and improve memory management#57
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes native/librdkafka memory growth in the exporter by addressing an FFI request-object leak and introducing periodic client/pool recycling, plus allocator/config tuning aimed at large clusters.
Changes:
- Fix
ListConsumerGroupOffsetsFFI cleanup so the native request object is properly destroyed. - Add Kafka client recycling (
client_recycle_interval) and timestamp consumer pool recycling to bound librdkafka metadata growth. - Enable jemalloc by default (feature-flagged) and apply memory-oriented librdkafka/default concurrency tuning; update docs and test-stack config.
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| test-stack/klag-config.toml | Adjust test-stack defaults (lower fetch concurrency; add performance section; disable OTEL). |
| src/main.rs | Configure jemalloc as global allocator and provide default malloc configuration. |
| src/kafka/consumer.rs | Add librdkafka memory-tuning properties; implement timestamp consumer pool recycling. |
| src/kafka/client.rs | Fix Admin FFI request lifetime/cleanup; add recyclable admin/consumer handles; add RSS logging helper. |
| src/config.rs | Add client_recycle_interval to performance config and adjust defaults. |
| src/collector/timestamp_sampler.rs | Expose recycle_pool() to recycle underlying consumer pool. |
| src/cluster/manager.rs | Trigger periodic client + pool recycling between successful collection cycles. |
| config.example.toml | Document new [exporter.performance] configuration section. |
| README.md | Document memory-management behavior and new tuning/recycling settings. |
| Dockerfile.dev | Bump Rust image version for dev builds. |
| Cargo.toml | Add tikv-jemallocator and enable jemalloc as a default feature. |
| Cargo.lock | Lockfile updates for jemalloc dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
c684b3b to
6765d90
Compare
6765d90 to
44b7fc0
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses high native memory growth in long-running klag-exporter processes by fixing an FFI leak in the Admin API offset fetch path and adding mechanisms to periodically recycle librdkafka clients/consumer pools to release accumulated internal metadata.
Changes:
- Fix
list_consumer_group_offsets()native allocation leak by ensuring the request object is properly destroyed (caller retains ownership). - Add configurable Kafka client + timestamp consumer pool recycling via
client_recycle_intervalto mitigate librdkafka topic-handle cache growth on large clusters. - Enable jemalloc by default and apply multiple memory/performance tuning updates (configs, defaults, docs, dev Docker image).
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/kafka/client.rs |
Fixes Admin API request lifetime handling; adds recyclable AdminClient/BaseConsumer wrapper and RSS logging helper. |
src/kafka/consumer.rs |
Adds librdkafka buffer/metadata-refresh tuning and implements timestamp consumer pool recycling. |
src/cluster/manager.rs |
Wires periodic recycling into the collection loop based on new config setting. |
src/config.rs |
Introduces client_recycle_interval with defaults and serde support; lowers default max_concurrent_fetches. |
src/main.rs |
Adds optional jemalloc global allocator + jemalloc runtime tuning export. |
Cargo.toml / Cargo.lock |
Adds tikv-jemallocator dependency and enables it by default via features. |
README.md |
Documents memory management rationale and the new recycling configuration. |
config.example.toml |
Adds [exporter.performance] documentation for recycling. |
test-stack/klag-config.toml |
Adjusts test-stack defaults (lower pool size, disable OTEL, disable recycling for small cluster). |
src/collector/timestamp_sampler.rs |
Exposes a recycling entry point for the timestamp consumer pool. |
Dockerfile.dev |
Bumps Rust toolchain image version used for development builds. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
- Fix memory leak in list_consumer_group_offsets() where the request object was incorrectly nulled after rd_kafka_ListConsumerGroupOffsets(), preventing the RAII cleanup guard from calling rd_kafka_ListConsumerGroupOffsets_destroy(). The caller retains ownership per librdkafka API contract. - Make KafkaClient recyclable by wrapping AdminClient and BaseConsumer in Mutex<Arc<...>>. Add recycle() method that destroys and recreates internal clients to release accumulated librdkafka metadata handles (rd_kafka_topic_t objects that are never freed until client destruction). - Add TimestampConsumer::recycle_pool() to periodically recreate pooled BaseConsumer instances for the same reason. - Add configurable client_recycle_interval to PerformanceConfig (default: 50 cycles, set 0 to disable). - Add jemalloc as default allocator with tuned configuration (narenas:2, dirty/muzzy_decay_ms:5000, background_thread:true) for better memory return behavior vs glibc malloc. - Tune librdkafka consumer properties for monitoring use: queued.min.messages=100, queued.max.messages.kbytes=1024, topic.metadata.refresh.interval.ms=600000 (or disabled for pool consumers). - Reduce default max_concurrent_fetches from 10 to 5 since each pooled consumer is a full librdkafka client (~5-15 MB). - Add RSS instrumentation to collection cycles for memory diagnostics. - Bump Dockerfile.dev rust image from 1.83 to 1.85 (needed for clap_lex edition2024 support). - Update README, config.example.toml with memory management docs. Closes #56
44b7fc0 to
9e46820
Compare
Summary
Fixes #56 — native memory leak in
list_consumer_group_offsets()and addresses broader memory management for large clusters.rd_kafka_ListConsumerGroupOffsets()does not take ownership of the request object — the caller must free it. The code was incorrectly nulling the cleanup guard's pointer, preventingrd_kafka_ListConsumerGroupOffsets_destroy()from being called. One request object leaked per group offset fetch per cycle (182 MB/cycle on the reporter's cluster).KafkaClientnow wraps itsAdminClientandBaseConsumerinMutex<Arc<...>>with arecycle()method that destroys and recreates them, releasing accumulated librdkafkard_kafka_topic_thandles that are never freed until client destruction. Configurable viaclient_recycle_interval(default: 50 cycles,0to disable).TimestampConsumer::recycle_pool()periodically recreates pooledBaseConsumerinstances for the same reason.queued.min.messages=100,queued.max.messages.kbytes=1024, reducedtopic.metadata.refresh.interval.msfor monitoring-oriented consumers.max_concurrent_fetchesdefault lowered from 10 to 5 since each is a full librdkafka client (~5-15 MB).clap_lexedition2024).Test plan
cargo test— all 48 tests passcargo clippy— no warningsinfolevel with RSS before/after