Skip to content

Commit cdf8eff

Browse files
amackilloptnull
andauthored
Parallelize read_payments (#7)
* Use async `KVStore` for `read_X` util methods Rather than using `KVStoreSync` we now use the async `KVStore` implementation for most `read_X` util methods used during node building. This is a first step towards making node building/startup entirely async eventually. * Parallelize `read_payments` Previously, we would read entries of our payment store sequentially. This is more or less fine when we read from a local store, but when we read from a remote (e.g., VSS) store, all the latency could result in considerable slowdown during startup. Here, we opt to read store entries in batches. * Add test for payment persistence after node restart Add integration test that verifies 200 payments are correctly persisted and retrievable via `list_payments` after restarting a node. Co-Authored-By: Claude AI --------- Co-authored-by: Elias Rohrer <[email protected]>
1 parent b66a67c commit cdf8eff

File tree

7 files changed

+300
-93
lines changed

7 files changed

+300
-93
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}
9292

9393
vss-client = { package = "vss-client-ng", version = "0.4" }
9494
prost = { version = "0.11.6", default-features = false}
95+
#bitcoin-payment-instructions = { version = "0.6" }
96+
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", branch = "2025-12-ldk-node-base" }
9597

9698
[target.'cfg(windows)'.dependencies]
9799
winapi = { version = "0.3", features = ["winbase"] }

src/builder.rs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bip39::Mnemonic;
1919
use bitcoin::bip32::{ChildNumber, Xpriv};
2020
use bitcoin::secp256k1::PublicKey;
2121
use bitcoin::{BlockHash, Network};
22-
use bdk_chain::{BlockId, TxUpdate};
22+
use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;
2323
use lightning::chain::{chainmonitor, BestBlock, Watch};
2424
use lightning::io::Cursor;
2525
use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs};
@@ -55,7 +55,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
5555
use crate::gossip::GossipSource;
5656
use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
58-
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
58+
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
59+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
60+
write_node_metrics,
5961
};
6062
use crate::io::vss_store::VssStore;
6163
use crate::io::{
@@ -1233,7 +1235,9 @@ fn build_with_store_internal(
12331235
}
12341236

12351237
// Initialize the status fields.
1236-
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
1238+
let node_metrics = match runtime
1239+
.block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1240+
{
12371241
Ok(metrics) => Arc::new(RwLock::new(metrics)),
12381242
Err(e) => {
12391243
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1247,7 +1251,9 @@ fn build_with_store_internal(
12471251
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
12481252
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
12491253

1250-
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
1254+
let payment_store = match runtime
1255+
.block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1256+
{
12511257
Ok(payments) => Arc::new(PaymentStore::new(
12521258
payments,
12531259
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
@@ -1474,24 +1480,23 @@ fn build_with_store_internal(
14741480
));
14751481

14761482
// Initialize the network graph, scorer, and router
1477-
let network_graph =
1478-
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
1479-
Ok(graph) => Arc::new(graph),
1480-
Err(e) => {
1481-
if e.kind() == std::io::ErrorKind::NotFound {
1482-
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1483-
} else {
1484-
log_error!(logger, "Failed to read network graph from store: {}", e);
1485-
return Err(BuildError::ReadFailed);
1486-
}
1487-
},
1488-
};
1483+
let network_graph = match runtime
1484+
.block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1485+
{
1486+
Ok(graph) => Arc::new(graph),
1487+
Err(e) => {
1488+
if e.kind() == std::io::ErrorKind::NotFound {
1489+
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1490+
} else {
1491+
log_error!(logger, "Failed to read network graph from store: {}", e);
1492+
return Err(BuildError::ReadFailed);
1493+
}
1494+
},
1495+
};
14891496

1490-
let local_scorer = match io::utils::read_scorer(
1491-
Arc::clone(&kv_store),
1492-
Arc::clone(&network_graph),
1493-
Arc::clone(&logger),
1494-
) {
1497+
let local_scorer = match runtime.block_on(async {
1498+
read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await
1499+
}) {
14951500
Ok(scorer) => scorer,
14961501
Err(e) => {
14971502
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1507,7 +1512,10 @@ fn build_with_store_internal(
15071512
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
15081513

15091514
// Restore external pathfinding scores from cache if possible.
1510-
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1515+
match runtime.block_on(async {
1516+
read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger))
1517+
.await
1518+
}) {
15111519
Ok(external_scores) => {
15121520
scorer.lock().unwrap().merge(external_scores, cur_time);
15131521
log_trace!(logger, "External scores from cache merged successfully");
@@ -1709,7 +1717,8 @@ fn build_with_store_internal(
17091717
},
17101718
};
17111719

1712-
let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger))
1720+
let event_queue = match runtime
1721+
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
17131722
{
17141723
Ok(event_queue) => Arc::new(event_queue),
17151724
Err(e) => {
@@ -1831,14 +1840,17 @@ fn build_with_store_internal(
18311840
let connection_manager =
18321841
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
18331842

1834-
let output_sweeper = match io::utils::read_output_sweeper(
1835-
Arc::clone(&tx_broadcaster),
1836-
Arc::clone(&fee_estimator),
1837-
Arc::clone(&chain_source),
1838-
Arc::clone(&keys_manager),
1839-
Arc::clone(&kv_store),
1840-
Arc::clone(&logger),
1841-
) {
1843+
let output_sweeper = match runtime.block_on(async {
1844+
read_output_sweeper(
1845+
Arc::clone(&tx_broadcaster),
1846+
Arc::clone(&fee_estimator),
1847+
Arc::clone(&chain_source),
1848+
Arc::clone(&keys_manager),
1849+
Arc::clone(&kv_store),
1850+
Arc::clone(&logger),
1851+
)
1852+
.await
1853+
}) {
18421854
Ok(output_sweeper) => Arc::new(output_sweeper),
18431855
Err(e) => {
18441856
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1859,7 +1871,9 @@ fn build_with_store_internal(
18591871
},
18601872
};
18611873

1862-
let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) {
1874+
let peer_store = match runtime
1875+
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1876+
{
18631877
Ok(peer_store) => Arc::new(peer_store),
18641878
Err(e) => {
18651879
if e.kind() == std::io::ErrorKind::NotFound {

src/ffi/types.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use lightning::offers::invoice::Bolt12Invoice as LdkBolt12Invoice;
2929
pub use lightning::offers::offer::OfferId;
3030
use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer};
3131
use lightning::offers::refund::Refund as LdkRefund;
32+
use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName;
3233
pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees};
3334
pub use lightning::routing::router::RouteParametersConfig;
3435
use lightning::util::ser::Writeable;
@@ -54,7 +55,7 @@ pub use crate::logger::{LogLevel, LogRecord, LogWriter};
5455
pub use crate::payment::store::{
5556
ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus,
5657
};
57-
pub use crate::payment::QrPaymentResult;
58+
pub use crate::payment::UnifiedPaymentResult;
5859
use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId};
5960

6061
impl UniffiCustomTypeConverter for PublicKey {

0 commit comments

Comments
 (0)