diff --git a/Cargo.lock b/Cargo.lock index 6517695..cca5641 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" + [[package]] name = "arraydeque" version = "0.5.1" @@ -572,21 +578,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.4.2" @@ -762,21 +753,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1360,25 +1336,6 @@ dependencies = [ "serde", ] -[[package]] -name = "kafka" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054ba4edcb4dcda4209e138c7e88caf26d4a325b3db76fbdb6ca5eecc23e426" -dependencies = [ - "byteorder", - "crc", - "flate2", - "fnv", - "openssl", - "openssl-sys", - "ref_slice", - "snap", - "thiserror 1.0.69", - "tracing", - "twox-hash", -] - [[package]] name = "lazy-regex" version = "3.4.1" @@ -1430,6 +1387,18 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "libz-sys" +version = "1.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9b68e50e6e0b26f672573834882eb57759f6db9b3be2ea3c35c91188bb4eaa" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1677,50 +1646,12 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "openssl" -version = "0.10.70" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" -[[package]] -name = "openssl-sys" -version = "0.9.105" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "ordered-multimap" version = "0.6.0" @@ -2071,6 +2002,36 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rdkafka" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.5.8" @@ -2080,12 +2041,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "ref_slice" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" - [[package]] name = "regex" version = "1.11.1" @@ -2195,6 +2150,7 @@ dependencies = [ name = "risotto" version = "0.1.0" dependencies = [ + "anyhow", "axum", "bgpkit-parser", "bytes", @@ -2203,11 +2159,11 @@ dependencies = [ "clap-verbosity-flag", "config", "env_logger", - "kafka", "log", "metrics", "metrics-exporter-prometheus", "rand 0.9.0", + "rdkafka", "serde", "serde_json", "tokio", @@ -2573,12 +2529,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "snap" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" - [[package]] name = "socket2" version = "0.5.8" @@ -2601,12 +2551,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.11.1" @@ -2980,17 +2924,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "rand 0.8.5", - "static_assertions", -] - [[package]] name = "typenum" version = "1.17.0" diff --git a/Cargo.toml b/Cargo.toml index caf3181..0808f32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.95" axum = "0.8.1" bgpkit-parser = { version = "0.10.11", features = ["serde"] } bytes = "1.9.0" @@ -14,11 +15,11 @@ clap = { version = "4.5.23", features = ["derive"] } clap-verbosity-flag = "3.0.2" config = "0.15.4" env_logger = "0.11.6" -kafka = "0.10.0" log = "0.4.22" metrics = "0.24.1" metrics-exporter-prometheus = "0.16.0" rand = "0.9.0" +rdkafka = "0.37.0" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.134" tokio = { version = "1.42.0", features = ["full"] } diff --git a/src/bmp.rs b/src/bmp.rs index 9fd2ae2..fc33d92 100644 --- a/src/bmp.rs +++ b/src/bmp.rs @@ -6,6 +6,7 @@ use bgpkit_parser::parse_bmp_msg; use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody}; use bytes::Bytes; use core::net::IpAddr; +use log::{error, info, trace}; use std::io::{Error, ErrorKind, Result}; use std::sync::mpsc::Sender; use tokio::io::AsyncReadExt; @@ -49,7 +50,7 @@ pub async fn unmarshal_bmp_packet(socket: &mut TcpStream) -> Result async fn process_bmp_packet( state: AsyncState, - tx: Sender>, + tx: Sender, router_addr: IpAddr, router_port: u16, message: BmpMessage, @@ -75,11 +76,10 @@ async fn process_bmp_packet( match message.message_body { BmpMessageBody::PeerUpNotification(body) => { - log::trace!("{:?}", body); - log::info!( + trace!("{:?}", body); + info!( "bmp - PeerUpNotification: {} - {}", - router_addr, - peer.peer_address + router_addr, peer.peer_address ); let spawn_state = state.clone(); @@ -88,7 +88,7 @@ async fn process_bmp_packet( }); } BmpMessageBody::RouteMonitoring(body) => { - log::trace!("{:?}", body); + trace!("{:?}", body); let header = UpdateHeader { timestamp, is_post_policy, @@ -105,23 +105,19 @@ async fn process_bmp_packet( } } - let mut buffer = vec![]; for mut update in legitimate_updates { let update = format_update(router_addr, router_port, &peer, &mut update); log::trace!("{:?}", update); - buffer.extend(update.as_bytes()); - buffer.extend(b"\n"); - } - // Sent to the event pipeline - tx.send(buffer).unwrap(); + // Sent to the event pipeline + tx.send(update).unwrap(); + } } BmpMessageBody::PeerDownNotification(body) => { - log::trace!("{:?}", body); - log::info!( + trace!("{:?}", body); + info!( "bmp - PeerDownNotification: {} - {}", - router_addr, - peer.peer_address + router_addr, peer.peer_address ); // Remove the peer and the associated updates from the state @@ -135,22 +131,19 @@ async fn process_bmp_packet( // Then update the state state_lock.remove_updates(&router_addr, &peer).unwrap(); - let mut buffer = vec![]; for mut update in synthetic_updates { let update = format_update(router_addr, router_port, &peer, &mut update); - log::trace!("{:?}", update); - buffer.extend(update.as_bytes()); - buffer.extend(b"\n"); - } + trace!("{:?}", update); - // Finally send the synthetic updates to the event pipeline - tx.send(buffer).unwrap(); + // Send the synthetic updates to the event pipeline + tx.send(update).unwrap(); + } } _ => (), } } -pub async fn handle(socket: &mut TcpStream, state: AsyncState, tx: Sender>) { +pub async fn handle(socket: &mut TcpStream, state: AsyncState, tx: Sender) { // Get router IP information let socket_info = socket.peer_addr().unwrap(); let router_ip = socket_info.ip(); @@ -168,22 +161,20 @@ pub async fn handle(socket: &mut TcpStream, state: AsyncState, tx: Sender { // Other errors are unexpected // Close the connection - log::error!("bmp - failed to unmarshal BMP message: {}", e); - log::error!( + error!("bmp - failed to unmarshal BMP message: {}", e); + error!( "bmp - closing connection with {}:{}", - router_ip, - router_port + router_ip, router_port ); break; } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..5411b38 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,167 @@ +use config::Config; +use core::net::IpAddr; + +#[derive(Debug, Clone)] +pub struct AppConfig { + pub api: APIConfig, + pub bmp: BMPConfig, + pub kafka: KafkaConfig, + pub state: StateConfig, +} + +fn load_config(config_path: &str) -> Config { + let cfg = Config::builder() + .add_source(config::File::with_name(config_path)) + .add_source(config::Environment::with_prefix("RISOTTO")) + .build() + .unwrap(); + cfg +} + +pub fn app_config(config_path: &str) -> AppConfig { + let config = load_config(config_path); + let api = get_api_config(&config); + let bmp = get_bmp_config(&config); + let kafka = get_kafka_config(&config); + let state = get_state_config(&config); + AppConfig { + api, + bmp, + kafka, + state, + } +} + +#[derive(Debug, Clone)] +pub struct APIConfig { + pub host: String, +} + +pub fn get_api_config(config: &Config) -> APIConfig { + let api_addr = config + .get_string("api.address") + .unwrap_or("localhost".to_string()); + let api_port = config.get_int("api.port").unwrap_or(3000); + + APIConfig { + host: host(api_addr, api_port, false), + } +} + +#[derive(Debug, Clone)] +pub struct BMPConfig { + pub host: String, +} + +pub fn get_bmp_config(config: &Config) -> BMPConfig { + let bmp_addr = config + .get_string("bmp.address") + .unwrap_or("localhost".to_string()); + let bmp_port = config.get_int("bmp.port").unwrap_or(4000); + + BMPConfig { + host: host(bmp_addr, bmp_port, true), + } +} + +#[derive(Debug, Clone)] +pub struct KafkaConfig { + /// Kafka brokers + /// Default: localhost:9092 + pub brokers: String, + + /// Kafka Authentication Protocol + /// Default: PLAINTEXT + pub auth_protocol: String, + + /// Kafka Authentication SASL Username + /// Default: saimiris + pub auth_sasl_username: String, + + /// Kafka Authentication SASL Password + /// Default: saimiris + pub auth_sasl_password: String, + + /// Kafka Authentication SASL Mechanism + /// Default: SCRAM-SHA-512 + pub auth_sasl_mechanism: String, + + /// Enable Kafka producer + /// Default: true + pub enable: bool, + + /// Kafka producer topic + /// Default: saimiris-results + pub topic: String, + + /// Kafka message max bytes + /// Default: 1048576 + pub message_max_bytes: usize, + + /// Kafka producer max wait time + /// Default: 1000 + pub max_wait_time: u64, +} + +pub fn get_kafka_config(config: &Config) -> KafkaConfig { + KafkaConfig { + brokers: config + .get_string("kafka.brokers") + .unwrap_or("localhost:9092".to_string()), + auth_protocol: config + .get_string("kafka.auth_protocol") + .unwrap_or("PLAINTEXT".to_string()), + auth_sasl_username: config + .get_string("kafka.auth_sasl_username") + .unwrap_or("risotto".to_string()), + auth_sasl_password: config + .get_string("kafka.auth_sasl_password") + .unwrap_or("risotto".to_string()), + auth_sasl_mechanism: config + .get_string("kafka.auth_sasl_mechanism") + .unwrap_or("SCRAM-SHA-512".to_string()), + enable: config.get_bool("kafka.enable").unwrap_or(true), + topic: config + .get_string("kafka.topic") + .unwrap_or("risotto-updates".to_string()), + message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576) as usize, + max_wait_time: config.get_int("kafka.max_wait_time").unwrap_or(1000) as u64, + } +} + +#[derive(Debug, Clone)] +pub struct StateConfig { + pub enable: bool, + pub path: String, + pub interval: u64, +} + +pub fn get_state_config(config: &Config) -> StateConfig { + StateConfig { + enable: config.get_bool("state.enable").unwrap_or(true), + path: config + .get_string("state.path") + .unwrap_or("state.json".to_string()), + interval: config.get_int("state.save_interval").unwrap_or(10) as u64, + } +} + +pub fn host(address: String, port: i64, accept_fqdn: bool) -> String { + let host = match address.parse::() { + Ok(ip) => { + if ip.is_ipv4() { + address + } else { + format!("[{}]", ip) + } + } + Err(_) => { + if accept_fqdn { + address + } else { + panic!("FQDN non supported") + } + } + }; + format!("{}:{}", host, port) +} diff --git a/src/main.rs b/src/main.rs index 9547a9a..35c05e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,16 @@ mod api; mod bmp; +mod config; mod producer; -mod settings; mod state; mod update; use chrono::Local; use clap::Parser; use clap_verbosity_flag::{InfoLevel, Verbosity}; -use config::Config; +use config::AppConfig; use env_logger::Builder; -use log::{debug, info}; +use log::{debug, error, info}; use std::error::Error; use std::io::Write; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -19,6 +19,7 @@ use std::time::Duration; use tokio::net::TcpListener; use tokio_graceful::Shutdown; +use crate::config::app_config; use crate::state::AsyncState; #[derive(Parser, Debug)] @@ -46,18 +47,8 @@ fn set_logging(cli: &Cli) { .init(); } -fn load_settings(config_path: &str) -> Arc { - let cfg = Config::builder() - .add_source(config::File::with_name(config_path)) - .add_source(config::Environment::with_prefix("RISOTTO")) - .build() - .unwrap(); - Arc::new(cfg) -} - -async fn api_handler(state: AsyncState, cfg: Arc) { - let api_config = settings::get_api_config(&cfg).unwrap(); - +async fn api_handler(state: AsyncState, cfg: Arc) { + let api_config = cfg.api.clone(); debug!("api - binding listener to {}", api_config.host); let api_listener = TcpListener::bind(api_config.host).await.unwrap(); @@ -65,8 +56,8 @@ async fn api_handler(state: AsyncState, cfg: Arc) { axum::serve(api_listener, app).await.unwrap(); } -async fn bmp_handler(state: AsyncState, cfg: Arc, tx: Sender>) { - let bmp_config = settings::get_bmp_config(&cfg).unwrap(); +async fn bmp_handler(state: AsyncState, cfg: Arc, tx: Sender) { + let bmp_config = cfg.bmp.clone(); debug!("bmp - binding listener to {}", bmp_config.host); let bmp_listener = TcpListener::bind(bmp_config.host).await.unwrap(); @@ -83,14 +74,17 @@ async fn bmp_handler(state: AsyncState, cfg: Arc, tx: Sender>) { } } -async fn producer_handler(cfg: Arc, rx: Receiver>) { - let cfg = settings::get_kafka_config(&cfg).unwrap(); +async fn producer_handler(cfg: Arc, rx: Receiver) { + let kafka_config = cfg.kafka.clone(); - producer::handle(&cfg, rx).await; + match producer::handle(&kafka_config, rx).await { + Ok(_) => {} + Err(e) => error!("producer - {}", e), + } } -async fn state_handler(state: AsyncState, cfg: Arc) { - let cfg = settings::get_state_config(&cfg).unwrap(); +async fn state_handler(state: AsyncState, cfg: Arc) { + let cfg = cfg.state.clone(); state::dump_handler(state.clone(), cfg.clone()).await; } @@ -99,8 +93,8 @@ async fn state_handler(state: AsyncState, cfg: Arc) { async fn main() -> Result<(), Box> { let cli = Cli::parse(); - let cfg = load_settings(&cli.config); - let state_config = settings::get_state_config(&cfg).unwrap(); + let cfg = Arc::new(app_config(&cli.config)); + let state_config = cfg.state.clone(); let state = state::new_state(&state_config); let shutdown: Shutdown = Shutdown::default(); diff --git a/src/producer.rs b/src/producer.rs index f1537ff..a53d045 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,155 +1,127 @@ -use kafka::client::{Compression, DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS}; -use kafka::producer::{AsBytes, Producer, Record, RequiredAcks, DEFAULT_ACK_TIMEOUT_MILLIS}; -use std::error::Error; -use std::io::BufRead; -use std::io::Cursor; -use std::ops::{Deref, DerefMut}; -use std::sync::mpsc::{Receiver, TryRecvError}; +use anyhow::Result; +use log::{debug, info, trace, warn}; +use rdkafka::config::ClientConfig; +use rdkafka::message::OwnedHeaders; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use std::sync::mpsc::Receiver; use std::time::Duration; -use crate::settings::KafkaConfig; +use crate::config::KafkaConfig; -struct Trimmed(String); - -impl AsBytes for Trimmed { - fn as_bytes(&self) -> &[u8] { - self.0.trim().as_bytes() - } +#[derive(Clone)] +pub struct SaslAuth { + pub username: String, + pub password: String, + pub mechanism: String, } -impl Deref for Trimmed { - type Target = String; - fn deref(&self) -> &Self::Target { - &self.0 - } +#[derive(Clone)] +pub enum KafkaAuth { + SasalPlainText(SaslAuth), + PlainText, } -impl DerefMut for Trimmed { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -fn produce_impl( - producer: &mut Producer, - cfg: &KafkaConfig, - data: &mut dyn BufRead, -) -> Result> { - // ~ a buffer of prepared records to be send in a batch to Kafka - // ~ in the loop following, we'll only modify the 'value' of the - // cached records - let mut rec_stash: Vec> = (0..cfg.batch_max_size) - .map(|_| Record::from_value(&cfg.topic, Trimmed(String::new()))) - .collect(); - - // ~ points to the next free slot in `rec_stash`. if it reaches - // `rec_stash.len()` we'll send `rec_stash` to kafka - let mut next_rec = 0; - let mut n_rec = 0; - loop { - // ~ send out a batch if it's ready - if next_rec == rec_stash.len() { - send_batch(producer, &rec_stash)?; - next_rec = 0; - } - let rec = &mut rec_stash[next_rec]; - rec.value.clear(); - if data.read_line(&mut rec.value)? == 0 { - break; // ~ EOF reached - } - if rec.value.trim().is_empty() { - continue; // ~ skip empty lines +pub async fn handle(config: &KafkaConfig, rx: Receiver) -> Result<()> { + // Configure Kafka authentication + let kafka_auth = match config.auth_protocol.as_str() { + "PLAINTEXT" => KafkaAuth::PlainText, + "SASL_PLAINTEXT" => KafkaAuth::SasalPlainText(SaslAuth { + username: config.auth_sasl_username.clone(), + password: config.auth_sasl_password.clone(), + mechanism: config.auth_sasl_mechanism.clone(), + }), + _ => { + return Err(anyhow::anyhow!( + "Invalid Kafka producer authentication protocol" + )) } - // ~ ok, we got a line. read the next one in a new buffer - next_rec += 1; - n_rec += 1; - } - // ~ flush pending messages - if any - if next_rec > 0 { - send_batch(producer, &rec_stash[..next_rec])?; - } - Ok(n_rec) -} - -fn send_batch( - producer: &mut Producer, - batch: &[Record<'_, (), Trimmed>], -) -> Result<(), Box> { - let rs = producer.send_all(batch)?; + }; - for r in rs { - for tpc in r.partition_confirms { - if let Err(code) = tpc.offset { - return Err(Box::new(kafka::error::Error::Kafka(code))); - } + if config.enable == false { + warn!("producer - disabled"); + loop { + rx.recv().unwrap(); } } - Ok(()) -} - -pub async fn handle(cfg: &KafkaConfig, rx: Receiver>) { - // TODO: Allow multiple brokers via the config file - let mut client = kafka::client::KafkaClient::new(vec![cfg.host.to_owned()]); - - // Wait until the metadata we succeed to reach the Kafka brokers + let producer: &FutureProducer = match kafka_auth { + KafkaAuth::PlainText => &ClientConfig::new() + .set("bootstrap.servers", config.brokers.clone()) + .set("message.timeout.ms", "5000") + .create() + .expect("Producer creation error"), + KafkaAuth::SasalPlainText(scram_auth) => &ClientConfig::new() + .set("bootstrap.servers", config.brokers.clone()) + .set("message.timeout.ms", "5000") + .set("sasl.username", scram_auth.username) + .set("sasl.password", scram_auth.password) + .set("sasl.mechanisms", scram_auth.mechanism) + .set("security.protocol", "SASL_PLAINTEXT") + .create() + .expect("Producer creation error"), + }; + + // Send to Kafka + let mut additional_message: Option = None; loop { - match client.load_metadata(&[cfg.topic.to_owned()]) { - Ok(_) => { - log::debug!("producer - metadata loaded"); + let start_time = std::time::Instant::now(); + let mut final_message = String::new(); + let mut n_messages = 0; + + // Send the additional message first + if let Some(message) = additional_message { + final_message.push_str(&message); + final_message.push_str("\n"); + n_messages += 1; + additional_message = None; + } + + loop { + let now = std::time::Instant::now(); + if now.duration_since(start_time) + > std::time::Duration::from_millis(config.max_wait_time) + { break; } - Err(_) => { - log::error!("producer - failed to load metadata: retrying in 5 seconds"); - tokio::time::sleep(Duration::from_secs(5)).await; + + let message = rx.try_recv(); + if message.is_err() { + continue; } - } - } - // TODO: Allow compression setting via the config file - // TODO: Allow timeouts setting via the config file - let mut producer = Producer::from_client(client) - .with_ack_timeout(Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS)) - .with_connection_idle_timeout(Duration::from_millis( - DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS, - )) - .with_required_acks(RequiredAcks::One) - .with_compression(Compression::NONE) - .create() - .unwrap(); + let message = message.unwrap(); + trace!("producer - received - {}", message); - loop { - // Wait the batch wait time to collect messages - tokio::time::sleep(Duration::from_secs(cfg.batch_interval)).await; - let mut data = Vec::new(); - loop { - // Collect all of the messages from BMP handler - match rx.try_recv() { - Ok(d) => data.extend(d), - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - log::error!("producer - BMP handler disconnected"); - return; - } + if message.len() + message.len() + 1 > config.message_max_bytes { + additional_message = Some(message); + break; } + + final_message.push_str(&message); + final_message.push_str("\n"); + n_messages += 1; } - // If no data was collected within the batch waiting time, - // continue to the next iteration - if data.is_empty() { - log::debug!("producer - produced 0 messages"); + if final_message.is_empty() { continue; } - // Send the collected messages to Kafka in batches - let mut data = Cursor::new(data); - match produce_impl(&mut producer, cfg, &mut data) { - Ok(n) => { - log::info!("producer - produced {} messages", n) - } - Err(e) => { - log::error!("producer - failed producing messages: {}", e); - } - }; + // Remove the last newline character + final_message.pop(); + + debug!("producer - {}", final_message); + info!("producer - sending {} updates to Kafka", n_messages); + + let delivery_status = producer + .send( + FutureRecord::to(config.topic.as_str()) + .payload(&format!("{}", final_message)) + .key("") + .headers(OwnedHeaders::new()), + Duration::from_secs(0), + ) + .await; + + info!("producer - {:?}", delivery_status); } } diff --git a/src/settings.rs b/src/settings.rs deleted file mode 100644 index 96d4c5a..0000000 --- a/src/settings.rs +++ /dev/null @@ -1,93 +0,0 @@ -use config::Config; -use core::net::IpAddr; -use std::error::Error; - -#[derive(Clone)] -pub struct APIConfig { - pub host: String, -} - -pub fn get_api_config(settings: &Config) -> Result> { - let api_addr = settings.get_string("api.address")?; - let api_port = settings.get_int("api.port")?; - let host = host(api_addr, api_port, false); - Ok(APIConfig { host }) -} - -#[derive(Clone)] -pub struct BMPConfig { - pub host: String, -} - -pub fn get_bmp_config(settings: &Config) -> Result> { - let bmp_addr = settings.get_string("bmp.address")?; - let bmp_port = settings.get_int("bmp.port")?; - let host = host(bmp_addr, bmp_port, true); - Ok(BMPConfig { host }) -} - -#[derive(Clone)] -pub struct KafkaConfig { - pub host: String, - pub topic: String, - pub batch_max_size: u64, - pub batch_interval: u64, -} - -pub fn get_kafka_config(settings: &Config) -> Result> { - // TODO: better error handling - // Right now the thread will panic if the settings are not found, - // but not the entire program - let kafka_addr = settings.get_string("kafka.address")?; - let kafka_port = settings.get_int("kafka.port")?; - let host = host(kafka_addr, kafka_port, true); - - let topic = settings.get_string("kafka.topic")?; - let batch_max_size = settings.get_int("kafka.batch_max_size").unwrap_or(100) as u64; - let batch_interval = settings.get_int("kafka.batch_interval").unwrap_or(1) as u64; - - Ok(KafkaConfig { - host, - topic, - batch_max_size, - batch_interval, - }) -} - -#[derive(Clone)] -pub struct StateConfig { - pub enable: bool, - pub path: String, - pub interval: u64, -} - -pub fn get_state_config(settings: &Config) -> Result> { - let enable = settings.get_bool("state.enable")?; - let path = settings.get_string("state.path")?; - let interval = settings.get_int("state.save_interval")? as u64; - Ok(StateConfig { - enable, - path, - interval, - }) -} - -pub fn host(address: String, port: i64, accept_fqdn: bool) -> String { - let host = match address.parse::() { - Ok(ip) => { - if ip.is_ipv4() { - address - } else { - format!("[{}]", ip) - } - } - Err(_) => { - if accept_fqdn { - address - } else { - panic!("FQDN non supported") - } - } - }; - format!("{}:{}", host, port) -} diff --git a/src/state.rs b/src/state.rs index c993cc8..c9cefe1 100644 --- a/src/state.rs +++ b/src/state.rs @@ -10,7 +10,7 @@ use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::time::Duration; -use crate::settings::StateConfig; +use crate::config::StateConfig; use crate::update::{format_update, Update}; pub type AsyncState = Arc>; @@ -258,7 +258,7 @@ pub async fn peer_up_withdraws_handler( state: AsyncState, router_addr: IpAddr, bgp_peer: BGPkitPeer, - tx: Sender>, + tx: Sender, ) { let startup = chrono::Utc::now(); let random = { @@ -300,27 +300,24 @@ pub async fn peer_up_withdraws_handler( } } + log::info!( + "state - startup withdraws handler - {} - {} emitting {} synthetic withdraw updates", + router_addr, + bgp_peer.peer_address, + synthetic_updates.len() + ); + let mut state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap(); - let mut buffer: Vec = vec![]; for (router_addr, peer, update) in &mut synthetic_updates { let update_str = format_update(*router_addr, 0, peer, update); log::trace!("{:?}", update_str); - buffer.extend(update_str.as_bytes()); - buffer.extend(b"\n"); + + // Sent to the event pipeline + tx.send(update_str).unwrap(); // Remove the update from the state state_lock.store.update(router_addr, peer, update); } - - log::info!( - "state - startup withdraws handler - {} - {} emitting {} synthetic withdraw updates", - router_addr, - bgp_peer.peer_address, - synthetic_updates.len() - ); - - // Sent to the event pipeline - tx.send(buffer).unwrap(); } pub async fn dump_handler(state: AsyncState, cfg: StateConfig) { diff --git a/testbed/compose.yml b/testbed/compose.yml index 9b3d740..c376360 100644 --- a/testbed/compose.yml +++ b/testbed/compose.yml @@ -52,7 +52,7 @@ services: risotto: build: .. - command: -v --config /config/risotto + command: -vv --config /config/risotto.yml volumes: - ./config/risotto/risotto.yml:/config/risotto.yml ports: diff --git a/testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql b/testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql index dbebeb6..08f55fb 100644 --- a/testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql +++ b/testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql @@ -22,7 +22,8 @@ SETTINGS kafka_broker_list = '10.0.0.100:9092', kafka_topic_list = 'risotto-updates', kafka_group_name = 'clickhouse-risotto-group', - kafka_format = 'CSV'; + kafka_format = 'CSV', + kafka_max_rows_per_message = 1048576; CREATE TABLE risotto.updates ( diff --git a/testbed/config/risotto/risotto.yml b/testbed/config/risotto/risotto.yml index 79a32ec..7d5c81b 100644 --- a/testbed/config/risotto/risotto.yml +++ b/testbed/config/risotto/risotto.yml @@ -7,12 +7,12 @@ bmp: port: 4000 kafka: - address: 10.0.0.100 - port: 9092 + enable: true + brokers: "10.0.0.100:9092" + auth_protocol: PLAINTEXT topic: risotto-updates - batch_max_size: 10 state: - enable: true - path: /app/dump.txt + enable: false + path: /app/state.json save_interval: 10 \ No newline at end of file