Skip to content

Commit 4c342bd

Browse files
authored
Merge pull request #2369 from TheBlueMatt/2023-06-mon-event-less-race
Don't drop ChannelMonitor Events until they're processed
2 parents dba3e8f + 4206e71 commit 4c342bd

File tree

2 files changed

+89
-30
lines changed

2 files changed

+89
-30
lines changed

lightning/src/chain/chainmonitor.rs

+8-11
Original file line numberDiff line numberDiff line change
@@ -520,12 +520,13 @@ where C::Target: chain::Filter,
520520
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
521521
&self, handler: H
522522
) {
523-
let mut pending_events = Vec::new();
524-
for monitor_state in self.monitors.read().unwrap().values() {
525-
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
526-
}
527-
for event in pending_events {
528-
handler(event).await;
523+
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
524+
// crazy dance to process a monitor's events then only remove them once we've done so.
525+
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
526+
for funding_txo in mons_to_process {
527+
let mut ev;
528+
super::channelmonitor::process_events_body!(
529+
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
529530
}
530531
}
531532

@@ -796,12 +797,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L
796797
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
797798
/// [`BumpTransaction`]: events::Event::BumpTransaction
798799
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
799-
let mut pending_events = Vec::new();
800800
for monitor_state in self.monitors.read().unwrap().values() {
801-
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
802-
}
803-
for event in pending_events {
804-
handler.handle_event(event);
801+
monitor_state.monitor.process_pending_events(&handler);
805802
}
806803
}
807804
}

lightning/src/chain/channelmonitor.rs

+81-19
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::chain::Filter;
4949
use crate::util::logger::Logger;
5050
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
5151
use crate::util::byte_utils;
52-
use crate::events::Event;
52+
use crate::events::{Event, EventHandler};
5353
use crate::events::bump_transaction::{AnchorDescriptor, HTLCDescriptor, BumpTransactionEvent};
5454

5555
use crate::prelude::*;
@@ -738,11 +738,6 @@ impl Readable for IrrevocablyResolvedHTLC {
738738
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
739739
/// information and are actively monitoring the chain.
740740
///
741-
/// Pending Events or updated HTLCs which have not yet been read out by
742-
/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
743-
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
744-
/// gotten are fully handled before re-serializing the new state.
745-
///
746741
/// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which
747742
/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
748743
/// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the
@@ -752,7 +747,7 @@ pub struct ChannelMonitor<Signer: WriteableEcdsaChannelSigner> {
752747
#[cfg(test)]
753748
pub(crate) inner: Mutex<ChannelMonitorImpl<Signer>>,
754749
#[cfg(not(test))]
755-
inner: Mutex<ChannelMonitorImpl<Signer>>,
750+
pub(super) inner: Mutex<ChannelMonitorImpl<Signer>>,
756751
}
757752

758753
#[derive(PartialEq)]
@@ -829,7 +824,8 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
829824
// we further MUST NOT generate events during block/transaction-disconnection.
830825
pending_monitor_events: Vec<MonitorEvent>,
831826

832-
pending_events: Vec<Event>,
827+
pub(super) pending_events: Vec<Event>,
828+
pub(super) is_processing_pending_events: bool,
833829

834830
// Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
835831
// which to take actions once they reach enough confirmations. Each entry includes the
@@ -1088,6 +1084,42 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
10881084
}
10891085
}
10901086

