diff --git a/Cargo.lock b/Cargo.lock index 7b9043b..eccf163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "hotdog" -version = "0.3.2" +version = "0.3.3" dependencies = [ "async-std", "async-tls", diff --git a/Cargo.toml b/Cargo.toml index 67c331a..d53baf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hotdog" -version = "0.3.2" +version = "0.3.3" authors = ["R. Tyler Croy "] edition = "2018" diff --git a/src/kafka.rs b/src/kafka.rs index f0f447c..508adb1 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -3,6 +3,7 @@ use crate::status::{Statistic, Stats}; * The Kafka module contains all the tooling/code necessary for connecting hotdog to Kafka for * sending log lines along as Kafka messages */ +use async_std::task; use async_std::sync::{channel, Receiver, Sender}; use log::*; use rdkafka::client::DefaultClientContext; @@ -140,64 +141,69 @@ impl Kafka { let start_time = Instant::now(); let producer = producer.clone(); - // TODO: What if this is a task::spawn for each message, would that be too much - // overhead? - - let record = FutureRecord::::to(&kmsg.topic).payload(&kmsg.msg); /* - * Intentionally setting the timeout_ms to -1 here so this blocks forever if the - * outbound librdkafka queue is full. This will block up the crossbeam channel - * properly and cause messages to begin to be dropped, rather than buffering - * "forever" inside of hotdog + * Needed in order to prevent concurrent writers from totally + * killing parallel performance */ - if let Ok(delivery_result) = producer.send(record, -1 as i64).await { - match delivery_result { - Ok(_) => { - stats - .send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1)) - .await; - /* - * dipstick only supports u64 timers anyways, but as_micros() can - * give a u128 (!). - */ - if let Ok(elapsed) = start_time.elapsed().as_micros().try_into() { - stats.send((Stats::KafkaMsgSent, elapsed)).await; - } else { - error!("Could not collect message time because the duration couldn't fit in an i64, yikes"); - } - } - Err((err, _)) => { - match err { + task::yield_now().await; + + task::spawn(async move { + let record = FutureRecord::::to(&kmsg.topic).payload(&kmsg.msg); + /* + * Intentionally setting the timeout_ms to -1 here so this blocks forever if the + * outbound librdkafka queue is full. This will block up the crossbeam channel + * properly and cause messages to begin to be dropped, rather than buffering + * "forever" inside of hotdog + */ + if let Ok(delivery_result) = producer.send(record, -1 as i64).await { + match delivery_result { + Ok(_) => { + stats + .send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1)) + .await; /* - * err_type will be one of RdKafkaError types defined: - * https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html - */ - KafkaError::MessageProduction(err_type) => { - error!("Failed to send message to Kafka due to: {}", err_type); - stats - .send(( - Stats::KafkaMsgErrored { - errcode: metric_name_for(err_type), - }, - 1, - )) - .await; + * dipstick only supports u64 timers anyways, but as_micros() can + * give a u128 (!). + */ + if let Ok(elapsed) = start_time.elapsed().as_micros().try_into() { + stats.send((Stats::KafkaMsgSent, elapsed)).await; + } else { + error!("Could not collect message time because the duration couldn't fit in an i64, yikes"); } - _ => { - error!("Failed to send message to Kafka!"); - stats - .send(( - Stats::KafkaMsgErrored { - errcode: String::from("generic"), - }, - 1, - )) - .await; + } + Err((err, _)) => { + match err { + /* + * err_type will be one of RdKafkaError types defined: + * https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html + */ + KafkaError::MessageProduction(err_type) => { + error!("Failed to send message to Kafka due to: {}", err_type); + stats + .send(( + Stats::KafkaMsgErrored { + errcode: metric_name_for(err_type), + }, + 1, + )) + .await; + } + _ => { + error!("Failed to send message to Kafka!"); + stats + .send(( + Stats::KafkaMsgErrored { + errcode: String::from("generic"), + }, + 1, + )) + .await; + } } } } } - } + }); } } }