Skip to content

Additional unit tests #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ edition = "2021"

[dependencies]
bitcoin = "0.29"
lightning = { version = "0.0.116" }
lightning-block-sync = { version = "0.0.116", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.116" }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48", features = ["rest-client"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version="=0.7.5" }
tokio-postgres = { version = "=0.7.5" }
futures = "0.3"

[dev-dependencies]
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "e13ff10c63b01d3a9f1618320ce5f17dad2e5b48" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need anything in upstream or can we stick to 116?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need the publicly exposed excess_data field in announcements


[profile.dev]
panic = "abort"

Expand Down
14 changes: 10 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,19 @@ pub(crate) fn cache_path() -> String {

pub(crate) fn db_connection_config() -> Config {
let mut config = Config::new();
let host = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_HOST").unwrap_or("localhost".to_string());
let user = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_USER").unwrap_or("alice".to_string());
let db = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_NAME").unwrap_or("ln_graph_sync".to_string());
let env_name_prefix = if cfg!(test) {
"RAPID_GOSSIP_TEST_DB"
} else {
"RAPID_GOSSIP_SYNC_SERVER_DB"
};

let host = env::var(format!("{}{}", env_name_prefix, "_HOST")).unwrap_or("localhost".to_string());
let user = env::var(format!("{}{}", env_name_prefix, "_USER")).unwrap_or("alice".to_string());
let db = env::var(format!("{}{}", env_name_prefix, "_NAME")).unwrap_or("ln_graph_sync".to_string());
config.host(&host);
config.user(&user);
config.dbname(&db);
if let Ok(password) = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD") {
if let Ok(password) = env::var(format!("{}{}", env_name_prefix, "_PASSWORD")) {
config.password(&password);
}
config
Expand Down
2 changes: 1 addition & 1 deletion src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
counter.channel_announcements += 1;
}

let gossip_message = GossipMessage::ChannelAnnouncement(msg);
let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
Expand Down
33 changes: 30 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
extern crate core;

use std::collections::{HashMap, HashSet};
#[cfg(test)]
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::ops::Deref;
Expand Down Expand Up @@ -41,6 +43,9 @@ mod verifier;

pub mod types;

#[cfg(test)]
mod tests;

/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
Expand All @@ -49,7 +54,7 @@ const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];

pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
network_graph: Arc<NetworkGraph<L>>,
logger: L
logger: L,
}

pub struct SerializedResponse {
Expand Down Expand Up @@ -81,7 +86,7 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
let arc_network_graph = Arc::new(network_graph);
Self {
network_graph: arc_network_graph,
logger
logger,
}
}

Expand Down Expand Up @@ -115,6 +120,21 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
}
}

#[cfg(test)]
impl Debug for SerializedResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"\nmessages: {}\n\tannouncements: {}\n\tupdates: {}\n\t\tfull: {}\n\t\tincremental: {}",
self.message_count,
self.announcement_count,
self.update_count,
self.update_count_full,
self.update_count_incremental
)
}
}

pub(crate) async fn connect_to_db() -> Client {
let connection_config = config::db_connection_config();
let (client, connection) = connection_config.connect(NoTls).await.unwrap();
Expand All @@ -125,6 +145,13 @@ pub(crate) async fn connect_to_db() -> Client {
}
});

if cfg!(test) {
let schema_name = tests::db_test_schema();
let schema_creation_command = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name);
client.execute(&schema_creation_command, &[]).await.unwrap();
client.execute(&format!("SET search_path TO {}", schema_name), &[]).await.unwrap();
}

client.execute("set time zone UTC", &[]).await.unwrap();
client
}
Expand Down Expand Up @@ -195,7 +222,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp, logger.clone());

// process announcements
// write the number of channel announcements to the output
Expand Down
30 changes: 27 additions & 3 deletions src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub(super) struct DirectedUpdateDelta {
pub(super) struct ChannelDelta {
pub(super) announcement: Option<AnnouncementDelta>,
pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
/// This value is only set if the first update to achieve bidirectionality was seen after
/// the last sync.
pub(super) first_bidirectional_updates_seen: Option<u32>,
/// The seen timestamp of the older of the two latest directional updates
pub(super) requires_reminder: bool,
Expand Down Expand Up @@ -112,6 +114,22 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
}
log_info!(logger, "Fetched {} announcement rows", announcement_count);

if cfg!(test) {
// THIS STEP IS USED TO DEBUG THE NUMBER OF ROWS WHEN TESTING
let update_rows = client.query("SELECT * FROM channel_updates", &[]).await.unwrap();
log_info!(logger, "Fetched {} update rows", update_rows.len());

let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1)", &[&channel_ids]).await.unwrap();
log_info!(logger, "Fetched {} update rows with filtered channels", update_rows.len());

// let timestamp_string = format!("{}", last_sync_timestamp);
let update_rows = client.query("SELECT * FROM channel_updates WHERE seen >= TO_TIMESTAMP($1)", &[&last_sync_timestamp_float]).await.unwrap();
log_info!(logger, "Fetched {} update rows with filtered seen", update_rows.len());

let update_rows = client.query("SELECT * FROM channel_updates WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)", &[&channel_ids, &last_sync_timestamp_float]).await.unwrap();
log_info!(logger, "Fetched {} update rows with filtered channels and seen", update_rows.len());
}

