Skip to content

Commit 6462131

Browse files
committed
Decouple spending from chain notifications
To prepare for asynchronous processing of the sweep, we need to decouple the spending from the chain notifications. These notifications run in a sync context and wouldn't allow calls into an async trait. Instead we now periodically call into the sweeper, to open up the possibility to do so from an async context if desired.
1 parent 46cb5ff commit 6462131

File tree

2 files changed

+139
-87
lines changed

2 files changed

+139
-87
lines changed

lightning-background-processor/src/lib.rs

+93-14
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -130,6 +132,11 @@ const REBROADCAST_TIMER: u64 = 30;
130132
#[cfg(test)]
131133
const REBROADCAST_TIMER: u64 = 1;
132134

135+
#[cfg(not(test))]
136+
const SWEEPER_TIMER: u64 = 30;
137+
#[cfg(test)]
138+
const SWEEPER_TIMER: u64 = 1;
139+
133140
#[cfg(feature = "futures")]
134141
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
135142
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -306,6 +313,7 @@ macro_rules! define_run_body {
306313
$channel_manager: ident, $process_channel_manager_events: expr,
307314
$onion_messenger: ident, $process_onion_message_handler_events: expr,
308315
$peer_manager: ident, $gossip_sync: ident,
316+
$process_sweeper: expr,
309317
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
310318
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
311319
) => { {
@@ -320,6 +328,7 @@ macro_rules! define_run_body {
320328
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
321329
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
322330
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
331+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
323332
let mut have_pruned = false;
324333
let mut have_decayed_scorer = false;
325334

@@ -461,6 +470,12 @@ macro_rules! define_run_body {
461470
$chain_monitor.rebroadcast_pending_claims();
462471
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
463472
}
473+
474+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
475+
log_trace!($logger, "Regenerating sweeper spends if necessary");
476+
let _ = $process_sweeper;
477+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
478+
}
464479
}
465480

466481
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -618,6 +633,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
618633
/// ```
619634
/// # use lightning::io;
620635
/// # use lightning::events::ReplayEvent;
636+
/// # use lightning::util::sweep::OutputSweeper;
621637
/// # use std::sync::{Arc, RwLock};
622638
/// # use std::sync::atomic::{AtomicBool, Ordering};
623639
/// # use std::time::SystemTime;
@@ -656,6 +672,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
656672
/// # F: lightning::chain::Filter + Send + Sync + 'static,
657673
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
658674
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
675+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
676+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
677+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
659678
/// # > {
660679
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
661680
/// # event_handler: Arc<EventHandler>,
@@ -666,14 +685,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666685
/// # persister: Arc<Store>,
667686
/// # logger: Arc<Logger>,
668687
/// # scorer: Arc<Scorer>,
688+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
669689
/// # }
670690
/// #
671691
/// # async fn setup_background_processing<
672692
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
673693
/// # F: lightning::chain::Filter + Send + Sync + 'static,
674694
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
675695
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
676-
/// # >(node: Node<B, F, FE, UL>) {
696+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
697+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
698+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
699+
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
677700
/// let background_persister = Arc::clone(&node.persister);
678701
/// let background_event_handler = Arc::clone(&node.event_handler);
679702
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -683,7 +706,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
683706
/// let background_onion_messenger = Arc::clone(&node.onion_messenger);
684707
/// let background_logger = Arc::clone(&node.logger);
685708
/// let background_scorer = Arc::clone(&node.scorer);
686-
///
709+
/// let background_sweeper = Arc::clone(&node.sweeper);
710+
687711
/// // Setup the sleeper.
688712
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
689713
///
@@ -708,6 +732,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
708732
/// Some(background_onion_messenger),
709733
/// background_gossip_sync,
710734
/// background_peer_man,
735+
/// Some(background_sweeper),
711736
/// background_logger,
712737
/// Some(background_scorer),
713738
/// sleeper,
@@ -742,6 +767,10 @@ pub async fn process_events_async<
742767
+ Sync,
743768
CM: 'static + Deref + Send + Sync,
744769
OM: 'static + Deref + Send + Sync,
770+
D: 'static + Deref,
771+
O: 'static + Deref,
772+
K: 'static + Deref,
773+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
745774
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
746775
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
747776
PM: 'static + Deref + Send + Sync,
@@ -753,12 +782,12 @@ pub async fn process_events_async<
753782
>(
754783
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
755784
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
756-
logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
757-
fetch_time: FetchTime,
785+
sweeper: Option<OS>, logger: L, scorer: Option<S>, sleeper: Sleeper,
786+
mobile_interruptable_platform: bool, fetch_time: FetchTime,
758787
) -> Result<(), lightning::io::Error>
759788
where
760789
UL::Target: 'static + UtxoLookup,
761-
CF::Target: 'static + chain::Filter,
790+
CF::Target: 'static + chain::Filter + Sync + Send,
762791
T::Target: 'static + BroadcasterInterface,
763792
F::Target: 'static + FeeEstimator,
764793
L::Target: 'static + Logger,
@@ -767,6 +796,9 @@ where
767796
CM::Target: AChannelManager + Send + Sync,
768797
OM::Target: AOnionMessenger + Send + Sync,
769798
PM::Target: APeerManager + Send + Sync,
799+
O::Target: 'static + OutputSpender,
800+
D::Target: 'static + ChangeDestinationSource,
801+
K::Target: 'static + KVStore,
770802
{
771803
let mut should_break = false;
772804
let async_event_handler = |event| {
@@ -810,6 +842,13 @@ where
810842
},
811843
peer_manager,
812844
gossip_sync,
845+
{
846+
if let Some(ref sweeper) = sweeper {
847+
sweeper.regenerate_and_broadcast_spend_if_necessary()
848+
} else {
849+
Ok(())
850+
}
851+
},
813852
logger,
814853
scorer,
815854
should_break,
@@ -922,14 +961,18 @@ impl BackgroundProcessor {
922961
PM: 'static + Deref + Send + Sync,
923962
S: 'static + Deref<Target = SC> + Send + Sync,
924963
SC: for<'b> WriteableScore<'b>,
964+
D: 'static + Deref,
965+
O: 'static + Deref,
966+
K: 'static + Deref,
967+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
925968
>(
926969
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927970
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928-
logger: L, scorer: Option<S>,
971+
sweeper: Option<OS>, logger: L, scorer: Option<S>,
929972
) -> Self
930973
where
931974
UL::Target: 'static + UtxoLookup,
932-
CF::Target: 'static + chain::Filter,
975+
CF::Target: 'static + chain::Filter + Sync + Send,
933976
T::Target: 'static + BroadcasterInterface,
934977
F::Target: 'static + FeeEstimator,
935978
L::Target: 'static + Logger,
@@ -938,6 +981,9 @@ impl BackgroundProcessor {
938981
CM::Target: AChannelManager + Send + Sync,
939982
OM::Target: AOnionMessenger + Send + Sync,
940983
PM::Target: APeerManager + Send + Sync,
984+
O::Target: 'static + OutputSpender,
985+
D::Target: 'static + ChangeDestinationSource,
986+
K::Target: 'static + KVStore,
941987
{
942988
let stop_thread = Arc::new(AtomicBool::new(false));
943989
let stop_thread_clone = stop_thread.clone();
@@ -973,6 +1019,13 @@ impl BackgroundProcessor {
9731019
},
9741020
peer_manager,
9751021
gossip_sync,
1022+
{
1023+
if let Some(ref sweeper) = sweeper {
1024+
sweeper.regenerate_and_broadcast_spend_if_necessary()
1025+
} else {
1026+
Ok(())
1027+
}
1028+
},
9761029
logger,
9771030
scorer,
9781031
stop_thread.load(Ordering::Acquire),
@@ -1069,7 +1122,7 @@ mod tests {
10691122
use core::sync::atomic::{AtomicBool, Ordering};
10701123
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
10711124
use lightning::chain::transaction::OutPoint;
1072-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1125+
use lightning::chain::{chainmonitor, BestBlock, Confirm};
10731126
use lightning::events::{Event, PathFailure, ReplayEvent};
10741127
use lightning::ln::channelmanager;
10751128
use lightning::ln::channelmanager::{
@@ -1222,7 +1275,7 @@ mod tests {
12221275
Arc<test_utils::TestBroadcaster>,
12231276
Arc<TestWallet>,
12241277
Arc<test_utils::TestFeeEstimator>,
1225-
Arc<dyn Filter + Sync + Send>,
1278+
Arc<test_utils::TestChainSource>,
12261279
Arc<FilesystemStore>,
12271280
Arc<test_utils::TestLogger>,
12281281
Arc<KeysManager>,
@@ -1601,7 +1654,7 @@ mod tests {
16011654
best_block,
16021655
Arc::clone(&tx_broadcaster),
16031656
Arc::clone(&fee_estimator),
1604-
None::<Arc<dyn Filter + Sync + Send>>,
1657+
None::<Arc<test_utils::TestChainSource>>,
16051658
Arc::clone(&keys_manager),
16061659
wallet,
16071660
Arc::clone(&kv_store),
@@ -1831,6 +1884,7 @@ mod tests {
18311884
Some(nodes[0].messenger.clone()),
18321885
nodes[0].p2p_gossip_sync(),
18331886
nodes[0].peer_manager.clone(),
1887+
Some(nodes[0].sweeper.clone()),
18341888
nodes[0].logger.clone(),
18351889
Some(nodes[0].scorer.clone()),
18361890
);
@@ -1924,6 +1978,7 @@ mod tests {
19241978
Some(nodes[0].messenger.clone()),
19251979
nodes[0].no_gossip_sync(),
19261980
nodes[0].peer_manager.clone(),
1981+
Some(nodes[0].sweeper.clone()),
19271982
nodes[0].logger.clone(),
19281983
Some(nodes[0].scorer.clone()),
19291984
);
@@ -1966,6 +2021,7 @@ mod tests {
19662021
Some(nodes[0].messenger.clone()),
19672022
nodes[0].no_gossip_sync(),
19682023
nodes[0].peer_manager.clone(),
2024+
Some(nodes[0].sweeper.clone()),
19692025
nodes[0].logger.clone(),
19702026
Some(nodes[0].scorer.clone()),
19712027
);
@@ -1998,6 +2054,7 @@ mod tests {
19982054
Some(nodes[0].messenger.clone()),
19992055
nodes[0].rapid_gossip_sync(),
20002056
nodes[0].peer_manager.clone(),
2057+
Some(nodes[0].sweeper.clone()),
20012058
nodes[0].logger.clone(),
20022059
Some(nodes[0].scorer.clone()),
20032060
move |dur: Duration| {
@@ -2034,6 +2091,7 @@ mod tests {
20342091
Some(nodes[0].messenger.clone()),
20352092
nodes[0].p2p_gossip_sync(),
20362093
nodes[0].peer_manager.clone(),
2094+
Some(nodes[0].sweeper.clone()),
20372095
nodes[0].logger.clone(),
20382096
Some(nodes[0].scorer.clone()),
20392097
);
@@ -2063,6 +2121,7 @@ mod tests {
20632121
Some(nodes[0].messenger.clone()),
20642122
nodes[0].no_gossip_sync(),
20652123
nodes[0].peer_manager.clone(),
2124+
Some(nodes[0].sweeper.clone()),
20662125
nodes[0].logger.clone(),
20672126
Some(nodes[0].scorer.clone()),
20682127
);
@@ -2109,6 +2168,7 @@ mod tests {
21092168
Some(nodes[0].messenger.clone()),
21102169
nodes[0].no_gossip_sync(),
21112170
nodes[0].peer_manager.clone(),
2171+
Some(nodes[0].sweeper.clone()),
21122172
nodes[0].logger.clone(),
21132173
Some(nodes[0].scorer.clone()),
21142174
);
@@ -2171,6 +2231,7 @@ mod tests {
21712231
Some(nodes[0].messenger.clone()),
21722232
nodes[0].no_gossip_sync(),
21732233
nodes[0].peer_manager.clone(),
2234+
Some(nodes[0].sweeper.clone()),
21742235
nodes[0].logger.clone(),
21752236
Some(nodes[0].scorer.clone()),
21762237
);
@@ -2216,10 +2277,22 @@ mod tests {
22162277

22172278
advance_chain(&mut nodes[0], 3);
22182279

2280+
let tx_broadcaster = nodes[0].tx_broadcaster.clone();
2281+
let wait_for_sweep_tx = || -> Transaction {
2282+
loop {
2283+
let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
2284+
if let Some(sweep_tx) = sweep_tx {
2285+
return sweep_tx;
2286+
}
2287+
2288+
std::thread::sleep(Duration::from_millis(100));
2289+
}
2290+
};
2291+
22192292
// Check we generate an initial sweeping tx.
22202293
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2294+
let sweep_tx_0 = wait_for_sweep_tx();
22212295
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2222-
let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22232296
match tracked_output.status {
22242297
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22252298
assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
@@ -2230,8 +2303,8 @@ mod tests {
22302303
// Check we regenerate and rebroadcast the sweeping tx each block.
22312304
advance_chain(&mut nodes[0], 1);
22322305
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2306+
let sweep_tx_1 = wait_for_sweep_tx();
22332307
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2234-
let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22352308
match tracked_output.status {
22362309
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22372310
assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
@@ -2242,8 +2315,8 @@ mod tests {
22422315

22432316
advance_chain(&mut nodes[0], 1);
22442317
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2318+
let sweep_tx_2 = wait_for_sweep_tx();
22452319
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2246-
let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22472320
match tracked_output.status {
22482321
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22492322
assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
@@ -2322,6 +2395,7 @@ mod tests {
23222395
Some(nodes[0].messenger.clone()),
23232396
nodes[0].no_gossip_sync(),
23242397
nodes[0].peer_manager.clone(),
2398+
Some(nodes[0].sweeper.clone()),
23252399
nodes[0].logger.clone(),
23262400
Some(nodes[0].scorer.clone()),
23272401
);
@@ -2351,6 +2425,7 @@ mod tests {
23512425
Some(nodes[0].messenger.clone()),
23522426
nodes[0].no_gossip_sync(),
23532427
nodes[0].peer_manager.clone(),
2428+
Some(nodes[0].sweeper.clone()),
23542429
nodes[0].logger.clone(),
23552430
Some(nodes[0].scorer.clone()),
23562431
);
@@ -2446,6 +2521,7 @@ mod tests {
24462521
Some(nodes[0].messenger.clone()),
24472522
nodes[0].rapid_gossip_sync(),
24482523
nodes[0].peer_manager.clone(),
2524+
Some(nodes[0].sweeper.clone()),
24492525
nodes[0].logger.clone(),
24502526
Some(nodes[0].scorer.clone()),
24512527
);
@@ -2478,6 +2554,7 @@ mod tests {
24782554
Some(nodes[0].messenger.clone()),
24792555
nodes[0].rapid_gossip_sync(),
24802556
nodes[0].peer_manager.clone(),
2557+
Some(nodes[0].sweeper.clone()),
24812558
nodes[0].logger.clone(),
24822559
Some(nodes[0].scorer.clone()),
24832560
move |dur: Duration| {
@@ -2640,6 +2717,7 @@ mod tests {
26402717
Some(nodes[0].messenger.clone()),
26412718
nodes[0].no_gossip_sync(),
26422719
nodes[0].peer_manager.clone(),
2720+
Some(nodes[0].sweeper.clone()),
26432721
nodes[0].logger.clone(),
26442722
Some(nodes[0].scorer.clone()),
26452723
);
@@ -2690,6 +2768,7 @@ mod tests {
26902768
Some(nodes[0].messenger.clone()),
26912769
nodes[0].no_gossip_sync(),
26922770
nodes[0].peer_manager.clone(),
2771+
Some(nodes[0].sweeper.clone()),
26932772
nodes[0].logger.clone(),
26942773
Some(nodes[0].scorer.clone()),
26952774
move |dur: Duration| {

0 commit comments

Comments
 (0)