Skip to content

Commit 2ac4df0

Browse files
authored
Merge pull request #5197 from stacks-network/fix/5193-stackerdb-decoherence
Fix/5193 stackerdb decoherence
2 parents f3f2e57 + f2f16c0 commit 2ac4df0

File tree

10 files changed

+152
-17
lines changed

10 files changed

+152
-17
lines changed

stackslib/src/net/api/getneighbors.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub struct RPCNeighbor {
5454
#[serde(skip_serializing_if = "Option::is_none")]
5555
#[serde(with = "serde_opt_vec_qci")]
5656
pub stackerdbs: Option<Vec<QualifiedContractIdentifier>>,
57+
#[serde(skip_serializing_if = "Option::is_none")]
58+
pub age: Option<u64>,
5759
}
5860

5961
/// Serialize and deserialize `Option<Vec<QualifiedContractIdentifier>>`
@@ -95,6 +97,7 @@ impl RPCNeighbor {
9597
pkh: Hash160,
9698
auth: bool,
9799
stackerdbs: Vec<QualifiedContractIdentifier>,
100+
age: Option<u64>,
98101
) -> RPCNeighbor {
99102
RPCNeighbor {
100103
network_id: nk.network_id,
@@ -104,6 +107,7 @@ impl RPCNeighbor {
104107
public_key_hash: pkh,
105108
authenticated: auth,
106109
stackerdbs: Some(stackerdbs),
110+
age,
107111
}
108112
}
109113
}
@@ -138,6 +142,7 @@ impl RPCNeighborsInfo {
138142
Hash160::from_node_public_key(&n.public_key),
139143
true,
140144
stackerdb_contract_ids,
145+
None,
141146
)
142147
})
143148
.collect();
@@ -164,6 +169,7 @@ impl RPCNeighborsInfo {
164169
Hash160::from_node_public_key(&n.public_key),
165170
true,
166171
stackerdb_contract_ids,
172+
None,
167173
)
168174
})
169175
.collect();
@@ -185,13 +191,15 @@ impl RPCNeighborsInfo {
185191
naddr.public_key_hash,
186192
convo.is_authenticated(),
187193
convo.get_stackerdb_contract_ids().to_vec(),
194+
Some(convo.age()),
188195
));
189196
} else {
190197
inbound.push(RPCNeighbor::from_neighbor_key_and_pubkh(
191198
nk,
192199
naddr.public_key_hash,
193200
convo.is_authenticated(),
194201
convo.get_stackerdb_contract_ids().to_vec(),
202+
Some(convo.age()),
195203
));
196204
}
197205
}

stackslib/src/net/chat.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ impl ConversationP2P {
609609
}
610610
}
611611

612+
pub fn age(&self) -> u64 {
613+
get_epoch_time_secs().saturating_sub(self.instantiated)
614+
}
615+
612616
pub fn set_public_key(&mut self, pubkey_opt: Option<Secp256k1PublicKey>) -> () {
613617
self.connection.set_public_key(pubkey_opt);
614618
}

stackslib/src/net/inv/epoch2x.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,9 @@ pub struct InvState {
973973

974974
/// What's the last reward cycle we _started_ the inv scan at?
975975
pub block_sortition_start: u64,
976+
977+
/// event IDs of connections we established, so they don't get pruned
978+
pinned: HashSet<usize>,
976979
}
977980

978981
impl InvState {
@@ -994,11 +997,13 @@ impl InvState {
994997
num_inv_syncs: 0,
995998

996999
block_sortition_start: 0,
1000+
pinned: HashSet::new(),
9971001
}
9981002
}
9991003

10001004
fn reset_sync_peers(
10011005
&mut self,
1006+
network: &PeerNetwork,
10021007
peers: HashSet<NeighborKey>,
10031008
bootstrap_peers: &HashSet<NeighborKey>,
10041009
max_neighbors: usize,
@@ -1042,6 +1047,24 @@ impl InvState {
10421047
added,
10431048
&peers
10441049
);
1050+
1051+
// if we're still connected to these peers, then keep them pinned
1052+
self.pinned.clear();
1053+
for peer in peers.iter() {
1054+
if let Some(event_id) = network.get_event_id(&peer) {
1055+
self.pinned.insert(event_id);
1056+
}
1057+
}
1058+
}
1059+
1060+
/// Pin a connection
1061+
pub fn pin_connection(&mut self, event_id: usize) {
1062+
self.pinned.insert(event_id);
1063+
}
1064+
1065+
/// Get the set of connections this state machine is using
1066+
pub fn get_pinned_connections(&self) -> &HashSet<usize> {
1067+
&self.pinned
10451068
}
10461069

