From 34c6769aebd597a93335e7251ab52cd14b34fc16 Mon Sep 17 00:00:00 2001 From: matthieugouel Date: Sat, 4 Jan 2025 22:48:07 +0100 Subject: [PATCH] add `is_post_policy` to the `TimePrefix` state hash --- src/bmp.rs | 15 +++++++++------ src/state.rs | 11 +++++++---- src/update.rs | 4 ++-- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/bmp.rs b/src/bmp.rs index 2492ecf..8e78635 100644 --- a/src/bmp.rs +++ b/src/bmp.rs @@ -1,5 +1,5 @@ use crate::state::{self, AsyncState}; -use crate::update::{create_withdraw_update, decode_updates, format_update, UpdateHeader}; +use crate::update::{decode_updates, format_update, synthesize_withdraw_update, UpdateHeader}; use bgpkit_parser::bmp::messages::PerPeerFlags; use bgpkit_parser::models::Peer; use bgpkit_parser::parse_bmp_msg; @@ -121,16 +121,19 @@ async fn process( peer.peer_address ); - // Remove the peer and the associated prefixes - // To do so, we start by emiting synthetic withdraw updates + // Remove the peer and the associated updates from the state + // We start by emiting synthetic withdraw updates let mut synthetic_updates = Vec::new(); let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap(); let now = Utc::now(); for prefix in updates { - synthetic_updates.push(create_withdraw_update(prefix.prefix.clone(), now.clone())); + synthetic_updates.push(synthesize_withdraw_update( + prefix.prefix.clone(), + now.clone(), + )); } - // And we then update the state + // Then update the state state_lock.remove_updates(&router_addr, &peer).unwrap(); let mut buffer = vec![]; @@ -141,7 +144,7 @@ async fn process( buffer.extend(b"\n"); } - // Sent to the event pipeline + // Finally send the synthetic updates to the event pipeline tx.send(buffer).unwrap(); } _ => (), diff --git a/src/state.rs b/src/state.rs index ed8fbac..6e3e5a2 100644 --- a/src/state.rs +++ b/src/state.rs @@ -11,7 +11,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use crate::settings::StateConfig; -use crate::update::{create_withdraw_update, format_update, Update}; +use crate::update::{format_update, synthesize_withdraw_update, Update}; pub type AsyncState = Arc>; @@ -99,18 +99,20 @@ impl State { #[derive(Serialize, Deserialize, Eq, Clone)] pub struct TimedPrefix { pub prefix: NetworkPrefix, + pub is_post_policy: bool, pub timestamp: i64, } impl PartialEq for TimedPrefix { fn eq(&self, other: &Self) -> bool { - self.prefix == other.prefix + self.prefix == other.prefix && self.is_post_policy == other.is_post_policy } } impl Hash for TimedPrefix { fn hash(&self, state: &mut H) { self.prefix.hash(state); + self.is_post_policy.hash(state); } } @@ -222,7 +224,8 @@ impl Router { let now: i64 = chrono::Utc::now().timestamp_millis(); let timed_prefix = TimedPrefix { - prefix: update.prefix.clone(), + prefix: update.prefix, + is_post_policy: update.is_post_policy, timestamp: now, }; @@ -284,7 +287,7 @@ pub async fn peer_up_withdraws_handler( synthetic_updates.push(( router_addr.clone(), peer.details.clone(), - create_withdraw_update(update.prefix.clone(), now.clone()), + synthesize_withdraw_update(update.prefix.clone(), now.clone()), )); } } diff --git a/src/update.rs b/src/update.rs index 33bc1e6..f11d6c5 100644 --- a/src/update.rs +++ b/src/update.rs @@ -93,9 +93,9 @@ pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option< } } -pub fn create_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime) -> Update { +pub fn synthesize_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime) -> Update { Update { - prefix: prefix, + prefix, announced: false, origin: Origin::INCOMPLETE, path: None,