From 6471eb050e5c2129953b09e58179d6c79b8d7455 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 1 Nov 2023 01:16:12 +0000 Subject: [PATCH 01/17] Depend on `libm` in `no-std` for `powf`(64) In the next commits we'll need `f64`'s `powf`, which is only available in `std`. For `no-std`, here we depend on `libm` (a `rust-lang` org project), which we can use for `powf`. --- lightning/Cargo.toml | 3 ++- lightning/src/lib.rs | 1 + lightning/src/routing/scoring.rs | 10 ++++++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lightning/Cargo.toml b/lightning/Cargo.toml index 3c3912e2b32..bb4a3ae1ffd 100644 --- a/lightning/Cargo.toml +++ b/lightning/Cargo.toml @@ -31,7 +31,7 @@ unsafe_revoked_tx_signing = [] # Override signing to not include randomness when generating signatures for test vectors. _test_vectors = [] -no-std = ["hashbrown", "bitcoin/no-std", "core2/alloc"] +no-std = ["hashbrown", "bitcoin/no-std", "core2/alloc", "libm"] std = ["bitcoin/std"] # Generates low-r bitcoin signatures, which saves 1 byte in 50% of the cases @@ -48,6 +48,7 @@ regex = { version = "1.5.6", optional = true } backtrace = { version = "0.3", optional = true } core2 = { version = "0.3.0", optional = true, default-features = false } +libm = { version = "0.2", optional = true, default-features = false } [dev-dependencies] regex = "1.5.6" diff --git a/lightning/src/lib.rs b/lightning/src/lib.rs index c42f66da576..6eefb3983cc 100644 --- a/lightning/src/lib.rs +++ b/lightning/src/lib.rs @@ -69,6 +69,7 @@ extern crate hex; #[cfg(any(test, feature = "_test_utils"))] extern crate regex; #[cfg(not(feature = "std"))] extern crate core2; +#[cfg(not(feature = "std"))] extern crate libm; #[cfg(ldk_bench)] extern crate criterion; diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 337fd8eec7a..3124b21b2b4 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1469,6 +1469,16 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob impl>, L: Deref, T: Time> Score for ProbabilisticScorerUsingTime where L::Target: Logger {} +#[cfg(feature = "std")] +#[inline] +fn powf64(n: f64, exp: f64) -> f64 { + n.powf(exp) +} +#[cfg(not(feature = "std"))] +fn powf64(n: f64, exp: f64) -> f64 { + libm::powf(n as f32, exp as f32) as f64 +} + mod approx { const BITS: u32 = 64; const HIGHEST_BIT: u32 = BITS - 1; From 6c366cf35f659126134cc5ec17d3c23838d9d83f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 2 Oct 2023 18:33:08 +0000 Subject: [PATCH 02/17] Pass the current time through `ScoreUpDate` methods In the coming commits, we'll stop relying on fetching the time during routefetching, preferring to decay score data in the background instead. The first step towards this - passing the current time through into the scorer when updating. --- lightning-background-processor/src/lib.rs | 70 ++++++++------ lightning/src/routing/router.rs | 5 +- lightning/src/routing/scoring.rs | 110 +++++++++++----------- lightning/src/util/test_utils.rs | 8 +- 4 files changed, 105 insertions(+), 88 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 62c03b81dfe..416f8d7b6d0 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -244,30 +244,30 @@ fn handle_network_graph_update( /// Updates scorer based on event and returns whether an update occurred so we can decide whether /// to persist. fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( - scorer: &'a S, event: &Event + scorer: &'a S, event: &Event, duration_since_epoch: Duration, ) -> bool { match event { Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { let mut score = scorer.write_lock(); - score.payment_path_failed(path, *scid); + score.payment_path_failed(path, *scid, duration_since_epoch); }, Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { // Reached if the destination explicitly failed it back. We treat this as a successful probe // because the payment made it all the way to the destination with sufficient liquidity. let mut score = scorer.write_lock(); - score.probe_successful(path); + score.probe_successful(path, duration_since_epoch); }, Event::PaymentPathSuccessful { path, .. } => { let mut score = scorer.write_lock(); - score.payment_path_successful(path); + score.payment_path_successful(path, duration_since_epoch); }, Event::ProbeSuccessful { path, .. } => { let mut score = scorer.write_lock(); - score.probe_successful(path); + score.probe_successful(path, duration_since_epoch); }, Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { let mut score = scorer.write_lock(); - score.probe_failed(path, *scid); + score.probe_failed(path, *scid, duration_since_epoch); }, _ => return false, } @@ -280,7 +280,7 @@ macro_rules! define_run_body { $channel_manager: ident, $process_channel_manager_events: expr, $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr + $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -383,11 +383,10 @@ macro_rules! define_run_body { if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = $gossip_sync.prunable_network_graph() { - #[cfg(feature = "std")] { + if let Some(duration_since_epoch) = $time_fetch() { log_trace!($logger, "Pruning and persisting network graph."); - network_graph.remove_stale_channels_and_tracking(); - } - #[cfg(not(feature = "std"))] { + network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); + } else { log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time."); log_trace!($logger, "Persisting network graph."); } @@ -510,12 +509,16 @@ use core::task; /// are unsure, you should set the flag, as the performance impact of it is minimal unless there /// are hundreds or thousands of simultaneous process calls running. /// +/// The `fetch_time` parameter should return the current wall clock time, if one is available. If +/// no time is available, some features may be disabled, however the node will still operate fine. +/// /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # struct MyStore {} /// # impl lightning::util::persist::KVStore for MyStore { @@ -584,6 +587,7 @@ use core::task; /// Some(background_scorer), /// sleeper, /// mobile_interruptable_platform, +/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) /// ) /// .await /// .expect("Failed to process events"); @@ -620,11 +624,12 @@ pub async fn process_events_async< S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, - Sleeper: Fn(Duration) -> SleepFuture + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, - sleeper: Sleeper, mobile_interruptable_platform: bool, + sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -648,15 +653,18 @@ where let scorer = &scorer; let logger = &logger; let persister = &persister; + let fetch_time = &fetch_time; async move { if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - if update_scorer(scorer, &event) { - log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + if let Some(duration_since_epoch) = fetch_time() { + if update_scorer(scorer, &event, duration_since_epoch) { + log_trace!(logger, "Persisting scorer after update"); + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } } } } @@ -688,7 +696,7 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform + }, mobile_interruptable_platform, fetch_time, ) } @@ -810,7 +818,10 @@ impl BackgroundProcessor { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - if update_scorer(scorer, &event) { + use std::time::SystemTime; + let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); if let Err(e) = persister.persist_scorer(&scorer) { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) @@ -829,7 +840,12 @@ impl BackgroundProcessor { channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, + || { + use std::time::SystemTime; + Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970")) + }, ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -1117,7 +1133,7 @@ mod tests { } impl ScoreUpdate for TestScorer { - fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { + fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, short_channel_id } => { @@ -1137,7 +1153,7 @@ mod tests { } } - fn payment_path_successful(&mut self, actual_path: &Path) { + fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1156,7 +1172,7 @@ mod tests { } } - fn probe_failed(&mut self, actual_path: &Path, _: u64) { + fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1174,7 +1190,7 @@ mod tests { } } } - fn probe_successful(&mut self, actual_path: &Path) { + fn probe_successful(&mut self, actual_path: &Path, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1469,7 +1485,7 @@ mod tests { tokio::time::sleep(dur).await; false // Never exit }) - }, false, + }, false, || Some(Duration::ZERO), ); match bp_future.await { Ok(_) => panic!("Expected error persisting manager"), @@ -1699,7 +1715,7 @@ mod tests { _ = exit_receiver.changed() => true, } }) - }, false, + }, false, || Some(Duration::from_secs(1696300000)), ); let t1 = tokio::spawn(bp_future); @@ -1874,7 +1890,7 @@ mod tests { _ = exit_receiver.changed() => true, } }) - }, false, + }, false, || Some(Duration::ZERO), ); let t1 = tokio::spawn(bp_future); let t2 = tokio::spawn(async move { diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 9423078749d..7bd1cebc532 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -8160,6 +8160,7 @@ mod tests { pub(crate) mod bench_utils { use super::*; use std::fs::File; + use std::time::Duration; use bitcoin::hashes::Hash; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; @@ -8308,10 +8309,10 @@ pub(crate) mod bench_utils { if let Ok(route) = route_res { for path in route.paths { if seed & 0x80 == 0 { - scorer.payment_path_successful(&path); + scorer.payment_path_successful(&path, Duration::ZERO); } else { let short_channel_id = path.hops[path.hops.len() / 2].short_channel_id; - scorer.payment_path_failed(&path, short_channel_id); + scorer.payment_path_failed(&path, short_channel_id, Duration::ZERO); } seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0; } diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 3124b21b2b4..92ebb979cf7 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -110,16 +110,16 @@ pub trait ScoreLookUp { /// `ScoreUpdate` is used to update the scorer's internal state after a payment attempt. pub trait ScoreUpdate { /// Handles updating channel penalties after failing to route through a channel. - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64); + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration); /// Handles updating channel penalties after successfully routing along a path. - fn payment_path_successful(&mut self, path: &Path); + fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration); /// Handles updating channel penalties after a probe over the given path failed. - fn probe_failed(&mut self, path: &Path, short_channel_id: u64); + fn probe_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration); /// Handles updating channel penalties after a probe over the given path succeeded. - fn probe_successful(&mut self, path: &Path); + fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration); } /// A trait which can both lookup and update routing channel penalty scores. @@ -145,20 +145,20 @@ impl> ScoreLookUp for T { #[cfg(not(c_bindings))] impl> ScoreUpdate for T { - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { - self.deref_mut().payment_path_failed(path, short_channel_id) + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { + self.deref_mut().payment_path_failed(path, short_channel_id, duration_since_epoch) } - fn payment_path_successful(&mut self, path: &Path) { - self.deref_mut().payment_path_successful(path) + fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration) { + self.deref_mut().payment_path_successful(path, duration_since_epoch) } - fn probe_failed(&mut self, path: &Path, short_channel_id: u64) { - self.deref_mut().probe_failed(path, short_channel_id) + fn probe_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { + self.deref_mut().probe_failed(path, short_channel_id, duration_since_epoch) } - fn probe_successful(&mut self, path: &Path) { - self.deref_mut().probe_successful(path) + fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { + self.deref_mut().probe_successful(path, duration_since_epoch) } } } } @@ -346,20 +346,20 @@ impl<'a, T: 'a + Score> DerefMut for MultiThreadedScoreLockWrite<'a, T> { #[cfg(c_bindings)] impl<'a, T: Score> ScoreUpdate for MultiThreadedScoreLockWrite<'a, T> { - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { - self.0.payment_path_failed(path, short_channel_id) + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { + self.0.payment_path_failed(path, short_channel_id, duration_since_epoch) } - fn payment_path_successful(&mut self, path: &Path) { - self.0.payment_path_successful(path) + fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration) { + self.0.payment_path_successful(path, duration_since_epoch) } - fn probe_failed(&mut self, path: &Path, short_channel_id: u64) { - self.0.probe_failed(path, short_channel_id) + fn probe_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { + self.0.probe_failed(path, short_channel_id, duration_since_epoch) } - fn probe_successful(&mut self, path: &Path) { - self.0.probe_successful(path) + fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { + self.0.probe_successful(path, duration_since_epoch) } } @@ -399,13 +399,13 @@ impl ScoreLookUp for FixedPenaltyScorer { } impl ScoreUpdate for FixedPenaltyScorer { - fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64) {} + fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64, _duration_since_epoch: Duration) {} - fn payment_path_successful(&mut self, _path: &Path) {} + fn payment_path_successful(&mut self, _path: &Path, _duration_since_epoch: Duration) {} - fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64) {} + fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64, _duration_since_epoch: Duration) {} - fn probe_successful(&mut self, _path: &Path) {} + fn probe_successful(&mut self, _path: &Path, _duration_since_epoch: Duration) {} } impl Writeable for FixedPenaltyScorer { @@ -1391,7 +1391,7 @@ impl>, L: Deref, T: Time> ScoreLookUp for Prob } impl>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime where L::Target: Logger { - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, _duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); let network_graph = self.network_graph.read_only(); @@ -1430,7 +1430,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob } } - fn payment_path_successful(&mut self, path: &Path) { + fn payment_path_successful(&mut self, path: &Path, _duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through SCID {} as having succeeded at {} msat.", path.hops.split_last().map(|(hop, _)| hop.short_channel_id).unwrap_or(0), amount_msat); @@ -1456,12 +1456,12 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob } } - fn probe_failed(&mut self, path: &Path, short_channel_id: u64) { - self.payment_path_failed(path, short_channel_id) + fn probe_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { + self.payment_path_failed(path, short_channel_id, duration_since_epoch) } - fn probe_successful(&mut self, path: &Path) { - self.payment_path_failed(path, u64::max_value()) + fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { + self.payment_path_failed(path, u64::max_value(), duration_since_epoch) } } @@ -2661,10 +2661,10 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 301); - scorer.payment_path_failed(&failed_path, 41); + scorer.payment_path_failed(&failed_path, 41, Duration::ZERO); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 301); - scorer.payment_path_successful(&successful_path); + scorer.payment_path_successful(&successful_path, Duration::ZERO); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 301); } @@ -2697,7 +2697,7 @@ mod tests { let usage = ChannelUsage { amount_msat: 750, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 602); - scorer.payment_path_failed(&path, 43); + scorer.payment_path_failed(&path, 43, Duration::ZERO); let usage = ChannelUsage { amount_msat: 250, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); @@ -2737,7 +2737,7 @@ mod tests { let usage = ChannelUsage { amount_msat: 750, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 602); - scorer.payment_path_failed(&path, 42); + scorer.payment_path_failed(&path, 42, Duration::ZERO); let usage = ChannelUsage { amount_msat: 250, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); @@ -2814,7 +2814,7 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 128); - scorer.payment_path_failed(&Path { hops: path, blinded_tail: None }, 43); + scorer.payment_path_failed(&Path { hops: path, blinded_tail: None }, 43, Duration::ZERO); let channel = network_graph.read_only().channel(42).unwrap().to_owned(); let (info, _) = channel.as_directed_from(&node_a).unwrap(); @@ -2877,7 +2877,7 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate_42, usage, ¶ms), 128); assert_eq!(scorer.channel_penalty_msat(&candidate_43, usage, ¶ms), 128); - scorer.payment_path_successful(&payment_path_for_amount(500)); + scorer.payment_path_successful(&payment_path_for_amount(500), Duration::ZERO); assert_eq!(scorer.channel_penalty_msat(&candidate_41, usage, ¶ms), 128); assert_eq!(scorer.channel_penalty_msat(&candidate_42, usage, ¶ms), 300); @@ -2915,8 +2915,8 @@ mod tests { let usage = ChannelUsage { amount_msat: 1_023, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 2_000); - scorer.payment_path_failed(&payment_path_for_amount(768), 42); - scorer.payment_path_failed(&payment_path_for_amount(128), 43); + scorer.payment_path_failed(&payment_path_for_amount(768), 42, Duration::ZERO); + scorer.payment_path_failed(&payment_path_for_amount(128), 43, Duration::ZERO); // Initial penalties let usage = ChannelUsage { amount_msat: 128, ..usage }; @@ -3013,7 +3013,7 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); - scorer.payment_path_failed(&payment_path_for_amount(512), 42); + scorer.payment_path_failed(&payment_path_for_amount(512), 42, Duration::ZERO); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 281); // An unchecked right shift 64 bits or more in DirectedChannelLiquidity::decayed_offset_msat @@ -3054,8 +3054,8 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); // More knowledge gives higher confidence (256, 768), meaning a lower penalty. - scorer.payment_path_failed(&payment_path_for_amount(768), 42); - scorer.payment_path_failed(&payment_path_for_amount(256), 43); + scorer.payment_path_failed(&payment_path_for_amount(768), 42, Duration::ZERO); + scorer.payment_path_failed(&payment_path_for_amount(256), 43, Duration::ZERO); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 281); // Decaying knowledge gives less confidence (128, 896), meaning a higher penalty. @@ -3064,12 +3064,12 @@ mod tests { // Reducing the upper bound gives more confidence (128, 832) that the payment amount (512) // is closer to the upper bound, meaning a higher penalty. - scorer.payment_path_successful(&payment_path_for_amount(64)); + scorer.payment_path_successful(&payment_path_for_amount(64), Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 331); // Increasing the lower bound gives more confidence (256, 832) that the payment amount (512) // is closer to the lower bound, meaning a lower penalty. - scorer.payment_path_failed(&payment_path_for_amount(256), 43); + scorer.payment_path_failed(&payment_path_for_amount(256), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 245); // Further decaying affects the lower bound more than the upper bound (128, 928). @@ -3098,7 +3098,7 @@ mod tests { effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_000, htlc_maximum_msat: 1_000 }, }; - scorer.payment_path_failed(&payment_path_for_amount(500), 42); + scorer.payment_path_failed(&payment_path_for_amount(500), 42, Duration::ZERO); let channel = network_graph.read_only().channel(42).unwrap().to_owned(); let (info, _) = channel.as_directed_from(&source).unwrap(); let candidate = CandidateRouteHop::PublicHop { @@ -3110,7 +3110,7 @@ mod tests { SinceEpoch::advance(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); - scorer.payment_path_failed(&payment_path_for_amount(250), 43); + scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); let mut serialized_scorer = Vec::new(); @@ -3143,7 +3143,7 @@ mod tests { effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_000, htlc_maximum_msat: 1_000 }, }; - scorer.payment_path_failed(&payment_path_for_amount(500), 42); + scorer.payment_path_failed(&payment_path_for_amount(500), 42, Duration::ZERO); let channel = network_graph.read_only().channel(42).unwrap().to_owned(); let (info, _) = channel.as_directed_from(&source).unwrap(); let candidate = CandidateRouteHop::PublicHop { @@ -3162,7 +3162,7 @@ mod tests { ::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); - scorer.payment_path_failed(&payment_path_for_amount(250), 43); + scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); SinceEpoch::advance(Duration::from_secs(10)); @@ -3437,7 +3437,7 @@ mod tests { assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 42, ¶ms), None); - scorer.payment_path_failed(&payment_path_for_amount(1), 42); + scorer.payment_path_failed(&payment_path_for_amount(1), 42, Duration::ZERO); { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3462,7 +3462,7 @@ mod tests { // Even after we tell the scorer we definitely have enough available liquidity, it will // still remember that there was some failure in the past, and assign a non-0 penalty. - scorer.payment_path_failed(&payment_path_for_amount(1000), 43); + scorer.payment_path_failed(&payment_path_for_amount(1000), 43, Duration::ZERO); { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3515,7 +3515,7 @@ mod tests { inflight_htlc_msat: 1024, effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_024 }, }; - scorer.payment_path_failed(&payment_path_for_amount(1), 42); + scorer.payment_path_failed(&payment_path_for_amount(1), 42, Duration::from_secs(10 * 16)); { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3547,7 +3547,7 @@ mod tests { path_hop(source_pubkey(), 42, 1), path_hop(sender_pubkey(), 41, 0), ]; - scorer.payment_path_failed(&Path { hops: path, blinded_tail: None }, 42); + scorer.payment_path_failed(&Path { hops: path, blinded_tail: None }, 42, Duration::from_secs(10 * (16 + 60 * 60))); } #[test] @@ -3646,9 +3646,9 @@ mod tests { // final value is taken into account. assert!(scorer.channel_liquidities.get(&42).is_none()); - scorer.payment_path_failed(&path, 42); + scorer.payment_path_failed(&path, 42, Duration::ZERO); path.blinded_tail.as_mut().unwrap().final_value_msat = 256; - scorer.payment_path_failed(&path, 43); + scorer.payment_path_failed(&path, 43, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -3702,7 +3702,7 @@ mod tests { None); // Fail to pay once, and then check the buckets and penalty. - scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42); + scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42, Duration::ZERO); // The penalty should be the maximum penalty, as the payment we're scoring is now in the // same bucket which is the only maximum datapoint. assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), @@ -3726,7 +3726,7 @@ mod tests { // ...but once we see a failure, we consider the payment to be substantially less likely, // even though not a probability of zero as we still look at the second max bucket which // now shows 31. - scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42); + scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42, Duration::ZERO); assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), Some(([63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [32, 31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 557be65d5fa..c6561863e09 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -1350,13 +1350,13 @@ impl ScoreLookUp for TestScorer { } impl ScoreUpdate for TestScorer { - fn payment_path_failed(&mut self, _actual_path: &Path, _actual_short_channel_id: u64) {} + fn payment_path_failed(&mut self, _actual_path: &Path, _actual_short_channel_id: u64, _duration_since_epoch: Duration) {} - fn payment_path_successful(&mut self, _actual_path: &Path) {} + fn payment_path_successful(&mut self, _actual_path: &Path, _duration_since_epoch: Duration) {} - fn probe_failed(&mut self, _actual_path: &Path, _: u64) {} + fn probe_failed(&mut self, _actual_path: &Path, _: u64, _duration_since_epoch: Duration) {} - fn probe_successful(&mut self, _actual_path: &Path) {} + fn probe_successful(&mut self, _actual_path: &Path, _duration_since_epoch: Duration) {} } impl Drop for TestScorer { From b84842a9048dcfc1d8d21e1db73e1961b4b175b0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 2 Oct 2023 19:14:26 +0000 Subject: [PATCH 03/17] Add a scoring decay method to the `ScoreUpdate` trait Rather than relying on fetching the current time during routefinding, here we introduce a new trait method to `ScoreUpdate` to do so. This largely mirrors what we do with the `NetworkGraph`, and allows us to take on much more expensive operations (floating point exponentiation) in our decaying. --- lightning-background-processor/src/lib.rs | 21 +++++++++++++++++++-- lightning/src/routing/scoring.rs | 18 ++++++++++++++++++ lightning/src/util/test_utils.rs | 2 ++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 416f8d7b6d0..1d5899682b3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -294,6 +294,7 @@ macro_rules! define_run_body { let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); let mut have_pruned = false; + let mut have_decayed_scorer = false; loop { $process_channel_manager_events; @@ -401,9 +402,24 @@ macro_rules! define_run_body { last_prune_call = $get_timer(prune_timer); } + if !have_decayed_scorer { + if let Some(ref scorer) = $scorer { + if let Some(duration_since_epoch) = $time_fetch() { + log_trace!($logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { if let Some(ref scorer) = $scorer { - log_trace!($logger, "Persisting scorer"); + if let Some(duration_since_epoch) = $time_fetch() { + log_trace!($logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + log_trace!($logger, "Persisting scorer"); + } if let Err(e) = $persister.persist_scorer(&scorer) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } @@ -1208,6 +1224,7 @@ mod tests { } } } + fn time_passed(&mut self, _: Duration) {} } #[cfg(c_bindings)] @@ -1616,7 +1633,7 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let expected_log = "Persisting scorer".to_string(); + let expected_log = "Calling time_passed and persisting scorer".to_string(); if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { break } diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 92ebb979cf7..ee6d515bc0e 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -120,6 +120,12 @@ pub trait ScoreUpdate { /// Handles updating channel penalties after a probe over the given path succeeded. fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration); + + /// Scorers may wish to reduce their certainty of channel liquidity information over time. + /// Thus, this method is provided to allow scorers to observe the passage of time - the holder + /// of this object should call this method regularly (generally via the + /// `lightning-background-processor` crate). + fn time_passed(&mut self, duration_since_epoch: Duration); } /// A trait which can both lookup and update routing channel penalty scores. @@ -160,6 +166,10 @@ impl> ScoreUpdate for T { fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { self.deref_mut().probe_successful(path, duration_since_epoch) } + + fn time_passed(&mut self, duration_since_epoch: Duration) { + self.deref_mut().time_passed(duration_since_epoch) + } } } } @@ -361,6 +371,10 @@ impl<'a, T: Score> ScoreUpdate for MultiThreadedScoreLockWrite<'a, T> { fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { self.0.probe_successful(path, duration_since_epoch) } + + fn time_passed(&mut self, duration_since_epoch: Duration) { + self.0.time_passed(duration_since_epoch) + } } @@ -406,6 +420,8 @@ impl ScoreUpdate for FixedPenaltyScorer { fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64, _duration_since_epoch: Duration) {} fn probe_successful(&mut self, _path: &Path, _duration_since_epoch: Duration) {} + + fn time_passed(&mut self, _duration_since_epoch: Duration) {} } impl Writeable for FixedPenaltyScorer { @@ -1463,6 +1479,8 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob fn probe_successful(&mut self, path: &Path, duration_since_epoch: Duration) { self.payment_path_failed(path, u64::max_value(), duration_since_epoch) } + + fn time_passed(&mut self, _duration_since_epoch: Duration) {} } #[cfg(c_bindings)] diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index c6561863e09..805806dc346 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -1357,6 +1357,8 @@ impl ScoreUpdate for TestScorer { fn probe_failed(&mut self, _actual_path: &Path, _: u64, _duration_since_epoch: Duration) {} fn probe_successful(&mut self, _actual_path: &Path, _duration_since_epoch: Duration) {} + + fn time_passed(&mut self, _duration_since_epoch: Duration) {} } impl Drop for TestScorer { From f0f8194719158759b9d745df7f136312fb09ff13 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 2 Oct 2023 19:44:36 +0000 Subject: [PATCH 04/17] Track historical liquidity update time separately from the bounds In the next commit, we'll start to use the new `ScoreUpdate::decay_liquidity_certainty` to decay our bounds in the background. This will result in the `last_updated` field getting updated regularly on decay, rather than only on update. While this isn't an issue for the regular liquidity bounds, it poses a problem for the historical liquidity buckets, which are decayed on a separate (and by default much longer) timer. If we didn't move to tracking their decays separately, we'd never let the `last_updated` field get old enough for the historical buckets to decay at all. Instead, here we introduce a new `Duration` in the `ChannelLiquidity` which tracks the last time the historical liquidity buckets were last updated. We initialize it to a copy of `last_updated` on deserialization if it is missing. --- lightning/src/routing/scoring.rs | 68 +++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index ee6d515bc0e..9c03ff40de0 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -805,11 +805,14 @@ struct ChannelLiquidity { /// Upper channel liquidity bound in terms of an offset from the effective capacity. max_liquidity_offset_msat: u64, + min_liquidity_offset_history: HistoricalBucketRangeTracker, + max_liquidity_offset_history: HistoricalBucketRangeTracker, + /// Time when the liquidity bounds were last modified. last_updated: T, - min_liquidity_offset_history: HistoricalBucketRangeTracker, - max_liquidity_offset_history: HistoricalBucketRangeTracker, + /// Time when the historical liquidity bounds were last modified. + offset_history_last_updated: T, } /// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and @@ -820,6 +823,7 @@ struct DirectedChannelLiquidity, BRT: Deref, capacity_msat: u64, last_updated: U, + offset_history_last_updated: U, now: T, decay_params: ProbabilisticScoringDecayParameters, } @@ -858,7 +862,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let dir_liq = liq.as_directed(source, target, amt, self.decay_params); let (min_buckets, max_buckets) = dir_liq.liquidity_history - .get_decayed_buckets(now, *dir_liq.last_updated, + .get_decayed_buckets(now, *dir_liq.offset_history_last_updated, self.decay_params.historical_no_updates_half_life) .unwrap_or(([0; 32], [0; 32])); @@ -955,7 +959,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let (min_buckets, mut max_buckets) = dir_liq.liquidity_history.get_decayed_buckets( - dir_liq.now, *dir_liq.last_updated, + dir_liq.now, *dir_liq.offset_history_last_updated, self.decay_params.historical_no_updates_half_life )?; @@ -988,7 +992,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let dir_liq = liq.as_directed(source, target, capacity_msat, self.decay_params); return dir_liq.liquidity_history.calculate_success_probability_times_billion( - dir_liq.now, *dir_liq.last_updated, + dir_liq.now, *dir_liq.offset_history_last_updated, self.decay_params.historical_no_updates_half_life, ¶ms, amount_msat, capacity_msat ).map(|p| p as f64 / (1024 * 1024 * 1024) as f64); @@ -1008,6 +1012,7 @@ impl ChannelLiquidity { min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), last_updated: T::now(), + offset_history_last_updated: T::now(), } } @@ -1034,6 +1039,7 @@ impl ChannelLiquidity { }, capacity_msat, last_updated: &self.last_updated, + offset_history_last_updated: &self.offset_history_last_updated, now: T::now(), decay_params: decay_params, } @@ -1062,6 +1068,7 @@ impl ChannelLiquidity { }, capacity_msat, last_updated: &mut self.last_updated, + offset_history_last_updated: &mut self.offset_history_last_updated, now: T::now(), decay_params: decay_params, } @@ -1197,7 +1204,8 @@ impl, BRT: Deref, if score_params.historical_liquidity_penalty_multiplier_msat != 0 || score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { if let Some(cumulative_success_prob_times_billion) = self.liquidity_history - .calculate_success_probability_times_billion(self.now, *self.last_updated, + .calculate_success_probability_times_billion( + self.now, *self.offset_history_last_updated, self.decay_params.historical_no_updates_half_life, score_params, amount_msat, self.capacity_msat) { @@ -1316,7 +1324,7 @@ impl, BRT: DerefMut, BRT: DerefMut, BRT: DerefMut> HistoricalMinMaxBuckets { - pub(super) fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) + pub(super) fn get_decayed_buckets(&self, now: T, offset_history_last_updated: T, half_life: Duration) -> Option<([u16; 32], [u16; 32])> { - let (_, required_decays) = self.get_total_valid_points(now, last_updated, half_life)?; + let (_, required_decays) = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; let mut min_buckets = *self.min_liquidity_offset_history; min_buckets.time_decay_data(required_decays); @@ -1994,9 +2004,9 @@ mod bucketed_history { Some((min_buckets.buckets, max_buckets.buckets)) } #[inline] - pub(super) fn get_total_valid_points(&self, now: T, last_updated: T, half_life: Duration) + pub(super) fn get_total_valid_points(&self, now: T, offset_history_last_updated: T, half_life: Duration) -> Option<(u64, u32)> { - let required_decays = now.duration_since(last_updated).as_secs() + let required_decays = now.duration_since(offset_history_last_updated).as_secs() .checked_div(half_life.as_secs()) .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); @@ -2019,7 +2029,7 @@ mod bucketed_history { #[inline] pub(super) fn calculate_success_probability_times_billion( - &self, now: T, last_updated: T, half_life: Duration, + &self, now: T, offset_history_last_updated: T, half_life: Duration, params: &ProbabilisticScoringFeeParameters, amount_msat: u64, capacity_msat: u64 ) -> Option { // If historical penalties are enabled, we try to calculate a probability of success @@ -2035,7 +2045,7 @@ mod bucketed_history { // Check if all our buckets are zero, once decayed and treat it as if we had no data. We // don't actually use the decayed buckets, though, as that would lose precision. let (total_valid_points_tracked, _) - = self.get_total_valid_points(now, last_updated, half_life)?; + = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; let mut cumulative_success_prob_times_billion = 0; // Special-case the 0th min bucket - it generally means we failed a payment, so only @@ -2128,6 +2138,8 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore impl Writeable for ChannelLiquidity { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { + let offset_history_duration_since_epoch = + T::duration_since_epoch() - self.offset_history_last_updated.elapsed(); let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed(); write_tlv_fields!(w, { (0, self.min_liquidity_offset_msat, required), @@ -2137,6 +2149,7 @@ impl Writeable for ChannelLiquidity { (4, duration_since_epoch, required), (5, Some(self.min_liquidity_offset_history), option), (7, Some(self.max_liquidity_offset_history), option), + (9, offset_history_duration_since_epoch, required), }); Ok(()) } @@ -2152,6 +2165,7 @@ impl Readable for ChannelLiquidity { let mut min_liquidity_offset_history: Option = None; let mut max_liquidity_offset_history: Option = None; let mut duration_since_epoch = Duration::from_secs(0); + let mut offset_history_duration_since_epoch = None; read_tlv_fields!(r, { (0, min_liquidity_offset_msat, required), (1, legacy_min_liq_offset_history, option), @@ -2160,6 +2174,7 @@ impl Readable for ChannelLiquidity { (4, duration_since_epoch, required), (5, min_liquidity_offset_history, option), (7, max_liquidity_offset_history, option), + (9, offset_history_duration_since_epoch, option), }); // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards. // We write `last_updated` as wallclock time even though its ultimately an `Instant` (which @@ -2173,6 +2188,13 @@ impl Readable for ChannelLiquidity { let last_updated = if wall_clock_now > duration_since_epoch { now - (wall_clock_now - duration_since_epoch) } else { now }; + + let offset_history_duration_since_epoch = + offset_history_duration_since_epoch.unwrap_or(duration_since_epoch); + let offset_history_last_updated = if wall_clock_now > offset_history_duration_since_epoch { + now - (wall_clock_now - offset_history_duration_since_epoch) + } else { now }; + if min_liquidity_offset_history.is_none() { if let Some(legacy_buckets) = legacy_min_liq_offset_history { min_liquidity_offset_history = Some(legacy_buckets.into_current()); @@ -2193,6 +2215,7 @@ impl Readable for ChannelLiquidity { min_liquidity_offset_history: min_liquidity_offset_history.unwrap(), max_liquidity_offset_history: max_liquidity_offset_history.unwrap(), last_updated, + offset_history_last_updated, }) } } @@ -2368,18 +2391,21 @@ mod tests { fn liquidity_bounds_directed_from_lowest_node_id() { let logger = TestLogger::new(); let last_updated = SinceEpoch::now(); + let offset_history_last_updated = SinceEpoch::now(); let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated, + min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, + last_updated, offset_history_last_updated, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }) .with_channel(43, ChannelLiquidity { - min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated, + min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, + last_updated, offset_history_last_updated, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); @@ -2446,12 +2472,14 @@ mod tests { fn resets_liquidity_upper_bound_when_crossed_by_lower_bound() { let logger = TestLogger::new(); let last_updated = SinceEpoch::now(); + let offset_history_last_updated = SinceEpoch::now(); let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated, + min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, + last_updated, offset_history_last_updated, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); @@ -2505,12 +2533,14 @@ mod tests { fn resets_liquidity_lower_bound_when_crossed_by_upper_bound() { let logger = TestLogger::new(); let last_updated = SinceEpoch::now(); + let offset_history_last_updated = SinceEpoch::now(); let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated, + min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, + last_updated, offset_history_last_updated, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); @@ -2616,6 +2646,7 @@ mod tests { fn constant_penalty_outside_liquidity_bounds() { let logger = TestLogger::new(); let last_updated = SinceEpoch::now(); + let offset_history_last_updated = SinceEpoch::now(); let network_graph = network_graph(&logger); let params = ProbabilisticScoringFeeParameters { liquidity_penalty_multiplier_msat: 1_000, @@ -2628,7 +2659,8 @@ mod tests { let scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated, + min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, + last_updated, offset_history_last_updated, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); From 9659c0695557b5ae1e60da509cf7b2273a8ac987 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 2 Oct 2023 20:07:21 +0000 Subject: [PATCH 05/17] Impl decaying in `ProbabilisticScorer::decay_liquidity_certainty` This implements decaying in the `ProbabilisticScorer`'s `ScoreLookup::decay_liquidity_certainty` implementation, using floats for accuracy since we're no longer particularly time-sensitive. Further, it (finally) removes score entries which have decayed to zero. --- lightning/src/routing/scoring.rs | 40 +++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 9c03ff40de0..21dee09be9e 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -493,7 +493,6 @@ where L::Target: Logger { decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L, - // TODO: Remove entries of closed channels. channel_liquidities: HashMap>, } @@ -1073,6 +1072,16 @@ impl ChannelLiquidity { decay_params: decay_params, } } + + fn decayed_offset(&self, offset: u64, decay_params: ProbabilisticScoringDecayParameters) -> u64 { + let half_life = decay_params.liquidity_offset_half_life.as_secs_f64(); + if half_life != 0.0 { + let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64(); + ((offset as f64) * powf64(0.5, elapsed_time / half_life)) as u64 + } else { + 0 + } + } } /// Bounds `-log10` to avoid excessive liquidity penalties for payments with low success @@ -1490,7 +1499,32 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob self.payment_path_failed(path, u64::max_value(), duration_since_epoch) } - fn time_passed(&mut self, _duration_since_epoch: Duration) {} + fn time_passed(&mut self, _duration_since_epoch: Duration) { + let decay_params = self.decay_params; + self.channel_liquidities.retain(|_scid, liquidity| { + liquidity.min_liquidity_offset_msat = liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, decay_params); + liquidity.max_liquidity_offset_msat = liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, decay_params); + liquidity.last_updated = T::now(); + let elapsed_time = + T::now().duration_since(liquidity.offset_history_last_updated); + if elapsed_time > decay_params.historical_no_updates_half_life { + let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); + if half_life != 0.0 { + let divisor = powf64(2048.0, elapsed_time.as_secs_f64() / half_life) as u64; + for bucket in liquidity.min_liquidity_offset_history.buckets.iter_mut() { + *bucket = ((*bucket as u64) * 1024 / divisor) as u16; + } + for bucket in liquidity.max_liquidity_offset_history.buckets.iter_mut() { + *bucket = ((*bucket as u64) * 1024 / divisor) as u16; + } + liquidity.offset_history_last_updated = T::now(); + } + } + liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || + liquidity.min_liquidity_offset_history.buckets != [0; 32] || + liquidity.max_liquidity_offset_history.buckets != [0; 32] + }); + } } #[cfg(c_bindings)] @@ -1928,7 +1962,7 @@ mod bucketed_history { /// in each of 32 buckets. #[derive(Clone, Copy)] pub(super) struct HistoricalBucketRangeTracker { - buckets: [u16; 32], + pub(super) buckets: [u16; 32], } /// Buckets are stored in fixed point numbers with a 5 bit fractional part. Thus, the value From 35b49645c4eb0d61c162a502723cd4e44ef0ca1e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 02:14:21 +0000 Subject: [PATCH 06/17] Stop decaying historical liquidity information during scoring Because scoring is an incredibly performance-sensitive operation, doing liquidity information decay (and especially fetching the current time!) during scoring isn't really a great idea. Now that we decay liquidity information in the background, we don't have any reason to decay during scoring, and we remove the historical bucket liquidity decaying here. --- lightning/src/routing/scoring.rs | 136 ++++++++++++++----------------- 1 file changed, 59 insertions(+), 77 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 21dee09be9e..4391da8e93e 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -850,8 +850,6 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU /// Note that this writes roughly one line per channel for which we have a liquidity estimate, /// which may be a substantial amount of log output. pub fn debug_log_liquidity_stats(&self) { - let now = T::now(); - let graph = self.network_graph.read_only(); for (scid, liq) in self.channel_liquidities.iter() { if let Some(chan_debug) = graph.channels().get(scid) { @@ -860,10 +858,8 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, amt, self.decay_params); - let (min_buckets, max_buckets) = dir_liq.liquidity_history - .get_decayed_buckets(now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life) - .unwrap_or(([0; 32], [0; 32])); + let min_buckets = &dir_liq.liquidity_history.min_liquidity_offset_history.buckets; + let max_buckets = &dir_liq.liquidity_history.max_liquidity_offset_history.buckets; log_debug!(self.logger, core::concat!( "Liquidity from {} to {} via {} is in the range ({}, {}).\n", @@ -942,7 +938,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU /// in the top and bottom bucket, and roughly with similar (recent) frequency. /// /// Because the datapoints are decayed slowly over time, values will eventually return to - /// `Some(([1; 32], [1; 32]))` and then to `None` once no datapoints remain. + /// `Some(([0; 32], [0; 32]))` or `None` if no data remains for a channel. /// /// In order to fetch a single success probability from the buckets provided here, as used in /// the scoring model, see [`Self::historical_estimated_payment_success_probability`]. @@ -956,11 +952,8 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, amt, self.decay_params); - let (min_buckets, mut max_buckets) = - dir_liq.liquidity_history.get_decayed_buckets( - dir_liq.now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life - )?; + let min_buckets = dir_liq.liquidity_history.min_liquidity_offset_history.buckets; + let mut max_buckets = dir_liq.liquidity_history.max_liquidity_offset_history.buckets; // Note that the liquidity buckets are an offset from the edge, so we inverse // the max order to get the probabilities from zero. @@ -991,9 +984,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let dir_liq = liq.as_directed(source, target, capacity_msat, self.decay_params); return dir_liq.liquidity_history.calculate_success_probability_times_billion( - dir_liq.now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life, ¶ms, amount_msat, - capacity_msat + ¶ms, amount_msat, capacity_msat ).map(|p| p as f64 / (1024 * 1024 * 1024) as f64); } } @@ -1214,9 +1205,7 @@ impl, BRT: Deref, score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { if let Some(cumulative_success_prob_times_billion) = self.liquidity_history .calculate_success_probability_times_billion( - self.now, *self.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life, score_params, amount_msat, - self.capacity_msat) + score_params, amount_msat, self.capacity_msat) { let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); res = res.saturating_add(Self::combined_penalty_msat(amount_msat, @@ -2027,22 +2016,20 @@ mod bucketed_history { } impl> HistoricalMinMaxBuckets { - pub(super) fn get_decayed_buckets(&self, now: T, offset_history_last_updated: T, half_life: Duration) - -> Option<([u16; 32], [u16; 32])> { - let (_, required_decays) = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; - - let mut min_buckets = *self.min_liquidity_offset_history; - min_buckets.time_decay_data(required_decays); - let mut max_buckets = *self.max_liquidity_offset_history; - max_buckets.time_decay_data(required_decays); - Some((min_buckets.buckets, max_buckets.buckets)) - } #[inline] - pub(super) fn get_total_valid_points(&self, now: T, offset_history_last_updated: T, half_life: Duration) - -> Option<(u64, u32)> { - let required_decays = now.duration_since(offset_history_last_updated).as_secs() - .checked_div(half_life.as_secs()) - .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); + pub(super) fn calculate_success_probability_times_billion( + &self, params: &ProbabilisticScoringFeeParameters, amount_msat: u64, + capacity_msat: u64 + ) -> Option { + // If historical penalties are enabled, we try to calculate a probability of success + // given our historical distribution of min- and max-liquidity bounds in a channel. + // To do so, we walk the set of historical liquidity bucket (min, max) combinations + // (where min_idx < max_idx, as having a minimum above our maximum is an invalid + // state). For each pair, we calculate the probability as if the bucket's corresponding + // min- and max- liquidity bounds were our current liquidity bounds and then multiply + // that probability by the weight of the selected buckets. + let payment_pos = amount_to_pos(amount_msat, capacity_msat); + if payment_pos >= POSITION_TICKS { return None; } let mut total_valid_points_tracked = 0; for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { @@ -2054,33 +2041,10 @@ mod bucketed_history { // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), // treat it as if we were fully decayed. const FULLY_DECAYED: u16 = BUCKET_FIXED_POINT_ONE * BUCKET_FIXED_POINT_ONE; - if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < FULLY_DECAYED.into() { + if total_valid_points_tracked < FULLY_DECAYED.into() { return None; } - Some((total_valid_points_tracked, required_decays)) - } - - #[inline] - pub(super) fn calculate_success_probability_times_billion( - &self, now: T, offset_history_last_updated: T, half_life: Duration, - params: &ProbabilisticScoringFeeParameters, amount_msat: u64, capacity_msat: u64 - ) -> Option { - // If historical penalties are enabled, we try to calculate a probability of success - // given our historical distribution of min- and max-liquidity bounds in a channel. - // To do so, we walk the set of historical liquidity bucket (min, max) combinations - // (where min_idx < max_idx, as having a minimum above our maximum is an invalid - // state). For each pair, we calculate the probability as if the bucket's corresponding - // min- and max- liquidity bounds were our current liquidity bounds and then multiply - // that probability by the weight of the selected buckets. - let payment_pos = amount_to_pos(amount_msat, capacity_msat); - if payment_pos >= POSITION_TICKS { return None; } - - // Check if all our buckets are zero, once decayed and treat it as if we had no data. We - // don't actually use the decayed buckets, though, as that would lose precision. - let (total_valid_points_tracked, _) - = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; - let mut cumulative_success_prob_times_billion = 0; // Special-case the 0th min bucket - it generally means we failed a payment, so only // consider the highest (i.e. largest-offset-from-max-capacity) max bucket for all @@ -3012,19 +2976,9 @@ mod tests { let usage = ChannelUsage { amount_msat: 896, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - // No decay - SinceEpoch::advance(Duration::from_secs(4)); - let usage = ChannelUsage { amount_msat: 128, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); - let usage = ChannelUsage { amount_msat: 256, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 93); - let usage = ChannelUsage { amount_msat: 768, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 1_479); - let usage = ChannelUsage { amount_msat: 896, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - // Half decay (i.e., three-quarter life) - SinceEpoch::advance(Duration::from_secs(1)); + SinceEpoch::advance(Duration::from_secs(5)); + scorer.time_passed(Duration::from_secs(5)); let usage = ChannelUsage { amount_msat: 128, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 22); let usage = ChannelUsage { amount_msat: 256, ..usage }; @@ -3036,6 +2990,7 @@ mod tests { // One decay (i.e., half life) SinceEpoch::advance(Duration::from_secs(5)); + scorer.time_passed(Duration::from_secs(10)); let usage = ChannelUsage { amount_msat: 64, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 128, ..usage }; @@ -3047,6 +3002,7 @@ mod tests { // Fully decay liquidity lower bound. SinceEpoch::advance(Duration::from_secs(10 * 7)); + scorer.time_passed(Duration::from_secs(10 * 8)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1, ..usage }; @@ -3058,12 +3014,14 @@ mod tests { // Fully decay liquidity upper bound. SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 9)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1_024, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 10)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1_024, ..usage }; @@ -3103,9 +3061,11 @@ mod tests { // An unchecked right shift 64 bits or more in DirectedChannelLiquidity::decayed_offset_msat // would cause an overflow. SinceEpoch::advance(Duration::from_secs(10 * 64)); + scorer.time_passed(Duration::from_secs(10 * 64)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 65)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); } @@ -3144,6 +3104,7 @@ mod tests { // Decaying knowledge gives less confidence (128, 896), meaning a higher penalty. SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 291); // Reducing the upper bound gives more confidence (128, 832) that the payment amount (512) @@ -3158,6 +3119,7 @@ mod tests { // Further decaying affects the lower bound more than the upper bound (128, 928). SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(20)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 280); } @@ -3192,6 +3154,7 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); @@ -3206,8 +3169,7 @@ mod tests { assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); } - #[test] - fn decays_persisted_liquidity_bounds() { + fn do_decays_persisted_liquidity_bounds(decay_before_reload: bool) { let logger = TestLogger::new(); let network_graph = network_graph(&logger); let params = ProbabilisticScoringFeeParameters { @@ -3236,23 +3198,38 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); + if decay_before_reload { + SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); + } + let mut serialized_scorer = Vec::new(); scorer.write(&mut serialized_scorer).unwrap(); - SinceEpoch::advance(Duration::from_secs(10)); - let mut serialized_scorer = io::Cursor::new(&serialized_scorer); - let deserialized_scorer = + let mut deserialized_scorer = ::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); + if !decay_before_reload { + SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); + deserialized_scorer.time_passed(Duration::from_secs(10)); + } assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); SinceEpoch::advance(Duration::from_secs(10)); + deserialized_scorer.time_passed(Duration::from_secs(20)); assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 370); } + #[test] + fn decays_persisted_liquidity_bounds() { + do_decays_persisted_liquidity_bounds(false); + do_decays_persisted_liquidity_bounds(true); + } + #[test] fn scores_realistic_payments() { // Shows the scores of "realistic" sends of 100k sats over channels of 1-10m sats (with a @@ -3577,6 +3554,7 @@ mod tests { // Advance the time forward 16 half-lives (which the docs claim will ensure all data is // gone), and check that we're back to where we started. SinceEpoch::advance(Duration::from_secs(10 * 16)); + scorer.time_passed(Duration::from_secs(10 * 16)); { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3591,7 +3569,7 @@ mod tests { // Once fully decayed we still have data, but its all-0s. In the future we may remove the // data entirely instead. assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), - None); + Some(([0; 32], [0; 32]))); assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1, ¶ms), None); let mut usage = ChannelUsage { @@ -3610,8 +3588,6 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 2050); - usage.inflight_htlc_msat = 0; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 866); let usage = ChannelUsage { amount_msat: 1, @@ -3623,6 +3599,12 @@ mod tests { // Advance to decay all liquidity offsets to zero. SinceEpoch::advance(Duration::from_secs(60 * 60 * 10)); + scorer.time_passed(Duration::from_secs(10 * (16 + 60 * 60))); + + // Once even the bounds have decayed information about the channel should be removed + // entirely. + assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), + None); // Use a path in the opposite direction, which have zero for htlc_maximum_msat. This will // ensure that the effective capacity is zero to test division-by-zero edge cases. From 6f8838fe7033b8ab6733baface32bdb183abefc7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 01:11:10 +0000 Subject: [PATCH 07/17] Stop decaying liquidity information during bounds-based scoring Because scoring is an incredibly performance-sensitive operation, doing liquidity information decay (and especially fetching the current time!) during scoring isn't really a great idea. Now that we decay liquidity information in the background, we don't have any reason to decay during scoring, and we ultimately remove it entirely here. --- lightning/src/routing/scoring.rs | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 4391da8e93e..d0766b6a78a 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1260,25 +1260,7 @@ impl, BRT: Deref, } fn decayed_offset_msat(&self, offset_msat: u64) -> u64 { - let half_life = self.decay_params.liquidity_offset_half_life.as_secs(); - if half_life != 0 { - // Decay the offset by the appropriate number of half lives. If half of the next half - // life has passed, approximate an additional three-quarter life to help smooth out the - // decay. - let elapsed_time = self.now.duration_since(*self.last_updated).as_secs(); - let half_decays = elapsed_time / (half_life / 2); - let decays = half_decays / 2; - let decayed_offset_msat = offset_msat.checked_shr(decays as u32).unwrap_or(0); - if half_decays % 2 == 0 { - decayed_offset_msat - } else { - // 11_585 / 16_384 ~= core::f64::consts::FRAC_1_SQRT_2 - // 16_384 == 2^14 - (decayed_offset_msat as u128 * 11_585 / 16_384) as u64 - } - } else { - 0 - } + offset_msat } } From 5ac68c1af3368e8b54e3742527d5753303e65ef7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 29 Nov 2023 03:07:54 +0000 Subject: [PATCH 08/17] Update history bucket last_update time immediately on update Now that we aren't decaying during scoring, when we set the last_updated time in the history bucket logic doesn't matter, so we should just update it when we've just updated the history buckets. --- lightning/src/routing/scoring.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index d0766b6a78a..72720245f5f 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1318,6 +1318,7 @@ impl, BRT: DerefMut, BRT: DerefMut, BRT: DerefMut Date: Mon, 9 Oct 2023 01:15:18 +0000 Subject: [PATCH 09/17] Pipe `Duration`-based time information through scoring pipeline In the coming commits, the `T: Time` bound on `ProbabilisticScorer` will be removed. In order to enable that, we need to pass the current time (as a `Duration` since the unix epoch) through the score updating pipeline, allowing us to keep the `*last_updated_time` fields up-to-date as we go. --- lightning/src/routing/scoring.rs | 69 +++++++++++++++++++------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 72720245f5f..a74331df7e1 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1064,7 +1064,10 @@ impl ChannelLiquidity { } } - fn decayed_offset(&self, offset: u64, decay_params: ProbabilisticScoringDecayParameters) -> u64 { + fn decayed_offset( + &self, offset: u64, duration_since_epoch: Duration, + decay_params: ProbabilisticScoringDecayParameters, + ) -> u64 { let half_life = decay_params.liquidity_offset_half_life.as_secs_f64(); if half_life != 0.0 { let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64(); @@ -1266,44 +1269,50 @@ impl, BRT: Deref, impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity { /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`. - fn failed_at_channel(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn failed_at_channel( + &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let existing_max_msat = self.max_liquidity_msat(); if amount_msat < existing_max_msat { log_debug!(logger, "Setting max liquidity of {} from {} to {}", chan_descr, existing_max_msat, amount_msat); - self.set_max_liquidity_msat(amount_msat); + self.set_max_liquidity_msat(amount_msat, duration_since_epoch); } else { log_trace!(logger, "Max liquidity of {} is {} (already less than or equal to {})", chan_descr, existing_max_msat, amount_msat); } - self.update_history_buckets(0); + self.update_history_buckets(0, duration_since_epoch); } /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat` downstream. - fn failed_downstream(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn failed_downstream( + &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let existing_min_msat = self.min_liquidity_msat(); if amount_msat > existing_min_msat { log_debug!(logger, "Setting min liquidity of {} from {} to {}", existing_min_msat, chan_descr, amount_msat); - self.set_min_liquidity_msat(amount_msat); + self.set_min_liquidity_msat(amount_msat, duration_since_epoch); } else { log_trace!(logger, "Min liquidity of {} is {} (already greater than or equal to {})", chan_descr, existing_min_msat, amount_msat); } - self.update_history_buckets(0); + self.update_history_buckets(0, duration_since_epoch); } /// Adjusts the channel liquidity balance bounds when successfully routing `amount_msat`. - fn successful(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn successful(&mut self, + amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let max_liquidity_msat = self.max_liquidity_msat().checked_sub(amount_msat).unwrap_or(0); log_debug!(logger, "Subtracting {} from max liquidity of {} (setting it to {})", amount_msat, chan_descr, max_liquidity_msat); - self.set_max_liquidity_msat(max_liquidity_msat); - self.update_history_buckets(amount_msat); + self.set_max_liquidity_msat(max_liquidity_msat, duration_since_epoch); + self.update_history_buckets(amount_msat, duration_since_epoch); } /// Updates the history buckets for this channel. Because the history buckets track what we now /// know about the channel's state *prior to our payment* (i.e. what we assume is "steady /// state"), we allow the caller to set an offset applied to our liquidity bounds which /// represents the amount of the successful payment we just made. - fn update_history_buckets(&mut self, bucket_offset_msat: u64) { + fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) { let half_lives = self.now.duration_since(*self.offset_history_last_updated).as_secs() .checked_div(self.decay_params.historical_no_updates_half_life.as_secs()) .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value()); @@ -1322,7 +1331,7 @@ impl, BRT: DerefMut self.max_liquidity_msat() { 0 @@ -1333,7 +1342,7 @@ impl, BRT: DerefMut>, L: Deref, T: Time> ScoreLookUp for Prob } impl>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime where L::Target: Logger { - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, _duration_since_epoch: Duration) { + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); let network_graph = self.network_graph.read_only(); @@ -1419,13 +1428,15 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .failed_at_channel(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .failed_at_channel(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .failed_downstream(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .failed_downstream(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } } else { log_debug!(self.logger, "Not able to penalize channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", @@ -1435,7 +1446,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob } } - fn payment_path_successful(&mut self, path: &Path, _duration_since_epoch: Duration) { + fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through SCID {} as having succeeded at {} msat.", path.hops.split_last().map(|(hop, _)| hop.short_channel_id).unwrap_or(0), amount_msat); @@ -1453,7 +1464,8 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .successful(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .successful(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { log_debug!(self.logger, "Not able to learn for channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", hop.short_channel_id); @@ -1469,12 +1481,15 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob self.payment_path_failed(path, u64::max_value(), duration_since_epoch) } - fn time_passed(&mut self, _duration_since_epoch: Duration) { + fn time_passed(&mut self, duration_since_epoch: Duration) { let decay_params = self.decay_params; self.channel_liquidities.retain(|_scid, liquidity| { - liquidity.min_liquidity_offset_msat = liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, decay_params); - liquidity.max_liquidity_offset_msat = liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, decay_params); + liquidity.min_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.max_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); liquidity.last_updated = T::now(); + let elapsed_time = T::now().duration_since(liquidity.offset_history_last_updated); if elapsed_time > decay_params.historical_no_updates_half_life { @@ -2408,7 +2423,7 @@ mod tests { scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_min_liquidity_msat(200); + .set_min_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2434,7 +2449,7 @@ mod tests { scorer.channel_liquidities.get_mut(&43).unwrap() .as_directed_mut(&target, &recipient, 1_000, decay_params) - .set_max_liquidity_msat(200); + .set_max_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&43).unwrap() .as_directed(&target, &recipient, 1_000, decay_params); @@ -2480,7 +2495,7 @@ mod tests { // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_min_liquidity_msat(900); + .set_min_liquidity_msat(900, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2495,7 +2510,7 @@ mod tests { // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&target, &source, 1_000, decay_params) - .set_min_liquidity_msat(400); + .set_min_liquidity_msat(400, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2541,7 +2556,7 @@ mod tests { // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_max_liquidity_msat(300); + .set_max_liquidity_msat(300, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2556,7 +2571,7 @@ mod tests { // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&target, &source, 1_000, decay_params) - .set_max_liquidity_msat(600); + .set_max_liquidity_msat(600, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); From 22888425f7bfb5a9ac9ffd32906f1a01a035021b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 01:44:33 +0000 Subject: [PATCH 10/17] Use `Duration` based time info in scoring rather than `Time` In the coming commits, the `T: Time` bound on `ProbabilisticScorer` will be removed. In order to enable that, we need to switch over to using the `ScoreUpdate`-provided current time (as a `Duration` since the unix epoch), making the `T` bound entirely unused. --- lightning/src/routing/scoring.rs | 147 +++++++++++++------------------ 1 file changed, 62 insertions(+), 85 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index a74331df7e1..2c29fd3f1c0 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -493,7 +493,8 @@ where L::Target: Logger { decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L, - channel_liquidities: HashMap>, + channel_liquidities: HashMap, + _unused_time: core::marker::PhantomData, } /// Parameters for configuring [`ProbabilisticScorer`]. @@ -797,7 +798,7 @@ impl ProbabilisticScoringDecayParameters { /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the /// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity /// offset fields gives the opposite direction. -struct ChannelLiquidity { +struct ChannelLiquidity { /// Lower channel liquidity bound in terms of an offset from zero. min_liquidity_offset_msat: u64, @@ -807,23 +808,22 @@ struct ChannelLiquidity { min_liquidity_offset_history: HistoricalBucketRangeTracker, max_liquidity_offset_history: HistoricalBucketRangeTracker, - /// Time when the liquidity bounds were last modified. - last_updated: T, + /// Time when the liquidity bounds were last modified as an offset since the unix epoch. + last_updated: Duration, - /// Time when the historical liquidity bounds were last modified. - offset_history_last_updated: T, + /// Time when the historical liquidity bounds were last modified as an offset against the unix + /// epoch. + offset_history_last_updated: Duration, } -/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and -/// decayed with a given half life. -struct DirectedChannelLiquidity, BRT: Deref, T: Time, U: Deref> { +/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity. +struct DirectedChannelLiquidity, BRT: Deref, T: Deref> { min_liquidity_offset_msat: L, max_liquidity_offset_msat: L, liquidity_history: HistoricalMinMaxBuckets, capacity_msat: u64, - last_updated: U, - offset_history_last_updated: U, - now: T, + last_updated: T, + offset_history_last_updated: T, decay_params: ProbabilisticScoringDecayParameters, } @@ -836,11 +836,12 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU network_graph, logger, channel_liquidities: HashMap::new(), + _unused_time: core::marker::PhantomData, } } #[cfg(test)] - fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Self { + fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Self { assert!(self.channel_liquidities.insert(short_channel_id, liquidity).is_none()); self } @@ -993,16 +994,15 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU } } -impl ChannelLiquidity { - #[inline] - fn new() -> Self { +impl ChannelLiquidity { + fn new(last_updated: Duration) -> Self { Self { min_liquidity_offset_msat: 0, max_liquidity_offset_msat: 0, min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), - last_updated: T::now(), - offset_history_last_updated: T::now(), + last_updated, + offset_history_last_updated: last_updated, } } @@ -1010,7 +1010,7 @@ impl ChannelLiquidity { /// `capacity_msat`. fn as_directed( &self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters - ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> { + ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, &Duration> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat, @@ -1030,7 +1030,6 @@ impl ChannelLiquidity { capacity_msat, last_updated: &self.last_updated, offset_history_last_updated: &self.offset_history_last_updated, - now: T::now(), decay_params: decay_params, } } @@ -1039,7 +1038,7 @@ impl ChannelLiquidity { /// `capacity_msat`. fn as_directed_mut( &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters - ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> { + ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, &mut Duration> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat, @@ -1059,7 +1058,6 @@ impl ChannelLiquidity { capacity_msat, last_updated: &mut self.last_updated, offset_history_last_updated: &mut self.offset_history_last_updated, - now: T::now(), decay_params: decay_params, } } @@ -1070,7 +1068,7 @@ impl ChannelLiquidity { ) -> u64 { let half_life = decay_params.liquidity_offset_half_life.as_secs_f64(); if half_life != 0.0 { - let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64(); + let elapsed_time = duration_since_epoch.saturating_sub(self.last_updated).as_secs_f64(); ((offset as f64) * powf64(0.5, elapsed_time / half_life)) as u64 } else { 0 @@ -1159,7 +1157,8 @@ fn success_probability( (numerator, denominator) } -impl, BRT: Deref, T: Time, U: Deref> DirectedChannelLiquidity< L, BRT, T, U> { +impl, BRT: Deref, T: Deref> +DirectedChannelLiquidity< L, BRT, T> { /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in /// this direction. fn penalty_msat(&self, amount_msat: u64, score_params: &ProbabilisticScoringFeeParameters) -> u64 { @@ -1267,7 +1266,8 @@ impl, BRT: Deref, } } -impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity { +impl, BRT: DerefMut, T: DerefMut> +DirectedChannelLiquidity { /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`. fn failed_at_channel( &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log @@ -1313,7 +1313,9 @@ impl, BRT: DerefMut, BRT: DerefMut self.max_liquidity_msat() { - 0 - } else { - self.decayed_offset_msat(*self.max_liquidity_offset_msat) - }; - *self.last_updated = self.now; + if amount_msat > self.max_liquidity_msat() { + *self.max_liquidity_offset_msat = 0; + } + *self.last_updated = duration_since_epoch; } /// Adjusts the upper bound of the channel liquidity balance in this direction. fn set_max_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) { *self.max_liquidity_offset_msat = self.capacity_msat.checked_sub(amount_msat).unwrap_or(0); - *self.min_liquidity_offset_msat = if amount_msat < self.min_liquidity_msat() { - 0 - } else { - self.decayed_offset_msat(*self.min_liquidity_offset_msat) - }; - *self.last_updated = self.now; + if amount_msat < *self.min_liquidity_offset_msat { + *self.min_liquidity_offset_msat = 0; + } + *self.last_updated = duration_since_epoch; } } @@ -1396,7 +1394,7 @@ impl>, L: Deref, T: Time> ScoreLookUp for Prob let capacity_msat = usage.effective_capacity.as_msat(); self.channel_liquidities .get(&scid) - .unwrap_or(&ChannelLiquidity::new()) + .unwrap_or(&ChannelLiquidity::new(Duration::ZERO)) .as_directed(&source, &target, capacity_msat, self.decay_params) .penalty_msat(amount_msat, score_params) .saturating_add(anti_probing_penalty_msat) @@ -1426,14 +1424,14 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob if at_failed_channel { self.channel_liquidities .entry(hop.short_channel_id) - .or_insert_with(ChannelLiquidity::new) + .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) .as_directed_mut(source, &target, capacity_msat, self.decay_params) .failed_at_channel(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { self.channel_liquidities .entry(hop.short_channel_id) - .or_insert_with(ChannelLiquidity::new) + .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) .as_directed_mut(source, &target, capacity_msat, self.decay_params) .failed_downstream(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); @@ -1462,7 +1460,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob let capacity_msat = channel.effective_capacity().as_msat(); self.channel_liquidities .entry(hop.short_channel_id) - .or_insert_with(ChannelLiquidity::new) + .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) .as_directed_mut(source, &target, capacity_msat, self.decay_params) .successful(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); @@ -1488,10 +1486,10 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); liquidity.max_liquidity_offset_msat = liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); - liquidity.last_updated = T::now(); + liquidity.last_updated = duration_since_epoch; let elapsed_time = - T::now().duration_since(liquidity.offset_history_last_updated); + duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated); if elapsed_time > decay_params.historical_no_updates_half_life { let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); if half_life != 0.0 { @@ -1502,7 +1500,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob for bucket in liquidity.max_liquidity_offset_history.buckets.iter_mut() { *bucket = ((*bucket as u64) * 1024 / divisor) as u16; } - liquidity.offset_history_last_updated = T::now(); + liquidity.offset_history_last_updated = duration_since_epoch; } } liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || @@ -2125,31 +2123,29 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore network_graph, logger, channel_liquidities, + _unused_time: core::marker::PhantomData, }) } } -impl Writeable for ChannelLiquidity { +impl Writeable for ChannelLiquidity { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { - let offset_history_duration_since_epoch = - T::duration_since_epoch() - self.offset_history_last_updated.elapsed(); - let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed(); write_tlv_fields!(w, { (0, self.min_liquidity_offset_msat, required), // 1 was the min_liquidity_offset_history in octile form (2, self.max_liquidity_offset_msat, required), // 3 was the max_liquidity_offset_history in octile form - (4, duration_since_epoch, required), + (4, self.last_updated, required), (5, Some(self.min_liquidity_offset_history), option), (7, Some(self.max_liquidity_offset_history), option), - (9, offset_history_duration_since_epoch, required), + (9, self.offset_history_last_updated, required), }); Ok(()) } } -impl Readable for ChannelLiquidity { +impl Readable for ChannelLiquidity { #[inline] fn read(r: &mut R) -> Result { let mut min_liquidity_offset_msat = 0; @@ -2158,36 +2154,18 @@ impl Readable for ChannelLiquidity { let mut legacy_max_liq_offset_history: Option = None; let mut min_liquidity_offset_history: Option = None; let mut max_liquidity_offset_history: Option = None; - let mut duration_since_epoch = Duration::from_secs(0); - let mut offset_history_duration_since_epoch = None; + let mut last_updated = Duration::from_secs(0); + let mut offset_history_last_updated = None; read_tlv_fields!(r, { (0, min_liquidity_offset_msat, required), (1, legacy_min_liq_offset_history, option), (2, max_liquidity_offset_msat, required), (3, legacy_max_liq_offset_history, option), - (4, duration_since_epoch, required), + (4, last_updated, required), (5, min_liquidity_offset_history, option), (7, max_liquidity_offset_history, option), - (9, offset_history_duration_since_epoch, option), + (9, offset_history_last_updated, option), }); - // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards. - // We write `last_updated` as wallclock time even though its ultimately an `Instant` (which - // is a time from a monotonic clock usually represented as an offset against boot time). - // Thus, we have to construct an `Instant` by subtracting the difference in wallclock time - // from the one that was written. However, because `Instant` can panic if we construct one - // in the future, we must handle wallclock time jumping backwards, which we do by simply - // using `Instant::now()` in that case. - let wall_clock_now = T::duration_since_epoch(); - let now = T::now(); - let last_updated = if wall_clock_now > duration_since_epoch { - now - (wall_clock_now - duration_since_epoch) - } else { now }; - - let offset_history_duration_since_epoch = - offset_history_duration_since_epoch.unwrap_or(duration_since_epoch); - let offset_history_last_updated = if wall_clock_now > offset_history_duration_since_epoch { - now - (wall_clock_now - offset_history_duration_since_epoch) - } else { now }; if min_liquidity_offset_history.is_none() { if let Some(legacy_buckets) = legacy_min_liq_offset_history { @@ -2209,7 +2187,7 @@ impl Readable for ChannelLiquidity { min_liquidity_offset_history: min_liquidity_offset_history.unwrap(), max_liquidity_offset_history: max_liquidity_offset_history.unwrap(), last_updated, - offset_history_last_updated, + offset_history_last_updated: offset_history_last_updated.unwrap_or(last_updated), }) } } @@ -2219,7 +2197,6 @@ mod tests { use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorerUsingTime}; use crate::blinded_path::{BlindedHop, BlindedPath}; use crate::util::config::UserConfig; - use crate::util::time::Time; use crate::util::time::tests::SinceEpoch; use crate::ln::channelmanager; @@ -2384,8 +2361,8 @@ mod tests { #[test] fn liquidity_bounds_directed_from_lowest_node_id() { let logger = TestLogger::new(); - let last_updated = SinceEpoch::now(); - let offset_history_last_updated = SinceEpoch::now(); + let last_updated = Duration::ZERO; + let offset_history_last_updated = Duration::ZERO; let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) @@ -2465,8 +2442,8 @@ mod tests { #[test] fn resets_liquidity_upper_bound_when_crossed_by_lower_bound() { let logger = TestLogger::new(); - let last_updated = SinceEpoch::now(); - let offset_history_last_updated = SinceEpoch::now(); + let last_updated = Duration::ZERO; + let offset_history_last_updated = Duration::ZERO; let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) @@ -2526,8 +2503,8 @@ mod tests { #[test] fn resets_liquidity_lower_bound_when_crossed_by_upper_bound() { let logger = TestLogger::new(); - let last_updated = SinceEpoch::now(); - let offset_history_last_updated = SinceEpoch::now(); + let last_updated = Duration::ZERO; + let offset_history_last_updated = Duration::ZERO; let network_graph = network_graph(&logger); let decay_params = ProbabilisticScoringDecayParameters::default(); let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger) @@ -2639,8 +2616,8 @@ mod tests { #[test] fn constant_penalty_outside_liquidity_bounds() { let logger = TestLogger::new(); - let last_updated = SinceEpoch::now(); - let offset_history_last_updated = SinceEpoch::now(); + let last_updated = Duration::ZERO; + let offset_history_last_updated = Duration::ZERO; let network_graph = network_graph(&logger); let params = ProbabilisticScoringFeeParameters { liquidity_penalty_multiplier_msat: 1_000, From d15a354f775b9e1bb6bdc0795bc948b03e6cb7ac Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 01:52:20 +0000 Subject: [PATCH 11/17] Drop now-unused `T: Time` bound on `ProbabilisticScorer` Now that we don't access time via the `Time` trait in `ProbabilisticScorer`, we can finally drop the `Time` bound entirely, removing the `ProbabilisticScorerUsingTime` and type alias indirection and replacing it with a simple struct. --- lightning/src/routing/scoring.rs | 43 +++++++++----------------------- 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 2c29fd3f1c0..15193ad2952 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -61,7 +61,6 @@ use crate::routing::gossip::{EffectiveCapacity, NetworkGraph, NodeId}; use crate::routing::router::{Path, CandidateRouteHop}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::logger::Logger; -use crate::util::time::Time; use crate::prelude::*; use core::{cmp, fmt}; @@ -440,13 +439,6 @@ impl ReadableArgs for FixedPenaltyScorer { } } -#[cfg(not(feature = "no-std"))] -type ConfiguredTime = crate::util::time::MonotonicTime; -#[cfg(feature = "no-std")] -use crate::util::time::Eternity; -#[cfg(feature = "no-std")] -type ConfiguredTime = Eternity; - /// [`ScoreLookUp`] implementation using channel success probability distributions. /// /// Channels are tracked with upper and lower liquidity bounds - when an HTLC fails at a channel, @@ -483,18 +475,12 @@ type ConfiguredTime = Eternity; /// [`liquidity_offset_half_life`]: ProbabilisticScoringDecayParameters::liquidity_offset_half_life /// [`historical_liquidity_penalty_multiplier_msat`]: ProbabilisticScoringFeeParameters::historical_liquidity_penalty_multiplier_msat /// [`historical_liquidity_penalty_amount_multiplier_msat`]: ProbabilisticScoringFeeParameters::historical_liquidity_penalty_amount_multiplier_msat -pub type ProbabilisticScorer = ProbabilisticScorerUsingTime::; - -/// Probabilistic [`ScoreLookUp`] implementation. -/// -/// This is not exported to bindings users generally all users should use the [`ProbabilisticScorer`] type alias. -pub struct ProbabilisticScorerUsingTime>, L: Deref, T: Time> +pub struct ProbabilisticScorer>, L: Deref> where L::Target: Logger { decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L, channel_liquidities: HashMap, - _unused_time: core::marker::PhantomData, } /// Parameters for configuring [`ProbabilisticScorer`]. @@ -749,7 +735,7 @@ pub struct ProbabilisticScoringDecayParameters { /// /// Default value: 14 days /// - /// [`historical_estimated_channel_liquidity_probabilities`]: ProbabilisticScorerUsingTime::historical_estimated_channel_liquidity_probabilities + /// [`historical_estimated_channel_liquidity_probabilities`]: ProbabilisticScorer::historical_estimated_channel_liquidity_probabilities pub historical_no_updates_half_life: Duration, /// Whenever this amount of time elapses since the last update to a channel's liquidity bounds, @@ -827,7 +813,7 @@ struct DirectedChannelLiquidity, BRT: Deref>, L: Deref, T: Time> ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref> ProbabilisticScorer where L::Target: Logger { /// Creates a new scorer using the given scoring parameters for sending payments from a node /// through a network graph. pub fn new(decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L) -> Self { @@ -836,7 +822,6 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU network_graph, logger, channel_liquidities: HashMap::new(), - _unused_time: core::marker::PhantomData, } } @@ -1351,7 +1336,7 @@ DirectedChannelLiquidity { } } -impl>, L: Deref, T: Time> ScoreLookUp for ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref> ScoreLookUp for ProbabilisticScorer where L::Target: Logger { type ScoreParams = ProbabilisticScoringFeeParameters; fn channel_penalty_msat( &self, candidate: &CandidateRouteHop, usage: ChannelUsage, score_params: &ProbabilisticScoringFeeParameters @@ -1402,7 +1387,7 @@ impl>, L: Deref, T: Time> ScoreLookUp for Prob } } -impl>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref> ScoreUpdate for ProbabilisticScorer where L::Target: Logger { fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); @@ -1511,7 +1496,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob } #[cfg(c_bindings)] -impl>, L: Deref, T: Time> Score for ProbabilisticScorerUsingTime +impl>, L: Deref> Score for ProbabilisticScorer where L::Target: Logger {} #[cfg(feature = "std")] @@ -2097,7 +2082,7 @@ mod bucketed_history { } use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, HistoricalMinMaxBuckets}; -impl>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref> Writeable for ProbabilisticScorer where L::Target: Logger { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { write_tlv_fields!(w, { @@ -2107,8 +2092,8 @@ impl>, L: Deref, T: Time> Writeable for Probab } } -impl>, L: Deref, T: Time> -ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref> +ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScorer where L::Target: Logger { #[inline] fn read( r: &mut R, args: (ProbabilisticScoringDecayParameters, G, L) @@ -2123,7 +2108,6 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore network_graph, logger, channel_liquidities, - _unused_time: core::marker::PhantomData, }) } } @@ -2194,7 +2178,7 @@ impl Readable for ChannelLiquidity { #[cfg(test)] mod tests { - use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorerUsingTime}; + use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorer}; use crate::blinded_path::{BlindedHop, BlindedPath}; use crate::util::config::UserConfig; use crate::util::time::tests::SinceEpoch; @@ -2243,9 +2227,6 @@ mod tests { // `ProbabilisticScorer` tests - /// A probabilistic scorer for testing with time that can be manually advanced. - type ProbabilisticScorer<'a> = ProbabilisticScorerUsingTime::<&'a NetworkGraph<&'a TestLogger>, &'a TestLogger, SinceEpoch>; - fn sender_privkey() -> SecretKey { SecretKey::from_slice(&[41; 32]).unwrap() } @@ -3138,7 +3119,7 @@ mod tests { let mut serialized_scorer = io::Cursor::new(&serialized_scorer); let deserialized_scorer = - ::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); + >::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); } @@ -3181,7 +3162,7 @@ mod tests { let mut serialized_scorer = io::Cursor::new(&serialized_scorer); let mut deserialized_scorer = - ::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); + >::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); if !decay_before_reload { SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10)); From 512f44cf15b70412d0023fad8cfa39260536c0fe Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 02:21:09 +0000 Subject: [PATCH 12/17] Drop now-trivial `decayed_offset_msat` helper utility As we now no longer decay bounds information when fetching them, there is no need to have a decaying-fetching helper utility. --- lightning/src/routing/scoring.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 15193ad2952..d6b1f44587f 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1236,18 +1236,14 @@ DirectedChannelLiquidity< L, BRT, T> { /// Returns the lower bound of the channel liquidity balance in this direction. #[inline(always)] fn min_liquidity_msat(&self) -> u64 { - self.decayed_offset_msat(*self.min_liquidity_offset_msat) + *self.min_liquidity_offset_msat } /// Returns the upper bound of the channel liquidity balance in this direction. #[inline(always)] fn max_liquidity_msat(&self) -> u64 { self.capacity_msat - .saturating_sub(self.decayed_offset_msat(*self.max_liquidity_offset_msat)) - } - - fn decayed_offset_msat(&self, offset_msat: u64) -> u64 { - offset_msat + .saturating_sub(*self.max_liquidity_offset_msat) } } @@ -1306,13 +1302,11 @@ DirectedChannelLiquidity { self.liquidity_history.min_liquidity_offset_history.time_decay_data(half_lives); self.liquidity_history.max_liquidity_offset_history.time_decay_data(half_lives); - let min_liquidity_offset_msat = self.decayed_offset_msat(*self.min_liquidity_offset_msat); self.liquidity_history.min_liquidity_offset_history.track_datapoint( - min_liquidity_offset_msat + bucket_offset_msat, self.capacity_msat + *self.min_liquidity_offset_msat + bucket_offset_msat, self.capacity_msat ); - let max_liquidity_offset_msat = self.decayed_offset_msat(*self.max_liquidity_offset_msat); self.liquidity_history.max_liquidity_offset_history.track_datapoint( - max_liquidity_offset_msat.saturating_sub(bucket_offset_msat), self.capacity_msat + self.max_liquidity_offset_msat.saturating_sub(bucket_offset_msat), self.capacity_msat ); *self.offset_history_last_updated = duration_since_epoch; } From 40b4094e878413aa16e656462554c3ffc30918a2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 03:23:55 +0000 Subject: [PATCH 13/17] Add a benchmark for decaying a 100k channel scorer's liquidity info This is a good gut-check to ensure we don't end up taking a ton of time decaying channel liquidity info. It currently clocks in around 1.25ms on an i7-1360P. --- bench/benches/bench.rs | 3 +- lightning/src/routing/scoring.rs | 58 ++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/bench/benches/bench.rs b/bench/benches/bench.rs index eaa3fcec50c..b854ffb93ce 100644 --- a/bench/benches/bench.rs +++ b/bench/benches/bench.rs @@ -21,5 +21,6 @@ criterion_group!(benches, lightning_persister::fs_store::bench::bench_sends, lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file, lightning::routing::gossip::benches::read_network_graph, - lightning::routing::gossip::benches::write_network_graph); + lightning::routing::gossip::benches::write_network_graph, + lightning::routing::scoring::benches::decay_100k_channel_bounds); criterion_main!(benches); diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index d6b1f44587f..cf139387396 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -3748,3 +3748,61 @@ mod tests { Some(0.0)); } } + +#[cfg(ldk_bench)] +pub mod benches { + use super::*; + use criterion::Criterion; + use crate::routing::router::{bench_utils, RouteHop}; + use crate::util::test_utils::TestLogger; + use crate::ln::features::{ChannelFeatures, NodeFeatures}; + + pub fn decay_100k_channel_bounds(bench: &mut Criterion) { + let logger = TestLogger::new(); + let network_graph = bench_utils::read_network_graph(&logger).unwrap(); + let mut scorer = ProbabilisticScorer::new(Default::default(), &network_graph, &logger); + // Score a number of random channels + let mut seed: u64 = 0xdeadbeef; + for _ in 0..100_000 { + seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0; + let (victim, victim_dst, amt) = { + let rong = network_graph.read_only(); + let channels = rong.channels(); + let chan = channels.unordered_iter() + .skip((seed as usize) % channels.len()) + .next().unwrap(); + seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0; + let amt = seed % chan.1.capacity_sats.map(|c| c * 1000) + .or(chan.1.one_to_two.as_ref().map(|info| info.htlc_maximum_msat)) + .or(chan.1.two_to_one.as_ref().map(|info| info.htlc_maximum_msat)) + .unwrap_or(1_000_000_000).saturating_add(1); + (*chan.0, chan.1.node_two, amt) + }; + let path = Path { + hops: vec![RouteHop { + pubkey: victim_dst.as_pubkey().unwrap(), + node_features: NodeFeatures::empty(), + short_channel_id: victim, + channel_features: ChannelFeatures::empty(), + fee_msat: amt, + cltv_expiry_delta: 42, + maybe_announced_channel: true, + }], + blinded_tail: None + }; + seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0; + if seed % 1 == 0 { + scorer.probe_failed(&path, victim, Duration::ZERO); + } else { + scorer.probe_successful(&path, Duration::ZERO); + } + } + let mut cur_time = Duration::ZERO; + cur_time += Duration::from_millis(1); + scorer.decay_liquidity_certainty(cur_time); + bench.bench_function("decay_100k_channel_bounds", |b| b.iter(|| { + cur_time += Duration::from_millis(1); + scorer.decay_liquidity_certainty(cur_time); + })); + } +} From 81389dee306d960a8030bec5ffa304004148ce85 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 12 Oct 2023 18:23:51 +0000 Subject: [PATCH 14/17] Drop warning about mixing `no-std` and `std` `ProbabilisticScorer`s Now that the serialization format of `no-std` and `std` `ProbabilisticScorer`s both just use `Duration` since UNIX epoch and don't care about time except when decaying, we don't need to warn users to not mix the scorers across `no-std` and `std` flags. Fixes #2539 --- lightning/src/routing/scoring.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index cf139387396..0dd0c2ea388 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -464,11 +464,6 @@ impl ReadableArgs for FixedPenaltyScorer { /// formula, but using the history of a channel rather than our latest estimates for the liquidity /// bounds. /// -/// # Note -/// -/// Mixing the `no-std` feature between serialization and deserialization results in undefined -/// behavior. -/// /// [1]: https://arxiv.org/abs/2107.05322 /// [`liquidity_penalty_multiplier_msat`]: ProbabilisticScoringFeeParameters::liquidity_penalty_multiplier_msat /// [`liquidity_penalty_amount_multiplier_msat`]: ProbabilisticScoringFeeParameters::liquidity_penalty_amount_multiplier_msat From 21facd0d175a450e986a83dfbef53c112aa48f84 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 29 Nov 2023 00:31:00 +0000 Subject: [PATCH 15/17] Make scorer decay + persistence more frequent There's some edge cases in our scoring when the information really should be decayed but hasn't yet been prior to an update. Rather than try to fix them exactly, we instead decay the scorer a bit more often, which largely solves them but also gives us a bit more accurate bounds on our channels, allowing us to reuse channels at a similar amount to what just failed immediately, but at a substantial penalty. --- lightning-background-processor/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 1d5899682b3..a12ec9c0f3b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -113,7 +113,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; const NETWORK_PRUNE_TIMER: u64 = 60 * 60; #[cfg(not(test))] -const SCORER_PERSIST_TIMER: u64 = 60 * 60; +const SCORER_PERSIST_TIMER: u64 = 60 * 5; #[cfg(test)] const SCORER_PERSIST_TIMER: u64 = 1; From 18b42319bc5e5a3940e8ba17c72b7df8b285c563 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 29 Nov 2023 00:33:16 +0000 Subject: [PATCH 16/17] Drop half-life-based bucket decay in `update_history_buckets` Because we decay the bucket information in the background, there's not much reason to try to decay them immediately prior to updating, and in removing that we can also clean up a good bit of dead code, which we do here. --- lightning/src/routing/scoring.rs | 136 +++++++++---------------------- 1 file changed, 38 insertions(+), 98 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 0dd0c2ea388..0bea2223cab 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -789,7 +789,7 @@ struct ChannelLiquidity { min_liquidity_offset_history: HistoricalBucketRangeTracker, max_liquidity_offset_history: HistoricalBucketRangeTracker, - /// Time when the liquidity bounds were last modified as an offset since the unix epoch. + /// Time when either liquidity bound was last modified as an offset since the unix epoch. last_updated: Duration, /// Time when the historical liquidity bounds were last modified as an offset against the unix @@ -805,7 +805,6 @@ struct DirectedChannelLiquidity, BRT: Deref>, L: Deref> ProbabilisticScorer where L::Target: Logger { @@ -837,7 +836,7 @@ impl>, L: Deref> ProbabilisticScorer whe let log_direction = |source, target| { if let Some((directed_info, _)) = chan_debug.as_directed_to(target) { let amt = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, amt, self.decay_params); + let dir_liq = liq.as_directed(source, target, amt); let min_buckets = &dir_liq.liquidity_history.min_liquidity_offset_history.buckets; let max_buckets = &dir_liq.liquidity_history.max_liquidity_offset_history.buckets; @@ -889,7 +888,7 @@ impl>, L: Deref> ProbabilisticScorer whe if let Some(liq) = self.channel_liquidities.get(&scid) { if let Some((directed_info, source)) = chan.as_directed_to(target) { let amt = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, amt, self.decay_params); + let dir_liq = liq.as_directed(source, target, amt); return Some((dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat())); } } @@ -931,7 +930,7 @@ impl>, L: Deref> ProbabilisticScorer whe if let Some(liq) = self.channel_liquidities.get(&scid) { if let Some((directed_info, source)) = chan.as_directed_to(target) { let amt = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, amt, self.decay_params); + let dir_liq = liq.as_directed(source, target, amt); let min_buckets = dir_liq.liquidity_history.min_liquidity_offset_history.buckets; let mut max_buckets = dir_liq.liquidity_history.max_liquidity_offset_history.buckets; @@ -962,7 +961,7 @@ impl>, L: Deref> ProbabilisticScorer whe if let Some(liq) = self.channel_liquidities.get(&scid) { if let Some((directed_info, source)) = chan.as_directed_to(target) { let capacity_msat = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, capacity_msat, self.decay_params); + let dir_liq = liq.as_directed(source, target, capacity_msat); return dir_liq.liquidity_history.calculate_success_probability_times_billion( ¶ms, amount_msat, capacity_msat @@ -989,7 +988,7 @@ impl ChannelLiquidity { /// Returns a view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. fn as_directed( - &self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters + &self, source: &NodeId, target: &NodeId, capacity_msat: u64, ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, &Duration> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { @@ -1010,14 +1009,13 @@ impl ChannelLiquidity { capacity_msat, last_updated: &self.last_updated, offset_history_last_updated: &self.offset_history_last_updated, - decay_params: decay_params, } } /// Returns a mutable view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. fn as_directed_mut( - &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters + &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, &mut Duration> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { @@ -1038,7 +1036,6 @@ impl ChannelLiquidity { capacity_msat, last_updated: &mut self.last_updated, offset_history_last_updated: &mut self.offset_history_last_updated, - decay_params: decay_params, } } @@ -1289,14 +1286,6 @@ DirectedChannelLiquidity { /// state"), we allow the caller to set an offset applied to our liquidity bounds which /// represents the amount of the successful payment we just made. fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) { - let half_lives = - duration_since_epoch.checked_sub(*self.offset_history_last_updated) - .unwrap_or(Duration::ZERO).as_secs() - .checked_div(self.decay_params.historical_no_updates_half_life.as_secs()) - .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value()); - self.liquidity_history.min_liquidity_offset_history.time_decay_data(half_lives); - self.liquidity_history.max_liquidity_offset_history.time_decay_data(half_lives); - self.liquidity_history.min_liquidity_offset_history.track_datapoint( *self.min_liquidity_offset_msat + bucket_offset_msat, self.capacity_msat ); @@ -1369,7 +1358,7 @@ impl>, L: Deref> ScoreLookUp for Probabilistic self.channel_liquidities .get(&scid) .unwrap_or(&ChannelLiquidity::new(Duration::ZERO)) - .as_directed(&source, &target, capacity_msat, self.decay_params) + .as_directed(&source, &target, capacity_msat) .penalty_msat(amount_msat, score_params) .saturating_add(anti_probing_penalty_msat) .saturating_add(base_penalty_msat) @@ -1399,14 +1388,14 @@ impl>, L: Deref> ScoreUpdate for Probabilistic self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) - .as_directed_mut(source, &target, capacity_msat, self.decay_params) + .as_directed_mut(source, &target, capacity_msat) .failed_at_channel(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) - .as_directed_mut(source, &target, capacity_msat, self.decay_params) + .as_directed_mut(source, &target, capacity_msat) .failed_downstream(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } @@ -1435,7 +1424,7 @@ impl>, L: Deref> ScoreUpdate for Probabilistic self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch)) - .as_directed_mut(source, &target, capacity_msat, self.decay_params) + .as_directed_mut(source, &target, capacity_msat) .successful(amount_msat, duration_since_epoch, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { @@ -1959,14 +1948,6 @@ mod bucketed_history { self.buckets[bucket] = self.buckets[bucket].saturating_add(BUCKET_FIXED_POINT_ONE); } } - /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old - /// datapoints as we receive newer information. - #[inline] - pub(super) fn time_decay_data(&mut self, half_lives: u32) { - for e in self.buckets.iter_mut() { - *e = e.checked_shr(half_lives).unwrap_or(0); - } - } } impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); @@ -2359,52 +2340,52 @@ mod tests { // Update minimum liquidity. let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 100); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 900); scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, decay_params) + .as_directed_mut(&source, &target, 1_000) .set_min_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 800); // Update maximum liquidity. let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&target, &recipient, 1_000, decay_params); + .as_directed(&target, &recipient, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 900); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&recipient, &target, 1_000, decay_params); + .as_directed(&recipient, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 100); assert_eq!(liquidity.max_liquidity_msat(), 300); scorer.channel_liquidities.get_mut(&43).unwrap() - .as_directed_mut(&target, &recipient, 1_000, decay_params) + .as_directed_mut(&target, &recipient, 1_000) .set_max_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&target, &recipient, 1_000, decay_params); + .as_directed(&target, &recipient, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 200); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&recipient, &target, 1_000, decay_params); + .as_directed(&recipient, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 800); assert_eq!(liquidity.max_liquidity_msat(), 1000); } @@ -2430,42 +2411,42 @@ mod tests { // Check initial bounds. let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 800); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 600); // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, decay_params) + .as_directed_mut(&source, &target, 1_000) .set_min_liquidity_msat(900, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 900); assert_eq!(liquidity.max_liquidity_msat(), 1_000); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 100); // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&target, &source, 1_000, decay_params) + .as_directed_mut(&target, &source, 1_000) .set_min_liquidity_msat(400, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 600); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 1_000); } @@ -2491,42 +2472,42 @@ mod tests { // Check initial bounds. let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 800); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 600); // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, decay_params) + .as_directed_mut(&source, &target, 1_000) .set_max_liquidity_msat(300, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 1_000); // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&target, &source, 1_000, decay_params) + .as_directed_mut(&target, &source, 1_000) .set_max_liquidity_msat(600, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 1_000); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, decay_params); + .as_directed(&target, &source, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 600); } @@ -2971,47 +2952,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); } - #[test] - fn decays_liquidity_bounds_without_shift_overflow() { - let logger = TestLogger::new(); - let network_graph = network_graph(&logger); - let params = ProbabilisticScoringFeeParameters { - liquidity_penalty_multiplier_msat: 1_000, - ..ProbabilisticScoringFeeParameters::zero_penalty() - }; - let decay_params = ProbabilisticScoringDecayParameters { - liquidity_offset_half_life: Duration::from_secs(10), - ..ProbabilisticScoringDecayParameters::default() - }; - let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger); - let source = source_node_id(); - let usage = ChannelUsage { - amount_msat: 256, - inflight_htlc_msat: 0, - effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_000 }, - }; - let channel = network_graph.read_only().channel(42).unwrap().to_owned(); - let (info, _) = channel.as_directed_from(&source).unwrap(); - let candidate = CandidateRouteHop::PublicHop { - info, - short_channel_id: 42, - }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); - - scorer.payment_path_failed(&payment_path_for_amount(512), 42, Duration::ZERO); - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 281); - - // An unchecked right shift 64 bits or more in DirectedChannelLiquidity::decayed_offset_msat - // would cause an overflow. - SinceEpoch::advance(Duration::from_secs(10 * 64)); - scorer.time_passed(Duration::from_secs(10 * 64)); - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); - - SinceEpoch::advance(Duration::from_secs(10)); - scorer.time_passed(Duration::from_secs(10 * 65)); - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); - } - #[test] fn restricts_liquidity_bounds_after_decay() { let logger = TestLogger::new(); @@ -3660,7 +3600,7 @@ mod tests { scorer.payment_path_failed(&path, 43, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, decay_params); + .as_directed(&source, &target, 1_000); assert_eq!(liquidity.min_liquidity_msat(), 256); assert_eq!(liquidity.max_liquidity_msat(), 768); } From f8fb70a7655d670254ed31801cf95578d3ad4889 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 5 Dec 2023 18:15:55 +0000 Subject: [PATCH 17/17] Drop fake time advancing in scoring tests Now that we use explicit times passed to decay methods, there's no reason to make calls to `SinceEpoch::advance` in scoring tests. --- lightning/src/routing/scoring.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 0bea2223cab..12ffdecec42 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -2151,7 +2151,6 @@ mod tests { use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorer}; use crate::blinded_path::{BlindedHop, BlindedPath}; use crate::util::config::UserConfig; - use crate::util::time::tests::SinceEpoch; use crate::ln::channelmanager; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; @@ -2901,7 +2900,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); // Half decay (i.e., three-quarter life) - SinceEpoch::advance(Duration::from_secs(5)); scorer.time_passed(Duration::from_secs(5)); let usage = ChannelUsage { amount_msat: 128, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 22); @@ -2913,7 +2911,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); // One decay (i.e., half life) - SinceEpoch::advance(Duration::from_secs(5)); scorer.time_passed(Duration::from_secs(10)); let usage = ChannelUsage { amount_msat: 64, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); @@ -2925,7 +2922,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); // Fully decay liquidity lower bound. - SinceEpoch::advance(Duration::from_secs(10 * 7)); scorer.time_passed(Duration::from_secs(10 * 8)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); @@ -2937,14 +2933,12 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); // Fully decay liquidity upper bound. - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10 * 9)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1_024, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10 * 10)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); @@ -2986,7 +2980,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 281); // Decaying knowledge gives less confidence (128, 896), meaning a higher penalty. - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 291); @@ -3001,7 +2994,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 245); // Further decaying affects the lower bound more than the upper bound (128, 928). - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(20)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 280); } @@ -3036,7 +3028,6 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); @@ -3082,7 +3073,6 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); if decay_before_reload { - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10)); } @@ -3093,7 +3083,6 @@ mod tests { let mut deserialized_scorer = >::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); if !decay_before_reload { - SinceEpoch::advance(Duration::from_secs(10)); scorer.time_passed(Duration::from_secs(10)); deserialized_scorer.time_passed(Duration::from_secs(10)); } @@ -3102,7 +3091,6 @@ mod tests { scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); - SinceEpoch::advance(Duration::from_secs(10)); deserialized_scorer.time_passed(Duration::from_secs(20)); assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 370); } @@ -3436,7 +3424,6 @@ mod tests { // Advance the time forward 16 half-lives (which the docs claim will ensure all data is // gone), and check that we're back to where we started. - SinceEpoch::advance(Duration::from_secs(10 * 16)); scorer.time_passed(Duration::from_secs(10 * 16)); { let network_graph = network_graph.read_only(); @@ -3481,7 +3468,6 @@ mod tests { } // Advance to decay all liquidity offsets to zero. - SinceEpoch::advance(Duration::from_secs(60 * 60 * 10)); scorer.time_passed(Duration::from_secs(10 * (16 + 60 * 60))); // Once even the bounds have decayed information about the channel should be removed @@ -3734,10 +3720,10 @@ pub mod benches { } let mut cur_time = Duration::ZERO; cur_time += Duration::from_millis(1); - scorer.decay_liquidity_certainty(cur_time); + scorer.time_passed(cur_time); bench.bench_function("decay_100k_channel_bounds", |b| b.iter(|| { cur_time += Duration::from_millis(1); - scorer.decay_liquidity_certainty(cur_time); + scorer.time_passed(cur_time); })); } }