Skip to content

Commit a978076

Browse files
authored
Merge pull request #3276 from arik-so/arik/2024/08/apply_monitor_updates_on_archive
Process updates before archiving monitors.
2 parents b706480 + 47c8aa5 commit a978076

File tree

1 file changed

+47
-46
lines changed

1 file changed

+47
-46
lines changed

lightning/src/util/persist.rs

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -400,28 +400,34 @@ where
400400
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
401401
/// would like to get rid of them, consider using the
402402
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
403-
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
403+
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
404404
where
405405
K::Target: KVStore,
406406
L::Target: Logger,
407407
ES::Target: EntropySource + Sized,
408408
SP::Target: SignerProvider + Sized,
409+
BI::Target: BroadcasterInterface,
410+
FE::Target: FeeEstimator
409411
{
410412
kv_store: K,
411413
logger: L,
412414
maximum_pending_updates: u64,
413415
entropy_source: ES,
414416
signer_provider: SP,
417+
broadcaster: BI,
418+
fee_estimator: FE
415419
}
416420

417421
#[allow(dead_code)]
418-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
419-
MonitorUpdatingPersister<K, L, ES, SP>
422+
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
423+
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
420424
where
421425
K::Target: KVStore,
422426
L::Target: Logger,
423427
ES::Target: EntropySource + Sized,
424428
SP::Target: SignerProvider + Sized,
429+
BI::Target: BroadcasterInterface,
430+
FE::Target: FeeEstimator
425431
{
426432
/// Constructs a new [`MonitorUpdatingPersister`].
427433
///
@@ -441,14 +447,16 @@ where
441447
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
442448
pub fn new(
443449
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
444-
signer_provider: SP,
450+
signer_provider: SP, broadcaster: BI, fee_estimator: FE
445451
) -> Self {
446452
MonitorUpdatingPersister {
447453
kv_store,
448454
logger,
449455
maximum_pending_updates,
450456
entropy_source,
451457
signer_provider,
458+
broadcaster,
459+
fee_estimator
452460
}
453461
}
454462

@@ -457,24 +465,14 @@ where
457465
/// It is extremely important that your [`KVStore::read`] implementation uses the
458466
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
459467
/// documentation for [`MonitorUpdatingPersister`].
460-
pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>(
461-
&self, broadcaster: &B, fee_estimator: &F,
462-
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
463-
where
464-
B::Target: BroadcasterInterface,
465-
F::Target: FeeEstimator,
466-
{
468+
pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
467469
let monitor_list = self.kv_store.list(
468470
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
469471
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
470472
)?;
471473
let mut res = Vec::with_capacity(monitor_list.len());
472474
for monitor_key in monitor_list {
473-
res.push(self.read_channel_monitor_with_updates(
474-
broadcaster,
475-
fee_estimator,
476-
monitor_key,
477-
)?)
475+
res.push(self.read_channel_monitor_with_updates(monitor_key)?)
478476
}
479477
Ok(res)
480478
}
@@ -496,13 +494,9 @@ where
496494
///
497495
/// Loading a large number of monitors will be faster if done in parallel. You can use this
498496
/// function to accomplish this. Take care to limit the number of parallel readers.
499-
pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>(
500-
&self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
501-
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
502-
where
503-
B::Target: BroadcasterInterface,
504-
F::Target: FeeEstimator,
505-
{
497+
pub fn read_channel_monitor_with_updates(
498+
&self, monitor_key: String,
499+
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
506500
let monitor_name = MonitorName::new(monitor_key)?;
507501
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
508502
let mut current_update_id = monitor.get_latest_update_id();
@@ -521,7 +515,7 @@ where
521515
Err(err) => return Err(err),
522516
};
523517

524-
monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger)
518+
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
525519
.map_err(|e| {
526520
log_error!(
527521
self.logger,
@@ -639,13 +633,15 @@ where
639633
}
640634
}
641635

642-
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
643-
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
636+
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
637+
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
644638
where
645639
K::Target: KVStore,
646640
L::Target: Logger,
647641
ES::Target: EntropySource + Sized,
648642
SP::Target: SignerProvider + Sized,
643+
BI::Target: BroadcasterInterface,
644+
FE::Target: FeeEstimator
649645
{
650646
/// Persists a new channel. This means writing the entire monitor to the
651647
/// parametrized [`KVStore`].
@@ -766,17 +762,18 @@ where
766762

767763
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
768764
let monitor_name = MonitorName::from(funding_txo);
769-
let monitor = match self.read_monitor(&monitor_name) {
765+
let monitor_key = monitor_name.as_str().to_string();
766+
let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
770767
Ok((_block_hash, monitor)) => monitor,
771768
Err(_) => return
772769
};
773770
match self.kv_store.write(
774771
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
775772
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
776773
monitor_name.as_str(),
777-
&monitor.encode()
774+
&monitor.encode(),
778775
) {
779-
Ok(()) => {},
776+
Ok(()) => {}
780777
Err(_e) => return,
781778
};
782779
let _ = self.kv_store.remove(
@@ -788,12 +785,14 @@ where
788785
}
789786
}
790787

791-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
788+
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
792789
where
793790
ES::Target: EntropySource + Sized,
794791
K::Target: KVStore,
795792
L::Target: Logger,
796-
SP::Target: SignerProvider + Sized
793+
SP::Target: SignerProvider + Sized,
794+
BI::Target: BroadcasterInterface,
795+
FE::Target: FeeEstimator
797796
{
798797
// Cleans up monitor updates for given monitor in range `start..=end`.
799798
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
@@ -962,13 +961,17 @@ mod tests {
962961
maximum_pending_updates: persister_0_max_pending_updates,
963962
entropy_source: &chanmon_cfgs[0].keys_manager,
964963
signer_provider: &chanmon_cfgs[0].keys_manager,
964+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
965+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
965966
};
966967
let persister_1 = MonitorUpdatingPersister {
967968
kv_store: &TestStore::new(false),
968969
logger: &TestLogger::new(),
969970
maximum_pending_updates: persister_1_max_pending_updates,
970971
entropy_source: &chanmon_cfgs[1].keys_manager,
971972
signer_provider: &chanmon_cfgs[1].keys_manager,
973+
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
974+
fee_estimator: &chanmon_cfgs[1].fee_estimator,
972975
};
973976
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
974977
let chain_mon_0 = test_utils::TestChainMonitor::new(
@@ -991,23 +994,18 @@ mod tests {
991994
node_cfgs[1].chain_monitor = chain_mon_1;
992995
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
993996
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
994-
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
995-
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
996997

997998
// Check that the persisted channel data is empty before any channels are
998999
// open.
999-
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
1000-
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1000+
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
10011001
assert_eq!(persisted_chan_data_0.len(), 0);
1002-
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
1003-
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
1002+
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
10041003
assert_eq!(persisted_chan_data_1.len(), 0);
10051004

10061005
// Helper to make sure the channel is on the expected update ID.
10071006
macro_rules! check_persisted_data {
10081007
($expected_update_id: expr) => {
1009-
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
1010-
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1008+
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
10111009
// check that we stored only one monitor
10121010
assert_eq!(persisted_chan_data_0.len(), 1);
10131011
for (_, mon) in persisted_chan_data_0.iter() {
@@ -1026,8 +1024,7 @@ mod tests {
10261024
);
10271025
}
10281026
}
1029-
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
1030-
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
1027+
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
10311028
assert_eq!(persisted_chan_data_1.len(), 1);
10321029
for (_, mon) in persisted_chan_data_1.iter() {
10331030
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
@@ -1095,7 +1092,7 @@ mod tests {
10951092
check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
10961093

10971094
// Make sure the expected number of stale updates is present.
1098-
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1095+
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
10991096
let (_, monitor) = &persisted_chan_data[0];
11001097
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
11011098
// The channel should have 0 updates, as it wrote a full monitor and consolidated.
@@ -1129,6 +1126,8 @@ mod tests {
11291126
maximum_pending_updates: 11,
11301127
entropy_source: node_cfgs[0].keys_manager,
11311128
signer_provider: node_cfgs[0].keys_manager,
1129+
broadcaster: node_cfgs[0].tx_broadcaster,
1130+
fee_estimator: node_cfgs[0].fee_estimator,
11321131
};
11331132
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
11341133
ChannelMonitorUpdateStatus::UnrecoverableError => {
@@ -1168,13 +1167,17 @@ mod tests {
11681167
maximum_pending_updates: test_max_pending_updates,
11691168
entropy_source: &chanmon_cfgs[0].keys_manager,
11701169
signer_provider: &chanmon_cfgs[0].keys_manager,
1170+
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
1171+
fee_estimator: &chanmon_cfgs[0].fee_estimator,
11711172
};
11721173
let persister_1 = MonitorUpdatingPersister {
11731174
kv_store: &TestStore::new(false),
11741175
logger: &TestLogger::new(),
11751176
maximum_pending_updates: test_max_pending_updates,
11761177
entropy_source: &chanmon_cfgs[1].keys_manager,
11771178
signer_provider: &chanmon_cfgs[1].keys_manager,
1179+
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
1180+
fee_estimator: &chanmon_cfgs[1].fee_estimator,
11781181
};
11791182
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
11801183
let chain_mon_0 = test_utils::TestChainMonitor::new(
@@ -1198,11 +1201,9 @@ mod tests {
11981201
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
11991202
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
12001203

1201-
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
1202-
12031204
// Check that the persisted channel data is empty before any channels are
12041205
// open.
1205-
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1206+
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
12061207
assert_eq!(persisted_chan_data.len(), 0);
12071208

12081209
// Create some initial channel
@@ -1213,7 +1214,7 @@ mod tests {
12131214
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
12141215

12151216
// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
1216-
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1217+
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
12171218
let (_, monitor) = &persisted_chan_data[0];
12181219
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
12191220
persister_0

0 commit comments

Comments
 (0)