Skip to content

Commit

Permalink
differentiate RIB-Adj- In/Out
Browse files Browse the repository at this point in the history
Closes #16
  • Loading branch information
matthieugouel committed Jan 5, 2025
1 parent f7eb045 commit 917df78
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
15 changes: 8 additions & 7 deletions src/bmp.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::state::{self, AsyncState};
use crate::update::{decode_updates, format_update, synthesize_withdraw_update, UpdateHeader};
use crate::update::{decode_updates, format_update, UpdateHeader};
use bgpkit_parser::bmp::messages::PerPeerFlags;
use bgpkit_parser::models::Peer;
use bgpkit_parser::parse_bmp_msg;
use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
use bytes::Bytes;
use chrono::Utc;
use core::net::IpAddr;
use std::io::{Error, ErrorKind, Result};
use std::sync::mpsc::Sender;
Expand Down Expand Up @@ -71,6 +70,11 @@ async fn process_bmp_packet(
PerPeerFlags::LocalRibPeerFlags(_) => false,
};

let is_adj_rib_out = match pph.peer_flags {
PerPeerFlags::PeerFlags(flags) => flags.is_adj_rib_out(),
PerPeerFlags::LocalRibPeerFlags(_) => false,
};

match message.message_body {
BmpMessageBody::PeerUpNotification(body) => {
log::trace!("{:?}", body);
Expand All @@ -90,6 +94,7 @@ async fn process_bmp_packet(
let header = UpdateHeader {
timestamp,
is_post_policy,
is_adj_rib_out,
};

let potential_updates = decode_updates(body, header).unwrap_or(Vec::new());
Expand Down Expand Up @@ -125,12 +130,8 @@ async fn process_bmp_packet(
// We start by emiting synthetic withdraw updates
let mut synthetic_updates = Vec::new();
let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
let now = Utc::now();
for prefix in updates {
synthetic_updates.push(synthesize_withdraw_update(
prefix.prefix.clone(),
now.clone(),
));
synthetic_updates.push(state::synthesize_withdraw_update(prefix.clone()));
}

// Then update the state
Expand Down
29 changes: 23 additions & 6 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bgpkit_parser::models::{NetworkPrefix, Peer as BGPkitPeer};
use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer};
use chrono::Utc;
use core::net::IpAddr;
use rand::Rng;
Expand All @@ -11,7 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::settings::StateConfig;
use crate::update::{format_update, synthesize_withdraw_update, Update};
use crate::update::{format_update, Update};

pub type AsyncState = Arc<Mutex<State>>;

Expand Down Expand Up @@ -100,19 +100,23 @@ impl State {
pub struct TimedPrefix {
pub prefix: NetworkPrefix,
pub is_post_policy: bool,
pub is_adj_rib_out: bool,
pub timestamp: i64,
}

impl PartialEq for TimedPrefix {
fn eq(&self, other: &Self) -> bool {
self.prefix == other.prefix && self.is_post_policy == other.is_post_policy
self.prefix == other.prefix
&& self.is_post_policy == other.is_post_policy
&& self.is_adj_rib_out == other.is_adj_rib_out
}
}

impl Hash for TimedPrefix {
fn hash<H: Hasher>(&self, state: &mut H) {
self.prefix.hash(state);
self.is_post_policy.hash(state);
self.is_adj_rib_out.hash(state);
}
}

Expand Down Expand Up @@ -226,6 +230,7 @@ impl Router {
let timed_prefix = TimedPrefix {
prefix: update.prefix,
is_post_policy: update.is_post_policy,
is_adj_rib_out: update.is_adj_rib_out,
timestamp: now,
};

Expand All @@ -244,6 +249,20 @@ impl Router {
}
}

pub fn synthesize_withdraw_update(prefix: TimedPrefix) -> Update {
Update {
prefix: prefix.prefix,
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
is_post_policy: prefix.is_post_policy,
is_adj_rib_out: prefix.is_adj_rib_out,
timestamp: Utc::now(),
synthetic: true,
}
}

pub async fn peer_up_withdraws_handler(
state: AsyncState,
router_addr: IpAddr,
Expand Down Expand Up @@ -277,17 +296,15 @@ pub async fn peer_up_withdraws_handler(

drop(state_lock);

let now = Utc::now();
let mut synthetic_updates = Vec::new();

for update in peer.updates {
if update.timestamp < startup.timestamp_millis() {
// This update has been re-announced after startup
// Emit a synthetic withdraw update
synthetic_updates.push((
router_addr.clone(),
peer.details.clone(),
synthesize_withdraw_update(update.prefix.clone(), now.clone()),
synthesize_withdraw_update(update.clone()),
));
}
}
Expand Down
19 changes: 5 additions & 14 deletions src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use log::error;
pub struct UpdateHeader {
pub timestamp: i64,
pub is_post_policy: bool,
pub is_adj_rib_out: bool,
}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -18,6 +19,7 @@ pub struct Update {
pub path: Option<AsPath>,
pub communities: Vec<MetaCommunity>,
pub is_post_policy: bool,
pub is_adj_rib_out: bool,
pub timestamp: DateTime<Utc>,
pub synthetic: bool,
}
Expand Down Expand Up @@ -82,6 +84,7 @@ pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<
path: path.clone(),
communities: communities.clone(),
is_post_policy: header.is_post_policy,
is_adj_rib_out: header.is_adj_rib_out,
timestamp,
synthetic: false,
});
Expand All @@ -93,19 +96,6 @@ pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<
}
}

pub fn synthesize_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime<Utc>) -> Update {
Update {
prefix,
announced: false,
origin: Origin::INCOMPLETE,
path: None,
communities: vec![],
is_post_policy: false,
timestamp,
synthetic: true,
}
}

pub fn construct_as_path(path: Option<AsPath>) -> Vec<u32> {
match path {
Some(mut path) => {
Expand Down Expand Up @@ -152,7 +142,7 @@ fn map_to_ipv6(ip: IpAddr) -> IpAddr {
}

// Returns a CSV line corresponding to this schema
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,origin,path,communities,synthetic
// timestamp,router_addr,router_port,peer_addr,peer_bgp_id,peer_asn,prefix_addr,prefix_len,announced,is_post_policy,is_adj_rib_out,origin,path,communities,synthetic
pub fn format_update(
router_addr: IpAddr,
router_port: u16,
Expand Down Expand Up @@ -183,6 +173,7 @@ pub fn format_update(
row.push(format!("{}", map_to_ipv6(update.prefix.prefix.addr())));
row.push(format!("{}", update.prefix.prefix.prefix_len()));
row.push(format!("{}", update.is_post_policy));
row.push(format!("{}", update.is_adj_rib_out));
row.push(format!("{}", update.announced));
row.push(format!("{}", update.origin));
row.push(format!("{}", as_path_str));
Expand Down

0 comments on commit 917df78

Please sign in to comment.