Skip to content

Commit

Permalink
implement basic kafka pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieugouel committed Nov 2, 2024
1 parent 548b501 commit 835bb5c
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 42 deletions.
135 changes: 135 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = "1.6.0"
chrono = "0.4.38"
hex = "0.4.3"
ipnet = "2.9.0"
kafka = "0.10.0"
prefix-trie = "0.3.0"
serde = { version = "1.0.213", features = ["derive"] }
tokio = { version = "1.37.0", features = ["full"] }
39 changes: 1 addition & 38 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use axum::{extract::State, routing::get, Json, Router};
use bgpkit_parser::models::{AsPath, AsPathSegment, MetaCommunity};
use core::net::{IpAddr, Ipv4Addr};
use serde::{Deserialize, Serialize};

use crate::db::DB;
use crate::update::{construct_as_path, construct_communities};

#[derive(Debug, Serialize, Deserialize)]
struct APIRouter {
Expand Down Expand Up @@ -33,43 +33,6 @@ pub fn app(db: DB) -> Router {
Router::new().route("/", get(root).with_state(db))
}

fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
match path {
Some(mut path) => {
let mut contructed_path: Vec<u32> = Vec::new();
path.dedup_coalesce();
for segment in path.into_segments_iter() {
match segment {
AsPathSegment::AsSequence(dedup_asns) => {
for asn in dedup_asns {
contructed_path.push(asn.to_u32());
}
}
_ => (),
}
}
contructed_path
}
None => Vec::new(),
}
}

fn construct_communities(communities: Vec<MetaCommunity>) -> Vec<(u32, u16)> {
let mut constructed_communities = Vec::new();
for community in communities {
match community {
MetaCommunity::Plain(community) => match community {
bgpkit_parser::models::Community::Custom(asn, value) => {
constructed_communities.push((asn.to_u32(), value));
}
_ => (), // TODO
},
_ => (), // TODO
}
}
constructed_communities
}

fn format(db: DB) -> Vec<APIRouter> {
let routers = db.routers.lock().unwrap();
let mut api_routers = Vec::new();
Expand Down
11 changes: 8 additions & 3 deletions src/bmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;

use crate::db::DB;
use crate::pipeline::send_to_kafka;
use crate::router::new_router;
use crate::update::decode_updates;
use crate::update::{decode_updates, format_update};

pub async fn unmarshal_bmp_packet(socket: &mut TcpStream) -> io::Result<Option<BmpMessage>> {
// Get minimal packet length to get how many bytes to remove from the socket
Expand Down Expand Up @@ -81,9 +82,11 @@ pub async fn handle(socket: &mut TcpStream, db: DB) {
}
}

// TODO: Handle streaming pipeline configuration (stdout, CSV file, Kafka, ...)
// TODO: Handle multiple event pipelines (stdout, CSV file, Kafka, ...)
for update in legitimate_updates {
let update = format_update(&router, &peer, &update);
println!("{:?}", update);
send_to_kafka("broker.nxthdr.dev:9092", "bgp-updates", update.as_bytes());
}
}
BmpMessageBody::PeerDownNotification(_) => {
Expand All @@ -102,9 +105,11 @@ pub async fn handle(socket: &mut TcpStream, db: DB) {
// And we then update the internal state
router.remove_peer(&peer);

// TODO: Handle streaming pipeline configuration (stdout, CSV file, Kafka, ...)
// TODO: Handle multiple event pipelines (stdout, CSV file, Kafka, ...)
for update in synthetic_updates {
let update = format_update(&router, &peer, &update);
println!("{:?}", update);
send_to_kafka("broker.nxthdr.dev:9092", "bgp-updates", update.as_bytes());
}
}
_ => (),
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod api;
mod bmp;
mod db;
mod pipeline;
mod router;
mod update;

Expand Down
24 changes: 24 additions & 0 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::time::Duration;

use kafka::error::Error as KafkaError;
use kafka::producer::{Producer, Record, RequiredAcks};

pub fn send_to_kafka(broker: &str, topic: &str, data: &[u8]) {
if let Err(e) = produce_message(data, topic, vec![broker.to_owned()]) {
println!("Failed producing messages: {}", e);
}
}

fn produce_message<'a, 'b>(
data: &'a [u8],
topic: &'b str,
brokers: Vec<String>,
) -> Result<(), KafkaError> {
let mut producer = Producer::from_hosts(brokers)
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()?;

producer.send(&Record::from_value(topic, data))?;
Ok(())
}
Loading

0 comments on commit 835bb5c

Please sign in to comment.