@@ -192,49 +192,22 @@ where
192192 }
193193}
194194
195- /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196- struct DecoratingEventHandler <
197- ' a ,
198- E : EventHandler ,
199- PGS : Deref < Target = P2PGossipSync < G , A , L > > ,
200- RGS : Deref < Target = RapidGossipSync < G , L > > ,
201- G : Deref < Target = NetworkGraph < L > > ,
202- A : Deref ,
203- L : Deref ,
204- >
205- where A :: Target : chain:: Access , L :: Target : Logger {
206- event_handler : E ,
207- gossip_sync : & ' a GossipSync < PGS , RGS , G , A , L > ,
208- }
209-
210- impl <
211- ' a ,
212- E : EventHandler ,
213- PGS : Deref < Target = P2PGossipSync < G , A , L > > ,
214- RGS : Deref < Target = RapidGossipSync < G , L > > ,
215- G : Deref < Target = NetworkGraph < L > > ,
216- A : Deref ,
217- L : Deref ,
218- > EventHandler for DecoratingEventHandler < ' a , E , PGS , RGS , G , A , L >
219- where A :: Target : chain:: Access , L :: Target : Logger {
220- fn handle_event ( & self , event : & Event ) {
221- if let Some ( network_graph) = self . gossip_sync . network_graph ( ) {
222- network_graph. handle_event ( event) ;
195+ fn handle_network_graph_update < L : Deref > (
196+ network_graph : & NetworkGraph < L > , event : & Event
197+ ) where L :: Target : Logger {
198+ if let Event :: PaymentPathFailed { ref network_update, .. } = event {
199+ if let Some ( network_update) = network_update {
200+ network_graph. handle_network_update ( & network_update) ;
223201 }
224- self . event_handler . handle_event ( event) ;
225202 }
226203}
227204
228205macro_rules! define_run_body {
229- ( $persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
206+ ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207+ $channel_manager: ident, $process_channel_manager_events: expr,
230208 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231209 $loop_exit_check: expr, $await: expr)
232210 => { {
233- let event_handler = DecoratingEventHandler {
234- event_handler: $event_handler,
235- gossip_sync: & $gossip_sync,
236- } ;
237-
238211 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
239212 $channel_manager. timer_tick_occurred( ) ;
240213
@@ -245,8 +218,8 @@ macro_rules! define_run_body {
245218 let mut have_pruned = false ;
246219
247220 loop {
248- $channel_manager . process_pending_events ( & event_handler ) ;
249- $chain_monitor . process_pending_events ( & event_handler ) ;
221+ $process_channel_manager_events ;
222+ $process_chain_monitor_events ;
250223
251224 // Note that the PeerManager::process_events may block on ChannelManager's locks,
252225 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -379,7 +352,8 @@ pub async fn process_events_async<
379352 CMH : ' static + Deref + Send + Sync ,
380353 RMH : ' static + Deref + Send + Sync ,
381354 OMH : ' static + Deref + Send + Sync ,
382- EH : ' static + EventHandler + Send ,
355+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
356+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
383357 PS : ' static + Deref + Send ,
384358 M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
385359 CM : ' static + Deref < Target = ChannelManager < CW , T , K , F , L > > + Send + Sync ,
@@ -392,7 +366,7 @@ pub async fn process_events_async<
392366 SleepFuture : core:: future:: Future < Output = bool > ,
393367 Sleeper : Fn ( Duration ) -> SleepFuture
394368> (
395- persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
369+ persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
396370 gossip_sync : GossipSync < PGS , RGS , G , CA , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
397371 sleeper : Sleeper ,
398372) -> Result < ( ) , std:: io:: Error >
@@ -412,7 +386,19 @@ where
412386 PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
413387{
414388 let mut should_break = true ;
415- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
389+ let async_event_handler = |event| {
390+ let network_graph = gossip_sync. network_graph ( ) ;
391+ let event_handler = & event_handler;
392+ async move {
393+ if let Some ( network_graph) = network_graph {
394+ handle_network_graph_update ( network_graph, & event)
395+ }
396+ event_handler ( event) . await ;
397+ }
398+ } ;
399+ define_run_body ! ( persister,
400+ chain_monitor, chain_monitor. process_pending_events_async( async_event_handler) . await ,
401+ channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
416402 gossip_sync, peer_manager, logger, scorer, should_break, {
417403 select_biased! {
418404 _ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
@@ -517,7 +503,15 @@ impl BackgroundProcessor {
517503 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
518504 let stop_thread_clone = stop_thread. clone ( ) ;
519505 let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
520- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
506+ let event_handler = |event| {
507+ let network_graph = gossip_sync. network_graph ( ) ;
508+ if let Some ( network_graph) = network_graph {
509+ handle_network_graph_update ( network_graph, & event)
510+ }
511+ event_handler. handle_event ( event) ;
512+ } ;
513+ define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
514+ channel_manager, channel_manager. process_pending_events( & event_handler) ,
521515 gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
522516 channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) )
523517 } ) ;
@@ -769,7 +763,7 @@ mod tests {
769763 begin_open_channel!( $node_a, $node_b, $channel_value) ;
770764 let events = $node_a. node. get_and_clear_pending_events( ) ;
771765 assert_eq!( events. len( ) , 1 ) ;
772- let ( temporary_channel_id, tx) = handle_funding_generation_ready!( & events[ 0 ] , $channel_value) ;
766+ let ( temporary_channel_id, tx) = handle_funding_generation_ready!( events[ 0 ] , $channel_value) ;
773767 end_open_channel!( $node_a, $node_b, temporary_channel_id, tx) ;
774768 tx
775769 } }
@@ -786,7 +780,7 @@ mod tests {
786780 macro_rules! handle_funding_generation_ready {
787781 ( $event: expr, $channel_value: expr) => { {
788782 match $event {
789- & Event :: FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
783+ Event :: FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
790784 assert_eq!( channel_value_satoshis, $channel_value) ;
791785 assert_eq!( user_channel_id, 42 ) ;
792786
@@ -847,7 +841,7 @@ mod tests {
847841 // Initiate the background processors to watch each node.
848842 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
849843 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
850- let event_handler = |_: & _ | { } ;
844+ let event_handler = |_: _ | { } ;
851845 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . p2p_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
852846
853847 macro_rules! check_persisted_data {
@@ -909,7 +903,7 @@ mod tests {
909903 let nodes = create_nodes ( 1 , "test_timer_tick_called" . to_string ( ) ) ;
910904 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
911905 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
912- let event_handler = |_: & _ | { } ;
906+ let event_handler = |_: _ | { } ;
913907 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
914908 loop {
915909 let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
@@ -932,7 +926,7 @@ mod tests {
932926
933927 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
934928 let persister = Arc :: new ( Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
935- let event_handler = |_: & _ | { } ;
929+ let event_handler = |_: _ | { } ;
936930 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
937931 match bg_processor. join ( ) {
938932 Ok ( _) => panic ! ( "Expected error persisting manager" ) ,
@@ -949,7 +943,7 @@ mod tests {
949943 let nodes = create_nodes ( 2 , "test_persist_network_graph_error" . to_string ( ) ) ;
950944 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
951945 let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
952- let event_handler = |_: & _ | { } ;
946+ let event_handler = |_: _ | { } ;
953947 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . p2p_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
954948
955949 match bg_processor. stop ( ) {
@@ -967,7 +961,7 @@ mod tests {
967961 let nodes = create_nodes ( 2 , "test_persist_scorer_error" . to_string ( ) ) ;
968962 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
969963 let persister = Arc :: new ( Persister :: new ( data_dir) . with_scorer_error ( std:: io:: ErrorKind :: Other , "test" ) ) ;
970- let event_handler = |_: & _ | { } ;
964+ let event_handler = |_: _ | { } ;
971965 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
972966
973967 match bg_processor. stop ( ) {
@@ -988,7 +982,7 @@ mod tests {
988982
989983 // Set up a background event handler for FundingGenerationReady events.
990984 let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
991- let event_handler = move |event : & Event | match event {
985+ let event_handler = move |event : Event | match event {
992986 Event :: FundingGenerationReady { .. } => sender. send ( handle_funding_generation_ready ! ( event, channel_value) ) . unwrap ( ) ,
993987 Event :: ChannelReady { .. } => { } ,
994988 _ => panic ! ( "Unexpected event: {:?}" , event) ,
@@ -1017,7 +1011,7 @@ mod tests {
10171011
10181012 // Set up a background event handler for SpendableOutputs events.
10191013 let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
1020- let event_handler = move |event : & Event | match event {
1014+ let event_handler = move |event : Event | match event {
10211015 Event :: SpendableOutputs { .. } => sender. send ( event. clone ( ) ) . unwrap ( ) ,
10221016 Event :: ChannelReady { .. } => { } ,
10231017 Event :: ChannelClosed { .. } => { } ,
@@ -1047,7 +1041,7 @@ mod tests {
10471041 let nodes = create_nodes ( 2 , "test_scorer_persistence" . to_string ( ) ) ;
10481042 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
10491043 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
1050- let event_handler = |_: & _ | { } ;
1044+ let event_handler = |_: _ | { } ;
10511045 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
10521046
10531047 loop {
@@ -1075,7 +1069,7 @@ mod tests {
10751069 assert ! ( original_graph_description. contains( "42: features: 0000, node_one:" ) ) ;
10761070 assert_eq ! ( network_graph. read_only( ) . channels( ) . len( ) , 1 ) ;
10771071
1078- let event_handler = |_: & _ | { } ;
1072+ let event_handler = |_: _ | { } ;
10791073 let background_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . rapid_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
10801074
10811075 loop {
@@ -1128,7 +1122,7 @@ mod tests {
11281122 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
11291123 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
11301124 let router = DefaultRouter :: new ( Arc :: clone ( & nodes[ 0 ] . network_graph ) , Arc :: clone ( & nodes[ 0 ] . logger ) , random_seed_bytes, Arc :: clone ( & nodes[ 0 ] . scorer ) ) ;
1131- let invoice_payer = Arc :: new ( InvoicePayer :: new ( Arc :: clone ( & nodes[ 0 ] . node ) , router, Arc :: clone ( & nodes[ 0 ] . logger ) , |_: & _ | { } , Retry :: Attempts ( 2 ) ) ) ;
1125+ let invoice_payer = Arc :: new ( InvoicePayer :: new ( Arc :: clone ( & nodes[ 0 ] . node ) , router, Arc :: clone ( & nodes[ 0 ] . logger ) , |_: _ | { } , Retry :: Attempts ( 2 ) ) ) ;
11321126 let event_handler = Arc :: clone ( & invoice_payer) ;
11331127 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
11341128 assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
0 commit comments