@@ -1676,13 +1676,15 @@ where
1676
1676
/// Append a message to a peer's pending outbound/write buffer
1677
1677
fn enqueue_message < M : wire:: Type > ( & self , peer : & mut Peer , message : & M ) {
1678
1678
let their_node_id = peer. their_node_id . map ( |p| p. 0 ) ;
1679
- let logger = WithContext :: from ( & self . logger , their_node_id, None , None ) ;
1680
- // `unwrap` SAFETY: `their_node_id` is guaranteed to be `Some` after the handshake
1681
- let node_id = peer. their_node_id . unwrap ( ) . 0 ;
1682
- if is_gossip_msg ( message. type_id ( ) ) {
1683
- log_gossip ! ( logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( node_id) ) ;
1679
+ if let Some ( node_id) = their_node_id {
1680
+ let logger = WithContext :: from ( & self . logger , their_node_id, None , None ) ;
1681
+ if is_gossip_msg ( message. type_id ( ) ) {
1682
+ log_gossip ! ( logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( node_id) ) ;
1683
+ } else {
1684
+ log_trace ! ( logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( node_id) ) ;
1685
+ }
1684
1686
} else {
1685
- log_trace ! ( logger , "Enqueueing message {:?} to {}" , message , log_pubkey! ( node_id ) ) ;
1687
+ debug_assert ! ( false , "node_id should be set by the time we send a message" ) ;
1686
1688
}
1687
1689
peer. msgs_sent_since_pong += 1 ;
1688
1690
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( message) ) ;
@@ -1785,21 +1787,26 @@ where
1785
1787
1786
1788
macro_rules! insert_node_id {
1787
1789
( ) => {
1788
- let their_node_id = peer. their_node_id. map( |p| p. 0 ) ;
1789
- let logger = WithContext :: from( & self . logger, their_node_id, None , None ) ;
1790
- match self . node_id_to_descriptor. lock( ) . unwrap( ) . entry( peer. their_node_id. unwrap( ) . 0 ) {
1790
+ let their_node_id = if let Some ( ( node_id, _) ) = peer. their_node_id {
1791
+ node_id
1792
+ } else {
1793
+ debug_assert!( false , "Should have a node_id to insert" ) ;
1794
+ return Err ( PeerHandleError { } ) ;
1795
+ } ;
1796
+ let logger = WithContext :: from( & self . logger, Some ( their_node_id) , None , None ) ;
1797
+ match self . node_id_to_descriptor. lock( ) . unwrap( ) . entry( their_node_id) {
1791
1798
hash_map:: Entry :: Occupied ( e) => {
1792
- log_trace!( logger, "Got second connection with {}, closing" , log_pubkey!( peer . their_node_id. unwrap ( ) . 0 ) ) ;
1799
+ log_trace!( logger, "Got second connection with {}, closing" , log_pubkey!( their_node_id) ) ;
1793
1800
// Unset `their_node_id` so that we don't generate a peer_disconnected event
1801
+ peer. their_node_id = None ;
1794
1802
// Check that the peers map is consistent with the
1795
1803
// node_id_to_descriptor map, as this has been broken
1796
1804
// before.
1797
- peer. their_node_id = None ;
1798
1805
debug_assert!( peers. get( e. get( ) ) . is_some( ) ) ;
1799
1806
return Err ( PeerHandleError { } )
1800
1807
} ,
1801
1808
hash_map:: Entry :: Vacant ( entry) => {
1802
- log_debug!( logger, "Finished noise handshake for connection with {}" , log_pubkey!( peer . their_node_id. unwrap ( ) . 0 ) ) ;
1809
+ log_debug!( logger, "Finished noise handshake for connection with {}" , log_pubkey!( their_node_id) ) ;
1803
1810
entry. insert( peer_descriptor. clone( ) )
1804
1811
} ,
1805
1812
} ;
@@ -2125,9 +2132,9 @@ where
2125
2132
peer_lock. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
2126
2133
}
2127
2134
2128
- let connection = peer_lock. inbound_connection ;
2135
+ let inbound = peer_lock. inbound_connection ;
2129
2136
let route_handler = & self . message_handler . route_handler ;
2130
- if let Err ( ( ) ) = route_handler. peer_connected ( their_node_id, & msg, connection ) {
2137
+ if let Err ( ( ) ) = route_handler. peer_connected ( their_node_id, & msg, inbound ) {
2131
2138
log_debug ! (
2132
2139
logger,
2133
2140
"Route Handler decided we couldn't communicate with peer {}" ,
@@ -2136,7 +2143,7 @@ where
2136
2143
return Err ( PeerHandleError { } . into ( ) ) ;
2137
2144
}
2138
2145
let chan_handler = & self . message_handler . chan_handler ;
2139
- if let Err ( ( ) ) = chan_handler. peer_connected ( their_node_id, & msg, connection ) {
2146
+ if let Err ( ( ) ) = chan_handler. peer_connected ( their_node_id, & msg, inbound ) {
2140
2147
log_debug ! (
2141
2148
logger,
2142
2149
"Channel Handler decided we couldn't communicate with peer {}" ,
@@ -2146,7 +2153,7 @@ where
2146
2153
return Err ( PeerHandleError { } . into ( ) ) ;
2147
2154
}
2148
2155
let onion_message_handler = & self . message_handler . onion_message_handler ;
2149
- if let Err ( ( ) ) = onion_message_handler. peer_connected ( their_node_id, & msg, connection ) {
2156
+ if let Err ( ( ) ) = onion_message_handler. peer_connected ( their_node_id, & msg, inbound ) {
2150
2157
log_debug ! (
2151
2158
logger,
2152
2159
"Onion Message Handler decided we couldn't communicate with peer {}" ,
@@ -2157,7 +2164,7 @@ where
2157
2164
return Err ( PeerHandleError { } . into ( ) ) ;
2158
2165
}
2159
2166
let custom_handler = & self . message_handler . custom_message_handler ;
2160
- if let Err ( ( ) ) = custom_handler. peer_connected ( their_node_id, & msg, connection ) {
2167
+ if let Err ( ( ) ) = custom_handler. peer_connected ( their_node_id, & msg, inbound ) {
2161
2168
log_debug ! (
2162
2169
logger,
2163
2170
"Custom Message Handler decided we couldn't communicate with peer {}" ,
@@ -3510,8 +3517,8 @@ where
3510
3517
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
3511
3518
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
3512
3519
3520
+ // We use a loop as a `goto` to skip writing the Ping message:
3513
3521
loop {
3514
- // Used as a `goto` to skip writing a Ping message.
3515
3522
if peer. awaiting_pong_timer_tick_intervals == -1 {
3516
3523
// Magic value set in `maybe_send_extra_ping`.
3517
3524
peer. awaiting_pong_timer_tick_intervals = 1 ;
@@ -3844,20 +3851,7 @@ mod tests {
3844
3851
cfgs
3845
3852
}
3846
3853
3847
- fn create_network < ' a > (
3848
- peer_count : usize , cfgs : & ' a Vec < PeerManagerCfg > ,
3849
- ) -> Vec <
3850
- PeerManager <
3851
- FileDescriptor ,
3852
- & ' a test_utils:: TestChannelMessageHandler ,
3853
- & ' a test_utils:: TestRoutingMessageHandler ,
3854
- IgnoringMessageHandler ,
3855
- & ' a test_utils:: TestLogger ,
3856
- & ' a TestCustomMessageHandler ,
3857
- & ' a test_utils:: TestNodeSigner ,
3858
- IgnoringMessageHandler ,
3859
- > ,
3860
- > {
3854
+ fn create_network < ' a > ( peer_count : usize , cfgs : & ' a Vec < PeerManagerCfg > ) -> Vec < TestPeer < ' a > > {
3861
3855
let mut peers = Vec :: new ( ) ;
3862
3856
for i in 0 ..peer_count {
3863
3857
let ephemeral_bytes = [ i as u8 ; 32 ] ;
0 commit comments