Skip to content

sync/fix: Clear gap sync on known imported blocks #8445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions prdoc/pr_8445.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
title: Fix the clearing of gap sync on known imported blocks

doc:
- audience: [Node Dev, Node Operator]
description: |
This PR ensures that warp sync gaps are properly cleared when known blocks are imported. Previously, gaps were only removed in response to ImportedUnknown events.
This limitation caused issues for asset-hub and bridge-hub collators, which remained stuck in the "Block history" state without progressing.
The root cause lies in the client.info() reporting a gap during node startup or restart (ie block verification fails). In some cases, a peer may respond with the missing blocks after we’ve already imported them locally, leaving the gap open.

crates:
- name: sc-network
bump: patch
28 changes: 28 additions & 0 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,34 @@ pub enum SyncingAction<B: BlockT> {
Finished,
}

// Note: Ideally we can deduce this information with #[derive(derive_more::Debug)].
// However, we'd need a bump to the latest version 2 of the crate.
impl<B> std::fmt::Debug for SyncingAction<B>
where
B: BlockT,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Self::StartRequest { peer_id, key, remove_obsolete, .. } => {
write!(
f,
"StartRequest {{ peer_id: {:?}, key: {:?}, remove_obsolete: {:?} }}",
peer_id, key, remove_obsolete
)
},
Self::CancelRequest { peer_id, key } => {
write!(f, "CancelRequest {{ peer_id: {:?}, key: {:?} }}", peer_id, key)
},
Self::DropPeer(peer) => write!(f, "DropPeer({:?})", peer),
Self::ImportBlocks { blocks, .. } => write!(f, "ImportBlocks({:?})", blocks),
Self::ImportJustifications { hash, number, .. } => {
write!(f, "ImportJustifications({:?}, {:?})", hash, number)
},
Self::Finished => write!(f, "Finished"),
}
}
}

impl<B: BlockT> SyncingAction<B> {
/// Returns `true` if the syncing action has completed.
pub fn is_finished(&self) -> bool {
Expand Down
34 changes: 22 additions & 12 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,12 @@ where
has_error |= result.is_err();

match result {
Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
if let Some(peer) = peer_id {
self.update_peer_common_number(&peer, number);
},
}
self.complete_gap_if_target(number);
},
Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
if aux.clear_justification_requests {
trace!(
Expand Down Expand Up @@ -720,15 +722,8 @@ where
self.mode = ChainSyncMode::Full;
self.restart();
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: LOG_TARGET,
"Block history download is complete."
);
self.gap_sync = None;
}

self.complete_gap_if_target(number);
},
Err(BlockImportError::IncompleteHeader(peer_id)) =>
if let Some(peer) = peer_id {
Expand Down Expand Up @@ -992,6 +987,18 @@ where
Ok(sync)
}

/// Complete the gap sync if the target number is reached and there is a gap.
fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: LOG_TARGET,
"Block history download is complete."
);
self.gap_sync = None;
}
}

#[must_use]
fn add_peer_inner(
&mut self,
Expand Down Expand Up @@ -1676,6 +1683,8 @@ where
/// state for.
fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
let info = self.client.info();
debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");

if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1705,7 +1714,8 @@ where
}

if let Some(BlockGap { start, end, .. }) = info.block_gap {
debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
self.gap_sync = Some(GapSync {
best_queued_number: start - One::one(),
target: end,
Expand Down
214 changes: 214 additions & 0 deletions substrate/client/network/sync/src/strategy/chain_sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,3 +1127,217 @@ fn request_across_forks() {
assert!(sync.is_known(&block.header.parent_hash()));
}
}

/// This test simulates a scenario where we get a `VerificationFailed` error
/// while a gap reported by our client.info(). Then the gap is filled after
/// the restart of the sync process. The test ensures that the gap is properly closed
/// on importing unknown blocks (ie blocks we don't have in our chain yet).
#[test]
fn sync_verification_failed_with_gap_filled() {
sp_tracing::try_init_simple();

// We only care about 2 iterations of the loop (since max blocks per request is 64).
const TEST_TARGET: u32 = 64 * 3;

let blocks = {
let client = TestClientBuilder::new().build();
(0..TEST_TARGET).map(|_| build_block(&client, None, false)).collect::<Vec<_>>()
};

let client = Arc::new(TestClientBuilder::new().build());
let info = client.info();

let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
None,
std::iter::empty(),
)
.unwrap();

let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();

let best_block = blocks.last().unwrap().clone();
let max_blocks_to_request = sync.max_blocks_per_request;

let status = sync.status();
assert!(status.warp_sync.is_none());
log::info!(target: LOG_TARGET, "Before adding peers: {status:?}");

// Connect the node we will sync from
sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number());
sync.add_peer(peer_id2, info.best_hash, 0);

let mut best_block_num = 0;
assert_eq!(sync.best_queued_number, 0);

// Two iterations to simulate the gap filling.
for loop_index in 0..2 {
log::info!(target: LOG_TARGET, "Loop index: {loop_index}");

// Build the request.
let request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());

// Clear old actions to not deal with them
let _ = sync.take_actions();

let status = sync.status();
log::info!(target: LOG_TARGET, "Status before on_block_data: {status:?}");

sync.on_block_data(&peer_id1, Some(request.clone()), response.clone()).unwrap();

let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() ==
max_blocks_to_request as usize, ));

