@@ -763,7 +763,7 @@ pub async fn process_events_async<
763
763
G ,
764
764
L ,
765
765
P ,
766
- EH ,
766
+ EventHandler ,
767
767
PS ,
768
768
M ,
769
769
CM ,
@@ -818,31 +818,41 @@ where
818
818
event_handler ( event) . await
819
819
} )
820
820
} ;
821
+ // We should extract these out of config because the macro expects individual arguments
822
+ let persister = config. persister ;
823
+ let chain_monitor = config. chain_monitor ;
824
+ let channel_manager = config. channel_manager ;
825
+ let onion_messenger = config. onion_messenger ;
826
+ let peer_manager = config. peer_manager ;
827
+ let gossip_sync = config. gossip_sync ;
828
+ let logger = config. logger ;
829
+ let scorer = config. scorer ;
830
+
821
831
define_run_body ! (
822
- config . persister,
823
- config . chain_monitor,
824
- config . chain_monitor. process_pending_events_async( async_event_handler) . await ,
825
- config . channel_manager,
826
- config . channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
827
- config . onion_messenger,
828
- if let Some ( om) = & config . onion_messenger {
832
+ persister,
833
+ chain_monitor,
834
+ chain_monitor. process_pending_events_async( async_event_handler) . await ,
835
+ channel_manager,
836
+ channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
837
+ onion_messenger,
838
+ if let Some ( om) = & onion_messenger {
829
839
om. get_om( ) . process_pending_events_async( async_event_handler) . await
830
840
} ,
831
- config . peer_manager,
832
- config . gossip_sync,
833
- config . logger,
834
- config . scorer,
841
+ peer_manager,
842
+ gossip_sync,
843
+ logger,
844
+ scorer,
835
845
should_break,
836
846
{
837
- let om_fut = if let Some ( om) = config . onion_messenger. as_ref( ) {
847
+ let om_fut = if let Some ( om) = onion_messenger. as_ref( ) {
838
848
let fut = om. get_om( ) . get_update_future( ) ;
839
849
OptionalSelector { optional_future: Some ( fut) }
840
850
} else {
841
851
OptionalSelector { optional_future: None }
842
852
} ;
843
853
let fut = Selector {
844
- a: config . channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
845
- b: config . chain_monitor. get_update_future( ) ,
854
+ a: channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
855
+ b: chain_monitor. get_update_future( ) ,
846
856
c: om_fut,
847
857
d: sleeper( if mobile_interruptable_platform {
848
858
Duration :: from_millis( 100 )
@@ -1036,18 +1046,18 @@ impl BackgroundProcessor {
1036
1046
/// # Example
1037
1047
/// ```
1038
1048
/// # use lightning_background_processor::*;
1039
- /// let config = BackgroundProcessorConfigBuilder::new(
1049
+ /// let mut builder = BackgroundProcessorConfigBuilder::new(
1040
1050
/// persister,
1041
1051
/// event_handler,
1042
1052
/// chain_monitor,
1043
1053
/// channel_manager,
1044
1054
/// gossip_sync,
1045
1055
/// peer_manager,
1046
1056
/// logger
1047
- /// )
1048
- /// .with_onion_messenger(messenger)
1049
- /// .with_scorer(scorer)
1050
- /// .build();
1057
+ /// );
1058
+ /// builder .with_onion_messenger(messenger);
1059
+ /// .with_scorer(scorer);
1060
+ /// let config = builder .build();
1051
1061
/// let bg_processor = BackgroundProcessor::from_config(config);
1052
1062
/// ```
1053
1063
pub fn from_config <
@@ -1186,18 +1196,18 @@ impl BackgroundProcessor {
1186
1196
/// # Example
1187
1197
/// ```
1188
1198
/// # use lightning_background_processor::*;
1189
- /// let config = BackgroundProcessorConfigBuilder::new(
1199
+ /// let mut builder = BackgroundProcessorConfigBuilder::new(
1190
1200
/// persister,
1191
1201
/// event_handler,
1192
1202
/// chain_monitor,
1193
1203
/// channel_manager,
1194
1204
/// gossip_sync,
1195
1205
/// peer_manager,
1196
1206
/// logger
1197
- /// )
1198
- /// .with_onion_messenger(messenger) // Optional
1199
- /// .with_scorer(scorer) // Optional
1200
- /// .build();
1207
+ /// );
1208
+ /// builder .with_onion_messenger(messenger); // Optional
1209
+ /// .with_scorer(scorer); // Optional
1210
+ /// let config = builder .build();
1201
1211
///
1202
1212
/// // Use with BackgroundProcessor
1203
1213
/// let processor = BackgroundProcessor::from_config(config);
@@ -1209,7 +1219,7 @@ impl BackgroundProcessor {
1209
1219
/// process_events_async(config, sleeper, mobile_interruptable_platform, fetch_time).await?;"
1210
1220
) ]
1211
1221
/// ```
1212
- #[ cfg( feature = "std" ) ]
1222
+ #[ cfg( any ( feature = "std" , feature = "futures" ) ) ]
1213
1223
pub struct BackgroundProcessorConfig <
1214
1224
' a ,
1215
1225
UL : ' static + Deref + Send + Sync ,
@@ -1219,7 +1229,8 @@ pub struct BackgroundProcessorConfig<
1219
1229
G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1220
1230
L : ' static + Deref + Send + Sync ,
1221
1231
P : ' static + Deref + Send + Sync ,
1222
- EH : ' static + EventHandler + Send ,
1232
+ #[ cfg( feature = "std" ) ] EH : ' static + EventHandler + Send ,
1233
+ #[ cfg( feature = "futures" ) ] EH : ' static + Fn ( Event ) -> core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
1223
1234
PS : ' static + Deref + Send ,
1224
1235
M : ' static
1225
1236
+ Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
@@ -1346,7 +1357,7 @@ where
1346
1357
PM :: Target : APeerManager + Send + Sync ,
1347
1358
{
1348
1359
/// Creates a new builder instance.
1349
- pub ( crate ) fn new (
1360
+ pub fn new (
1350
1361
persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1351
1362
gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L ,
1352
1363
) -> Self {
@@ -1405,7 +1416,9 @@ impl Drop for BackgroundProcessor {
1405
1416
1406
1417
#[ cfg( all( feature = "std" , test) ) ]
1407
1418
mod tests {
1408
- use super :: { BackgroundProcessor , GossipSync , FRESHNESS_TIMER } ;
1419
+ use super :: {
1420
+ BackgroundProcessor , BackgroundProcessorConfigBuilder , GossipSync , FRESHNESS_TIMER ,
1421
+ } ;
1409
1422
use bitcoin:: constants:: { genesis_block, ChainHash } ;
1410
1423
use bitcoin:: hashes:: Hash ;
1411
1424
use bitcoin:: locktime:: absolute:: LockTime ;
@@ -2338,18 +2351,19 @@ mod tests {
2338
2351
Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
2339
2352
) ;
2340
2353
2341
- let config = BackgroundProcessorConfigBuilder :: new (
2354
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
2342
2355
persister,
2343
2356
|_: _ | async { Ok ( ( ) ) } ,
2344
2357
nodes[ 0 ] . chain_monitor . clone ( ) ,
2345
2358
nodes[ 0 ] . node . clone ( ) ,
2346
2359
nodes[ 0 ] . rapid_gossip_sync ( ) ,
2347
2360
nodes[ 0 ] . peer_manager . clone ( ) ,
2348
2361
nodes[ 0 ] . logger . clone ( ) ,
2349
- )
2350
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2351
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2352
- . build ( ) ;
2362
+ ) ;
2363
+ builder
2364
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2365
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
2366
+ let config = builder. build ( ) ;
2353
2367
2354
2368
let bp_future = super :: process_events_async (
2355
2369
config,
@@ -2822,18 +2836,19 @@ mod tests {
2822
2836
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2823
2837
let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2824
2838
2825
- let config = BackgroundProcessorConfigBuilder :: new (
2839
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
2826
2840
persister,
2827
2841
|_: _ | async { Ok ( ( ) ) } ,
2828
2842
nodes[ 0 ] . chain_monitor . clone ( ) ,
2829
2843
nodes[ 0 ] . node . clone ( ) ,
2830
2844
nodes[ 0 ] . rapid_gossip_sync ( ) ,
2831
2845
nodes[ 0 ] . peer_manager . clone ( ) ,
2832
2846
nodes[ 0 ] . logger . clone ( ) ,
2833
- )
2834
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2835
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2836
- . build ( ) ;
2847
+ ) ;
2848
+ builder
2849
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2850
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
2851
+ let config = builder. build ( ) ;
2837
2852
2838
2853
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2839
2854
let bp_future = super :: process_events_async (
@@ -3040,18 +3055,19 @@ mod tests {
3040
3055
3041
3056
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
3042
3057
3043
- let config = BackgroundProcessorConfigBuilder :: new (
3058
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
3044
3059
persister,
3045
3060
event_handler,
3046
3061
nodes[ 0 ] . chain_monitor . clone ( ) ,
3047
3062
nodes[ 0 ] . node . clone ( ) ,
3048
3063
nodes[ 0 ] . no_gossip_sync ( ) ,
3049
3064
nodes[ 0 ] . peer_manager . clone ( ) ,
3050
3065
nodes[ 0 ] . logger . clone ( ) ,
3051
- )
3052
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3053
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3054
- . build ( ) ;
3066
+ ) ;
3067
+ builder
3068
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3069
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
3070
+ let config = builder. build ( ) ;
3055
3071
3056
3072
let bp_future = super :: process_events_async (
3057
3073
config,
@@ -3086,11 +3102,11 @@ mod tests {
3086
3102
}
3087
3103
3088
3104
#[ test]
3089
- fn test_background_processor_builder ( ) {
3105
+ fn test_background_processor_config_builder ( ) {
3090
3106
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
3091
3107
// updates. Also test that when new updates are available, the manager signals that it needs
3092
3108
// re-persistence and is successfully re-persisted.
3093
- let ( persist_dir, nodes) = create_nodes ( 2 , "test_background_processor_builder " ) ;
3109
+ let ( persist_dir, nodes) = create_nodes ( 2 , "test_background_processor_config_builder " ) ;
3094
3110
3095
3111
// Go through the channel creation process so that each node has something to persist. Since
3096
3112
// open_channel consumes events, it must complete before starting BackgroundProcessor to
@@ -3101,18 +3117,19 @@ mod tests {
3101
3117
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
3102
3118
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
3103
3119
let event_handler = |_: _ | Ok ( ( ) ) ;
3104
- let config = BackgroundProcessorConfigBuilder :: new (
3120
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
3105
3121
persister,
3106
3122
event_handler,
3107
3123
nodes[ 0 ] . chain_monitor . clone ( ) ,
3108
3124
nodes[ 0 ] . node . clone ( ) ,
3109
3125
nodes[ 0 ] . p2p_gossip_sync ( ) ,
3110
3126
nodes[ 0 ] . peer_manager . clone ( ) ,
3111
3127
nodes[ 0 ] . logger . clone ( ) ,
3112
- )
3113
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3114
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3115
- . build ( ) ;
3128
+ ) ;
3129
+ builder
3130
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3131
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
3132
+ let config = builder. build ( ) ;
3116
3133
3117
3134
let bg_processor = BackgroundProcessor :: from_config ( config) ;
3118
3135
@@ -3164,7 +3181,7 @@ mod tests {
3164
3181
. unwrap ( ) ;
3165
3182
3166
3183
// Check that the force-close updates are persisted
3167
- check_persisted_data ! ( nodes[ 0 ] . node, manager_path . clone( ) ) ;
3184
+ check_persisted_data ! ( nodes[ 0 ] . node, filepath . clone( ) ) ;
3168
3185
loop {
3169
3186
if !nodes[ 0 ] . node . get_event_or_persist_condvar_value ( ) {
3170
3187
break ;
@@ -3174,7 +3191,7 @@ mod tests {
3174
3191
// Check network graph is persisted
3175
3192
let filepath =
3176
3193
get_full_filepath ( format ! ( "{}_persister_0" , & persist_dir) , "network_graph" . to_string ( ) ) ;
3177
- check_persisted_data ! ( nodes[ 0 ] . network_graph, filepath) ;
3194
+ check_persisted_data ! ( nodes[ 0 ] . network_graph, filepath. clone ( ) ) ;
3178
3195
3179
3196
// Check scorer is persisted
3180
3197
let filepath =
0 commit comments