Skip to content

Commit

Permalink
alleviate CPU producer utilization
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieugouel committed Feb 15, 2025
1 parent 873e6d6 commit d3fb7ed
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ bmp:
port: 4000

kafka:
host: kafka.example.com
port: 9092
enable: true
brokers: "kafka.example.com:9092"
auth_protocol: PLAINTEXT
topic: bgp-updates

state:
enable: true
path: /app/dump.txt
path: /app/dump.json
save_interval: 10
```
Expand Down
11 changes: 8 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,13 @@ pub struct KafkaConfig {
/// Default: 1048576
pub message_max_bytes: usize,

/// Kafka producer max wait time
/// Kafka producer batch wait time
/// Default: 1000
pub max_wait_time: u64,
pub batch_wait_time: u64,

/// Kafka producer batch wait interval
/// Default: 100
pub batch_wait_interval: u64,
}

pub fn get_kafka_config(config: &Config) -> KafkaConfig {
Expand All @@ -125,7 +129,8 @@ pub fn get_kafka_config(config: &Config) -> KafkaConfig {
.get_string("kafka.topic")
.unwrap_or("risotto-updates".to_string()),
message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576) as usize,
max_wait_time: config.get_int("kafka.max_wait_time").unwrap_or(1000) as u64,
batch_wait_time: config.get_int("kafka.batch_wait_time").unwrap_or(1000) as u64,
batch_wait_interval: config.get_int("kafka.batch_wait_interval").unwrap_or(100) as u64,
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ pub async fn handle(config: &KafkaConfig, rx: Receiver<String>) -> Result<()> {
}

loop {
let now = std::time::Instant::now();
if now.duration_since(start_time)
> std::time::Duration::from_millis(config.max_wait_time)
if std::time::Instant::now().duration_since(start_time)
> std::time::Duration::from_millis(config.batch_wait_time)
{
break;
}

let message = rx.try_recv();
if message.is_err() {
tokio::time::sleep(Duration::from_millis(config.batch_wait_interval)).await;
continue;
}

Expand Down

0 comments on commit d3fb7ed

Please sign in to comment.