{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA

Expand Down Expand Up @@ -147,14 +165,16 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
let scid: i64 = current_row.get("short_channel_id");
let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;

log_gossip!(logger, "Channel {} with first update to complete bidirectional data since last sync seen at: {}", scid, current_seen_timestamp);

// the newer of the two oldest seen directional updates came after last sync timestamp
let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
// first time a channel was seen in both directions
(*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);

newer_oldest_directional_update_count += 1;
}
log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
log_info!(logger, "Fetched {} update rows of the first update in a new direction having occurred since the last sync", newer_oldest_directional_update_count);
}

{
Expand Down Expand Up @@ -188,6 +208,8 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
let current_row = row_res.unwrap();
let scid: i64 = current_row.get("short_channel_id");

log_gossip!(logger, "Channel {} with newest update in less recently updated direction being at least 6 days ago", scid);

// annotate this channel as requiring that reminders be sent to the client
let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());

Expand Down Expand Up @@ -217,7 +239,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
}
older_latest_directional_update_count += 1;
}
log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction being more than six days ago", older_latest_directional_update_count);
}
}

Expand Down Expand Up @@ -334,12 +356,14 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
seen: current_seen_timestamp,
update: unsigned_channel_update.clone(),
});
log_gossip!(logger, "Channel {} latest update in direction 0: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp)
} else if direction && !previously_seen_directions.1 {
previously_seen_directions.1 = true;
update_delta.latest_update_after_seen = Some(UpdateDelta {
seen: current_seen_timestamp,
update: unsigned_channel_update.clone(),
});
log_gossip!(logger, "Channel {} latest update in direction 1: {} (seen: {})", scid, unsigned_channel_update.timestamp, current_seen_timestamp)
}
}

Expand All @@ -365,7 +389,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
}
}
}
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
log_info!(logger, "Processed {} intermediate rows (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
Expand Down
62 changes: 52 additions & 10 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
i += 1; // count the persisted gossip messages

if latest_persistence_log.elapsed().as_secs() >= 60 {
let _should_log_message = latest_persistence_log.elapsed().as_secs() >= 60;
#[cfg(test)]
let _should_log_message = true; // we log every persistence message in test

if _should_log_message {
log_info!(self.logger, "Persisting gossip message #{}", i);
latest_persistence_log = Instant::now();
}
Expand All @@ -111,21 +115,34 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
}

match &gossip_message {
GossipMessage::ChannelAnnouncement(announcement) => {
GossipMessage::ChannelAnnouncement(announcement, timestamp) => {
let scid = announcement.contents.short_channel_id as i64;

// start with the type prefix, which is already known a priori
let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
if let Some(timestamp) = timestamp {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed, \
seen \
) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed,
&(*timestamp as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed \
) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed
])).await.unwrap().unwrap();
&scid,
&announcement_signed
])).await.unwrap().unwrap();
}
}
GossipMessage::ChannelUpdate(update) => {
let scid = update.contents.short_channel_id as i64;
Expand All @@ -146,8 +163,23 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut update_signed = Vec::new();
update.write(&mut update_signed).unwrap();

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_updates (\
let insertion_statement = if cfg!(test) {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
seen, \
channel_flags, \
direction, \
disable, \
cltv_expiry_delta, \
htlc_minimum_msat, \
fee_base_msat, \
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING"
} else {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
channel_flags, \
Expand All @@ -159,9 +191,19 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING"
};

// this may not be used outside test cfg
#[cfg(test)]
let system_time = timestamp as f64;

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
&timestamp,
#[cfg(test)]
&system_time,
&(update.contents.flags as i16),
&direction,
&disable,
Expand Down
18 changes: 8 additions & 10 deletions src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::cmp::max;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use std::ops::Deref;

use bitcoin::BlockHash;
use bitcoin::hashes::Hash;
use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
use lightning::log_gossip;
use lightning::util::logger::Logger;
use lightning::util::ser::{BigSize, Writeable};
use crate::config;

Expand Down Expand Up @@ -104,17 +106,16 @@ struct FullUpdateValueHistograms {
htlc_maximum_msat: HashMap<u64, usize>,
}

pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet {
pub(super) fn serialize_delta_set<L: Deref>(delta_set: DeltaSet, last_sync_timestamp: u32, logger: L) -> SerializationSet where L::Target: Logger {
let chain_hash = bitcoin::blockdata::constants::genesis_block(config::network()).block_hash();
let mut serialization_set = SerializationSet {
announcements: vec![],
updates: vec![],
full_update_defaults: Default::default(),
chain_hash: BlockHash::all_zeros(),
chain_hash,
latest_seen: 0,
};

let mut chain_hash_set = false;

let mut full_update_histograms = FullUpdateValueHistograms {
cltv_expiry_delta: Default::default(),
htlc_minimum_msat: Default::default(),
Expand All @@ -138,18 +139,15 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)

// any announcement chain hash is gonna be the same value. Just set it from the first one.
let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
if !chain_hash_set {
chain_hash_set = true;
serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone();
}

let current_announcement_seen = channel_announcement_delta.seen;
let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen {
log_gossip!(logger, "Channel {} first bidirectional update seen: {}", scid, first_update_seen);
first_update_seen >= last_sync_timestamp
} else {
false
};
log_gossip!(logger, "Channel {} announcement seen at {} (new: {}, newly included: {})", scid, current_announcement_seen, is_new_announcement, is_newly_included_announcement);
let send_announcement = is_new_announcement || is_newly_included_announcement;
if send_announcement {
serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
Expand Down
Loading