let status = sync.status();
log::info!(target: LOG_TARGET, "Status before processing blocks: {status:?}");

best_block_num += max_blocks_to_request as u32;

let responses: Vec<_> = resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id1),
)),
b.hash(),
)
})
.collect();

sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
responses,
);

let status = sync.status();
log::info!(target: LOG_TARGET, "Status after processing blocks: {status:?}");

// Import the blocks as final to the client.
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());

if loop_index == 0 {
log::info!(target: LOG_TARGET, "Peer state {:#?}", sync.peers);

// Both peers are in the available state.
match sync.peers.get(&peer_id1) {
Some(peer) => assert_eq!(peer.state, PeerSyncState::Available),
None => panic!("Peer not found"),
}
match sync.peers.get(&peer_id2) {
Some(peer) => assert_eq!(peer.state, PeerSyncState::Available),
None => panic!("Peer not found"),
}

// Simulate that we encounter a `VerificationFailed` error while processing the blocks.
// During this error, the sync will enter the `AncestorSearch` state for the peer 1
// because of the sync restart operation. Then, the peer will be in the `Available`
// state after the ancestor search is done. However, we still have the gap present.
sync.gap_sync = Some(GapSync {
best_queued_number: 64 as u64,
target: 84 as u64,
blocks: BlockCollection::new(),
});
} else if loop_index == 1 {
if sync.gap_sync.is_none() {
log::info!(target: LOG_TARGET, "Gap successfully closed");
} else {
panic!("Gap not closed after the second loop");
}
}
}
}

#[test]
fn sync_gap_filled_regardless_of_blocks_origin() {
sp_tracing::try_init_simple();

let blocks = {
let client = TestClientBuilder::new().build();
(0..2).map(|_| build_block(&client, None, false)).collect::<Vec<_>>()
};

let client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
None,
std::iter::empty(),
)
.unwrap();

let peer_id1 = PeerId::random();

// BlockImportStatus::ImportedUnknown clears the gap.
{
// Simulate that we encounter a `VerificationFailed` error while processing the blocks
// and the client.info() reports a gap.
sync.gap_sync = Some(GapSync {
best_queued_number: *blocks[0].header().number(),
target: *blocks[0].header().number(),
blocks: BlockCollection::new(),
});

// Announce the block as unknown.
let results = [(
Ok(BlockImportStatus::ImportedUnknown(
*blocks[0].header().number(),
Default::default(),
Some(peer_id1),
)),
blocks[0].hash(),
)];
sync.on_blocks_processed(1, 1, results.into_iter().collect());
// Ensure the gap is cleared out.
assert!(sync.gap_sync.is_none());
}

// BlockImportStatus::ImportedKnown also clears the gap.
{
sync.gap_sync = Some(GapSync {
best_queued_number: *blocks[0].header().number(),
target: *blocks[0].header().number(),
blocks: BlockCollection::new(),
});

// Announce the block as known.
let results = [(
Ok(BlockImportStatus::ImportedKnown(*blocks[0].header().number(), Some(peer_id1))),
blocks[0].hash(),
)];

sync.on_blocks_processed(1, 1, results.into_iter().collect());
// Ensure the gap is cleared out.
assert!(sync.gap_sync.is_none());
}
}
Loading