Skip to content

Commit

Permalink
Merge pull request #17 from nxthdr/feat-post-policy
Browse files Browse the repository at this point in the history
Differentiate pre/post policy
  • Loading branch information
matthieugouel authored Jan 4, 2025
2 parents 876a43f + 3e58ac7 commit 3212615
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
32 changes: 17 additions & 15 deletions src/bmp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::state::{self, AsyncState};
use crate::update::{decode_updates, format_update, Update};
use bgpkit_parser::models::{Origin, Peer};
use crate::update::{create_withdraw_update, decode_updates, format_update, UpdateHeader};
use bgpkit_parser::bmp::messages::PerPeerFlags;
use bgpkit_parser::models::Peer;
use bgpkit_parser::parse_bmp_msg;
use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
use bytes::Bytes;
Expand Down Expand Up @@ -63,7 +64,12 @@ async fn process(
return;
};
let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
let ts = (pph.timestamp * 1000.0) as i64;
let timestamp = (pph.timestamp * 1000.0) as i64;

let is_post_policy = match pph.peer_flags {
PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
PerPeerFlags::LocalRibPeerFlags(_) => false,
};

match message.message_body {
BmpMessageBody::PeerUpNotification(body) => {
Expand All @@ -81,7 +87,12 @@ async fn process(
}
BmpMessageBody::RouteMonitoring(body) => {
log::trace!("{:?}", body);
let potential_updates = decode_updates(body, ts).unwrap_or(Vec::new());
let header = UpdateHeader {
timestamp,
is_post_policy,
};

let potential_updates = decode_updates(body, header).unwrap_or(Vec::new());

let mut legitimate_updates = Vec::new();
for update in potential_updates {
Expand Down Expand Up @@ -114,18 +125,9 @@ async fn process(
// To do so, we start by emiting synthetic withdraw updates
let mut synthetic_updates = Vec::new();
let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
let now = Utc::now();
for prefix in updates {
let update_to_withdrawn = Update {
prefix: prefix.prefix.clone(),
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
timestamp: Utc::now(),
synthetic: true,
};

synthetic_updates.push(update_to_withdrawn);
synthetic_updates.push(create_withdraw_update(prefix.prefix.clone(), now.clone()));
}

// And we then update the state
Expand Down
15 changes: 3 additions & 12 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer};
use bgpkit_parser::models::{NetworkPrefix, Peer as BGPkitPeer};
use chrono::Utc;
use core::net::IpAddr;
use rand::Rng;
Expand All @@ -11,8 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::settings::StateConfig;
use crate::update::format_update;
use crate::update::Update;
use crate::update::{create_withdraw_update, format_update, Update};

pub type AsyncState = Arc<Mutex<State>>;

Expand Down Expand Up @@ -285,15 +284,7 @@ pub async fn peer_up_withdraws_handler(
synthetic_updates.push((
router_addr.clone(),
peer.details.clone(),
Update {
prefix: update.prefix.clone(),
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
timestamp: now.clone(),
synthetic: true,
},
create_withdraw_update(update.prefix.clone(), now.clone()),
));
}
}
Expand Down
33 changes: 27 additions & 6 deletions src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@ use bgpkit_parser::models::*;
use core::net::IpAddr;
use log::error;

pub struct UpdateHeader {
pub timestamp: i64,
pub is_post_policy: bool,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Update {
pub prefix: NetworkPrefix,
pub announced: bool,
pub origin: Origin,
pub path: Option<AsPath>,
pub communities: Vec<MetaCommunity>,
pub is_post_policy: bool,
pub timestamp: DateTime<Utc>,
pub synthetic: bool,
}

pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Update>> {
pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<Vec<Update>> {
let mut updates = Vec::new();

match message.bgp_message {
Expand Down Expand Up @@ -57,12 +63,12 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
};
let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();

let timestamp = match Utc.timestamp_millis_opt(timestamp) {
let timestamp = match Utc.timestamp_millis_opt(header.timestamp) {
MappedLocalTime::Single(dt) => dt,
_ => {
error!(
"bmp - failed to parse timestamp: {}, using Utc::now()",
timestamp
header.timestamp
);
Utc::now()
}
Expand All @@ -75,6 +81,7 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
origin,
path: path.clone(),
communities: communities.clone(),
is_post_policy: header.is_post_policy,
timestamp,
synthetic: false,
});
Expand All @@ -86,6 +93,19 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
}
}

pub fn create_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime<Utc>) -> Update {
Update {
prefix: prefix,
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
is_post_policy: false,
timestamp,
synthetic: true,
}
}

pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
match path {
Some(mut path) => {
Expand Down Expand Up @@ -132,7 +152,7 @@ fn map_to_ipv6(ip: IpAddr) -> IpAddr {
}

// Returns a CSV line corresponding to this schema
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,origin,announced,synthetic,path,communities
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,origin,path,communities,synthetic
pub fn format_update(
router_addr: IpAddr,
router_port: u16,
Expand Down Expand Up @@ -162,11 +182,12 @@ pub fn format_update(
row.push(format!("{}", peer.peer_asn));
row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
row.push(format!("{}", update.prefix.prefix.prefix_len()));
row.push(format!("{}", update.origin));
row.push(format!("{}", update.is_post_policy));
row.push(format!("{}", update.announced));
row.push(format!("{}", update.synthetic));
row.push(format!("{}", update.origin));
row.push(format!("{}", as_path_str));
row.push(format!("{}", communities_str));
row.push(format!("{}", update.synthetic));

return row.join(",");
}

0 comments on commit 3212615

Please sign in to comment.