From 3e58ac77ae93eee8417d9ac1796dff1452197888 Mon Sep 17 00:00:00 2001 From: matthieugouel Date: Sat, 4 Jan 2025 11:25:01 +0100 Subject: [PATCH] Differentiate pre/post policy New `is_post_policy` added to the output line. Closes #15 --- src/bmp.rs | 32 +++++++++++++++++--------------- src/state.rs | 15 +++------------ src/update.rs | 33 +++++++++++++++++++++++++++------ 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/src/bmp.rs b/src/bmp.rs index eb24df3..2492ecf 100644 --- a/src/bmp.rs +++ b/src/bmp.rs @@ -1,6 +1,7 @@ use crate::state::{self, AsyncState}; -use crate::update::{decode_updates, format_update, Update}; -use bgpkit_parser::models::{Origin, Peer}; +use crate::update::{create_withdraw_update, decode_updates, format_update, UpdateHeader}; +use bgpkit_parser::bmp::messages::PerPeerFlags; +use bgpkit_parser::models::Peer; use bgpkit_parser::parse_bmp_msg; use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody}; use bytes::Bytes; @@ -63,7 +64,12 @@ async fn process( return; }; let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn); - let ts = (pph.timestamp * 1000.0) as i64; + let timestamp = (pph.timestamp * 1000.0) as i64; + + let is_post_policy = match pph.peer_flags { + PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(), + PerPeerFlags::LocalRibPeerFlags(_) => false, + }; match message.message_body { BmpMessageBody::PeerUpNotification(body) => { @@ -81,7 +87,12 @@ async fn process( } BmpMessageBody::RouteMonitoring(body) => { log::trace!("{:?}", body); - let potential_updates = decode_updates(body, ts).unwrap_or(Vec::new()); + let header = UpdateHeader { + timestamp, + is_post_policy, + }; + + let potential_updates = decode_updates(body, header).unwrap_or(Vec::new()); let mut legitimate_updates = Vec::new(); for update in potential_updates { @@ -114,18 +125,9 @@ async fn process( // To do so, we start by emiting synthetic withdraw updates let mut synthetic_updates = Vec::new(); let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap(); + let now = Utc::now(); for prefix in updates { - let update_to_withdrawn = Update { - prefix: prefix.prefix.clone(), - announced: false, - origin: Origin::INCOMPLETE, - path: None, - communities: vec![], - timestamp: Utc::now(), - synthetic: true, - }; - - synthetic_updates.push(update_to_withdrawn); + synthetic_updates.push(create_withdraw_update(prefix.prefix.clone(), now.clone())); } // And we then update the state diff --git a/src/state.rs b/src/state.rs index aec9afb..ed8fbac 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,4 @@ -use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer}; +use bgpkit_parser::models::{NetworkPrefix, Peer as BGPkitPeer}; use chrono::Utc; use core::net::IpAddr; use rand::Rng; @@ -11,8 +11,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use crate::settings::StateConfig; -use crate::update::format_update; -use crate::update::Update; +use crate::update::{create_withdraw_update, format_update, Update}; pub type AsyncState = Arc>; @@ -285,15 +284,7 @@ pub async fn peer_up_withdraws_handler( synthetic_updates.push(( router_addr.clone(), peer.details.clone(), - Update { - prefix: update.prefix.clone(), - announced: false, - origin: Origin::INCOMPLETE, - path: None, - communities: vec![], - timestamp: now.clone(), - synthetic: true, - }, + create_withdraw_update(update.prefix.clone(), now.clone()), )); } } diff --git a/src/update.rs b/src/update.rs index 02c3327..33bc1e6 100644 --- a/src/update.rs +++ b/src/update.rs @@ -5,6 +5,11 @@ use bgpkit_parser::models::*; use core::net::IpAddr; use log::error; +pub struct UpdateHeader { + pub timestamp: i64, + pub is_post_policy: bool, +} + #[derive(Debug, Clone, PartialEq)] pub struct Update { pub prefix: NetworkPrefix, @@ -12,11 +17,12 @@ pub struct Update { pub origin: Origin, pub path: Option, pub communities: Vec, + pub is_post_policy: bool, pub timestamp: DateTime, pub synthetic: bool, } -pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option> { +pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option> { let mut updates = Vec::new(); match message.bgp_message { @@ -57,12 +63,12 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option = attributes.iter_communities().collect(); - let timestamp = match Utc.timestamp_millis_opt(timestamp) { + let timestamp = match Utc.timestamp_millis_opt(header.timestamp) { MappedLocalTime::Single(dt) => dt, _ => { error!( "bmp - failed to parse timestamp: {}, using Utc::now()", - timestamp + header.timestamp ); Utc::now() } @@ -75,6 +81,7 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option Option) -> Update { + Update { + prefix: prefix, + announced: false, + origin: Origin::INCOMPLETE, + path: None, + communities: vec![], + is_post_policy: false, + timestamp, + synthetic: true, + } +} + pub fn construct_as_path(path: Option) -> Vec { match path { Some(mut path) => { @@ -132,7 +152,7 @@ fn map_to_ipv6(ip: IpAddr) -> IpAddr { } // Returns a CSV line corresponding to this schema -// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,origin,announced,synthetic,path,communities +// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,origin,path,communities,synthetic pub fn format_update( router_addr: IpAddr, router_port: u16, @@ -162,11 +182,12 @@ pub fn format_update( row.push(format!("{}", peer.peer_asn)); row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr()))); row.push(format!("{}", update.prefix.prefix.prefix_len())); - row.push(format!("{}", update.origin)); + row.push(format!("{}", update.is_post_policy)); row.push(format!("{}", update.announced)); - row.push(format!("{}", update.synthetic)); + row.push(format!("{}", update.origin)); row.push(format!("{}", as_path_str)); row.push(format!("{}", communities_str)); + row.push(format!("{}", update.synthetic)); return row.join(","); }