Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate pre/post policy #17

Merged
merged 1 commit into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions src/bmp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::state::{self, AsyncState};
use crate::update::{decode_updates, format_update, Update};
use bgpkit_parser::models::{Origin, Peer};
use crate::update::{create_withdraw_update, decode_updates, format_update, UpdateHeader};
use bgpkit_parser::bmp::messages::PerPeerFlags;
use bgpkit_parser::models::Peer;
use bgpkit_parser::parse_bmp_msg;
use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
use bytes::Bytes;
Expand Down Expand Up @@ -63,7 +64,12 @@ async fn process(
return;
};
let peer = Peer::new(pph.peer_bgp_id, pph.peer_ip, pph.peer_asn);
let ts = (pph.timestamp * 1000.0) as i64;
let timestamp = (pph.timestamp * 1000.0) as i64;

let is_post_policy = match pph.peer_flags {
PerPeerFlags::PeerFlags(flags) => flags.is_post_policy(),
PerPeerFlags::LocalRibPeerFlags(_) => false,
};

match message.message_body {
BmpMessageBody::PeerUpNotification(body) => {
Expand All @@ -81,7 +87,12 @@ async fn process(
}
BmpMessageBody::RouteMonitoring(body) => {
log::trace!("{:?}", body);
let potential_updates = decode_updates(body, ts).unwrap_or(Vec::new());
let header = UpdateHeader {
timestamp,
is_post_policy,
};

let potential_updates = decode_updates(body, header).unwrap_or(Vec::new());

let mut legitimate_updates = Vec::new();
for update in potential_updates {
Expand Down Expand Up @@ -114,18 +125,9 @@ async fn process(
// To do so, we start by emiting synthetic withdraw updates
let mut synthetic_updates = Vec::new();
let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
let now = Utc::now();
for prefix in updates {
let update_to_withdrawn = Update {
prefix: prefix.prefix.clone(),
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
timestamp: Utc::now(),
synthetic: true,
};

synthetic_updates.push(update_to_withdrawn);
synthetic_updates.push(create_withdraw_update(prefix.prefix.clone(), now.clone()));
}

// And we then update the state
Expand Down
15 changes: 3 additions & 12 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer};
use bgpkit_parser::models::{NetworkPrefix, Peer as BGPkitPeer};
use chrono::Utc;
use core::net::IpAddr;
use rand::Rng;
Expand All @@ -11,8 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::settings::StateConfig;
use crate::update::format_update;
use crate::update::Update;
use crate::update::{create_withdraw_update, format_update, Update};

pub type AsyncState = Arc<Mutex<State>>;

Expand Down Expand Up @@ -285,15 +284,7 @@ pub async fn peer_up_withdraws_handler(
synthetic_updates.push((
router_addr.clone(),
peer.details.clone(),
Update {
prefix: update.prefix.clone(),
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
timestamp: now.clone(),
synthetic: true,
},
create_withdraw_update(update.prefix.clone(), now.clone()),
));
}
}
Expand Down
33 changes: 27 additions & 6 deletions src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@ use bgpkit_parser::models::*;
use core::net::IpAddr;
use log::error;

pub struct UpdateHeader {
pub timestamp: i64,
pub is_post_policy: bool,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Update {
pub prefix: NetworkPrefix,
pub announced: bool,
pub origin: Origin,
pub path: Option<AsPath>,
pub communities: Vec<MetaCommunity>,
pub is_post_policy: bool,
pub timestamp: DateTime<Utc>,
pub synthetic: bool,
}

pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Update>> {
pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<Vec<Update>> {
let mut updates = Vec::new();

match message.bgp_message {
Expand Down Expand Up @@ -57,12 +63,12 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
};
let communities: Vec<MetaCommunity> = attributes.iter_communities().collect();

let timestamp = match Utc.timestamp_millis_opt(timestamp) {
let timestamp = match Utc.timestamp_millis_opt(header.timestamp) {
MappedLocalTime::Single(dt) => dt,
_ => {
error!(
"bmp - failed to parse timestamp: {}, using Utc::now()",
timestamp
header.timestamp
);
Utc::now()
}
Expand All @@ -75,6 +81,7 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
origin,
path: path.clone(),
communities: communities.clone(),
is_post_policy: header.is_post_policy,
timestamp,
synthetic: false,
});
Expand All @@ -86,6 +93,19 @@ pub fn decode_updates(message: RouteMonitoring, timestamp: i64) -> Option<Vec<Up
}
}

pub fn create_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime<Utc>) -> Update {
Update {
prefix: prefix,
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
is_post_policy: false,
timestamp,
synthetic: true,
}
}

pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
match path {
Some(mut path) => {
Expand Down Expand Up @@ -132,7 +152,7 @@ fn map_to_ipv6(ip: IpAddr) -> IpAddr {
}

// Returns a CSV line corresponding to this schema
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,origin,announced,synthetic,path,communities
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,origin,path,communities,synthetic
pub fn format_update(
router_addr: IpAddr,
router_port: u16,
Expand Down Expand Up @@ -162,11 +182,12 @@ pub fn format_update(
row.push(format!("{}", peer.peer_asn));
row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
row.push(format!("{}", update.prefix.prefix.prefix_len()));
row.push(format!("{}", update.origin));
row.push(format!("{}", update.is_post_policy));
row.push(format!("{}", update.announced));
row.push(format!("{}", update.synthetic));
row.push(format!("{}", update.origin));
row.push(format!("{}", as_path_str));
row.push(format!("{}", communities_str));
row.push(format!("{}", update.synthetic));

return row.join(",");
}
Loading