Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linera-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ linera-kywasmtime = { workspace = true, optional = true }
linera-witty = { workspace = true, features = ["macros"] }
lru.workspace = true
prometheus = { workspace = true, optional = true }
quick_cache.workspace = true
proptest = { workspace = true, optional = true, features = ["alloc"] }
quick_cache.workspace = true
rand.workspace = true
reqwest = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"] }
Expand Down
2 changes: 1 addition & 1 deletion linera-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ linera-storage.workspace = true
linera-version.workspace = true
linera-views.workspace = true
lru.workspace = true
quick_cache.workspace = true
papaya = { workspace = true, features = ["serde"] }
prometheus = { workspace = true, optional = true }
proptest = { workspace = true, optional = true }
quick_cache.workspace = true
rand = { workspace = true, features = ["std_rng"] }
serde.workspace = true
serde_json.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use linera_base::{
hashed::Hashed,
identifiers::{ApplicationId, BlobId, ChainId, StreamId},
time::Instant,
value_cache::{ParkingCache, ValueCache},
};
use linera_chain::{
data_types::{BlockProposal, MessageBundle, ProposedBlock},
Expand All @@ -32,8 +33,6 @@ use linera_views::context::InactiveContext;
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
use tracing::{debug, instrument, trace, Instrument as _};

use linera_base::value_cache::{ParkingCache, ValueCache};

use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
use crate::{
chain_worker::BlockOutcome,
Expand Down
9 changes: 4 additions & 5 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use linera_base::{
ensure,
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, StreamId},
value_cache::{ParkingCache, ValueCache},
};
use linera_chain::{
data_types::{
Expand All @@ -43,8 +44,6 @@ use linera_views::{
use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard};
use tracing::{debug, instrument, trace, warn};

use linera_base::value_cache::{ParkingCache, ValueCache};

use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult};
use crate::{
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
Expand Down Expand Up @@ -590,7 +589,8 @@ where
for block in blocks {
let hashed_block = block.into_inner();
let height = hashed_block.inner().header.height;
self.block_values.insert_hashed(Cow::Owned(hashed_block.clone()));
self.block_values
.insert_hashed(Cow::Owned(hashed_block.clone()));
height_to_blocks.insert(height, hashed_block);
}
}
Expand Down Expand Up @@ -1458,8 +1458,7 @@ where
.query_application(context, query, self.service_runtime_endpoint.as_mut())
.await
.with_execution_context(ChainExecutionContext::Query)?;
self.execution_state_cache
.insert(&requested_block, state);
self.execution_state_cache.insert(&requested_block, state);
Ok(outcome)
} else {
tracing::debug!(requested_block = %requested_block, "requested block hash not found in cache, querying committed state");
Expand Down
71 changes: 24 additions & 47 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,63 +1207,40 @@ impl<Env: Environment> Client<Env> {
.get(&sender_chain_id)
.copied()
.unwrap_or(BlockHeight::ZERO);
let (max_epoch, committees) = self.admin_committees().await?;

// Recursively collect all certificates we need, following
// the chain of previous_message_blocks back to next_outbox_height.
let mut certificates = BTreeMap::new();
let mut current_height = height;

// Stop if we've reached the height we've already processed.
while current_height >= next_outbox_height {
// Download the certificate for this height.
let downloaded = self
.requests_scheduler
.download_certificates_by_heights(
remote_node,
sender_chain_id,
vec![current_height],
)
.await?;
let Some(certificate) = downloaded.into_iter().next() else {
return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
chain_id: sender_chain_id,
height: current_height,
});
};
// Skip if we already have all certificates up to this height.
if height < next_outbox_height {
return Ok(());
}

// Validate the certificate.
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
.into_result()?;
let (max_epoch, committees) = self.admin_committees().await?;

// Check if there's a previous message block to our chain.
let block = certificate.block();
let next_height = block
.body
.previous_message_blocks
.get(&receiver_chain_id)
.map(|(_prev_hash, prev_height)| *prev_height);

// Store this certificate.
certificates.insert(current_height, certificate);

if let Some(prev_height) = next_height {
// Continue with the previous block.
current_height = prev_height;
} else {
// No more dependencies.
break;
}
}
// Request the proxy to do the traversal and return the certificates
// in ascending height order, ready for processing.
let certificates = self
.requests_scheduler
.download_sender_certificates_for_receiver(
remote_node,
sender_chain_id,
receiver_chain_id,
height,
next_outbox_height,
)
.await?;

if certificates.is_empty() {
self.local_node
.retry_pending_cross_chain_requests(sender_chain_id)
.await?;
return Ok(());
}

// Process certificates in ascending block height order (BTreeMap keeps them sorted).
for certificate in certificates.into_values() {
// Process certificates in ascending block height order (already sorted by proxy).
for certificate in certificates {
// Validate the certificate.
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
.into_result()?;

self.receive_sender_certificate(
certificate,
ReceiveCertificateMode::AlreadyChecked,
Expand Down
10 changes: 10 additions & 0 deletions linera-core/src/client/requests_scheduler/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ pub enum RequestKey {
chain_id: ChainId,
heights: Vec<BlockHeight>,
},
/// Download sender certificates for a receiver chain
SenderCertificates {
sender_chain_id: ChainId,
receiver_chain_id: ChainId,
target_height: BlockHeight,
start_height: BlockHeight,
},
/// Download a blob by ID
Blob(BlobId),
/// Download a pending blob
Expand All @@ -32,6 +39,9 @@ impl RequestKey {
pub(super) fn chain_id(&self) -> Option<ChainId> {
match self {
RequestKey::Certificates { chain_id, .. } => Some(*chain_id),
RequestKey::SenderCertificates {
sender_chain_id, ..
} => Some(*sender_chain_id),
RequestKey::PendingBlob { chain_id, .. } => Some(*chain_id),
_ => None,
}
Expand Down
29 changes: 29 additions & 0 deletions linera-core/src/client/requests_scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,35 @@ impl<Env: Environment> RequestsScheduler<Env> {
.await
}

pub async fn download_sender_certificates_for_receiver(
&self,
peer: &RemoteNode<Env::ValidatorNode>,
sender_chain_id: ChainId,
receiver_chain_id: ChainId,
target_height: BlockHeight,
start_height: BlockHeight,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
self.with_peer(
RequestKey::SenderCertificates {
sender_chain_id,
receiver_chain_id,
target_height,
start_height,
},
peer.clone(),
move |peer| async move {
peer.download_sender_certificates_for_receiver(
sender_chain_id,
receiver_chain_id,
target_height,
start_height,
)
.await
},
)
.await
}

pub async fn download_certificate_for_blob(
&self,
peer: &RemoteNode<Env::ValidatorNode>,
Expand Down
16 changes: 16 additions & 0 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ pub struct CertificatesByHeightRequest {
pub heights: Vec<BlockHeight>,
}

/// Request for downloading sender certificates with their sending ancestors.
/// The proxy will traverse `previous_message_blocks` to find all certificates
/// that sent messages to the receiver chain.
#[derive(Debug, Clone)]
pub struct SenderCertificatesRequest {
/// The chain that sent the messages.
pub sender_chain_id: ChainId,
/// The chain that receives the messages.
pub receiver_chain_id: ChainId,
/// The target block height to start from.
pub target_height: BlockHeight,
/// The height from which to stop traversing (exclusive lower bound).
/// This is typically the `next_outbox_height` the client already has.
pub start_height: BlockHeight,
}

/// The outcome of trying to commit a list of operations to the chain.
#[derive(Debug)]
pub enum ClientOutcome<T> {
Expand Down
13 changes: 13 additions & 0 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ pub trait ValidatorNode {
heights: Vec<BlockHeight>,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;

/// Downloads sender certificates with their sending ancestors.
/// The proxy traverses `previous_message_blocks` to find all certificates
/// from `sender_chain_id` that sent messages to `receiver_chain_id`,
/// starting from `target_height` and stopping at `start_height`.
/// Returns certificates in ascending height order, ready for processing.
async fn download_sender_certificates_for_receiver(
&self,
sender_chain_id: ChainId,
receiver_chain_id: ChainId,
target_height: BlockHeight,
start_height: BlockHeight,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;

/// Returns the hash of the `Certificate` that last used a blob.
async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;

Expand Down
37 changes: 37 additions & 0 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,43 @@ impl<N: ValidatorNode> RemoteNode<N> {
Ok(certificates)
}

/// Downloads sender certificates with their sending ancestors.
#[instrument(level = "trace")]
pub async fn download_sender_certificates_for_receiver(
&self,
sender_chain_id: ChainId,
receiver_chain_id: ChainId,
target_height: BlockHeight,
start_height: BlockHeight,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
let certificates = self
.node
.download_sender_certificates_for_receiver(
sender_chain_id,
receiver_chain_id,
target_height,
start_height,
)
.await?;

// Validate that all returned certificates are for the sender chain
// and in ascending height order.
let mut prev_height = None;
for certificate in &certificates {
ensure!(
certificate.inner().chain_id() == sender_chain_id,
NodeError::UnexpectedCertificateValue
);
let height = certificate.inner().height();
if let Some(prev) = prev_height {
ensure!(height > prev, NodeError::UnexpectedCertificateValue);
}
prev_height = Some(height);
}

Ok(certificates)
}

/// Checks that requesting these blobs when trying to handle this certificate is legitimate,
/// i.e. that there are no duplicates and the blobs are actually required.
pub fn check_blobs_not_found<T: CertificateValue>(
Expand Down
Loading
Loading