10471070
pub fn get_peer_status(&self, nk: &NeighborKey) -> NodeStatus {
@@ -1801,6 +1824,7 @@ impl PeerNetwork {
18011824
/// Start requesting the next batch of PoX inventories
18021825
fn inv_getpoxinv_begin(
18031826
&mut self,
1827+
pins: &mut HashSet<usize>,
18041828
sortdb: &SortitionDB,
18051829
nk: &NeighborKey,
18061830
stats: &mut NeighborBlockStats,
@@ -1821,6 +1845,8 @@ impl PeerNetwork {
18211845
};
18221846

18231847
let payload = StacksMessageType::GetPoxInv(getpoxinv);
1848+
let event_id_opt = self.get_event_id(&nk);
1849+
18241850
let message = self.sign_for_neighbor(nk, payload)?;
18251851
let request = self
18261852
.send_neighbor_message(nk, message, request_timeout)
@@ -1830,6 +1856,10 @@ impl PeerNetwork {
18301856
})?;
18311857

18321858
stats.getpoxinv_begin(request, target_pox_reward_cycle);
1859+
if let Some(event_id) = event_id_opt {
1860+
pins.insert(event_id);
1861+
}
1862+
18331863
Ok(())
18341864
}
18351865

@@ -1988,6 +2018,7 @@ impl PeerNetwork {
19882018
/// Start requesting the next batch of block inventories
19892019
fn inv_getblocksinv_begin(
19902020
&mut self,
2021+
pins: &mut HashSet<usize>,
19912022
sortdb: &SortitionDB,
19922023
nk: &NeighborKey,
19932024
stats: &mut NeighborBlockStats,
@@ -2008,6 +2039,7 @@ impl PeerNetwork {
20082039

20092040
let num_blocks_expected = getblocksinv.num_blocks;
20102041
let payload = StacksMessageType::GetBlocksInv(getblocksinv);
2042+
let event_id_opt = self.get_event_id(nk);
20112043
let message = self.sign_for_neighbor(nk, payload)?;
20122044
let request = self
20132045
.send_neighbor_message(nk, message, request_timeout)
@@ -2017,6 +2049,9 @@ impl PeerNetwork {
20172049
})?;
20182050

20192051
stats.getblocksinv_begin(request, target_block_reward_cycle, num_blocks_expected);
2052+
if let Some(event_id) = event_id_opt {
2053+
pins.insert(event_id);
2054+
}
20202055
Ok(())
20212056
}
20222057

@@ -2114,6 +2149,7 @@ impl PeerNetwork {
21142149
/// Run a single state-machine to completion
21152150
fn inv_sync_run(
21162151
&mut self,
2152+
pins: &mut HashSet<usize>,
21172153
sortdb: &SortitionDB,
21182154
nk: &NeighborKey,
21192155
stats: &mut NeighborBlockStats,
@@ -2130,13 +2166,13 @@ impl PeerNetwork {
21302166
debug!("Inv sync state is {:?}", &stats.state);
21312167
let again = match stats.state {
21322168
InvWorkState::GetPoxInvBegin => self
2133-
.inv_getpoxinv_begin(sortdb, nk, stats, request_timeout)
2169+
.inv_getpoxinv_begin(pins, sortdb, nk, stats, request_timeout)
21342170
.and_then(|_| Ok(true))?,
21352171
InvWorkState::GetPoxInvFinish => {
21362172
self.inv_getpoxinv_try_finish(sortdb, nk, stats, ibd)?
21372173
}
21382174
InvWorkState::GetBlocksInvBegin => self
2139-
.inv_getblocksinv_begin(sortdb, nk, stats, request_timeout)
2175+
.inv_getblocksinv_begin(pins, sortdb, nk, stats, request_timeout)
21402176
.and_then(|_| Ok(true))?,
21412177
InvWorkState::GetBlocksInvFinish => {
21422178
self.inv_getblocksinv_try_finish(nk, stats, ibd)?
@@ -2231,9 +2267,10 @@ impl PeerNetwork {
22312267
) -> (bool, bool, Vec<NeighborKey>, Vec<NeighborKey>) {
22322268
PeerNetwork::with_inv_state(self, |network, inv_state| {
22332269
debug!(
2234-
"{:?}: Inventory state has {} block stats tracked",
2270+
"{:?}: Inventory state has {} block stats tracked on connections {:?}",
22352271
&network.local_peer,
2236-
inv_state.block_stats.len()
2272+
inv_state.block_stats.len(),
2273+
inv_state.pinned,
22372274
);
22382275

22392276
let mut all_done = true;
@@ -2261,6 +2298,7 @@ impl PeerNetwork {
22612298
return (true, true, vec![], vec![]);
22622299
}
22632300

2301+
let mut new_pins = HashSet::new();
22642302
for (nk, stats) in inv_state.block_stats.iter_mut() {
22652303
debug!(
22662304
"{:?}: inv state-machine for {:?} is in state {:?}, at PoX {},target={}; blocks {},target={}; status {:?}, done={}",
@@ -2275,7 +2313,7 @@ impl PeerNetwork {
22752313
stats.done
22762314
);
22772315
if !stats.done {
2278-
match network.inv_sync_run(sortdb, nk, stats, inv_state.request_timeout, ibd) {
2316+
match network.inv_sync_run(&mut new_pins, sortdb, nk, stats, inv_state.request_timeout, ibd) {
22792317
Ok(d) => d,
22802318
Err(net_error::StaleView) => {
22812319
// stop work on this state machine -- it needs to be restarted.
@@ -2341,6 +2379,9 @@ impl PeerNetwork {
23412379
}
23422380
}
23432381
}
2382+
let _ = new_pins
2383+
.into_iter()
2384+
.map(|event_id| inv_state.pin_connection(event_id));
23442385

23452386
if all_done {
23462387
let mut new_sync_peers = network.get_outbound_sync_peers();
@@ -2450,6 +2491,7 @@ impl PeerNetwork {
24502491
}
24512492

24522493
inv_state.reset_sync_peers(
2494+
network,
24532495
good_sync_peers_set,
24542496
&bootstrap_peers,
24552497
network.connection_opts.num_neighbors as usize,

stackslib/src/net/inv/nakamoto.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
use std::collections::{BTreeMap, HashMap};
17+
use std::collections::{BTreeMap, HashMap, HashSet};
1818

1919
use stacks_common::bitvec::BitVec;
2020
use stacks_common::types::chainstate::StacksBlockId;
@@ -557,6 +557,10 @@ impl<NC: NeighborComms> NakamotoInvStateMachine<NC> {
557557
self.comms.reset();
558558
}
559559

560+
pub fn get_pinned_connections(&self) -> &HashSet<usize> {
561+
self.comms.get_pinned_connections()
562+
}
563+
560564
/// Remove state for a particular neighbor
561565
pub fn del_peer(&mut self, peer: &NeighborAddress) {
562566
self.inventories.remove(peer);

stackslib/src/net/neighbors/comms.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub trait NeighborComms {
5050
fn get_connecting<NK: ToNeighborKey>(&self, network: &PeerNetwork, nk: &NK) -> Option<usize>;
5151
/// Remove a neighbor from connecting state
5252
fn remove_connecting<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK);
53+
/// Remove a neighbor from connecting state due to an error
54+
fn remove_connecting_error<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK);
5355
/// Mark a neighbor as dead (inactive, unreachable, etc.)
5456
fn add_dead<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK);
5557
/// Mark a neighbor as broken (in protocol violation)
@@ -150,7 +152,7 @@ pub trait NeighborComms {
150152
// is the peer network still working?
151153
if !network.is_connecting(event_id) {
152154
debug!("{:?}: Failed to connect to {:?} (event {} no longer connecting; assumed timed out)", network.get_local_peer(), event_id, &nk);
153-
self.remove_connecting(network, &nk);
155+
self.remove_connecting_error(network, &nk);
154156
return Err(net_error::PeerNotConnected);
155157
}
156158

@@ -518,7 +520,13 @@ impl NeighborComms for PeerNetworkComms {
518520
.map(|event_ref| *event_ref)
519521
}
520522

523+
/// Remove a connecting neighbor because it connected
521524
fn remove_connecting<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK) {
525+
self.connecting.remove(&nk.to_neighbor_key(network));
526+
}
527+
528+
/// Remove a connecting neighbor due to an error. The connection will be unpinned.
529+
fn remove_connecting_error<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK) {
522530
let event_id_opt = self.connecting.remove(&nk.to_neighbor_key(network));
523531
if let Some(event_id) = event_id_opt {
524532
self.unpin_connection(event_id);

stackslib/src/net/p2p.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,7 +2399,7 @@ impl PeerNetwork {
23992399
}
24002400

24012401
/// Prune inbound and outbound connections if we can
2402-
fn prune_connections(&mut self) -> () {
2402+
pub(crate) fn prune_connections(&mut self) -> () {
24032403
if cfg!(test) && self.connection_opts.disable_network_prune {
24042404
return;
24052405
}
@@ -2443,6 +2443,22 @@ impl PeerNetwork {
24432443
}
24442444
}
24452445

2446+
// if we're in the middle of epoch2 inv sync, then don't prune any connections it
2447+
// established
2448+
if let Some(inv_state) = self.inv_state.as_ref() {
2449+
if inv_state.get_pinned_connections().contains(event_id) {
2450+
safe.insert(*event_id);
2451+
}
2452+
}
2453+
2454+
// if we're in the middle of nakamoto inv sync, then don't prune any connections it
2455+
// established
2456+
if let Some(nakamoto_inv) = self.inv_state_nakamoto.as_ref() {
2457+
if nakamoto_inv.get_pinned_connections().contains(event_id) {
2458+
safe.insert(*event_id);
2459+
}
2460+
}
2461+
24462462
// if we're running stacker DBs, then don't prune any outbound connections it
24472463
// established
24482464
if let Some(stacker_db_syncs) = self.stacker_db_syncs.as_ref() {
@@ -2454,6 +2470,7 @@ impl PeerNetwork {
24542470
}
24552471
}
24562472

2473+
debug!("Pinned connections: {:?}", &safe);
24572474
self.prune_frontier(&safe);
24582475
}
24592476

0 commit comments

Comments
 (0)