@@ -400,28 +400,34 @@ where
400
400
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
401
401
/// would like to get rid of them, consider using the
402
402
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
403
- pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref >
403
+ pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
404
404
where
405
405
K :: Target : KVStore ,
406
406
L :: Target : Logger ,
407
407
ES :: Target : EntropySource + Sized ,
408
408
SP :: Target : SignerProvider + Sized ,
409
+ BI :: Target : BroadcasterInterface ,
410
+ FE :: Target : FeeEstimator
409
411
{
410
412
kv_store : K ,
411
413
logger : L ,
412
414
maximum_pending_updates : u64 ,
413
415
entropy_source : ES ,
414
416
signer_provider : SP ,
417
+ broadcaster : BI ,
418
+ fee_estimator : FE
415
419
}
416
420
417
421
#[ allow( dead_code) ]
418
- impl < K : Deref , L : Deref , ES : Deref , SP : Deref >
419
- MonitorUpdatingPersister < K , L , ES , SP >
422
+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
423
+ MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
420
424
where
421
425
K :: Target : KVStore ,
422
426
L :: Target : Logger ,
423
427
ES :: Target : EntropySource + Sized ,
424
428
SP :: Target : SignerProvider + Sized ,
429
+ BI :: Target : BroadcasterInterface ,
430
+ FE :: Target : FeeEstimator
425
431
{
426
432
/// Constructs a new [`MonitorUpdatingPersister`].
427
433
///
@@ -441,14 +447,16 @@ where
441
447
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
442
448
pub fn new (
443
449
kv_store : K , logger : L , maximum_pending_updates : u64 , entropy_source : ES ,
444
- signer_provider : SP ,
450
+ signer_provider : SP , broadcaster : BI , fee_estimator : FE
445
451
) -> Self {
446
452
MonitorUpdatingPersister {
447
453
kv_store,
448
454
logger,
449
455
maximum_pending_updates,
450
456
entropy_source,
451
457
signer_provider,
458
+ broadcaster,
459
+ fee_estimator
452
460
}
453
461
}
454
462
@@ -457,24 +465,14 @@ where
457
465
/// It is extremely important that your [`KVStore::read`] implementation uses the
458
466
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
459
467
/// documentation for [`MonitorUpdatingPersister`].
460
- pub fn read_all_channel_monitors_with_updates < B : Deref , F : Deref > (
461
- & self , broadcaster : & B , fee_estimator : & F ,
462
- ) -> Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > , io:: Error >
463
- where
464
- B :: Target : BroadcasterInterface ,
465
- F :: Target : FeeEstimator ,
466
- {
468
+ pub fn read_all_channel_monitors_with_updates ( & self ) -> Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > , io:: Error > {
467
469
let monitor_list = self . kv_store . list (
468
470
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
469
471
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
470
472
) ?;
471
473
let mut res = Vec :: with_capacity ( monitor_list. len ( ) ) ;
472
474
for monitor_key in monitor_list {
473
- res. push ( self . read_channel_monitor_with_updates (
474
- broadcaster,
475
- fee_estimator,
476
- monitor_key,
477
- ) ?)
475
+ res. push ( self . read_channel_monitor_with_updates ( monitor_key) ?)
478
476
}
479
477
Ok ( res)
480
478
}
@@ -496,13 +494,9 @@ where
496
494
///
497
495
/// Loading a large number of monitors will be faster if done in parallel. You can use this
498
496
/// function to accomplish this. Take care to limit the number of parallel readers.
499
- pub fn read_channel_monitor_with_updates < B : Deref , F : Deref > (
500
- & self , broadcaster : & B , fee_estimator : & F , monitor_key : String ,
501
- ) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error >
502
- where
503
- B :: Target : BroadcasterInterface ,
504
- F :: Target : FeeEstimator ,
505
- {
497
+ pub fn read_channel_monitor_with_updates (
498
+ & self , monitor_key : String ,
499
+ ) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error > {
506
500
let monitor_name = MonitorName :: new ( monitor_key) ?;
507
501
let ( block_hash, monitor) = self . read_monitor ( & monitor_name) ?;
508
502
let mut current_update_id = monitor. get_latest_update_id ( ) ;
@@ -521,7 +515,7 @@ where
521
515
Err ( err) => return Err ( err) ,
522
516
} ;
523
517
524
- monitor. update_monitor ( & update, broadcaster, fee_estimator, & self . logger )
518
+ monitor. update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
525
519
. map_err ( |e| {
526
520
log_error ! (
527
521
self . logger,
@@ -639,13 +633,15 @@ where
639
633
}
640
634
}
641
635
642
- impl < ChannelSigner : EcdsaChannelSigner , K : Deref , L : Deref , ES : Deref , SP : Deref >
643
- Persist < ChannelSigner > for MonitorUpdatingPersister < K , L , ES , SP >
636
+ impl < ChannelSigner : EcdsaChannelSigner , K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
637
+ Persist < ChannelSigner > for MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
644
638
where
645
639
K :: Target : KVStore ,
646
640
L :: Target : Logger ,
647
641
ES :: Target : EntropySource + Sized ,
648
642
SP :: Target : SignerProvider + Sized ,
643
+ BI :: Target : BroadcasterInterface ,
644
+ FE :: Target : FeeEstimator
649
645
{
650
646
/// Persists a new channel. This means writing the entire monitor to the
651
647
/// parametrized [`KVStore`].
@@ -788,12 +784,14 @@ where
788
784
}
789
785
}
790
786
791
- impl < K : Deref , L : Deref , ES : Deref , SP : Deref > MonitorUpdatingPersister < K , L , ES , SP >
787
+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref > MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
792
788
where
793
789
ES :: Target : EntropySource + Sized ,
794
790
K :: Target : KVStore ,
795
791
L :: Target : Logger ,
796
- SP :: Target : SignerProvider + Sized
792
+ SP :: Target : SignerProvider + Sized ,
793
+ BI :: Target : BroadcasterInterface ,
794
+ FE :: Target : FeeEstimator
797
795
{
798
796
// Cleans up monitor updates for given monitor in range `start..=end`.
799
797
fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
@@ -962,13 +960,17 @@ mod tests {
962
960
maximum_pending_updates : persister_0_max_pending_updates,
963
961
entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
964
962
signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
963
+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
964
+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
965
965
} ;
966
966
let persister_1 = MonitorUpdatingPersister {
967
967
kv_store : & TestStore :: new ( false ) ,
968
968
logger : & TestLogger :: new ( ) ,
969
969
maximum_pending_updates : persister_1_max_pending_updates,
970
970
entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
971
971
signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
972
+ broadcaster : & chanmon_cfgs[ 1 ] . tx_broadcaster ,
973
+ fee_estimator : & chanmon_cfgs[ 1 ] . fee_estimator ,
972
974
} ;
973
975
let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
974
976
let chain_mon_0 = test_utils:: TestChainMonitor :: new (
@@ -991,23 +993,18 @@ mod tests {
991
993
node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
992
994
let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
993
995
let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
994
- let broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ;
995
- let broadcaster_1 = & chanmon_cfgs[ 3 ] . tx_broadcaster ;
996
996
997
997
// Check that the persisted channel data is empty before any channels are
998
998
// open.
999
- let mut persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates (
1000
- & broadcaster_0, & & chanmon_cfgs[ 0 ] . fee_estimator ) . unwrap ( ) ;
999
+ let mut persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
1001
1000
assert_eq ! ( persisted_chan_data_0. len( ) , 0 ) ;
1002
- let mut persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates (
1003
- & broadcaster_1, & & chanmon_cfgs[ 1 ] . fee_estimator ) . unwrap ( ) ;
1001
+ let mut persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
1004
1002
assert_eq ! ( persisted_chan_data_1. len( ) , 0 ) ;
1005
1003
1006
1004
// Helper to make sure the channel is on the expected update ID.
1007
1005
macro_rules! check_persisted_data {
1008
1006
( $expected_update_id: expr) => {
1009
- persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates(
1010
- & broadcaster_0, &&chanmon_cfgs[ 0 ] . fee_estimator) . unwrap( ) ;
1007
+ persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
1011
1008
// check that we stored only one monitor
1012
1009
assert_eq!( persisted_chan_data_0. len( ) , 1 ) ;
1013
1010
for ( _, mon) in persisted_chan_data_0. iter( ) {
@@ -1026,8 +1023,7 @@ mod tests {
1026
1023
) ;
1027
1024
}
1028
1025
}
1029
- persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates(
1030
- & broadcaster_1, &&chanmon_cfgs[ 1 ] . fee_estimator) . unwrap( ) ;
1026
+ persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
1031
1027
assert_eq!( persisted_chan_data_1. len( ) , 1 ) ;
1032
1028
for ( _, mon) in persisted_chan_data_1. iter( ) {
1033
1029
assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
@@ -1095,7 +1091,7 @@ mod tests {
1095
1091
check_persisted_data ! ( CLOSED_CHANNEL_UPDATE_ID ) ;
1096
1092
1097
1093
// Make sure the expected number of stale updates is present.
1098
- let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( & broadcaster_0 , & & chanmon_cfgs [ 0 ] . fee_estimator ) . unwrap ( ) ;
1094
+ let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
1099
1095
let ( _, monitor) = & persisted_chan_data[ 0 ] ;
1100
1096
let monitor_name = MonitorName :: from ( monitor. get_funding_txo ( ) . 0 ) ;
1101
1097
// The channel should have 0 updates, as it wrote a full monitor and consolidated.
@@ -1129,6 +1125,8 @@ mod tests {
1129
1125
maximum_pending_updates : 11 ,
1130
1126
entropy_source : node_cfgs[ 0 ] . keys_manager ,
1131
1127
signer_provider : node_cfgs[ 0 ] . keys_manager ,
1128
+ broadcaster : node_cfgs[ 0 ] . tx_broadcaster ,
1129
+ fee_estimator : node_cfgs[ 0 ] . fee_estimator ,
1132
1130
} ;
1133
1131
match ro_persister. persist_new_channel ( test_txo, & added_monitors[ 0 ] . 1 ) {
1134
1132
ChannelMonitorUpdateStatus :: UnrecoverableError => {
@@ -1168,13 +1166,17 @@ mod tests {
1168
1166
maximum_pending_updates : test_max_pending_updates,
1169
1167
entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
1170
1168
signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
1169
+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
1170
+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
1171
1171
} ;
1172
1172
let persister_1 = MonitorUpdatingPersister {
1173
1173
kv_store : & TestStore :: new ( false ) ,
1174
1174
logger : & TestLogger :: new ( ) ,
1175
1175
maximum_pending_updates : test_max_pending_updates,
1176
1176
entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
1177
1177
signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
1178
+ broadcaster : & chanmon_cfgs[ 1 ] . tx_broadcaster ,
1179
+ fee_estimator : & chanmon_cfgs[ 1 ] . fee_estimator ,
1178
1180
} ;
1179
1181
let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
1180
1182
let chain_mon_0 = test_utils:: TestChainMonitor :: new (
@@ -1198,11 +1200,9 @@ mod tests {
1198
1200
let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
1199
1201
let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
1200
1202
1201
- let broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ;
1202
-
1203
1203
// Check that the persisted channel data is empty before any channels are
1204
1204
// open.
1205
- let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( & broadcaster_0 , & & chanmon_cfgs [ 0 ] . fee_estimator ) . unwrap ( ) ;
1205
+ let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
1206
1206
assert_eq ! ( persisted_chan_data. len( ) , 0 ) ;
1207
1207
1208
1208
// Create some initial channel
@@ -1213,7 +1213,7 @@ mod tests {
1213
1213
send_payment ( & nodes[ 1 ] , & vec ! [ & nodes[ 0 ] ] [ ..] , 4_000_000 ) ;
1214
1214
1215
1215
// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
1216
- let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( & broadcaster_0 , & & chanmon_cfgs [ 0 ] . fee_estimator ) . unwrap ( ) ;
1216
+ let persisted_chan_data = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
1217
1217
let ( _, monitor) = & persisted_chan_data[ 0 ] ;
1218
1218
let monitor_name = MonitorName :: from ( monitor. get_funding_txo ( ) . 0 ) ;
1219
1219
persister_0
0 commit comments