From 70d766e4340395022733bb78513b4c8f0e941b03 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 Jan 2025 19:47:35 +0000 Subject: [PATCH 1/6] Update to LDK 0.1 --- Cargo.toml | 14 +++++------ src/bitcoind_client.rs | 4 ++-- src/cli.rs | 9 +++---- src/disk.rs | 10 ++++---- src/main.rs | 54 ++++++++++++++++-------------------------- 5 files changed, 38 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 33111311..dff35518 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { version = "0.0.125", features = ["max_level_trace"] } -lightning-block-sync = { version = "0.0.125", features = [ "rpc-client", "tokio" ] } -lightning-invoice = { version = "0.32.0" } -lightning-net-tokio = { version = "0.0.125" } -lightning-persister = { version = "0.0.125" } -lightning-background-processor = { version = "0.0.125", features = [ "futures" ] } -lightning-rapid-gossip-sync = { version = "0.0.125" } +lightning = { version = "0.1.0", features = ["dnssec"] } +lightning-block-sync = { version = "0.1.0", features = [ "rpc-client", "tokio" ] } +lightning-invoice = { version = "0.33.0" } +lightning-net-tokio = { version = "0.1.0" } +lightning-persister = { version = "0.1.0" } +lightning-background-processor = { version = "0.1.0", features = [ "futures" ] } +lightning-rapid-gossip-sync = { version = "0.1.0" } base64 = "0.13.0" bitcoin = "0.32" diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index 31b328cc..bfa4e583 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -71,7 +71,7 @@ impl BitcoindClient { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); - let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; + let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint); let _dummy = bitcoind_rpc_client .call_method::("getblockchaininfo", &vec![]) .await @@ -226,7 +226,7 @@ impl BitcoindClient { }); } - pub fn get_new_rpc_client(&self) -> std::io::Result { + pub fn get_new_rpc_client(&self) -> RpcClient { let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone())); diff --git a/src/cli.rs b/src/cli.rs index c55623af..ff705b12 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use bitcoin::network::Network; use bitcoin::secp256k1::PublicKey; use lightning::chain::channelmonitor::Balance; use lightning::ln::bolt11_payment::payment_parameters_from_invoice; -use lightning::ln::bolt11_payment::payment_parameters_from_zero_amount_invoice; +use lightning::ln::bolt11_payment::payment_parameters_from_variable_amount_invoice; use lightning::ln::channelmanager::{PaymentId, RecipientOnionFields, Retry}; use lightning::ln::invoice_utils as utils; use lightning::ln::msgs::SocketAddress; @@ -746,7 +746,7 @@ fn send_payment( invoice.amount_milli_satoshis().is_none() || invoice.amount_milli_satoshis() == Some(0); let pay_params_opt = if zero_amt_invoice { if let Some(amt_msat) = required_amount_msat { - payment_parameters_from_zero_amount_invoice(invoice, amt_msat) + payment_parameters_from_variable_amount_invoice(invoice, amt_msat) } else { println!("Need an amount for the given 0-value invoice"); print!("> "); @@ -826,7 +826,7 @@ fn keysend( }, ); fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()).unwrap(); - match channel_manager.send_spontaneous_payment_with_retry( + match channel_manager.send_spontaneous_payment( Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), payment_id, @@ -859,9 +859,6 @@ fn get_invoice( }; let invoice = match utils::create_invoice_from_channelmanager( channel_manager, - keys_manager, - logger, - currency, Some(amt_msat), "ldk-tutorial-node".to_string(), expiry_secs, diff --git a/src/disk.rs b/src/disk.rs index 6db81d4c..62d77c66 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -3,9 +3,9 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use chrono::Utc; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; +use lightning::util::hash_tables::{new_hash_map, HashMap}; use lightning::util::logger::{Logger, Record}; use lightning::util::ser::{Readable, ReadableArgs}; -use std::collections::HashMap; use std::fs; use std::fs::File; use std::io::{BufRead, BufReader, Write}; @@ -58,9 +58,9 @@ pub(crate) fn persist_channel_peer(path: &Path, peer_info: &str) -> std::io::Res pub(crate) fn read_channel_peer_data( path: &Path, ) -> Result, std::io::Error> { - let mut peer_data = HashMap::new(); + let mut peer_data = new_hash_map(); if !Path::new(&path).exists() { - return Ok(HashMap::new()); + return Ok(new_hash_map()); } let file = File::open(path)?; let reader = BufReader::new(file); @@ -92,7 +92,7 @@ pub(crate) fn read_inbound_payment_info(path: &Path) -> InboundPaymentInfoStorag return info; } } - InboundPaymentInfoStorage { payments: HashMap::new() } + InboundPaymentInfoStorage { payments: new_hash_map() } } pub(crate) fn read_outbound_payment_info(path: &Path) -> OutboundPaymentInfoStorage { @@ -101,7 +101,7 @@ pub(crate) fn read_outbound_payment_info(path: &Path) -> OutboundPaymentInfoStor return info; } } - OutboundPaymentInfoStorage { payments: HashMap::new() } + OutboundPaymentInfoStorage { payments: new_hash_map() } } pub(crate) fn read_scorer( diff --git a/src/main.rs b/src/main.rs index cfe5705c..3e20de1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,6 +34,8 @@ use lightning::routing::scoring::ProbabilisticScoringFeeParameters; use lightning::sign::{EntropySource, InMemorySigner, KeysManager}; use lightning::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::util::config::UserConfig; +use lightning::util::hash_tables::hash_map::Entry; +use lightning::util::hash_tables::HashMap; use lightning::util::persist::{ self, KVStore, MonitorUpdatingPersister, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, @@ -49,8 +51,7 @@ use lightning_block_sync::UnboundedCache; use lightning_net_tokio::SocketDescriptor; use lightning_persister::fs_store::FilesystemStore; use rand::{thread_rng, Rng}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::HashMap as StdHashMap; use std::convert::TryInto; use std::fmt; use std::fs; @@ -158,7 +159,7 @@ pub(crate) type PeerManager = SimpleArcPeerManager< ChainMonitor, BitcoindClient, BitcoindClient, - GossipVerifier, + Arc, FilesystemLogger, >; @@ -219,7 +220,7 @@ async fn handle_ldk_events( ) .expect("Lightning funding tx should always be to a SegWit output") .to_address(); - let mut outputs = vec![HashMap::with_capacity(1)]; + let mut outputs = vec![StdHashMap::new()]; outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0); let raw_tx = bitcoind_client.create_raw_transaction(outputs).await; @@ -246,17 +247,7 @@ async fn handle_ldk_events( Event::FundingTxBroadcastSafe { .. } => { // We don't use the manual broadcasting feature, so this event should never be seen. }, - Event::PaymentClaimable { - payment_hash, - purpose, - amount_msat, - receiver_node_id: _, - via_channel_id: _, - via_user_channel_id: _, - claim_deadline: _, - onion_fields: _, - counterparty_skimmed_fee_msat: _, - } => { + Event::PaymentClaimable { payment_hash, purpose, amount_msat, .. } => { println!( "\nEVENT: received payment from payment hash {} of {} millisatoshis", payment_hash, amount_msat, @@ -402,9 +393,7 @@ async fn handle_ldk_events( total_fee_earned_msat, claim_from_onchain_tx, outbound_amount_forwarded_msat, - skimmed_fee_msat: _, - prev_user_channel_id: _, - next_user_channel_id: _, + .. } => { let read_only_network_graph = network_graph.read_only(); let nodes = read_only_network_graph.nodes(); @@ -497,14 +486,7 @@ async fn handle_ldk_events( print!("> "); std::io::stdout().flush().unwrap(); }, - Event::ChannelClosed { - channel_id, - reason, - user_channel_id: _, - counterparty_node_id, - channel_capacity_sats: _, - channel_funding_txo: _, - } => { + Event::ChannelClosed { channel_id, reason, counterparty_node_id, .. } => { println!( "\nEVENT: Channel {} with counterparty {} closed due to: {:?}", channel_id, @@ -688,7 +670,7 @@ async fn start_ldk() { Arc::clone(&logger), ))); - // Step 10: Create Router + // Step 10: Create Routers let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( network_graph.clone(), @@ -698,6 +680,9 @@ async fn start_ldk() { scoring_fee_params, )); + let message_router = + Arc::new(DefaultMessageRouter::new(Arc::clone(&network_graph), Arc::clone(&keys_manager))); + // Step 11: Initialize the ChannelManager let mut user_config = UserConfig::default(); user_config.channel_handshake_limits.force_announced_channel_preference = false; @@ -706,9 +691,9 @@ async fn start_ldk() { let mut restarting_node = true; let (channel_manager_blockhash, channel_manager) = { if let Ok(f) = fs::File::open(format!("{}/manager", ldk_data_dir.clone())) { - let mut channel_monitor_mut_references = Vec::new(); - for (_, channel_monitor) in channelmonitors.iter_mut() { - channel_monitor_mut_references.push(channel_monitor); + let mut channel_monitor_references = Vec::new(); + for (_, channel_monitor) in channelmonitors.iter() { + channel_monitor_references.push(channel_monitor); } let read_args = ChannelManagerReadArgs::new( keys_manager.clone(), @@ -718,9 +703,10 @@ async fn start_ldk() { chain_monitor.clone(), broadcaster.clone(), router, + Arc::clone(&message_router), logger.clone(), user_config, - channel_monitor_mut_references, + channel_monitor_references, ); <(BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args).unwrap() } else { @@ -736,6 +722,7 @@ async fn start_ldk() { chain_monitor.clone(), broadcaster.clone(), router, + Arc::clone(&message_router), logger.clone(), keys_manager.clone(), keys_manager.clone(), @@ -842,7 +829,8 @@ async fn start_ldk() { Arc::clone(&keys_manager), Arc::clone(&logger), Arc::clone(&channel_manager), - Arc::new(DefaultMessageRouter::new(Arc::clone(&network_graph), Arc::clone(&keys_manager))), + Arc::clone(&message_router), + Arc::clone(&channel_manager), Arc::clone(&channel_manager), Arc::clone(&channel_manager), IgnoringMessageHandler {}, @@ -871,7 +859,7 @@ async fn start_ldk() { Arc::clone(&gossip_sync), Arc::clone(&peer_manager), ); - gossip_sync.add_utxo_lookup(Some(utxo_lookup)); + gossip_sync.add_utxo_lookup(Some(Arc::new(utxo_lookup))); // ## Running LDK // Step 17: Initialize networking From db5c60dbdaf83a39ec79eafda94346565a5bd5eb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 Jan 2025 19:55:53 +0000 Subject: [PATCH 2/6] Drop use of now-deprecated `create_invoice_from_channelmanager` --- src/cli.rs | 32 ++++++++++---------------------- src/main.rs | 4 ---- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index ff705b12..4fa72d1e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,8 +11,9 @@ use bitcoin::secp256k1::PublicKey; use lightning::chain::channelmonitor::Balance; use lightning::ln::bolt11_payment::payment_parameters_from_invoice; use lightning::ln::bolt11_payment::payment_parameters_from_variable_amount_invoice; -use lightning::ln::channelmanager::{PaymentId, RecipientOnionFields, Retry}; -use lightning::ln::invoice_utils as utils; +use lightning::ln::channelmanager::{ + Bolt11InvoiceParameters, PaymentId, RecipientOnionFields, Retry, +}; use lightning::ln::msgs::SocketAddress; use lightning::ln::types::ChannelId; use lightning::offers::offer::{self, Offer}; @@ -23,7 +24,7 @@ use lightning::types::payment::{PaymentHash, PaymentPreimage}; use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; -use lightning_invoice::{Bolt11Invoice, Currency}; +use lightning_invoice::Bolt11Invoice; use lightning_persister::fs_store::FilesystemStore; use std::env; use std::io::Write; @@ -50,7 +51,7 @@ pub(crate) fn poll_for_user_input( chain_monitor: Arc, keys_manager: Arc, network_graph: Arc, inbound_payments: Arc>, outbound_payments: Arc>, ldk_data_dir: String, - network: Network, logger: Arc, fs_store: Arc, + fs_store: Arc, ) { println!( "LDK startup successful. Enter \"help\" to view available commands. Press Ctrl-D to quit." @@ -325,10 +326,7 @@ pub(crate) fn poll_for_user_input( amt_msat.unwrap(), &mut inbound_payments, &channel_manager, - Arc::clone(&keys_manager), - network, expiry_secs.unwrap(), - Arc::clone(&logger), ); fs_store .write("", "", INBOUND_PAYMENTS_FNAME, &inbound_payments.encode()) @@ -848,22 +846,12 @@ fn keysend( fn get_invoice( amt_msat: u64, inbound_payments: &mut InboundPaymentInfoStorage, - channel_manager: &ChannelManager, keys_manager: Arc, network: Network, - expiry_secs: u32, logger: Arc, + channel_manager: &ChannelManager, expiry_secs: u32, ) { - let currency = match network { - Network::Bitcoin => Currency::Bitcoin, - Network::Regtest => Currency::Regtest, - Network::Signet => Currency::Signet, - Network::Testnet | _ => Currency::BitcoinTestnet, - }; - let invoice = match utils::create_invoice_from_channelmanager( - channel_manager, - Some(amt_msat), - "ldk-tutorial-node".to_string(), - expiry_secs, - None, - ) { + let mut invoice_params: Bolt11InvoiceParameters = Default::default(); + invoice_params.amount_msats = Some(amt_msat); + invoice_params.invoice_expiry_delta_secs = Some(expiry_secs); + let invoice = match channel_manager.create_bolt11_invoice(invoice_params) { Ok(inv) => { println!("SUCCESS: generated invoice: {}", inv); inv diff --git a/src/main.rs b/src/main.rs index 3e20de1c..02b8193a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1049,7 +1049,6 @@ async fn start_ldk() { // some public channels. let peer_man = Arc::clone(&peer_manager); let chan_man = Arc::clone(&channel_manager); - let network = args.network; tokio::spawn(async move { // First wait a minute until we have some peers and maybe have opened a channel. tokio::time::sleep(Duration::from_secs(60)).await; @@ -1083,7 +1082,6 @@ async fn start_ldk() { let cli_channel_manager = Arc::clone(&channel_manager); let cli_chain_monitor = Arc::clone(&chain_monitor); let cli_persister = Arc::clone(&persister); - let cli_logger = Arc::clone(&logger); let cli_peer_manager = Arc::clone(&peer_manager); let cli_poll = tokio::task::spawn_blocking(move || { cli::poll_for_user_input( @@ -1095,8 +1093,6 @@ async fn start_ldk() { inbound_payments, outbound_payments, ldk_data_dir, - network, - cli_logger, cli_persister, ) }); From c1652fb7d00e4c996d07e65ccd2dcf4e9bc18789 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 Jan 2025 23:47:12 +0000 Subject: [PATCH 3/6] Disable `Gossip`-level logging at runtime This used to be disabled at compile-time, but the feature was removed as it didn't net substantial performance gain, so now we disable them at runtime. --- src/disk.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/disk.rs b/src/disk.rs index 62d77c66..00f0ea4d 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -4,7 +4,7 @@ use bitcoin::Network; use chrono::Utc; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; use lightning::util::hash_tables::{new_hash_map, HashMap}; -use lightning::util::logger::{Logger, Record}; +use lightning::util::logger::{Level, Logger, Record}; use lightning::util::ser::{Readable, ReadableArgs}; use std::fs; use std::fs::File; @@ -28,6 +28,10 @@ impl FilesystemLogger { } impl Logger for FilesystemLogger { fn log(&self, record: Record) { + if record.level == Level::Gossip { + // Gossip-level logs are incredibly verbose, and thus we skip them by default. + return; + } let raw_log = record.args.to_string(); let log = format!( "{} {:<5} [{}:{}] {}\n", From dcdc608feb4ff60531f00c3123a5596afbaaf81f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 13 Jan 2025 15:32:48 +0000 Subject: [PATCH 4/6] Use an `OMDomainResolver` by default to resolve HRNs for others --- Cargo.toml | 1 + src/main.rs | 62 ++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dff35518..20ebddb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" [dependencies] lightning = { version = "0.1.0", features = ["dnssec"] } lightning-block-sync = { version = "0.1.0", features = [ "rpc-client", "tokio" ] } +lightning-dns-resolver = { version = "0.2.0" } lightning-invoice = { version = "0.33.0" } lightning-net-tokio = { version = "0.1.0" } lightning-persister = { version = "0.1.0" } diff --git a/src/main.rs b/src/main.rs index 02b8193a..370838da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,9 +24,13 @@ use lightning::ln::channelmanager::{ ChainParameters, ChannelManagerReadArgs, PaymentId, SimpleArcChannelManager, }; use lightning::ln::msgs::DecodeError; -use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager}; +use lightning::ln::peer_handler::{ + IgnoringMessageHandler, MessageHandler, PeerManager as LdkPeerManager, +}; use lightning::ln::types::ChannelId; -use lightning::onion_message::messenger::{DefaultMessageRouter, SimpleArcOnionMessenger}; +use lightning::onion_message::messenger::{ + DefaultMessageRouter, OnionMessenger as LdkOnionMessenger, +}; use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; @@ -48,6 +52,7 @@ use lightning_block_sync::init; use lightning_block_sync::poll; use lightning_block_sync::SpvClient; use lightning_block_sync::UnboundedCache; +use lightning_dns_resolver::OMDomainResolver; use lightning_net_tokio::SocketDescriptor; use lightning_persister::fs_store::FilesystemStore; use rand::{thread_rng, Rng}; @@ -154,13 +159,16 @@ pub(crate) type GossipVerifier = lightning_block_sync::gossip::GossipVerifier< Arc, >; -pub(crate) type PeerManager = SimpleArcPeerManager< +// Note that if you do not use an `OMDomainResolver` here you should use SimpleArcPeerManager +// instead. +pub(crate) type PeerManager = LdkPeerManager< SocketDescriptor, - ChainMonitor, - BitcoindClient, - BitcoindClient, - Arc, - FilesystemLogger, + Arc, + Arc, Arc, Arc>>, + Arc, + Arc, + IgnoringMessageHandler, + Arc, >; pub(crate) type ChannelManager = @@ -168,8 +176,19 @@ pub(crate) type ChannelManager = pub(crate) type NetworkGraph = gossip::NetworkGraph>; -type OnionMessenger = - SimpleArcOnionMessenger; +// Note that if you do not use an `OMDomainResolver` here you should use SimpleArcOnionMessenger +// instead. +type OnionMessenger = LdkOnionMessenger< + Arc, + Arc, + Arc, + Arc, + Arc, Arc, Arc>>, + Arc, + Arc, + Arc>>, + IgnoringMessageHandler, +>; pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler< Arc, @@ -822,8 +841,17 @@ async fn start_ldk() { let gossip_sync = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger))); - // Step 16: Initialize the PeerManager + // Step 16 an OMDomainResolver as a service to other nodes + // As a service to other LDK users, using an `OMDomainResolver` allows others to resolve BIP + // 353 Human Readable Names for others, providing them DNSSEC proofs over lightning onion + // messages. Doing this only makes sense for a always-online public routing node, and doesn't + // provide you any direct value, but its nice to offer the service for others. let channel_manager: Arc = Arc::new(channel_manager); + let resolver = "8.8.8.8:53".to_socket_addrs().unwrap().next().unwrap(); + let domain_resolver = + Arc::new(OMDomainResolver::new(resolver, Some(Arc::clone(&channel_manager)))); + + // Step 17: Initialize the PeerManager let onion_messenger: Arc = Arc::new(OnionMessenger::new( Arc::clone(&keys_manager), Arc::clone(&keys_manager), @@ -832,7 +860,7 @@ async fn start_ldk() { Arc::clone(&message_router), Arc::clone(&channel_manager), Arc::clone(&channel_manager), - Arc::clone(&channel_manager), + domain_resolver, IgnoringMessageHandler {}, )); let mut ephemeral_bytes = [0; 32]; @@ -862,7 +890,7 @@ async fn start_ldk() { gossip_sync.add_utxo_lookup(Some(Arc::new(utxo_lookup))); // ## Running LDK - // Step 17: Initialize networking + // Step 18: Initialize networking let peer_manager_connection_handler = peer_manager.clone(); let listening_port = args.ldk_peer_listening_port; @@ -888,7 +916,7 @@ async fn start_ldk() { } }); - // Step 18: Connect and Disconnect Blocks + // Step 19: Connect and Disconnect Blocks let output_sweeper: Arc = Arc::new(output_sweeper); let channel_manager_listener = channel_manager.clone(); let chain_monitor_listener = chain_monitor.clone(); @@ -937,7 +965,7 @@ async fn start_ldk() { .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.lock().unwrap().encode()) .unwrap(); - // Step 19: Handle LDK Events + // Step 20: Handle LDK Events let channel_manager_event_listener = Arc::clone(&channel_manager); let bitcoind_client_event_listener = Arc::clone(&bitcoind_client); let network_graph_event_listener = Arc::clone(&network_graph); @@ -979,10 +1007,10 @@ async fn start_ldk() { } }; - // Step 20: Persist ChannelManager and NetworkGraph + // Step 21: Persist ChannelManager and NetworkGraph let persister = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); - // Step 21: Background Processing + // Step 22: Background Processing let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(()); let mut background_processor = tokio::spawn(process_events_async( Arc::clone(&persister), From d6e65fab18bc454a334cb78cc96e6ecb17e9633e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 13 Jan 2025 19:52:00 +0000 Subject: [PATCH 5/6] Switch to a separate executor for RPC calls to avoid tokio hangs See the comment in the commit for more info on why we have to do this. --- src/bitcoind_client.rs | 118 +++++++++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 35 deletions(-) diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index bfa4e583..f16c5f41 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient; use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; use serde_json; use std::collections::HashMap; +use std::future::Future; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::runtime::{self, Runtime}; + pub struct BitcoindClient { pub(crate) bitcoind_rpc_client: Arc, network: Network, @@ -38,7 +41,8 @@ pub struct BitcoindClient { rpc_user: String, rpc_password: String, fees: Arc>, - handle: tokio::runtime::Handle, + main_runtime_handle: runtime::Handle, + inner_runtime: Arc, logger: Arc, } @@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253; impl BitcoindClient { pub(crate) async fn new( host: String, port: u16, rpc_user: String, rpc_password: String, network: Network, - handle: tokio::runtime::Handle, logger: Arc, + handle: runtime::Handle, logger: Arc, ) -> std::io::Result { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = @@ -95,6 +99,15 @@ impl BitcoindClient { fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE)); fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE)); + let mut builder = runtime::Builder::new_multi_thread(); + let runtime = + builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap(); + let inner_runtime = Arc::new(runtime); + // Tokio will panic if we drop a runtime while in another runtime. Because the entire + // application runs inside a tokio runtime, we have to ensure this runtime is never + // `drop`'d, which we do by leaking an Arc reference. + std::mem::forget(Arc::clone(&inner_runtime)); + let client = Self { bitcoind_rpc_client: Arc::new(bitcoind_rpc_client), host, @@ -103,7 +116,8 @@ impl BitcoindClient { rpc_password, network, fees: Arc::new(fees), - handle: handle.clone(), + main_runtime_handle: handle.clone(), + inner_runtime, logger, }; BitcoindClient::poll_for_fee_estimates( @@ -226,10 +240,42 @@ impl BitcoindClient { }); } + fn run_future_in_blocking_context(&self, future: F) -> F::Output + where + F::Output: Send + 'static, + { + // Tokio deliberately makes it nigh impossible to block on a future in a sync context that + // is running in an async task (which makes it really hard to interact with sync code that + // has callbacks in an async project). + // + // Reading the docs, it *seems* like + // `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the + // trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if + // the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go + // into a `block_in_place` call, and the inner future requires I/O (which of course it + // does, its a future!), the whole thing will come to a grinding halt as no other thread is + // allowed to poll I/O until the blocked one finishes. + // + // This is, of course, nuts, and an almost trivial performance penalty of occasional + // additional wakeups would solve this, but tokio refuses to do so because any performance + // penalty at all would be too much (tokio issue #4730). + // + // Instead, we have to do a rather insane dance - we have to spawn the `future` we want to + // run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid + // blocking too many threads on the main runtime). We want to block on that `future` being + // run on the other runtime's threads, but tokio only provides `block_on` to do so, which + // runs the `future` itself on the current thread, panicing if this thread is already a + // part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we + // have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting + // `JoinHandle` on the main runtime. + tokio::task::block_in_place(move || { + self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap() + }) + } + pub fn get_new_rpc_client(&self) -> RpcClient { let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); - let rpc_credentials = - base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone())); + let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password)); RpcClient::new(&rpc_credentials, http_endpoint) } @@ -273,22 +319,28 @@ impl BitcoindClient { .unwrap(); } - pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx { + pub fn sign_raw_transaction_with_wallet( + &self, tx_hex: String, + ) -> impl Future { let tx_hex_json = serde_json::json!(tx_hex); - self.bitcoind_rpc_client - .call_method("signrawtransactionwithwallet", &vec![tx_hex_json]) - .await - .unwrap() + let rpc_client = self.get_new_rpc_client(); + async move { + rpc_client + .call_method("signrawtransactionwithwallet", &vec![tx_hex_json]) + .await + .unwrap() + } } - pub async fn get_new_address(&self) -> Address { + pub fn get_new_address(&self) -> impl Future { let addr_args = vec![serde_json::json!("LDK output address")]; - let addr = self - .bitcoind_rpc_client - .call_method::("getnewaddress", &addr_args) - .await - .unwrap(); - Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap() + let network = self.network; + let rpc_client = self.get_new_rpc_client(); + async move { + let addr = + rpc_client.call_method::("getnewaddress", &addr_args).await.unwrap(); + Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap() + } } pub async fn get_blockchain_info(&self) -> BlockchainInfo { @@ -298,11 +350,11 @@ impl BitcoindClient { .unwrap() } - pub async fn list_unspent(&self) -> ListUnspentResponse { - self.bitcoind_rpc_client - .call_method::("listunspent", &vec![]) - .await - .unwrap() + pub fn list_unspent(&self) -> impl Future { + let rpc_client = self.get_new_rpc_client(); + async move { + rpc_client.call_method::("listunspent", &vec![]).await.unwrap() + } } } @@ -324,7 +376,7 @@ impl BroadcasterInterface for BitcoindClient { let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::>(); let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client); let logger = Arc::clone(&self.logger); - self.handle.spawn(async move { + self.main_runtime_handle.spawn(async move { let res = if txn.len() == 1 { let tx_json = serde_json::json!(txn[0]); bitcoind_rpc_client @@ -355,17 +407,15 @@ impl BroadcasterInterface for BitcoindClient { impl ChangeDestinationSource for BitcoindClient { fn get_change_destination_script(&self) -> Result { - tokio::task::block_in_place(move || { - Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() })) - }) + let future = self.get_new_address(); + Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() })) } } impl WalletSource for BitcoindClient { fn list_confirmed_utxos(&self) -> Result, ()> { - let utxos = tokio::task::block_in_place(move || { - self.handle.block_on(async move { self.list_unspent().await }).0 - }); + let future = self.list_unspent(); + let utxos = self.run_future_in_blocking_context(async move { future.await.0 }); Ok(utxos .into_iter() .filter_map(|utxo| { @@ -398,18 +448,16 @@ impl WalletSource for BitcoindClient { } fn get_change_script(&self) -> Result { - tokio::task::block_in_place(move || { - Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() })) - }) + let future = self.get_new_address(); + Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() })) } fn sign_psbt(&self, tx: Psbt) -> Result { let mut tx_bytes = Vec::new(); let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ()); let tx_hex = hex_utils::hex_str(&tx_bytes); - let signed_tx = tokio::task::block_in_place(move || { - self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await }) - }); + let future = self.sign_raw_transaction_with_wallet(tx_hex); + let signed_tx = self.run_future_in_blocking_context(async move { future.await }); let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?; Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ()) } From e09d9e0132ac65230c65c46461db32330bd19b1a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 18 Jan 2025 17:50:17 +0000 Subject: [PATCH 6/6] Add HRN-based payments to `sendpayment` --- src/cli.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 4fa72d1e..44007378 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -17,6 +17,8 @@ use lightning::ln::channelmanager::{ use lightning::ln::msgs::SocketAddress; use lightning::ln::types::ChannelId; use lightning::offers::offer::{self, Offer}; +use lightning::onion_message::dns_resolution::HumanReadableName; +use lightning::onion_message::messenger::Destination; use lightning::routing::gossip::NodeId; use lightning::routing::router::{PaymentParameters, RouteParameters}; use lightning::sign::{EntropySource, KeysManager}; @@ -142,9 +144,10 @@ pub(crate) fn poll_for_user_input( "sendpayment" => { let invoice_str = words.next(); if invoice_str.is_none() { - println!("ERROR: sendpayment requires an invoice: `sendpayment `"); + println!("ERROR: sendpayment requires an invoice: `sendpayment [amount_msat]`"); continue; } + let invoice_str = invoice_str.unwrap(); let mut user_provided_amt: Option = None; if let Some(amt_msat_str) = words.next() { @@ -157,7 +160,7 @@ pub(crate) fn poll_for_user_input( }; } - if let Ok(offer) = Offer::from_str(invoice_str.unwrap()) { + if let Ok(offer) = Offer::from_str(invoice_str) { let random_bytes = keys_manager.get_secure_random_bytes(); let payment_id = PaymentId(random_bytes); @@ -213,11 +216,77 @@ pub(crate) fn poll_for_user_input( let amt = Some(amt_msat); let pay = channel_manager .pay_for_offer(&offer, None, amt, None, payment_id, retry, None); - if pay.is_err() { + if pay.is_ok() { + println!("Payment in flight"); + } else { println!("ERROR: Failed to pay: {:?}", pay); } + } else if let Ok(hrn) = HumanReadableName::from_encoded(invoice_str) { + let random_bytes = keys_manager.get_secure_random_bytes(); + let payment_id = PaymentId(random_bytes); + + if user_provided_amt.is_none() { + println!("Can't pay to a human-readable-name without an amount"); + continue; + } + + // We need some nodes that will resolve DNS for us in order to pay a Human + // Readable Name. They don't need to be trusted, but until onion message + // forwarding is widespread we'll directly connect to them, revealing who + // we intend to pay. + let mut dns_resolvers = Vec::new(); + for (node_id, node) in network_graph.read_only().nodes().unordered_iter() { + if let Some(info) = &node.announcement_info { + // Sadly, 31 nodes currently squat on the DNS Resolver feature bit + // without speaking it. + // Its unclear why they're doing so, but none of them currently + // also have the onion messaging feature bit set, so here we check + // for both. + let supports_dns = info.features().supports_dns_resolution(); + let supports_om = info.features().supports_onion_messages(); + if supports_dns && supports_om { + if let Ok(pubkey) = node_id.as_pubkey() { + dns_resolvers.push(Destination::Node(pubkey)); + } + } + } + if dns_resolvers.len() > 5 { + break; + } + } + if dns_resolvers.is_empty() { + println!( + "Failed to find any DNS resolving nodes, check your network graph is synced" + ); + continue; + } + + let amt_msat = user_provided_amt.unwrap(); + outbound_payments.lock().unwrap().payments.insert( + payment_id, + PaymentInfo { + preimage: None, + secret: None, + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(Some(amt_msat)), + }, + ); + fs_store + .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()) + .unwrap(); + + let retry = Retry::Timeout(Duration::from_secs(10)); + let pay = |a, b, c, d, e, f| { + channel_manager.pay_for_offer_from_human_readable_name(a, b, c, d, e, f) + }; + let pay = pay(hrn, amt_msat, payment_id, retry, None, dns_resolvers); + if pay.is_ok() { + println!("Payment in flight"); + } else { + println!("ERROR: Failed to pay"); + } } else { - match Bolt11Invoice::from_str(invoice_str.unwrap()) { + match Bolt11Invoice::from_str(invoice_str) { Ok(invoice) => send_payment( &channel_manager, &invoice, @@ -505,7 +574,7 @@ fn help() { println!(" disconnectpeer "); println!(" listpeers"); println!("\n Payments:"); - println!(" sendpayment []"); + println!(" sendpayment []"); println!(" keysend "); println!(" listpayments"); println!("\n Invoices:");