From 8187e66963bd197dbc0443dae902578209ab512d Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 15 Aug 2023 14:37:25 -0700 Subject: [PATCH 01/15] Create test-aware db env vars. --- src/config.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1de24822..0655aa7d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -74,13 +74,19 @@ pub(crate) fn cache_path() -> String { pub(crate) fn db_connection_config() -> Config { let mut config = Config::new(); - let host = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_HOST").unwrap_or("localhost".to_string()); - let user = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_USER").unwrap_or("alice".to_string()); - let db = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_NAME").unwrap_or("ln_graph_sync".to_string()); + let env_name_prefix = if cfg!(test) { + "RAPID_GOSSIP_TEST_DB" + } else { + "RAPID_GOSSIP_SYNC_SERVER_DB" + }; + + let host = env::var(format!("{}{}", env_name_prefix, "_HOST")).unwrap_or("localhost".to_string()); + let user = env::var(format!("{}{}", env_name_prefix, "_USER")).unwrap_or("alice".to_string()); + let db = env::var(format!("{}{}", env_name_prefix, "_NAME")).unwrap_or("ln_graph_sync".to_string()); config.host(&host); config.user(&user); config.dbname(&db); - if let Ok(password) = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD") { + if let Ok(password) = env::var(format!("{}{}", env_name_prefix, "_PASSWORD")) { config.password(&password); } config From d36993aaefc84b498cbc2348052eec6c8a444836 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 15 Aug 2023 14:38:17 -0700 Subject: [PATCH 02/15] Create TestLogger. --- src/types.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/types.rs b/src/types.rs index 0b03081f..1a4d677d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -36,3 +36,58 @@ impl Logger for RGSSLogger { println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); } } + +#[cfg(test)] +pub mod tests { + use std::collections::HashMap; + use std::sync::{Mutex}; + use lightning::util::logger::{Level, Logger, Record}; + + pub struct TestLogger { + level: Level, + pub(crate) id: String, + pub lines: Mutex>, + } + + impl TestLogger { + pub fn new() -> TestLogger { + Self::with_id("".to_owned()) + } + pub fn with_id(id: String) -> TestLogger { + TestLogger { + level: Level::Trace, + id, + lines: Mutex::new(HashMap::new()), + } + } + pub fn enable(&mut self, level: Level) { + self.level = level; + } + pub fn assert_log(&self, module: String, line: String, count: usize) { + let log_entries = self.lines.lock().unwrap(); + assert_eq!(log_entries.get(&(module, line)), Some(&count)); + } + + /// Search for the number of occurrence of the logged lines which + /// 1. belongs to the specified module and + /// 2. contains `line` in it. + /// And asserts if the number of occurrences is the same with the given `count` + pub fn assert_log_contains(&self, module: &str, line: &str, count: usize) { + let log_entries = self.lines.lock().unwrap(); + let l: usize = log_entries.iter().filter(|&(&(ref m, ref l), _c)| { + m == module && l.contains(line) + }).map(|(_, c)| { c }).sum(); + assert_eq!(l, count) + } + } + + impl Logger for TestLogger { + fn log(&self, record: &Record) { + *self.lines.lock().unwrap().entry((record.module_path.to_string(), format!("{}", record.args))).or_insert(0) += 1; + if record.level >= self.level { + // #[cfg(feature = "std")] + println!("{:<5} {} [{} : {}, {}] {}", record.level.to_string(), self.id, record.module_path, record.file, record.line, record.args); + } + } + } +} From afbc1851b55433b4ca80deb27f1d62d271ca140e Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 15 Aug 2023 14:38:46 -0700 Subject: [PATCH 03/15] Use LDK version with public excess_data. --- Cargo.toml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 99aeed72..2c511927 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,16 @@ edition = "2021" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.116" } -lightning-block-sync = { version = "0.0.116", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.116" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48", features = ["rest-client"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } tokio = { version = "1.25", features = ["full"] } -tokio-postgres = { version="=0.7.5" } +tokio-postgres = { version = "=0.7.5" } futures = "0.3" +[dev-dependencies] +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } + [profile.dev] panic = "abort" From d796d4cbede0ed99cd4d6c17077d22e8d3eed1f5 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 15 Aug 2023 14:39:12 -0700 Subject: [PATCH 04/15] Create first unit test. --- src/lib.rs | 3 + src/tests/mod.rs | 172 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 src/tests/mod.rs diff --git a/src/lib.rs b/src/lib.rs index f56aca26..8504ca8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,9 @@ mod verifier; pub mod types; +#[cfg(test)] +mod tests; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 00000000..f761aa87 --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,172 @@ +//! Multi-module tests that use database fixtures + +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use bitcoin::{BlockHash, Network}; +use bitcoin::secp256k1::ecdsa::Signature; +use bitcoin::secp256k1::{Secp256k1, SecretKey}; +use bitcoin::hashes::Hash; +use bitcoin::hashes::sha256d::Hash as Sha256dHash; +use lightning::ln::features::ChannelFeatures; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::ser::Writeable; +use lightning_rapid_gossip_sync::RapidGossipSync; +use tokio_postgres::NoTls; +use crate::{config, serialize_delta}; +use crate::persistence::GossipPersister; +use crate::types::{GossipMessage, tests::TestLogger}; + +const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week + +fn blank_signature() -> Signature { + Signature::from_compact(&[0u8; 64]).unwrap() +} + +fn genesis_hash() -> BlockHash { + bitcoin::blockdata::constants::genesis_block(Network::Bitcoin).block_hash() +} + +fn current_time() -> u32 { + SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32 +} + +fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement { + let secp_context = Secp256k1::new(); + + let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap(); + let random_public_key_1 = random_private_key_1.public_key(&secp_context); + let node_id_1 = NodeId::from_pubkey(&random_public_key_1); + + let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap(); + let random_public_key_2 = random_private_key_2.public_key(&secp_context); + let node_id_2 = NodeId::from_pubkey(&random_public_key_2); + + let announcement = UnsignedChannelAnnouncement { + features: ChannelFeatures::empty(), + chain_hash: genesis_hash(), + short_channel_id, + node_id_1, + node_id_2, + bitcoin_key_1: node_id_1, + bitcoin_key_2: node_id_2, + excess_data: vec![], + }; + + let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap(); + let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1); + let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2); + + ChannelAnnouncement { + node_signature_1, + node_signature_2, + bitcoin_signature_1: node_signature_1, + bitcoin_signature_2: node_signature_2, + contents: announcement, + } +} + +fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate { + let flag_mask = if direction { 1 } else { 0 }; + ChannelUpdate { + signature: blank_signature(), + contents: UnsignedChannelUpdate { + chain_hash: genesis_hash(), + short_channel_id: scid, + timestamp, + flags: 0 | flag_mask, + cltv_expiry_delta: expiry_delta, + htlc_minimum_msat: min_msat, + htlc_maximum_msat: max_msat, + fee_base_msat: base_msat, + fee_proportional_millionths: fee_rate, + excess_data: vec![], + }, + } +} + +async fn clean_test_db() { + let connection_config = config::db_connection_config(); + let (client, connection) = connection_config.connect(NoTls).await.unwrap(); + + tokio::spawn(async move { + if let Err(e) = connection.await { + panic!("connection error: {}", e); + } + }); + + client.query("TRUNCATE TABLE channel_announcements RESTART IDENTITY CASCADE", &[]).await.unwrap(); + client.query("TRUNCATE TABLE channel_updates RESTART IDENTITY CASCADE", &[]).await.unwrap(); + client.query("TRUNCATE TABLE config RESTART IDENTITY CASCADE", &[]).await.unwrap(); +} + +#[tokio::test] +async fn test_trivial_setup() { + // start off with a clean slate + clean_test_db().await; + + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + let short_channel_id = 1; + let timestamp = current_time() - 10; + println!("timestamp: {}", timestamp); + + { // seed the db + let announcement = generate_announcement(short_channel_id); + let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0); + let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + drop(receiver); + persister.persist_gossip().await; + } + + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + clean_test_db().await; + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + println!("update result: {}", update_result); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::SNAPSHOT_CALCULATION_INTERVAL, 0); + assert!(update_result < timestamp); + + let timestamp_delta = timestamp - update_result; + println!("timestamp delta: {}", timestamp_delta); + assert!(timestamp_delta < config::SNAPSHOT_CALCULATION_INTERVAL); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10); + let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update; + let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update; + println!("last update a: {}", last_update_seen_a); + println!("last update b: {}", last_update_seen_b); + assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL); + assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL); +} From 0ffee72cf1b135cafe4edfd60e63f75c720b54ed Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 16 Aug 2023 11:07:15 -0700 Subject: [PATCH 05/15] Use timestamp value for seen in tests. --- src/persistence.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/persistence.rs b/src/persistence.rs index 8bb7b188..a0fa7773 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -146,10 +146,11 @@ impl GossipPersister where L::Target: Logger { let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_updates (\ + let insertion_statement = if cfg!(test) { + "INSERT INTO channel_updates (\ short_channel_id, \ timestamp, \ + seen, \ channel_flags, \ direction, \ disable, \ @@ -159,9 +160,33 @@ impl GossipPersister where L::Target: Logger { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" + } else { + "INSERT INTO channel_updates (\ + short_channel_id, \ + timestamp, \ + channel_flags, \ + direction, \ + disable, \ + cltv_expiry_delta, \ + htlc_minimum_msat, \ + fee_base_msat, \ + fee_proportional_millionths, \ + htlc_maximum_msat, \ + blob_signed \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING" + }; + + // this may not be used outside test cfg + #[cfg(test)] + let system_time: std::time::SystemTime = std::time::UNIX_EPOCH + Duration::from_secs(timestamp as u64); + + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute(insertion_statement, &[ &scid, ×tamp, + #[cfg(test)] + &system_time, &(update.contents.flags as i16), &direction, &disable, From d926de52cb0d464a725dab6d2d55b30bf9d7b318 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 16 Aug 2023 14:46:40 -0700 Subject: [PATCH 06/15] Always use chain hash based on config. --- src/serialization.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/serialization.rs b/src/serialization.rs index 93bd381d..b1f23d5e 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -3,11 +3,11 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::BlockHash; -use bitcoin::hashes::Hash; use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use lightning::util::ser::{BigSize, Writeable}; use crate::config; +use crate::config; use crate::lookup::{DeltaSet, DirectedUpdateDelta}; pub(super) struct SerializationSet { @@ -105,16 +105,15 @@ struct FullUpdateValueHistograms { } pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet { + let chain_hash = bitcoin::blockdata::constants::genesis_block(config::network()).block_hash(); let mut serialization_set = SerializationSet { announcements: vec![], updates: vec![], full_update_defaults: Default::default(), - chain_hash: BlockHash::all_zeros(), + chain_hash, latest_seen: 0, }; - let mut chain_hash_set = false; - let mut full_update_histograms = FullUpdateValueHistograms { cltv_expiry_delta: Default::default(), htlc_minimum_msat: Default::default(), @@ -138,11 +137,6 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) // any announcement chain hash is gonna be the same value. Just set it from the first one. let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap(); - if !chain_hash_set { - chain_hash_set = true; - serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone(); - } - let current_announcement_seen = channel_announcement_delta.seen; let is_new_announcement = current_announcement_seen >= last_sync_timestamp; let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen { From 430845bd9d202b34fef6f5ac137d2789130b7330 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 16 Aug 2023 14:47:03 -0700 Subject: [PATCH 07/15] Log all persistence messages in test. --- src/persistence.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/persistence.rs b/src/persistence.rs index a0fa7773..e354054f 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -99,7 +99,11 @@ impl GossipPersister where L::Target: Logger { while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages - if latest_persistence_log.elapsed().as_secs() >= 60 { + let _should_log_message = latest_persistence_log.elapsed().as_secs() >= 60; + #[cfg(test)] + let _should_log_message = true; // we log every persistence message in test + + if _should_log_message { log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } From cca1cc891c0f601ec6f7c9d387893b139c34690f Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 16 Aug 2023 20:25:39 -0700 Subject: [PATCH 08/15] Change test-aware persistence modification to use f64 timestamps. --- src/persistence.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/persistence.rs b/src/persistence.rs index e354054f..5f713eb2 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -164,7 +164,7 @@ impl GossipPersister where L::Target: Logger { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" + ) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" } else { "INSERT INTO channel_updates (\ short_channel_id, \ @@ -183,7 +183,7 @@ impl GossipPersister where L::Target: Logger { // this may not be used outside test cfg #[cfg(test)] - let system_time: std::time::SystemTime = std::time::UNIX_EPOCH + Duration::from_secs(timestamp as u64); + let system_time = timestamp as f64; tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute(insertion_statement, &[ From b5f26bc170b3d3554e8039a660fa5a1e6aaede25 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 16 Aug 2023 21:52:05 -0700 Subject: [PATCH 09/15] Log per-channel lookup information. --- src/lookup.rs | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 8def6f13..0872c39a 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -82,8 +82,9 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64) .collect::>() }; - #[cfg(test)] - log_info!(logger, "Channel IDs: {:?}", channel_ids); + if cfg!(test) { + log_info!(logger, "Channel IDs: {:?}", channel_ids); + } log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp); let last_sync_timestamp_float = last_sync_timestamp as f64; @@ -112,6 +113,22 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS } log_info!(logger, "Fetched {} announcement rows", announcement_count); + if cfg!(test) { + // THIS STEP IS USED TO DEBUG THE NUMBER OF ROWS WHEN TESTING + let update_rows = client.query("SELECT * FROM channel_updates", &[]).await.unwrap(); + log_info!(logger, "Fetched {} update rows", update_rows.len()); + + let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1)", &[&channel_ids]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered channels", update_rows.len()); + + // let timestamp_string = format!("{}", last_sync_timestamp); + let update_rows = client.query("SELECT * FROM channel_updates WHERE seen >= TO_TIMESTAMP($1)", &[&last_sync_timestamp_float]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered seen", update_rows.len()); + + let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)", &[&channel_ids, &last_sync_timestamp_float]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered channels and seen", update_rows.len()); + } + { // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA @@ -147,6 +164,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32; + log_trace!(logger, "Channel {} first update to complete bidirectional data seen at: {}", scid, current_seen_timestamp); + // the newer of the two oldest seen directional updates came after last sync timestamp let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); // first time a channel was seen in both directions @@ -188,6 +207,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let current_row = row_res.unwrap(); let scid: i64 = current_row.get("short_channel_id"); + log_trace!(logger, "Channel {} with newest update in less recently updated direction being at least 6 days ago", scid); + // annotate this channel as requiring that reminders be sent to the client let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); @@ -217,7 +238,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS } older_latest_directional_update_count += 1; } - log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction being more than six days ago", older_latest_directional_update_count); } } @@ -334,12 +355,14 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); + log_trace!(logger, "Channel {} latest update in direction 0: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } else if direction && !previously_seen_directions.1 { previously_seen_directions.1 = true; update_delta.latest_update_after_seen = Some(UpdateDelta { seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); + log_trace!(logger, "Channel {} latest update in direction 1: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } } From 16bdfa56f70fe6afae9d267ff39193469bfcd144 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 17 Aug 2023 14:41:44 -0700 Subject: [PATCH 10/15] Create test ensuring that a previously unannounced channel would not send incremental updates. --- src/tests/mod.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/src/tests/mod.rs b/src/tests/mod.rs index f761aa87..b9d415d5 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,3 +1,5 @@ +#![allow(unused_variables)] +#![allow(unused_imports)] //! Multi-module tests that use database fixtures use std::sync::Arc; @@ -170,3 +172,85 @@ async fn test_trivial_setup() { assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL); assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL); } + +/// If a channel has only seen updates in one direction, it should not be announced +#[tokio::test] +async fn test_unidirectional_intermediate_update_consideration() { + // start off with a clean slate + clean_test_db().await; + + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + let short_channel_id = 1; + let current_timestamp = current_time() - 10; + + { // seed the db + let announcement = generate_announcement(short_channel_id); + let update_1 = generate_update(short_channel_id, false, current_timestamp - 4, 0, 0, 0, 5, 0); + let update_2 = generate_update(short_channel_id, false, current_timestamp - 3, 0, 0, 0, 4, 0); + let update_3 = generate_update(short_channel_id, false, current_timestamp - 2, 0, 0, 0, 3, 0); + let update_4 = generate_update(short_channel_id, true, current_timestamp, 0, 0, 0, 5, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_3)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_4)).await.unwrap(); + drop(receiver); + persister.persist_gossip().await; + } + + let channel_count = network_graph_arc.read_only().channels().len(); + assert_eq!(channel_count, 1); + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + + let last_sync_timestamp = current_timestamp - 3; + let serialization = serialize_delta(network_graph_arc.clone(), last_sync_timestamp, logger.clone()).await; + println!( + "serialization data: \nmessages: {}\n\tannouncements: {}\n\tupdates: {}\n\t\tfull: {}\n\t\tincremental: {}", + serialization.message_count, + serialization.announcement_count, + serialization.update_count, + serialization.update_count_full, + serialization.update_count_incremental + ); + + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel IDs: [{}]", short_channel_id), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 announcement rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} last update before seen: 1/false/{}", short_channel_id, current_timestamp - 4), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (3)", 1); + println!("current_timestamp: {}", current_timestamp); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} latest update in direction 0: {}", short_channel_id, current_timestamp - 2), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} latest update in direction 1: {}", short_channel_id, current_timestamp), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} first update to complete bidirectional data seen at: {}", short_channel_id, current_timestamp), 1); + + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + assert_eq!(serialization.update_count_full, 2); + assert_eq!(serialization.update_count_incremental, 0); + + let next_timestamp = rgs.update_network_graph(&serialization.data).unwrap(); + println!("last sync timestamp: {}", last_sync_timestamp); + println!("next timestamp: {}", next_timestamp); + // the update result must be a multiple of our snapshot granularity + assert_eq!(next_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL, 0); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); +} From 91ab27c0a156528b566cc20ba106921d06b28d0d Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 17 Aug 2023 16:24:58 -0700 Subject: [PATCH 11/15] Add second value to announcement tuple to allow overriding the Postgres-default seen timestamp. --- src/downloader.rs | 2 +- src/persistence.rs | 25 +++++++++++++++++++------ src/tests/mod.rs | 2 +- src/types.rs | 5 ++++- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index 8d5cdb00..f5dd17f5 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -62,7 +62,7 @@ impl GossipRouter where L::Target: Logger { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { diff --git a/src/persistence.rs b/src/persistence.rs index 5f713eb2..ec1b922a 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -115,21 +115,34 @@ impl GossipPersister where L::Target: Logger { } match &gossip_message { - GossipMessage::ChannelAnnouncement(announcement) => { + GossipMessage::ChannelAnnouncement(announcement, timestamp) => { let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori let mut announcement_signed = Vec::new(); announcement.write(&mut announcement_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_announcements (\ + if let Some(timestamp) = timestamp { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ + short_channel_id, \ + announcement_signed, \ + seen \ + ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, + &announcement_signed, + &(*timestamp as f64) + ])).await.unwrap().unwrap(); + } else { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ short_channel_id, \ announcement_signed \ ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid, - &announcement_signed - ])).await.unwrap().unwrap(); + &scid, + &announcement_signed + ])).await.unwrap().unwrap(); + } } GossipMessage::ChannelUpdate(update) => { let scid = update.contents.short_channel_id as i64; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index b9d415d5..51fdf9c9 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -125,7 +125,7 @@ async fn test_trivial_setup() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); drop(receiver); diff --git a/src/types.rs b/src/types.rs index 1a4d677d..5e37bfcb 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,7 +14,10 @@ pub(crate) type GossipPeerManager = Arc // optionally include a persistence timestamp + ), ChannelUpdate(ChannelUpdate), } From 8091162a54eabbf06e185abf2c0c822565d63bd7 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 17 Aug 2023 16:27:11 -0700 Subject: [PATCH 12/15] Introduce additional logging during serialization. --- src/lib.rs | 2 +- src/serialization.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8504ca8c..f240dde4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -198,7 +198,7 @@ async fn serialize_delta(network_graph: Arc>, log_info!(logger, "update-fetched channel count: {}", delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); - let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); + let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp, logger.clone()); // process announcements // write the number of channel announcements to the output diff --git a/src/serialization.rs b/src/serialization.rs index b1f23d5e..3d672fef 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -1,9 +1,12 @@ use std::cmp::max; use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; +use std::ops::Deref; use bitcoin::BlockHash; use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::log_trace; +use lightning::util::logger::Logger; use lightning::util::ser::{BigSize, Writeable}; use crate::config; @@ -104,7 +107,7 @@ struct FullUpdateValueHistograms { htlc_maximum_msat: HashMap, } -pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet { +pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32, logger: L) -> SerializationSet where L::Target: Logger { let chain_hash = bitcoin::blockdata::constants::genesis_block(config::network()).block_hash(); let mut serialization_set = SerializationSet { announcements: vec![], @@ -140,10 +143,14 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) let current_announcement_seen = channel_announcement_delta.seen; let is_new_announcement = current_announcement_seen >= last_sync_timestamp; let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen { + #[cfg(test)] + log_trace!(logger, "Channel {} first bidirectional update seen: {}", scid, first_update_seen); first_update_seen >= last_sync_timestamp } else { false }; + #[cfg(test)] + log_trace!(logger, "Channel {} announcement seen at {} (new: {}, newly included: {})", scid, current_announcement_seen, is_new_announcement, is_newly_included_announcement); let send_announcement = is_new_announcement || is_newly_included_announcement; if send_announcement { serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen); From 337355995386e21080cc7092c8902fb446c99dc2 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 17 Aug 2023 16:27:38 -0700 Subject: [PATCH 13/15] Create debug formatter for `SerializedResponse`. --- src/lib.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index f240dde4..7c3cbef3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,8 @@ extern crate core; use std::collections::{HashMap, HashSet}; +#[cfg(test)] +use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::ops::Deref; @@ -118,6 +120,21 @@ impl RapidSyncProcessor where L::Ta } } +#[cfg(test)] +impl Debug for SerializedResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "\nmessages: {}\n\tannouncements: {}\n\tupdates: {}\n\t\tfull: {}\n\t\tincremental: {}", + self.message_count, + self.announcement_count, + self.update_count, + self.update_count_full, + self.update_count_incremental + ) + } +} + pub(crate) async fn connect_to_db() -> Client { let connection_config = config::db_connection_config(); let (client, connection) = connection_config.connect(NoTls).await.unwrap(); From 9c48bce1afbecbaae3212fec2b63bab7e0f4e5e1 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 17 Aug 2023 16:28:27 -0700 Subject: [PATCH 14/15] Disambiguate phrasing pertaining to first bidirectional channel updates. --- src/lookup.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 0872c39a..f03495f1 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -40,6 +40,8 @@ pub(super) struct DirectedUpdateDelta { pub(super) struct ChannelDelta { pub(super) announcement: Option, pub(super) updates: (Option, Option), + /// This value is only set if the first update to achieve bidirectionality was seen after + /// the last sync. pub(super) first_bidirectional_updates_seen: Option, /// The seen timestamp of the older of the two latest directional updates pub(super) requires_reminder: bool, @@ -164,7 +166,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32; - log_trace!(logger, "Channel {} first update to complete bidirectional data seen at: {}", scid, current_seen_timestamp); + log_trace!(logger, "Channel {} with first update to complete bidirectional data since last sync seen at: {}", scid, current_seen_timestamp); // the newer of the two oldest seen directional updates came after last sync timestamp let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); @@ -173,7 +175,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS newer_oldest_directional_update_count += 1; } - log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the first update in a new direction having occurred since the last sync", newer_oldest_directional_update_count); } { @@ -388,7 +390,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl } } } - log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Processed {} intermediate rows (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } pub(super) fn filter_delta_set(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger { From 67b6d4f7c5483854fdb7c3b7fde2074f9cc85a06 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 6 Sep 2023 22:37:46 -0700 Subject: [PATCH 15/15] Use separate schemas for each unit test. --- src/lib.rs | 11 +++++++-- src/lookup.rs | 13 +++++------ src/serialization.rs | 9 +++----- src/tests/mod.rs | 55 +++++++++++++++++++++++++++++--------------- 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7c3cbef3..44250ad7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; pub struct RapidSyncProcessor where L::Target: Logger { network_graph: Arc>, - logger: L + logger: L, } pub struct SerializedResponse { @@ -86,7 +86,7 @@ impl RapidSyncProcessor where L::Ta let arc_network_graph = Arc::new(network_graph); Self { network_graph: arc_network_graph, - logger + logger, } } @@ -145,6 +145,13 @@ pub(crate) async fn connect_to_db() -> Client { } }); + if cfg!(test) { + let schema_name = tests::db_test_schema(); + let schema_creation_command = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name); + client.execute(&schema_creation_command, &[]).await.unwrap(); + client.execute(&format!("SET search_path TO {}", schema_name), &[]).await.unwrap(); + } + client.execute("set time zone UTC", &[]).await.unwrap(); client } diff --git a/src/lookup.rs b/src/lookup.rs index f03495f1..f0f6de8e 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -84,9 +84,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64) .collect::>() }; - if cfg!(test) { - log_info!(logger, "Channel IDs: {:?}", channel_ids); - } + #[cfg(test)] + log_info!(logger, "Channel IDs: {:?}", channel_ids); log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp); let last_sync_timestamp_float = last_sync_timestamp as f64; @@ -166,7 +165,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32; - log_trace!(logger, "Channel {} with first update to complete bidirectional data since last sync seen at: {}", scid, current_seen_timestamp); + log_gossip!(logger, "Channel {} with first update to complete bidirectional data since last sync seen at: {}", scid, current_seen_timestamp); // the newer of the two oldest seen directional updates came after last sync timestamp let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); @@ -209,7 +208,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let current_row = row_res.unwrap(); let scid: i64 = current_row.get("short_channel_id"); - log_trace!(logger, "Channel {} with newest update in less recently updated direction being at least 6 days ago", scid); + log_gossip!(logger, "Channel {} with newest update in less recently updated direction being at least 6 days ago", scid); // annotate this channel as requiring that reminders be sent to the client let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); @@ -357,14 +356,14 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); - log_trace!(logger, "Channel {} latest update in direction 0: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) + log_gossip!(logger, "Channel {} latest update in direction 0: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } else if direction && !previously_seen_directions.1 { previously_seen_directions.1 = true; update_delta.latest_update_after_seen = Some(UpdateDelta { seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); - log_trace!(logger, "Channel {} latest update in direction 1: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) + log_gossip!(logger, "Channel {} latest update in direction 1: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } } diff --git a/src/serialization.rs b/src/serialization.rs index 3d672fef..000ee4f4 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -5,12 +5,11 @@ use std::ops::Deref; use bitcoin::BlockHash; use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; -use lightning::log_trace; +use lightning::log_gossip; use lightning::util::logger::Logger; use lightning::util::ser::{BigSize, Writeable}; use crate::config; -use crate::config; use crate::lookup::{DeltaSet, DirectedUpdateDelta}; pub(super) struct SerializationSet { @@ -143,14 +142,12 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_times let current_announcement_seen = channel_announcement_delta.seen; let is_new_announcement = current_announcement_seen >= last_sync_timestamp; let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen { - #[cfg(test)] - log_trace!(logger, "Channel {} first bidirectional update seen: {}", scid, first_update_seen); + log_gossip!(logger, "Channel {} first bidirectional update seen: {}", scid, first_update_seen); first_update_seen >= last_sync_timestamp } else { false }; - #[cfg(test)] - log_trace!(logger, "Channel {} announcement seen at {} (new: {}, newly included: {})", scid, current_announcement_seen, is_new_announcement, is_newly_included_announcement); + log_gossip!(logger, "Channel {} announcement seen at {} (new: {}, newly included: {})", scid, current_announcement_seen, is_new_announcement, is_newly_included_announcement); let send_announcement = is_new_announcement || is_newly_included_announcement; if send_announcement { serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 51fdf9c9..edf82e1a 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -2,12 +2,14 @@ #![allow(unused_imports)] //! Multi-module tests that use database fixtures +use std::cell::{RefCell, RefMut}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::{BlockHash, Network}; use bitcoin::secp256k1::ecdsa::Signature; use bitcoin::secp256k1::{Secp256k1, SecretKey}; use bitcoin::hashes::Hash; +use bitcoin::hashes::hex::ToHex; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use lightning::ln::features::ChannelFeatures; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; @@ -21,6 +23,10 @@ use crate::types::{GossipMessage, tests::TestLogger}; const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week +thread_local! { + static DB_TEST_SCHEMA:RefCell> = RefCell::new(None); +} + fn blank_signature() -> Signature { Signature::from_compact(&[0u8; 64]).unwrap() } @@ -33,6 +39,29 @@ fn current_time() -> u32 { SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32 } +pub(crate) fn db_test_schema() -> String { + DB_TEST_SCHEMA.with(|suffix_reference| { + let mut suffix_option = suffix_reference.borrow_mut(); + match suffix_option.as_ref() { + None => { + let current_time = SystemTime::now(); + let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let timestamp_seconds = unix_time.as_secs(); + let timestamp_nanos = unix_time.as_nanos(); + let preimage = format!("{}", timestamp_nanos); + let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex(); + // the schema must start with a letter + let schema = format!("test_{}_{}", timestamp_seconds, suffix); + suffix_option.replace(schema.clone()); + schema + } + Some(suffix) => { + suffix.clone() + } + } + }) +} + fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement { let secp_context = Secp256k1::new(); @@ -88,25 +117,13 @@ fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16 } async fn clean_test_db() { - let connection_config = config::db_connection_config(); - let (client, connection) = connection_config.connect(NoTls).await.unwrap(); - - tokio::spawn(async move { - if let Err(e) = connection.await { - panic!("connection error: {}", e); - } - }); - - client.query("TRUNCATE TABLE channel_announcements RESTART IDENTITY CASCADE", &[]).await.unwrap(); - client.query("TRUNCATE TABLE channel_updates RESTART IDENTITY CASCADE", &[]).await.unwrap(); - client.query("TRUNCATE TABLE config RESTART IDENTITY CASCADE", &[]).await.unwrap(); + let client = crate::connect_to_db().await; + let schema = db_test_schema(); + client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap(); } #[tokio::test] async fn test_trivial_setup() { - // start off with a clean slate - clean_test_db().await; - let logger = Arc::new(TestLogger::new()); let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); let network_graph_arc = Arc::new(network_graph); @@ -149,12 +166,12 @@ async fn test_trivial_setup() { let update_result = rgs.update_network_graph(&serialization.data).unwrap(); println!("update result: {}", update_result); // the update result must be a multiple of our snapshot granularity - assert_eq!(update_result % config::SNAPSHOT_CALCULATION_INTERVAL, 0); + assert_eq!(update_result % config::snapshot_generation_interval(), 0); assert!(update_result < timestamp); let timestamp_delta = timestamp - update_result; println!("timestamp delta: {}", timestamp_delta); - assert!(timestamp_delta < config::SNAPSHOT_CALCULATION_INTERVAL); + assert!(timestamp_delta < config::snapshot_generation_interval()); let readonly_graph = client_graph_arc.read_only(); let channels = readonly_graph.channels(); @@ -200,7 +217,7 @@ async fn test_unidirectional_intermediate_update_consideration() { network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(current_timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_3)).await.unwrap(); @@ -247,7 +264,7 @@ async fn test_unidirectional_intermediate_update_consideration() { println!("last sync timestamp: {}", last_sync_timestamp); println!("next timestamp: {}", next_timestamp); // the update result must be a multiple of our snapshot granularity - assert_eq!(next_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL, 0); + assert_eq!(next_timestamp % config::snapshot_generation_interval(), 0); let readonly_graph = client_graph_arc.read_only(); let channels = readonly_graph.channels();