From 666013b1eb0f7564029dcdb2093b0827a10a9ad6 Mon Sep 17 00:00:00 2001 From: matthieugouel Date: Sat, 15 Feb 2025 18:48:28 +0100 Subject: [PATCH] alleviate producer CPU utilization --- README.md | 7 ++++--- src/config.rs | 11 ++++++++--- src/producer.rs | 6 +++--- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 72710cf..0119041 100644 --- a/README.md +++ b/README.md @@ -38,13 +38,14 @@ bmp: port: 4000 kafka: - host: kafka.example.com - port: 9092 + enable: true + brokers: "kafka.example.com:9092" + auth_protocol: PLAINTEXT topic: bgp-updates state: enable: true - path: /app/dump.txt + path: /app/dump.json save_interval: 10 ``` diff --git a/src/config.rs b/src/config.rs index 5411b38..3203c62 100644 --- a/src/config.rs +++ b/src/config.rs @@ -98,9 +98,13 @@ pub struct KafkaConfig { /// Default: 1048576 pub message_max_bytes: usize, - /// Kafka producer max wait time + /// Kafka producer batch wait time /// Default: 1000 - pub max_wait_time: u64, + pub batch_wait_time: u64, + + /// Kafka producer batch wait interval + /// Default: 100 + pub batch_wait_interval: u64, } pub fn get_kafka_config(config: &Config) -> KafkaConfig { @@ -125,7 +129,8 @@ pub fn get_kafka_config(config: &Config) -> KafkaConfig { .get_string("kafka.topic") .unwrap_or("risotto-updates".to_string()), message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576) as usize, - max_wait_time: config.get_int("kafka.max_wait_time").unwrap_or(1000) as u64, + batch_wait_time: config.get_int("kafka.batch_wait_time").unwrap_or(1000) as u64, + batch_wait_interval: config.get_int("kafka.batch_wait_interval").unwrap_or(100) as u64, } } diff --git a/src/producer.rs b/src/producer.rs index a53d045..3757e9b 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -77,15 +77,15 @@ pub async fn handle(config: &KafkaConfig, rx: Receiver) -> Result<()> { } loop { - let now = std::time::Instant::now(); - if now.duration_since(start_time) - > std::time::Duration::from_millis(config.max_wait_time) + if std::time::Instant::now().duration_since(start_time) + > std::time::Duration::from_millis(config.batch_wait_time) { break; } let message = rx.try_recv(); if message.is_err() { + tokio::time::sleep(Duration::from_millis(config.batch_wait_interval)).await; continue; }