diff --git a/Cargo.toml b/Cargo.toml index 99aeed7..2c51192 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,16 @@ edition = "2021" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.116" } -lightning-block-sync = { version = "0.0.116", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.116" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48", features = ["rest-client"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } tokio = { version = "1.25", features = ["full"] } -tokio-postgres = { version="=0.7.5" } +tokio-postgres = { version = "=0.7.5" } futures = "0.3" +[dev-dependencies] +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" } + [profile.dev] panic = "abort" diff --git a/src/config.rs b/src/config.rs index 1de2482..0655aa7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -74,13 +74,19 @@ pub(crate) fn cache_path() -> String { pub(crate) fn db_connection_config() -> Config { let mut config = Config::new(); - let host = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_HOST").unwrap_or("localhost".to_string()); - let user = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_USER").unwrap_or("alice".to_string()); - let db = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_NAME").unwrap_or("ln_graph_sync".to_string()); + let env_name_prefix = if cfg!(test) { + "RAPID_GOSSIP_TEST_DB" + } else { + "RAPID_GOSSIP_SYNC_SERVER_DB" + }; + + let host = env::var(format!("{}{}", env_name_prefix, "_HOST")).unwrap_or("localhost".to_string()); + let user = env::var(format!("{}{}", env_name_prefix, "_USER")).unwrap_or("alice".to_string()); + let db = env::var(format!("{}{}", env_name_prefix, "_NAME")).unwrap_or("ln_graph_sync".to_string()); config.host(&host); config.user(&user); config.dbname(&db); - if let Ok(password) = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD") { + if let Ok(password) = env::var(format!("{}{}", env_name_prefix, "_PASSWORD")) { config.password(&password); } config diff --git a/src/downloader.rs b/src/downloader.rs index 8d5cdb0..f5dd17f 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -62,7 +62,7 @@ impl GossipRouter where L::Target: Logger { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { diff --git a/src/lib.rs b/src/lib.rs index f56aca2..44250ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,8 @@ extern crate core; use std::collections::{HashMap, HashSet}; +#[cfg(test)] +use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::ops::Deref; @@ -41,6 +43,9 @@ mod verifier; pub mod types; +#[cfg(test)] +mod tests; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// @@ -49,7 +54,7 @@ const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; pub struct RapidSyncProcessor where L::Target: Logger { network_graph: Arc>, - logger: L + logger: L, } pub struct SerializedResponse { @@ -81,7 +86,7 @@ impl RapidSyncProcessor where L::Ta let arc_network_graph = Arc::new(network_graph); Self { network_graph: arc_network_graph, - logger + logger, } } @@ -115,6 +120,21 @@ impl RapidSyncProcessor where L::Ta } } +#[cfg(test)] +impl Debug for SerializedResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "\nmessages: {}\n\tannouncements: {}\n\tupdates: {}\n\t\tfull: {}\n\t\tincremental: {}", + self.message_count, + self.announcement_count, + self.update_count, + self.update_count_full, + self.update_count_incremental + ) + } +} + pub(crate) async fn connect_to_db() -> Client { let connection_config = config::db_connection_config(); let (client, connection) = connection_config.connect(NoTls).await.unwrap(); @@ -125,6 +145,13 @@ pub(crate) async fn connect_to_db() -> Client { } }); + if cfg!(test) { + let schema_name = tests::db_test_schema(); + let schema_creation_command = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name); + client.execute(&schema_creation_command, &[]).await.unwrap(); + client.execute(&format!("SET search_path TO {}", schema_name), &[]).await.unwrap(); + } + client.execute("set time zone UTC", &[]).await.unwrap(); client } @@ -195,7 +222,7 @@ async fn serialize_delta(network_graph: Arc>, log_info!(logger, "update-fetched channel count: {}", delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); - let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); + let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp, logger.clone()); // process announcements // write the number of channel announcements to the output diff --git a/src/lookup.rs b/src/lookup.rs index 8def6f1..f0f6de8 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -40,6 +40,8 @@ pub(super) struct DirectedUpdateDelta { pub(super) struct ChannelDelta { pub(super) announcement: Option, pub(super) updates: (Option, Option), + /// This value is only set if the first update to achieve bidirectionality was seen after + /// the last sync. pub(super) first_bidirectional_updates_seen: Option, /// The seen timestamp of the older of the two latest directional updates pub(super) requires_reminder: bool, @@ -112,6 +114,22 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS } log_info!(logger, "Fetched {} announcement rows", announcement_count); + if cfg!(test) { + // THIS STEP IS USED TO DEBUG THE NUMBER OF ROWS WHEN TESTING + let update_rows = client.query("SELECT * FROM channel_updates", &[]).await.unwrap(); + log_info!(logger, "Fetched {} update rows", update_rows.len()); + + let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1)", &[&channel_ids]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered channels", update_rows.len()); + + // let timestamp_string = format!("{}", last_sync_timestamp); + let update_rows = client.query("SELECT * FROM channel_updates WHERE seen >= TO_TIMESTAMP($1)", &[&last_sync_timestamp_float]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered seen", update_rows.len()); + + let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)", &[&channel_ids, &last_sync_timestamp_float]).await.unwrap(); + log_info!(logger, "Fetched {} update rows with filtered channels and seen", update_rows.len()); + } + { // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA @@ -147,6 +165,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32; + log_gossip!(logger, "Channel {} with first update to complete bidirectional data since last sync seen at: {}", scid, current_seen_timestamp); + // the newer of the two oldest seen directional updates came after last sync timestamp let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); // first time a channel was seen in both directions @@ -154,7 +174,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS newer_oldest_directional_update_count += 1; } - log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the first update in a new direction having occurred since the last sync", newer_oldest_directional_update_count); } { @@ -188,6 +208,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let current_row = row_res.unwrap(); let scid: i64 = current_row.get("short_channel_id"); + log_gossip!(logger, "Channel {} with newest update in less recently updated direction being at least 6 days ago", scid); + // annotate this channel as requiring that reminders be sent to the client let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); @@ -217,7 +239,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS } older_latest_directional_update_count += 1; } - log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction being more than six days ago", older_latest_directional_update_count); } } @@ -334,12 +356,14 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); + log_gossip!(logger, "Channel {} latest update in direction 0: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } else if direction && !previously_seen_directions.1 { previously_seen_directions.1 = true; update_delta.latest_update_after_seen = Some(UpdateDelta { seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); + log_gossip!(logger, "Channel {} latest update in direction 1: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp) } } @@ -365,7 +389,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl } } } - log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Processed {} intermediate rows (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } pub(super) fn filter_delta_set(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger { diff --git a/src/persistence.rs b/src/persistence.rs index 8bb7b18..ec1b922 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -99,7 +99,11 @@ impl GossipPersister where L::Target: Logger { while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages - if latest_persistence_log.elapsed().as_secs() >= 60 { + let _should_log_message = latest_persistence_log.elapsed().as_secs() >= 60; + #[cfg(test)] + let _should_log_message = true; // we log every persistence message in test + + if _should_log_message { log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } @@ -111,21 +115,34 @@ impl GossipPersister where L::Target: Logger { } match &gossip_message { - GossipMessage::ChannelAnnouncement(announcement) => { + GossipMessage::ChannelAnnouncement(announcement, timestamp) => { let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori let mut announcement_signed = Vec::new(); announcement.write(&mut announcement_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_announcements (\ + if let Some(timestamp) = timestamp { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ + short_channel_id, \ + announcement_signed, \ + seen \ + ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, + &announcement_signed, + &(*timestamp as f64) + ])).await.unwrap().unwrap(); + } else { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ short_channel_id, \ announcement_signed \ ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid, - &announcement_signed - ])).await.unwrap().unwrap(); + &scid, + &announcement_signed + ])).await.unwrap().unwrap(); + } } GossipMessage::ChannelUpdate(update) => { let scid = update.contents.short_channel_id as i64; @@ -146,8 +163,23 @@ impl GossipPersister where L::Target: Logger { let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_updates (\ + let insertion_statement = if cfg!(test) { + "INSERT INTO channel_updates (\ + short_channel_id, \ + timestamp, \ + seen, \ + channel_flags, \ + direction, \ + disable, \ + cltv_expiry_delta, \ + htlc_minimum_msat, \ + fee_base_msat, \ + fee_proportional_millionths, \ + htlc_maximum_msat, \ + blob_signed \ + ) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" + } else { + "INSERT INTO channel_updates (\ short_channel_id, \ timestamp, \ channel_flags, \ @@ -159,9 +191,19 @@ impl GossipPersister where L::Target: Logger { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING" + }; + + // this may not be used outside test cfg + #[cfg(test)] + let system_time = timestamp as f64; + + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute(insertion_statement, &[ &scid, ×tamp, + #[cfg(test)] + &system_time, &(update.contents.flags as i16), &direction, &disable, diff --git a/src/serialization.rs b/src/serialization.rs index 93bd381..000ee4f 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -1,10 +1,12 @@ use std::cmp::max; use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; +use std::ops::Deref; use bitcoin::BlockHash; -use bitcoin::hashes::Hash; use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::log_gossip; +use lightning::util::logger::Logger; use lightning::util::ser::{BigSize, Writeable}; use crate::config; @@ -104,17 +106,16 @@ struct FullUpdateValueHistograms { htlc_maximum_msat: HashMap, } -pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet { +pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32, logger: L) -> SerializationSet where L::Target: Logger { + let chain_hash = bitcoin::blockdata::constants::genesis_block(config::network()).block_hash(); let mut serialization_set = SerializationSet { announcements: vec![], updates: vec![], full_update_defaults: Default::default(), - chain_hash: BlockHash::all_zeros(), + chain_hash, latest_seen: 0, }; - let mut chain_hash_set = false; - let mut full_update_histograms = FullUpdateValueHistograms { cltv_expiry_delta: Default::default(), htlc_minimum_msat: Default::default(), @@ -138,18 +139,15 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) // any announcement chain hash is gonna be the same value. Just set it from the first one. let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap(); - if !chain_hash_set { - chain_hash_set = true; - serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone(); - } - let current_announcement_seen = channel_announcement_delta.seen; let is_new_announcement = current_announcement_seen >= last_sync_timestamp; let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen { + log_gossip!(logger, "Channel {} first bidirectional update seen: {}", scid, first_update_seen); first_update_seen >= last_sync_timestamp } else { false }; + log_gossip!(logger, "Channel {} announcement seen at {} (new: {}, newly included: {})", scid, current_announcement_seen, is_new_announcement, is_newly_included_announcement); let send_announcement = is_new_announcement || is_newly_included_announcement; if send_announcement { serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen); diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 0000000..edf82e1 --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,273 @@ +#![allow(unused_variables)] +#![allow(unused_imports)] +//! Multi-module tests that use database fixtures + +use std::cell::{RefCell, RefMut}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use bitcoin::{BlockHash, Network}; +use bitcoin::secp256k1::ecdsa::Signature; +use bitcoin::secp256k1::{Secp256k1, SecretKey}; +use bitcoin::hashes::Hash; +use bitcoin::hashes::hex::ToHex; +use bitcoin::hashes::sha256d::Hash as Sha256dHash; +use lightning::ln::features::ChannelFeatures; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::ser::Writeable; +use lightning_rapid_gossip_sync::RapidGossipSync; +use tokio_postgres::NoTls; +use crate::{config, serialize_delta}; +use crate::persistence::GossipPersister; +use crate::types::{GossipMessage, tests::TestLogger}; + +const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week + +thread_local! { + static DB_TEST_SCHEMA:RefCell> = RefCell::new(None); +} + +fn blank_signature() -> Signature { + Signature::from_compact(&[0u8; 64]).unwrap() +} + +fn genesis_hash() -> BlockHash { + bitcoin::blockdata::constants::genesis_block(Network::Bitcoin).block_hash() +} + +fn current_time() -> u32 { + SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32 +} + +pub(crate) fn db_test_schema() -> String { + DB_TEST_SCHEMA.with(|suffix_reference| { + let mut suffix_option = suffix_reference.borrow_mut(); + match suffix_option.as_ref() { + None => { + let current_time = SystemTime::now(); + let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let timestamp_seconds = unix_time.as_secs(); + let timestamp_nanos = unix_time.as_nanos(); + let preimage = format!("{}", timestamp_nanos); + let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex(); + // the schema must start with a letter + let schema = format!("test_{}_{}", timestamp_seconds, suffix); + suffix_option.replace(schema.clone()); + schema + } + Some(suffix) => { + suffix.clone() + } + } + }) +} + +fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement { + let secp_context = Secp256k1::new(); + + let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap(); + let random_public_key_1 = random_private_key_1.public_key(&secp_context); + let node_id_1 = NodeId::from_pubkey(&random_public_key_1); + + let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap(); + let random_public_key_2 = random_private_key_2.public_key(&secp_context); + let node_id_2 = NodeId::from_pubkey(&random_public_key_2); + + let announcement = UnsignedChannelAnnouncement { + features: ChannelFeatures::empty(), + chain_hash: genesis_hash(), + short_channel_id, + node_id_1, + node_id_2, + bitcoin_key_1: node_id_1, + bitcoin_key_2: node_id_2, + excess_data: vec![], + }; + + let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap(); + let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1); + let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2); + + ChannelAnnouncement { + node_signature_1, + node_signature_2, + bitcoin_signature_1: node_signature_1, + bitcoin_signature_2: node_signature_2, + contents: announcement, + } +} + +fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate { + let flag_mask = if direction { 1 } else { 0 }; + ChannelUpdate { + signature: blank_signature(), + contents: UnsignedChannelUpdate { + chain_hash: genesis_hash(), + short_channel_id: scid, + timestamp, + flags: 0 | flag_mask, + cltv_expiry_delta: expiry_delta, + htlc_minimum_msat: min_msat, + htlc_maximum_msat: max_msat, + fee_base_msat: base_msat, + fee_proportional_millionths: fee_rate, + excess_data: vec![], + }, + } +} + +async fn clean_test_db() { + let client = crate::connect_to_db().await; + let schema = db_test_schema(); + client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap(); +} + +#[tokio::test] +async fn test_trivial_setup() { + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + let short_channel_id = 1; + let timestamp = current_time() - 10; + println!("timestamp: {}", timestamp); + + { // seed the db + let announcement = generate_announcement(short_channel_id); + let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0); + let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + drop(receiver); + persister.persist_gossip().await; + } + + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + clean_test_db().await; + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + println!("update result: {}", update_result); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let timestamp_delta = timestamp - update_result; + println!("timestamp delta: {}", timestamp_delta); + assert!(timestamp_delta < config::snapshot_generation_interval()); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10); + let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update; + let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update; + println!("last update a: {}", last_update_seen_a); + println!("last update b: {}", last_update_seen_b); + assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL); + assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL); +} + +/// If a channel has only seen updates in one direction, it should not be announced +#[tokio::test] +async fn test_unidirectional_intermediate_update_consideration() { + // start off with a clean slate + clean_test_db().await; + + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + let short_channel_id = 1; + let current_timestamp = current_time() - 10; + + { // seed the db + let announcement = generate_announcement(short_channel_id); + let update_1 = generate_update(short_channel_id, false, current_timestamp - 4, 0, 0, 0, 5, 0); + let update_2 = generate_update(short_channel_id, false, current_timestamp - 3, 0, 0, 0, 4, 0); + let update_3 = generate_update(short_channel_id, false, current_timestamp - 2, 0, 0, 0, 3, 0); + let update_4 = generate_update(short_channel_id, true, current_timestamp, 0, 0, 0, 5, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(current_timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_3)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_4)).await.unwrap(); + drop(receiver); + persister.persist_gossip().await; + } + + let channel_count = network_graph_arc.read_only().channels().len(); + assert_eq!(channel_count, 1); + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + + let last_sync_timestamp = current_timestamp - 3; + let serialization = serialize_delta(network_graph_arc.clone(), last_sync_timestamp, logger.clone()).await; + println!( + "serialization data: \nmessages: {}\n\tannouncements: {}\n\tupdates: {}\n\t\tfull: {}\n\t\tincremental: {}", + serialization.message_count, + serialization.announcement_count, + serialization.update_count, + serialization.update_count_full, + serialization.update_count_incremental + ); + + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel IDs: [{}]", short_channel_id), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 announcement rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} last update before seen: 1/false/{}", short_channel_id, current_timestamp - 4), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (3)", 1); + println!("current_timestamp: {}", current_timestamp); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} latest update in direction 0: {}", short_channel_id, current_timestamp - 2), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} latest update in direction 1: {}", short_channel_id, current_timestamp), 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", &format!("Channel {} first update to complete bidirectional data seen at: {}", short_channel_id, current_timestamp), 1); + + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + assert_eq!(serialization.update_count_full, 2); + assert_eq!(serialization.update_count_incremental, 0); + + let next_timestamp = rgs.update_network_graph(&serialization.data).unwrap(); + println!("last sync timestamp: {}", last_sync_timestamp); + println!("next timestamp: {}", next_timestamp); + // the update result must be a multiple of our snapshot granularity + assert_eq!(next_timestamp % config::snapshot_generation_interval(), 0); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); +} diff --git a/src/types.rs b/src/types.rs index 0b03081..5e37bfc 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,7 +14,10 @@ pub(crate) type GossipPeerManager = Arc // optionally include a persistence timestamp + ), ChannelUpdate(ChannelUpdate), } @@ -36,3 +39,58 @@ impl Logger for RGSSLogger { println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); } } + +#[cfg(test)] +pub mod tests { + use std::collections::HashMap; + use std::sync::{Mutex}; + use lightning::util::logger::{Level, Logger, Record}; + + pub struct TestLogger { + level: Level, + pub(crate) id: String, + pub lines: Mutex>, + } + + impl TestLogger { + pub fn new() -> TestLogger { + Self::with_id("".to_owned()) + } + pub fn with_id(id: String) -> TestLogger { + TestLogger { + level: Level::Trace, + id, + lines: Mutex::new(HashMap::new()), + } + } + pub fn enable(&mut self, level: Level) { + self.level = level; + } + pub fn assert_log(&self, module: String, line: String, count: usize) { + let log_entries = self.lines.lock().unwrap(); + assert_eq!(log_entries.get(&(module, line)), Some(&count)); + } + + /// Search for the number of occurrence of the logged lines which + /// 1. belongs to the specified module and + /// 2. contains `line` in it. + /// And asserts if the number of occurrences is the same with the given `count` + pub fn assert_log_contains(&self, module: &str, line: &str, count: usize) { + let log_entries = self.lines.lock().unwrap(); + let l: usize = log_entries.iter().filter(|&(&(ref m, ref l), _c)| { + m == module && l.contains(line) + }).map(|(_, c)| { c }).sum(); + assert_eq!(l, count) + } + } + + impl Logger for TestLogger { + fn log(&self, record: &Record) { + *self.lines.lock().unwrap().entry((record.module_path.to_string(), format!("{}", record.args))).or_insert(0) += 1; + if record.level >= self.level { + // #[cfg(feature = "std")] + println!("{:<5} {} [{} : {}, {}] {}", record.level.to_string(), self.id, record.module_path, record.file, record.line, record.args); + } + } + } +}