@@ -1404,6 +1404,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
1404
1404
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
1405
1405
/// [`ChannelMessageHandler::peer_disconnected`].
1406
1406
pub is_connected: bool,
1407
+ /// Holds the peer storage data for the channel partner on a per-peer basis.
1408
+ peer_storage: Vec<u8>,
1407
1409
}
1408
1410
1409
1411
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -2872,6 +2874,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
2872
2874
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
2873
2875
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
2874
2876
2877
+ /// The maximum allowed size for peer storage, in bytes.
2878
+ ///
2879
+ /// This constant defines the upper limit for the size of data
2880
+ /// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
2881
+ /// to prevent excessive resource consumption.
2882
+ const MAX_PEER_STORAGE_SIZE: usize = 1024;
2883
+
2875
2884
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
2876
2885
/// many peers we reject new (inbound) connections.
2877
2886
const MAX_NO_CHANNEL_PEERS: usize = 250;
@@ -8269,6 +8278,53 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
8269
8278
}
8270
8279
}
8271
8280
8281
+ fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
8282
+ // TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8283
+ let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8284
+
8285
+ log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8286
+
8287
+ Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8288
+ "Invalid peer_storage_retrieval message received.".into(),
8289
+ ), ChannelId([0; 32])))
8290
+ }
8291
+
8292
+ fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
8293
+ let per_peer_state = self.per_peer_state.read().unwrap();
8294
+ let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
8295
+ .ok_or_else(|| {
8296
+ debug_assert!(false);
8297
+ MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32]))
8298
+ })?;
8299
+
8300
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8301
+ let peer_state = &mut *peer_state_lock;
8302
+ let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8303
+
8304
+ // Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
8305
+ if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8306
+ log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8307
+ return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8308
+ "Ignoring peer_storage message, as peer storage is currently supported only for \
8309
+ peers with an active funded channel.".into(),
8310
+ ), ChannelId([0; 32])));
8311
+ }
8312
+
8313
+ #[cfg(not(test))]
8314
+ if msg.data.len() > MAX_PEER_STORAGE_SIZE {
8315
+ log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8316
+
8317
+ return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
8318
+ format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
8319
+ ), ChannelId([0; 32])));
8320
+ }
8321
+
8322
+ log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
8323
+ peer_state.peer_storage = msg.data;
8324
+
8325
+ Ok(())
8326
+ }
8327
+
8272
8328
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
8273
8329
let best_block = *self.best_block.read().unwrap();
8274
8330
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -11465,6 +11521,16 @@ where
11465
11521
let _ = handle_error!(self, self.internal_funding_signed(&counterparty_node_id, msg), counterparty_node_id);
11466
11522
}
11467
11523
11524
+ fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) {
11525
+ let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
11526
+ let _ = handle_error!(self, self.internal_peer_storage(counterparty_node_id, msg), counterparty_node_id);
11527
+ }
11528
+
11529
+ fn handle_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorageRetrieval) {
11530
+ let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
11531
+ let _ = handle_error!(self, self.internal_peer_storage_retrieval(counterparty_node_id, msg), counterparty_node_id);
11532
+ }
11533
+
11468
11534
fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) {
11469
11535
// Note that we never need to persist the updated ChannelManager for an inbound
11470
11536
// channel_ready message - while the channel's state will change, any channel_ready message
@@ -11706,6 +11772,10 @@ where
11706
11772
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
11707
11773
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
11708
11774
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
11775
+
11776
+ // Peer Storage
11777
+ &events::MessageSendEvent::SendPeerStorage { .. } => false,
11778
+ &events::MessageSendEvent::SendPeerStorageRetrieval { .. } => false,
11709
11779
}
11710
11780
});
11711
11781
debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
@@ -11758,6 +11828,7 @@ where
11758
11828
actions_blocking_raa_monitor_updates: BTreeMap::new(),
11759
11829
closed_channel_monitor_update_ids: BTreeMap::new(),
11760
11830
is_connected: true,
11831
+ peer_storage: Vec::new(),
11761
11832
}));
11762
11833
},
11763
11834
hash_map::Entry::Occupied(e) => {
@@ -11787,6 +11858,15 @@ where
11787
11858
let peer_state = &mut *peer_state_lock;
11788
11859
let pending_msg_events = &mut peer_state.pending_msg_events;
11789
11860
11861
+ if !peer_state.peer_storage.is_empty() {
11862
+ pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
11863
+ node_id: counterparty_node_id.clone(),
11864
+ msg: msgs::PeerStorageRetrieval {
11865
+ data: peer_state.peer_storage.clone()
11866
+ },
11867
+ });
11868
+ }
11869
+
11790
11870
for (_, chan) in peer_state.channel_by_id.iter_mut() {
11791
11871
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
11792
11872
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12473,6 +12553,7 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures {
12473
12553
features.set_scid_privacy_optional();
12474
12554
features.set_zero_conf_optional();
12475
12555
features.set_route_blinding_optional();
12556
+ features.set_provide_storage_optional();
12476
12557
if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
12477
12558
features.set_anchors_zero_fee_htlc_tx_optional();
12478
12559
}
@@ -12994,6 +13075,8 @@ where
12994
13075
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
12995
13076
}
12996
13077
13078
+ let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
13079
+
12997
13080
(serializable_peer_count).write(writer)?;
12998
13081
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
12999
13082
// Peers which we have no channels to should be dropped once disconnected. As we
@@ -13003,6 +13086,8 @@ where
13003
13086
if !peer_state.ok_to_remove(false) {
13004
13087
peer_pubkey.write(writer)?;
13005
13088
peer_state.latest_features.write(writer)?;
13089
+ peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
13090
+
13006
13091
if !peer_state.monitor_update_blocked_actions.is_empty() {
13007
13092
monitor_update_blocked_actions_per_peer
13008
13093
.get_or_insert_with(Vec::new)
@@ -13124,6 +13209,7 @@ where
13124
13209
(14, decode_update_add_htlcs_opt, option),
13125
13210
(15, self.inbound_payment_id_secret, required),
13126
13211
(17, in_flight_monitor_updates, required),
13212
+ (19, peer_storage_dir, optional_vec),
13127
13213
});
13128
13214
13129
13215
Ok(())
@@ -13356,6 +13442,7 @@ where
13356
13442
monitor_update_blocked_actions: BTreeMap::new(),
13357
13443
actions_blocking_raa_monitor_updates: BTreeMap::new(),
13358
13444
closed_channel_monitor_update_ids: BTreeMap::new(),
13445
+ peer_storage: Vec::new(),
13359
13446
is_connected: false,
13360
13447
}
13361
13448
};
@@ -13651,6 +13738,7 @@ where
13651
13738
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
13652
13739
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
13653
13740
let mut inbound_payment_id_secret = None;
13741
+ let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
13654
13742
read_tlv_fields!(reader, {
13655
13743
(1, pending_outbound_payments_no_retry, option),
13656
13744
(2, pending_intercepted_htlcs, option),
@@ -13667,8 +13755,10 @@ where
13667
13755
(14, decode_update_add_htlcs, option),
13668
13756
(15, inbound_payment_id_secret, option),
13669
13757
(17, in_flight_monitor_updates, required),
13758
+ (19, peer_storage_dir, optional_vec),
13670
13759
});
13671
13760
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13761
+ let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
13672
13762
if fake_scid_rand_bytes.is_none() {
13673
13763
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
13674
13764
}
@@ -13700,6 +13790,12 @@ where
13700
13790
}
13701
13791
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
13702
13792
13793
+ for (peer_pubkey, peer_storage) in peer_storage_dir {
13794
+ if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13795
+ peer_state.get_mut().unwrap().peer_storage = peer_storage;
13796
+ }
13797
+ }
13798
+
13703
13799
// Handle transitioning from the legacy TLV to the new one on upgrades.
13704
13800
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
13705
13801
// We should never serialize an empty map.
@@ -14774,6 +14870,70 @@ mod tests {
14774
14870
}
14775
14871
}
14776
14872
14873
+ #[test]
14874
+ fn test_peer_storage() {
14875
+ let chanmon_cfgs = create_chanmon_cfgs(2);
14876
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
14877
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
14878
+ let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
14879
+
14880
+ create_announced_chan_between_nodes(&nodes, 0, 1);
14881
+
14882
+ // Since we do not send peer storage, we manually simulate receiving a dummy
14883
+ // `PeerStorage` from the channel partner.
14884
+ nodes[0].node.handle_peer_storage(nodes[1].node.get_our_node_id(), msgs::PeerStorage{data: vec![0; 100]});
14885
+
14886
+ nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id());
14887
+ nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id());
14888
+
14889
+ nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init {
14890
+ features: nodes[1].node.init_features(), networks: None, remote_network_address: None
14891
+ }, true).unwrap();
14892
+ nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init {
14893
+ features: nodes[0].node.init_features(), networks: None, remote_network_address: None
14894
+ }, false).unwrap();
14895
+
14896
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
14897
+ assert_eq!(node_0_events.len(), 2);
14898
+
14899
+ for msg in node_0_events{
14900
+ if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
14901
+ nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg);
14902
+ assert_eq!(*node_id, nodes[1].node.get_our_node_id());
14903
+ } else if let MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } = msg {
14904
+ nodes[1].node.handle_peer_storage_retrieval(nodes[0].node.get_our_node_id(), msg.clone());
14905
+ assert_eq!(*node_id, nodes[1].node.get_our_node_id());
14906
+ } else {
14907
+ panic!("Unexpected event")
14908
+ }
14909
+ }
14910
+
14911
+ let msg_events_after_peer_storage_retrieval = nodes[1].node.get_and_clear_pending_msg_events();
14912
+
14913
+ // Check if we receive a warning message.
14914
+ let peer_storage_warning: Vec<&MessageSendEvent> = msg_events_after_peer_storage_retrieval
14915
+ .iter()
14916
+ .filter(|event| match event {
14917
+ MessageSendEvent::HandleError { .. } => true,
14918
+ _ => false,
14919
+ })
14920
+ .collect();
14921
+
14922
+ assert_eq!(peer_storage_warning.len(), 1);
14923
+
14924
+ match peer_storage_warning[0] {
14925
+ MessageSendEvent::HandleError { node_id, action } => {
14926
+ assert_eq!(*node_id, nodes[0].node.get_our_node_id());
14927
+ match action {
14928
+ ErrorAction::SendWarningMessage { msg, .. } =>
14929
+ assert_eq!(msg.data, "Invalid peer_storage_retrieval message received.".to_owned()),
14930
+ _ => panic!("Unexpected error action"),
14931
+ }
14932
+ }
14933
+ _ => panic!("Unexpected event"),
14934
+ }
14935
+ }
14936
+
14777
14937
#[test]
14778
14938
fn test_keysend_dup_payment_hash() {
14779
14939
// (1): Test that a keysend payment with a duplicate payment hash to an existing pending
0 commit comments