Skip to content

Commit

Permalink
Merge pull request #19 from nxthdr/feat-post-policy-hash
Browse files Browse the repository at this point in the history
add `is_post_policy` to the `TimePrefix` state hash
  • Loading branch information
matthieugouel authored Jan 4, 2025
2 parents 3212615 + 34c6769 commit ccae3b3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
15 changes: 9 additions & 6 deletions src/bmp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::state::{self, AsyncState};
use crate::update::{create_withdraw_update, decode_updates, format_update, UpdateHeader};
use crate::update::{decode_updates, format_update, synthesize_withdraw_update, UpdateHeader};
use bgpkit_parser::bmp::messages::PerPeerFlags;
use bgpkit_parser::models::Peer;
use bgpkit_parser::parse_bmp_msg;
Expand Down Expand Up @@ -121,16 +121,19 @@ async fn process(
peer.peer_address
);

// Remove the peer and the associated prefixes
// To do so, we start by emiting synthetic withdraw updates
// Remove the peer and the associated updates from the state
// We start by emiting synthetic withdraw updates
let mut synthetic_updates = Vec::new();
let updates = state_lock.get_updates_by_peer(&router_addr, &peer).unwrap();
let now = Utc::now();
for prefix in updates {
synthetic_updates.push(create_withdraw_update(prefix.prefix.clone(), now.clone()));
synthetic_updates.push(synthesize_withdraw_update(
prefix.prefix.clone(),
now.clone(),
));
}

// And we then update the state
// Then update the state
state_lock.remove_updates(&router_addr, &peer).unwrap();

let mut buffer = vec![];
Expand All @@ -141,7 +144,7 @@ async fn process(
buffer.extend(b"\n");
}

// Sent to the event pipeline
// Finally send the synthetic updates to the event pipeline
tx.send(buffer).unwrap();
}
_ => (),
Expand Down
11 changes: 7 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::settings::StateConfig;
use crate::update::{create_withdraw_update, format_update, Update};
use crate::update::{format_update, synthesize_withdraw_update, Update};

pub type AsyncState = Arc<Mutex<State>>;

Expand Down Expand Up @@ -99,18 +99,20 @@ impl State {
#[derive(Serialize, Deserialize, Eq, Clone)]
pub struct TimedPrefix {
pub prefix: NetworkPrefix,
pub is_post_policy: bool,
pub timestamp: i64,
}

impl PartialEq for TimedPrefix {
fn eq(&self, other: &Self) -> bool {
self.prefix == other.prefix
self.prefix == other.prefix && self.is_post_policy == other.is_post_policy
}
}

impl Hash for TimedPrefix {
fn hash<H: Hasher>(&self, state: &mut H) {
self.prefix.hash(state);
self.is_post_policy.hash(state);
}
}

Expand Down Expand Up @@ -222,7 +224,8 @@ impl Router {

let now: i64 = chrono::Utc::now().timestamp_millis();
let timed_prefix = TimedPrefix {
prefix: update.prefix.clone(),
prefix: update.prefix,
is_post_policy: update.is_post_policy,
timestamp: now,
};

Expand Down Expand Up @@ -284,7 +287,7 @@ pub async fn peer_up_withdraws_handler(
synthetic_updates.push((
router_addr.clone(),
peer.details.clone(),
create_withdraw_update(update.prefix.clone(), now.clone()),
synthesize_withdraw_update(update.prefix.clone(), now.clone()),
));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ pub fn decode_updates(message: RouteMonitoring, header: UpdateHeader) -> Option<
}
}

pub fn create_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime<Utc>) -> Update {
pub fn synthesize_withdraw_update(prefix: NetworkPrefix, timestamp: DateTime<Utc>) -> Update {
Update {
prefix: prefix,
prefix,
announced: false,
origin: Origin::INCOMPLETE,
path: None,
Expand Down

0 comments on commit ccae3b3

Please sign in to comment.