diff --git a/linera-base/Cargo.toml b/linera-base/Cargo.toml index e8f6f1d04c1f..66cc47bc85d5 100644 --- a/linera-base/Cargo.toml +++ b/linera-base/Cargo.toml @@ -51,8 +51,8 @@ linera-kywasmtime = { workspace = true, optional = true } linera-witty = { workspace = true, features = ["macros"] } lru.workspace = true prometheus = { workspace = true, optional = true } -quick_cache.workspace = true proptest = { workspace = true, optional = true, features = ["alloc"] } +quick_cache.workspace = true rand.workspace = true reqwest = { workspace = true, optional = true } serde = { workspace = true, features = ["rc"] } diff --git a/linera-core/Cargo.toml b/linera-core/Cargo.toml index e9fb9d1f4537..3047faa7481e 100644 --- a/linera-core/Cargo.toml +++ b/linera-core/Cargo.toml @@ -67,10 +67,10 @@ linera-storage.workspace = true linera-version.workspace = true linera-views.workspace = true lru.workspace = true -quick_cache.workspace = true papaya = { workspace = true, features = ["serde"] } prometheus = { workspace = true, optional = true } proptest = { workspace = true, optional = true } +quick_cache.workspace = true rand = { workspace = true, features = ["std_rng"] } serde.workspace = true serde_json.workspace = true diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index febf84c1da20..411237b21d33 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -17,6 +17,7 @@ use linera_base::{ hashed::Hashed, identifiers::{ApplicationId, BlobId, ChainId, StreamId}, time::Instant, + value_cache::{ParkingCache, ValueCache}, }; use linera_chain::{ data_types::{BlockProposal, MessageBundle, ProposedBlock}, @@ -32,8 +33,6 @@ use linera_views::context::InactiveContext; use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; use tracing::{debug, instrument, trace, Instrument as _}; -use linera_base::value_cache::{ParkingCache, ValueCache}; - use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier}; use crate::{ chain_worker::BlockOutcome, diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 11e255fc2496..273f954296fb 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -20,6 +20,7 @@ use linera_base::{ ensure, hashed::Hashed, identifiers::{AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, StreamId}, + value_cache::{ParkingCache, ValueCache}, }; use linera_chain::{ data_types::{ @@ -43,8 +44,6 @@ use linera_views::{ use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard}; use tracing::{debug, instrument, trace, warn}; -use linera_base::value_cache::{ParkingCache, ValueCache}; - use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult}; use crate::{ data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, @@ -590,7 +589,8 @@ where for block in blocks { let hashed_block = block.into_inner(); let height = hashed_block.inner().header.height; - self.block_values.insert_hashed(Cow::Owned(hashed_block.clone())); + self.block_values + .insert_hashed(Cow::Owned(hashed_block.clone())); height_to_blocks.insert(height, hashed_block); } } @@ -1458,8 +1458,7 @@ where .query_application(context, query, self.service_runtime_endpoint.as_mut()) .await .with_execution_context(ChainExecutionContext::Query)?; - self.execution_state_cache - .insert(&requested_block, state); + self.execution_state_cache.insert(&requested_block, state); Ok(outcome) } else { tracing::debug!(requested_block = %requested_block, "requested block hash not found in cache, querying committed state"); diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 7c99421ad658..2d3a7b5f5ab4 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -1207,63 +1207,40 @@ impl Client { .get(&sender_chain_id) .copied() .unwrap_or(BlockHeight::ZERO); - let (max_epoch, committees) = self.admin_committees().await?; - - // Recursively collect all certificates we need, following - // the chain of previous_message_blocks back to next_outbox_height. - let mut certificates = BTreeMap::new(); - let mut current_height = height; - // Stop if we've reached the height we've already processed. - while current_height >= next_outbox_height { - // Download the certificate for this height. - let downloaded = self - .requests_scheduler - .download_certificates_by_heights( - remote_node, - sender_chain_id, - vec![current_height], - ) - .await?; - let Some(certificate) = downloaded.into_iter().next() else { - return Err(chain_client::Error::CannotDownloadMissingSenderBlock { - chain_id: sender_chain_id, - height: current_height, - }); - }; + // Skip if we already have all certificates up to this height. + if height < next_outbox_height { + return Ok(()); + } - // Validate the certificate. - Client::::check_certificate(max_epoch, &committees, &certificate)? - .into_result()?; + let (max_epoch, committees) = self.admin_committees().await?; - // Check if there's a previous message block to our chain. - let block = certificate.block(); - let next_height = block - .body - .previous_message_blocks - .get(&receiver_chain_id) - .map(|(_prev_hash, prev_height)| *prev_height); - - // Store this certificate. - certificates.insert(current_height, certificate); - - if let Some(prev_height) = next_height { - // Continue with the previous block. - current_height = prev_height; - } else { - // No more dependencies. - break; - } - } + // Request the proxy to do the traversal and return the certificates + // in ascending height order, ready for processing. + let certificates = self + .requests_scheduler + .download_sender_certificates_for_receiver( + remote_node, + sender_chain_id, + receiver_chain_id, + height, + next_outbox_height, + ) + .await?; if certificates.is_empty() { self.local_node .retry_pending_cross_chain_requests(sender_chain_id) .await?; + return Ok(()); } - // Process certificates in ascending block height order (BTreeMap keeps them sorted). - for certificate in certificates.into_values() { + // Process certificates in ascending block height order (already sorted by proxy). + for certificate in certificates { + // Validate the certificate. + Client::::check_certificate(max_epoch, &committees, &certificate)? + .into_result()?; + self.receive_sender_certificate( certificate, ReceiveCertificateMode::AlreadyChecked, diff --git a/linera-core/src/client/requests_scheduler/request.rs b/linera-core/src/client/requests_scheduler/request.rs index 308e8232e57b..7164f4dc1e14 100644 --- a/linera-core/src/client/requests_scheduler/request.rs +++ b/linera-core/src/client/requests_scheduler/request.rs @@ -19,6 +19,13 @@ pub enum RequestKey { chain_id: ChainId, heights: Vec, }, + /// Download sender certificates for a receiver chain + SenderCertificates { + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + }, /// Download a blob by ID Blob(BlobId), /// Download a pending blob @@ -32,6 +39,9 @@ impl RequestKey { pub(super) fn chain_id(&self) -> Option { match self { RequestKey::Certificates { chain_id, .. } => Some(*chain_id), + RequestKey::SenderCertificates { + sender_chain_id, .. + } => Some(*sender_chain_id), RequestKey::PendingBlob { chain_id, .. } => Some(*chain_id), _ => None, } diff --git a/linera-core/src/client/requests_scheduler/scheduler.rs b/linera-core/src/client/requests_scheduler/scheduler.rs index c482c8206fa0..396728d97af0 100644 --- a/linera-core/src/client/requests_scheduler/scheduler.rs +++ b/linera-core/src/client/requests_scheduler/scheduler.rs @@ -396,6 +396,35 @@ impl RequestsScheduler { .await } + pub async fn download_sender_certificates_for_receiver( + &self, + peer: &RemoteNode, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + self.with_peer( + RequestKey::SenderCertificates { + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + }, + peer.clone(), + move |peer| async move { + peer.download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + ) + .await + }, + ) + .await + } + pub async fn download_certificate_for_blob( &self, peer: &RemoteNode, diff --git a/linera-core/src/data_types.rs b/linera-core/src/data_types.rs index fba8b3a41298..7c816dbf0fa3 100644 --- a/linera-core/src/data_types.rs +++ b/linera-core/src/data_types.rs @@ -347,6 +347,22 @@ pub struct CertificatesByHeightRequest { pub heights: Vec, } +/// Request for downloading sender certificates with their sending ancestors. +/// The proxy will traverse `previous_message_blocks` to find all certificates +/// that sent messages to the receiver chain. +#[derive(Debug, Clone)] +pub struct SenderCertificatesRequest { + /// The chain that sent the messages. + pub sender_chain_id: ChainId, + /// The chain that receives the messages. + pub receiver_chain_id: ChainId, + /// The target block height to start from. + pub target_height: BlockHeight, + /// The height from which to stop traversing (exclusive lower bound). + /// This is typically the `next_outbox_height` the client already has. + pub start_height: BlockHeight, +} + /// The outcome of trying to commit a list of operations to the chain. #[derive(Debug)] pub enum ClientOutcome { diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index 17372fc8c128..3c80efb8e100 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -156,6 +156,19 @@ pub trait ValidatorNode { heights: Vec, ) -> Result, NodeError>; + /// Downloads sender certificates with their sending ancestors. + /// The proxy traverses `previous_message_blocks` to find all certificates + /// from `sender_chain_id` that sent messages to `receiver_chain_id`, + /// starting from `target_height` and stopping at `start_height`. + /// Returns certificates in ascending height order, ready for processing. + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError>; + /// Returns the hash of the `Certificate` that last used a blob. async fn blob_last_used_by(&self, blob_id: BlobId) -> Result; diff --git a/linera-core/src/remote_node.rs b/linera-core/src/remote_node.rs index 73ced78390a7..f836ecfd6182 100644 --- a/linera-core/src/remote_node.rs +++ b/linera-core/src/remote_node.rs @@ -266,6 +266,43 @@ impl RemoteNode { Ok(certificates) } + /// Downloads sender certificates with their sending ancestors. + #[instrument(level = "trace")] + pub async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + let certificates = self + .node + .download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + ) + .await?; + + // Validate that all returned certificates are for the sender chain + // and in ascending height order. + let mut prev_height = None; + for certificate in &certificates { + ensure!( + certificate.inner().chain_id() == sender_chain_id, + NodeError::UnexpectedCertificateValue + ); + let height = certificate.inner().height(); + if let Some(prev) = prev_height { + ensure!(height > prev, NodeError::UnexpectedCertificateValue); + } + prev_height = Some(height); + } + + Ok(certificates) + } + /// Checks that requesting these blobs when trying to handle this certificate is legitimate, /// i.e. that there are no duplicates and the blobs are actually required. pub fn check_blobs_not_found( diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index bd9994ec54e9..e3e5424e8284 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -290,6 +290,25 @@ where total_shards: 1, }) } + + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + self.spawn_and_receive(move |validator, sender| { + validator.do_download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + sender, + ) + }) + .await + } } impl LocalValidatorClient @@ -705,6 +724,95 @@ where .map_err(Into::into); sender.send(missing_blob_ids) } + + async fn do_download_sender_certificates_for_receiver( + self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + sender: oneshot::Sender, NodeError>>, + ) -> Result<(), Result, NodeError>> { + // Skip if we already have all certificates up to this height. + if target_height < start_height { + return sender.send(Ok(Vec::new())); + } + + // Get certificate hash for target_height only. + let (query_sender, query_receiver) = oneshot::channel(); + let query = ChainInfoQuery::new(sender_chain_id) + .with_sent_certificate_hashes_by_heights(vec![target_height]); + + self.clone() + .do_handle_chain_info_query(query, query_sender) + .await + .ok(); + + let mut current_hash = match query_receiver.await { + Ok(Ok(response)) => response + .info + .requested_sent_certificate_hashes + .into_iter() + .next(), + Ok(Err(e)) => return sender.send(Err(e)), + Err(_) => { + return sender.send(Err(NodeError::ClientIoError { + error: "Failed to receive chain info".to_string(), + })) + } + }; + + let Some(initial_hash) = current_hash else { + return sender.send(Ok(Vec::new())); + }; + current_hash = Some(initial_hash); + + // Traverse previous_message_blocks following the hash chain. + let mut certificates_to_return = Vec::new(); + + while let Some(hash) = current_hash { + // Download the certificate by hash. + let (cert_sender, cert_receiver) = oneshot::channel(); + self.clone() + .do_download_certificate(hash, cert_sender) + .await + .ok(); + + let certificate = match cert_receiver.await { + Ok(Ok(cert)) => cert, + Ok(Err(e)) => return sender.send(Err(e)), + Err(_) => { + return sender.send(Err(NodeError::ClientIoError { + error: "Failed to receive certificate".to_string(), + })) + } + }; + + let height = certificate.inner().height(); + if height < start_height { + break; + } + + // Get the previous message block entry which contains (hash, height). + let prev_entry = certificate + .block() + .body + .previous_message_blocks + .get(&receiver_chain_id) + .cloned(); + + certificates_to_return.push(certificate); + + // Follow the hash chain - previous_message_blocks already has the hash! + current_hash = prev_entry + .filter(|(_, h)| *h >= start_height) + .map(|(h, _)| h); + } + + // Return certificates in ascending height order. + certificates_to_return.reverse(); + sender.send(Ok(certificates_to_return)) + } } #[derive(Clone)] diff --git a/linera-core/src/unit_tests/value_cache_tests.rs b/linera-core/src/unit_tests/value_cache_tests.rs index 2731a34b0387..cceafa2e934b 100644 --- a/linera-core/src/unit_tests/value_cache_tests.rs +++ b/linera-core/src/unit_tests/value_cache_tests.rs @@ -145,7 +145,8 @@ fn test_one_over_capacity() { // Exactly one value should have been evicted let present_count = values.iter().filter(|v| cache.contains(&v.hash())).count(); assert_eq!( - present_count, TEST_CACHE_SIZE, + present_count, + TEST_CACHE_SIZE, "Expected {} items in cache after inserting {} items with capacity {}", TEST_CACHE_SIZE, values.len(), diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index b2c6b1530fb7..b8ceadf32082 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -19,6 +19,7 @@ use linera_base::{ identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId}, time::Instant, util::traits::DynError, + value_cache::{ParkingCache, ValueCache}, }; #[cfg(with_testing)] use linera_chain::ChainExecutionContext; @@ -49,7 +50,6 @@ use crate::{ notifier::Notifier, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; -use linera_base::value_cache::{ParkingCache, ValueCache}; #[cfg(test)] #[path = "unit_tests/worker_tests.rs"] diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index dc07c42f0859..df091595e27f 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -94,6 +94,10 @@ service ValidatorNode { // Download a batch of certificates, in serialized form, by their heights. rpc DownloadRawCertificatesByHeights(DownloadCertificatesByHeightsRequest) returns (RawCertificatesBatch); + // Download sender certificates with their sending ancestors. + // Traverses previous_message_blocks to find all certificates that sent messages to the receiver. + rpc DownloadSenderCertificatesForReceiver(SenderCertificatesRequest) returns (CertificatesBatchResponse); + // Return the hash of the `Certificate` that last used a blob. rpc BlobLastUsedBy(BlobId) returns (CryptoHash); @@ -124,6 +128,14 @@ message DownloadCertificatesByHeightsRequest { repeated BlockHeight heights = 2; } +// A request for downloading sender certificates with their sending ancestors. +message SenderCertificatesRequest { + ChainId sender_chain_id = 1; + ChainId receiver_chain_id = 2; + BlockHeight target_height = 3; + BlockHeight start_height = 4; +} + // A request for a batch of certificates. message CertificatesBatchRequest { repeated CryptoHash hashes = 1; diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index 55bbad0527d4..cde3d1e61714 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -318,4 +318,37 @@ impl ValidatorNode for Client { Client::Simple(simple_client) => simple_client.get_shard_info(chain_id).await?, }) } + + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + Ok(match self { + Client::Grpc(grpc_client) => { + grpc_client + .download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + ) + .await? + } + + #[cfg(with_simple_network)] + Client::Simple(simple_client) => { + simple_client + .download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + ) + .await? + } + }) + } } diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index 7d665e31c4a6..bd164a75c3a3 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -19,7 +19,7 @@ use linera_chain::{ }, }; use linera_core::{ - data_types::{CertificatesByHeightRequest, ChainInfoResponse}, + data_types::{CertificatesByHeightRequest, ChainInfoResponse, SenderCertificatesRequest}, node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode}, worker::Notification, }; @@ -484,6 +484,29 @@ impl ValidatorNode for GrpcClient { Ok(certs_collected) } + #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))] + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + let request = SenderCertificatesRequest { + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + }; + let response = client_delegate!(self, download_sender_certificates_for_receiver, request)?; + response + .certificates + .into_iter() + .map(|cert| cert.try_into()) + .collect::>() + .map_err(|_| NodeError::UnexpectedCertificateValue) + } + #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))] async fn blob_last_used_by(&self, blob_id: BlobId) -> Result { Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?) diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index 24f8e2131b37..c45eb1dc015d 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -20,6 +20,7 @@ use linera_chain::{ use linera_core::{ data_types::{ CertificatesByHeightRequest, ChainInfoQuery, ChainInfoResponse, CrossChainRequest, + SenderCertificatesRequest, }, node::NodeError, worker::Notification, @@ -1035,6 +1036,36 @@ impl TryFrom for CertificatesByHeight } } +impl From for api::SenderCertificatesRequest { + fn from(request: SenderCertificatesRequest) -> Self { + Self { + sender_chain_id: Some(request.sender_chain_id.into()), + receiver_chain_id: Some(request.receiver_chain_id.into()), + target_height: Some(request.target_height.into()), + start_height: Some(request.start_height.into()), + } + } +} + +impl TryFrom for SenderCertificatesRequest { + type Error = GrpcProtoConversionError; + + fn try_from(request: api::SenderCertificatesRequest) -> Result { + Ok(Self { + sender_chain_id: try_proto_convert(request.sender_chain_id)?, + receiver_chain_id: try_proto_convert(request.receiver_chain_id)?, + target_height: request + .target_height + .map(Into::into) + .ok_or(GrpcProtoConversionError::MissingField)?, + start_height: request + .start_height + .map(Into::into) + .ok_or(GrpcProtoConversionError::MissingField)?, + }) + } +} + #[cfg(test)] pub mod tests { use std::{borrow::Cow, fmt::Debug}; diff --git a/linera-rpc/src/message.rs b/linera-rpc/src/message.rs index 7cf312090106..c5138aad25c4 100644 --- a/linera-rpc/src/message.rs +++ b/linera-rpc/src/message.rs @@ -50,6 +50,12 @@ pub enum RpcMessage { DownloadConfirmedBlock(Box), DownloadCertificates(Vec), DownloadCertificatesByHeights(ChainId, Vec), + DownloadSenderCertificatesForReceiver { + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + }, BlobLastUsedBy(Box), MissingBlobIds(Vec), VersionInfoQuery, @@ -67,6 +73,7 @@ pub enum RpcMessage { DownloadConfirmedBlockResponse(Box), DownloadCertificatesResponse(Vec), DownloadCertificatesByHeightsResponse(Vec), + DownloadSenderCertificatesForReceiverResponse(Vec), BlobLastUsedByResponse(Box), MissingBlobIdsResponse(Vec), @@ -114,6 +121,8 @@ impl RpcMessage { | DownloadConfirmedBlockResponse(_) | DownloadCertificatesByHeightsResponse(_) | DownloadCertificates(_) + | DownloadSenderCertificatesForReceiver { .. } + | DownloadSenderCertificatesForReceiverResponse(_) | BlobLastUsedBy(_) | BlobLastUsedByResponse(_) | BlobLastUsedByCertificate(_) @@ -145,7 +154,8 @@ impl RpcMessage { | BlobLastUsedByCertificate(_) | MissingBlobIds(_) | DownloadCertificates(_) - | DownloadCertificatesByHeights(_, _) => true, + | DownloadCertificatesByHeights(_, _) + | DownloadSenderCertificatesForReceiver { .. } => true, BlockProposal(_) | LiteCertificate(_) | TimeoutCertificate(_) @@ -169,7 +179,8 @@ impl RpcMessage { | BlobLastUsedByCertificateResponse(_) | MissingBlobIdsResponse(_) | DownloadCertificatesResponse(_) - | DownloadCertificatesByHeightsResponse(_) => false, + | DownloadCertificatesByHeightsResponse(_) + | DownloadSenderCertificatesForReceiverResponse(_) => false, } } } @@ -234,8 +245,11 @@ impl TryFrom for Vec { type Error = NodeError; fn try_from(message: RpcMessage) -> Result { match message { - RpcMessage::DownloadCertificatesResponse(certificates) => Ok(certificates), - RpcMessage::DownloadCertificatesByHeightsResponse(certificates) => Ok(certificates), + RpcMessage::DownloadCertificatesResponse(certificates) + | RpcMessage::DownloadCertificatesByHeightsResponse(certificates) + | RpcMessage::DownloadSenderCertificatesForReceiverResponse(certificates) => { + Ok(certificates) + } RpcMessage::Error(error) => Err(*error), _ => Err(NodeError::UnexpectedMessage), } diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index 6a08b30c46fd..cd0e8771d26e 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -248,6 +248,22 @@ impl ValidatorNode for SimpleClient { Ok(certificates) } + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result, NodeError> { + self.query(RpcMessage::DownloadSenderCertificatesForReceiver { + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + }) + .await + } + async fn blob_last_used_by(&self, blob_id: BlobId) -> Result { self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id))) .await diff --git a/linera-rpc/src/simple/server.rs b/linera-rpc/src/simple/server.rs index 970b9756338d..a016d2e509aa 100644 --- a/linera-rpc/src/simple/server.rs +++ b/linera-rpc/src/simple/server.rs @@ -380,7 +380,9 @@ where | RpcMessage::UploadBlob(_) | RpcMessage::UploadBlobResponse(_) | RpcMessage::DownloadCertificatesByHeights(_, _) - | RpcMessage::DownloadCertificatesByHeightsResponse(_) => { + | RpcMessage::DownloadCertificatesByHeightsResponse(_) + | RpcMessage::DownloadSenderCertificatesForReceiver { .. } + | RpcMessage::DownloadSenderCertificatesForReceiverResponse(_) => { Err(NodeError::UnexpectedMessage) } }; diff --git a/linera-service/src/exporter/test_utils.rs b/linera-service/src/exporter/test_utils.rs index 3f737fae64e4..f08fadac8bec 100644 --- a/linera-service/src/exporter/test_utils.rs +++ b/linera-service/src/exporter/test_utils.rs @@ -384,6 +384,13 @@ impl ValidatorNode for DummyValidator { ) -> Result, Status> { unimplemented!() } + + async fn download_sender_certificates_for_receiver( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } } #[async_trait] diff --git a/linera-service/src/proxy/grpc.rs b/linera-service/src/proxy/grpc.rs index a0df4011fe9f..3868eb754473 100644 --- a/linera-service/src/proxy/grpc.rs +++ b/linera-service/src/proxy/grpc.rs @@ -15,10 +15,17 @@ use std::{ use anyhow::Result; use async_trait::async_trait; -use futures::{future::BoxFuture, stream, stream::BoxStream, stream::StreamExt as _, FutureExt as _}; +use futures::{ + future::BoxFuture, + stream, + stream::{BoxStream, StreamExt as _}, + FutureExt as _, +}; use linera_base::identifiers::ChainId; use linera_core::{ - data_types::{CertificatesByHeightRequest, ChainInfo, ChainInfoQuery}, + data_types::{ + CertificatesByHeightRequest, ChainInfo, ChainInfoQuery, SenderCertificatesRequest, + }, node::NodeError, notifier::ChannelNotifier, JoinSetExt as _, @@ -795,6 +802,108 @@ where })) } + #[instrument( + skip_all, + err(Display), + fields(method = "download_sender_certificates_for_receiver") + )] + async fn download_sender_certificates_for_receiver( + &self, + request: Request, + ) -> Result, Status> { + let request: SenderCertificatesRequest = request.into_inner().try_into()?; + + // Skip if we already have all certificates up to this height. + if request.target_height < request.start_height { + return Ok(Response::new(CertificatesBatchResponse::try_from(Vec::< + linera_chain::types::Certificate, + >::new( + ))?)); + } + + // Get certificate hash for target_height only. + let chain_info_request = ChainInfoQuery::new(request.sender_chain_id) + .with_sent_certificate_hashes_by_heights(vec![request.target_height]); + + let chain_info_response = self + .handle_chain_info_query(Request::new(chain_info_request.try_into()?)) + .await?; + + let chain_info_result = chain_info_response.into_inner(); + let mut current_hash = match chain_info_result.inner { + Some(api::chain_info_result::Inner::ChainInfoResponse(response)) => { + let chain_info: ChainInfo = + bincode::deserialize(&response.chain_info).map_err(|e| { + Status::internal(format!("Failed to deserialize ChainInfo: {}", e)) + })?; + chain_info + .requested_sent_certificate_hashes + .into_iter() + .next() + } + Some(api::chain_info_result::Inner::Error(error)) => { + let error = + bincode::deserialize(&error).unwrap_or_else(|err| NodeError::GrpcError { + error: format!("failed to unmarshal error message: {}", err), + }); + return Err(Status::internal(format!( + "Chain info query failed: {error}" + ))); + } + None => { + return Err(Status::internal("Empty chain info result")); + } + }; + + let Some(initial_hash) = current_hash else { + return Ok(Response::new(CertificatesBatchResponse::try_from(Vec::< + linera_chain::types::Certificate, + >::new( + ))?)); + }; + current_hash = Some(initial_hash); + + // Traverse previous_message_blocks following the hash chain. + let mut certificates_to_return = Vec::new(); + + while let Some(hash) = current_hash { + // Read the certificate from storage (benefits from the certificate cache). + let certificate = self + .0 + .storage + .read_certificate(hash) + .await + .map_err(Self::view_error_to_status)? + .ok_or_else(|| Status::internal(format!("Missing certificate {}", hash)))?; + + let height = certificate.inner().height(); + if height < request.start_height { + break; + } + + // Get the previous message block entry which contains (hash, height). + let prev_entry = certificate + .block() + .body + .previous_message_blocks + .get(&request.receiver_chain_id) + .cloned(); + + certificates_to_return.push(linera_chain::types::Certificate::from(certificate)); + + // Follow the hash chain - previous_message_blocks already has the hash! + current_hash = prev_entry + .filter(|(_, h)| *h >= request.start_height) + .map(|(h, _)| h); + } + + // Return certificates in ascending height order. + certificates_to_return.reverse(); + Ok(Response::new(CertificatesBatchResponse::try_from( + certificates_to_return, + )?)) + } + #[instrument(skip_all, err(level = Level::WARN), fields( method = "blob_last_used_by" ))] @@ -858,10 +967,8 @@ where request: Request, ) -> Result, Status> { // Group notifications by chain_id to send them in batches - let mut by_chain: std::collections::HashMap< - ChainId, - Vec>, - > = std::collections::HashMap::new(); + let mut by_chain: std::collections::HashMap>> = + std::collections::HashMap::new(); for notification in request.into_inner().notifications { let chain_id = notification diff --git a/linera-service/src/proxy/main.rs b/linera-service/src/proxy/main.rs index 6cbfa45b7a2f..42a059109371 100644 --- a/linera-service/src/proxy/main.rs +++ b/linera-service/src/proxy/main.rs @@ -26,7 +26,8 @@ use std::{net::SocketAddr, path::PathBuf, time::Duration}; use anyhow::{anyhow, bail, ensure, Result}; use async_trait::async_trait; use futures::{FutureExt as _, SinkExt, StreamExt}; -use linera_base::listen_for_shutdown_signals; +use linera_base::{data_types::BlockHeight, identifiers::ChainId, listen_for_shutdown_signals}; +use linera_chain::types::ConfirmedBlockCertificate; use linera_client::config::ValidatorServerConfig; use linera_core::{node::NodeError, JoinSetExt as _}; #[cfg(with_metrics)] @@ -317,6 +318,90 @@ where Ok(message) } + /// Downloads sender certificates with their sending ancestors. + /// Traverses `previous_message_blocks` to find all certificates from `sender_chain_id` + /// that sent messages to `receiver_chain_id`, starting from `target_height` and + /// stopping at `start_height`. Returns certificates in ascending height order. + async fn download_sender_certificates_for_receiver( + &self, + sender_chain_id: ChainId, + receiver_chain_id: ChainId, + target_height: BlockHeight, + start_height: BlockHeight, + ) -> Result> { + // Skip if we already have all certificates up to this height. + if target_height < start_height { + return Ok(Vec::new()); + } + + // Get the certificate hash for target_height only. + let shard = self.internal_config.get_shard_for(sender_chain_id).clone(); + let protocol = self.internal_config.protocol; + + let chain_info_query = RpcMessage::ChainInfoQuery(Box::new( + linera_core::data_types::ChainInfoQuery::new(sender_chain_id) + .with_sent_certificate_hashes_by_heights(vec![target_height]), + )); + + let mut current_hash = match Self::try_proxy_message( + chain_info_query, + shard, + protocol, + self.send_timeout, + self.recv_timeout, + ) + .await + { + Ok(Some(RpcMessage::ChainInfoResponse(response))) => response + .info + .requested_sent_certificate_hashes + .into_iter() + .next(), + _ => bail!("Failed to retrieve certificate hash for target height"), + }; + + let Some(initial_hash) = current_hash else { + return Ok(Vec::new()); + }; + current_hash = Some(initial_hash); + + // Traverse previous_message_blocks following the hash chain. + let mut certificates_to_return = Vec::new(); + + while let Some(hash) = current_hash { + // Read the certificate from storage (benefits from the certificate cache). + let certificate = self + .storage + .read_certificate(hash) + .await? + .ok_or_else(|| anyhow!("Missing certificate {}", hash))?; + + let height = certificate.inner().height(); + if height < start_height { + break; + } + + // Get the previous message block entry which contains (hash, height). + let prev_entry = certificate + .block() + .body + .previous_message_blocks + .get(&receiver_chain_id) + .cloned(); + + certificates_to_return.push(certificate); + + // Follow the hash chain - previous_message_blocks already has the hash! + current_hash = prev_entry + .filter(|(_, h)| *h >= start_height) + .map(|(h, _)| h); + } + + // Return certificates in ascending height order. + certificates_to_return.reverse(); + Ok(certificates_to_return) + } + async fn try_local_message(&self, message: RpcMessage) -> Result> { use RpcMessage::*; @@ -413,6 +498,24 @@ where certificates, ))) } + DownloadSenderCertificatesForReceiver { + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + } => { + let certificates = self + .download_sender_certificates_for_receiver( + sender_chain_id, + receiver_chain_id, + target_height, + start_height, + ) + .await?; + Ok(Some( + RpcMessage::DownloadSenderCertificatesForReceiverResponse(certificates), + )) + } BlobLastUsedBy(blob_id) => { let blob_state = self.storage.read_blob_state(*blob_id).await?; let blob_state = blob_state.ok_or_else(|| anyhow!("Blob not found {}", blob_id))?; @@ -464,7 +567,8 @@ where | DownloadConfirmedBlockResponse(_) | DownloadCertificatesResponse(_) | UploadBlobResponse(_) - | DownloadCertificatesByHeightsResponse(_) => { + | DownloadCertificatesByHeightsResponse(_) + | DownloadSenderCertificatesForReceiverResponse(_) => { Err(anyhow::Error::from(NodeError::UnexpectedMessage)) } } diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index c615aa2e85c0..60d2a5f90f40 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -164,6 +164,16 @@ impl ValidatorNode for DummyValidatorNode { ) -> Result { Err(NodeError::UnexpectedMessage) } + + async fn download_sender_certificates_for_receiver( + &self, + _: ChainId, + _: ChainId, + _: BlockHeight, + _: BlockHeight, + ) -> Result, NodeError> { + Err(NodeError::UnexpectedMessage) + } } struct DummyValidatorNodeProvider; diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index 9a0a2302b8f4..88b5a9e1766c 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -39,9 +39,7 @@ use linera_views::{context::Context, views::RootView, ViewError}; pub use crate::db_storage::metrics; #[cfg(with_testing)] pub use crate::db_storage::TestClock; -pub use crate::db_storage::{ - ChainStatesFirstAssignment, DbStorage, StorageCacheConfig, WallClock, -}; +pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, StorageCacheConfig, WallClock}; /// The default namespace to be used when none is specified pub const DEFAULT_NAMESPACE: &str = "default";