Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ where
/// # Returns
/// - `Ok(())` if synchronization completed successfully or the validator is already up to date
/// - `Err` if there was a communication or storage error
#[instrument(level = "trace", skip_all)]
pub async fn send_chain_information(
&mut self,
chain_id: ChainId,
Expand Down Expand Up @@ -896,8 +897,9 @@ where

/// Sends chain information for all chains referenced by the given blobs.
///
/// Reads blob states from storage, determines the chain heights needed,
/// and sends chain information to bring the validator up to date.
/// Reads blob states from storage, determines the specific chain heights needed,
/// and sends chain information for those heights. With sparse chains, this only
/// sends the specific blocks containing the blobs, not all blocks up to those heights.
async fn send_chain_info_for_blobs(
&mut self,
blob_ids: &[BlobId],
Expand All @@ -909,20 +911,72 @@ where
.read_blob_states_from_storage(blob_ids)
.await?;

let mut chain_heights = BTreeMap::new();
let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
for blob_state in blob_states {
let block_chain_id = blob_state.chain_id;
let block_height = blob_state.block_height.try_add_one()?;
let block_height = blob_state.block_height;
chain_heights
.entry(block_chain_id)
.and_modify(|h| *h = block_height.max(*h))
.or_insert(block_height);
.or_default()
.insert(block_height);
}

self.send_chain_info_up_to_heights(chain_heights, delivery)
self.send_chain_info_at_heights(chain_heights, delivery)
.await
}

/// Sends chain information for specific heights on multiple chains.
///
/// Unlike `send_chain_info_up_to_heights`, this method only sends the blocks at the
/// specified heights, not all blocks up to those heights. This is more efficient for
/// sparse chains where only specific blocks are needed.
async fn send_chain_info_at_heights(
&mut self,
chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
delivery: CrossChainMessageDelivery,
) -> Result<(), ChainClientError> {
FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, heights)| {
let mut updater = self.clone();
async move {
// Get all block hashes for this chain at the specified heights in one call
let heights_vec: Vec<_> = heights.into_iter().collect();
let hashes = updater
.client
.local_node
.get_block_hashes(chain_id, heights_vec.clone())
.await?;

if hashes.len() != heights_vec.len() {
return Err(ChainClientError::InternalError(
"send_chain_info_at_heights called with invalid heights",
));
}

// Read all certificates in one call
let certificates = updater
.client
.local_node
.storage_client()
.read_certificates(hashes.clone())
.await?;

// Send each certificate
for (hash, certificate) in hashes.into_iter().zip(certificates) {
let certificate =
certificate.ok_or_else(|| ChainClientError::MissingConfirmedBlock(hash))?;
updater
.send_confirmed_certificate(certificate, delivery)
.await?;
}

Ok::<_, ChainClientError>(())
}
}))
.try_collect::<Vec<_>>()
.await?;
Ok(())
}

async fn send_chain_info_up_to_heights(
&mut self,
chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
Expand Down
Loading