1087+
macro_rules! _process_events_body {
1088+
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
1089+
loop {
1090+
let (pending_events, repeated_events);
1091+
if let Some(us) = $self_opt {
1092+
let mut inner = us.inner.lock().unwrap();
1093+
if inner.is_processing_pending_events {
1094+
break;
1095+
}
1096+
inner.is_processing_pending_events = true;
1097+
1098+
pending_events = inner.pending_events.clone();
1099+
repeated_events = inner.get_repeated_events();
1100+
} else { break; }
1101+
let num_events = pending_events.len();
1102+
1103+
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1104+
$event_to_handle = event;
1105+
$handle_event;
1106+
}
1107+
1108+
if let Some(us) = $self_opt {
1109+
let mut inner = us.inner.lock().unwrap();
1110+
inner.pending_events.drain(..num_events);
1111+
inner.is_processing_pending_events = false;
1112+
if !inner.pending_events.is_empty() {
1113+
// If there's more events to process, go ahead and do so.
1114+
continue;
1115+
}
1116+
}
1117+
break;
1118+
}
1119+
}
1120+
}
1121+
pub(super) use _process_events_body as process_events_body;
1122+
10911123
impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
10921124
/// For lockorder enforcement purposes, we need to have a single site which constructs the
10931125
/// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
@@ -1179,6 +1211,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
11791211
payment_preimages: HashMap::new(),
11801212
pending_monitor_events: Vec::new(),
11811213
pending_events: Vec::new(),
1214+
is_processing_pending_events: false,
11821215

11831216
onchain_events_awaiting_threshold_conf: Vec::new(),
11841217
outputs_to_watch,
@@ -1306,16 +1339,41 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
13061339
self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
13071340
}
13081341

1309-
/// Gets the list of pending events which were generated by previous actions, clearing the list
1310-
/// in the process.
1342+
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
1343+
///
1344+
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
1345+
/// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
1346+
/// within each channel. As the confirmation of a commitment transaction may be critical to the
1347+
/// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
1348+
/// environment with spotty connections, like on mobile.
13111349
///
1312-
/// This is called by the [`EventsProvider::process_pending_events`] implementation for
1313-
/// [`ChainMonitor`].
1350+
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
1351+
/// order to handle these events.
1352+
///
1353+
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
1354+
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
1355+
pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
1356+
let mut ev;
1357+
process_events_body!(Some(self), ev, handler.handle_event(ev));
1358+
}
1359+
1360+
/// Processes any events asynchronously.
13141361
///
1315-
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1316-
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
1362+
/// See [`Self::process_pending_events`] for more information.
1363+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1364+
&self, handler: &H
1365+
) {
1366+
let mut ev;
1367+
process_events_body!(Some(self), ev, { handler(ev).await });
1368+
}
1369+
1370+
#[cfg(test)]
13171371
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
1318-
self.inner.lock().unwrap().get_and_clear_pending_events()
1372+
let mut ret = Vec::new();
1373+
let mut lck = self.inner.lock().unwrap();
1374+
mem::swap(&mut ret, &mut lck.pending_events);
1375+
ret.append(&mut lck.get_repeated_events());
1376+
ret
13191377
}
13201378

13211379
pub(crate) fn get_min_seen_secret(&self) -> u64 {
@@ -2531,10 +2589,13 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
25312589
ret
25322590
}
25332591

2534-
pub fn get_and_clear_pending_events(&mut self) -> Vec<Event> {
2535-
let mut ret = Vec::new();
2536-
mem::swap(&mut ret, &mut self.pending_events);
2537-
for (claim_id, claim_event) in self.onchain_tx_handler.get_and_clear_pending_claim_events().drain(..) {
2592+
/// Gets the set of events that are repeated regularly (e.g. those which RBF bump
2593+
/// transactions). We're okay if we lose these on restart as they'll be regenerated for us at
2594+
/// some regular interval via [`ChannelMonitor::rebroadcast_pending_claims`].
2595+
pub(super) fn get_repeated_events(&mut self) -> Vec<Event> {
2596+
let pending_claim_events = self.onchain_tx_handler.get_and_clear_pending_claim_events();
2597+
let mut ret = Vec::with_capacity(pending_claim_events.len());
2598+
for (claim_id, claim_event) in pending_claim_events {
25382599
match claim_event {
25392600
ClaimEvent::BumpCommitment {
25402601
package_target_feerate_sat_per_1000_weight, commitment_tx, anchor_output_idx,
@@ -4096,6 +4157,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
40964157
payment_preimages,
40974158
pending_monitor_events: pending_monitor_events.unwrap(),
40984159
pending_events,
4160+
is_processing_pending_events: false,
40994161

41004162
onchain_events_awaiting_threshold_conf,
41014163
outputs_to_watch,

0 commit comments

Comments
 (0)