Skip to content

Commit 45a57ff

Browse files
committed
Move download_sender_certificates_for_receiver to proxy
1 parent b070858 commit 45a57ff

26 files changed

Lines changed: 629 additions & 77 deletions

File tree

linera-base/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ linera-kywasmtime = { workspace = true, optional = true }
5151
linera-witty = { workspace = true, features = ["macros"] }
5252
lru.workspace = true
5353
prometheus = { workspace = true, optional = true }
54-
quick_cache.workspace = true
5554
proptest = { workspace = true, optional = true, features = ["alloc"] }
55+
quick_cache.workspace = true
5656
rand.workspace = true
5757
reqwest = { workspace = true, optional = true }
5858
serde = { workspace = true, features = ["rc"] }

linera-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ linera-storage.workspace = true
6868
linera-version.workspace = true
6969
linera-views.workspace = true
7070
lru.workspace = true
71-
quick_cache.workspace = true
7271
papaya = { workspace = true, features = ["serde"] }
7372
prometheus = { workspace = true, optional = true }
7473
proptest = { workspace = true, optional = true }
74+
quick_cache.workspace = true
7575
rand = { workspace = true, features = ["std_rng"] }
7676
serde.workspace = true
7777
serde_json.workspace = true

linera-core/src/chain_worker/actor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use linera_base::{
1717
hashed::Hashed,
1818
identifiers::{ApplicationId, BlobId, ChainId, StreamId},
1919
time::Instant,
20+
value_cache::{ParkingCache, ValueCache},
2021
};
2122
use linera_chain::{
2223
data_types::{BlockProposal, MessageBundle, ProposedBlock},
@@ -32,8 +33,6 @@ use linera_views::context::InactiveContext;
3233
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
3334
use tracing::{debug, instrument, trace, Instrument as _};
3435

35-
use linera_base::value_cache::{ParkingCache, ValueCache};
36-
3736
use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
3837
use crate::{
3938
chain_worker::BlockOutcome,

linera-core/src/chain_worker/state.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use linera_base::{
2020
ensure,
2121
hashed::Hashed,
2222
identifiers::{AccountOwner, ApplicationId, BlobId, BlobType, ChainId, StreamId},
23+
value_cache::{ParkingCache, ValueCache},
2324
};
2425
use linera_chain::{
2526
data_types::{
@@ -42,8 +43,6 @@ use linera_views::{
4243
use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard};
4344
use tracing::{debug, instrument, trace, warn};
4445

45-
use linera_base::value_cache::{ParkingCache, ValueCache};
46-
4746
use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult};
4847
use crate::{
4948
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
@@ -589,7 +588,8 @@ where
589588
for cert in certificates {
590589
let hashed_block = cert.into_value().into_inner();
591590
let height = hashed_block.inner().header.height;
592-
self.block_values.insert_hashed(Cow::Owned(hashed_block.clone()));
591+
self.block_values
592+
.insert_hashed(Cow::Owned(hashed_block.clone()));
593593
height_to_blocks.insert(height, hashed_block);
594594
}
595595
}
@@ -1435,8 +1435,7 @@ where
14351435
.query_application(context, query, self.service_runtime_endpoint.as_mut())
14361436
.await
14371437
.with_execution_context(ChainExecutionContext::Query)?;
1438-
self.execution_state_cache
1439-
.insert(&requested_block, state);
1438+
self.execution_state_cache.insert(&requested_block, state);
14401439
Ok(outcome)
14411440
} else {
14421441
tracing::debug!(requested_block = %requested_block, "requested block hash not found in cache, querying committed state");

linera-core/src/client/mod.rs

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,63 +1156,40 @@ impl<Env: Environment> Client<Env> {
11561156
.get(&sender_chain_id)
11571157
.copied()
11581158
.unwrap_or(BlockHeight::ZERO);
1159-
let (max_epoch, committees) = self.admin_committees().await?;
1160-
1161-
// Recursively collect all certificates we need, following
1162-
// the chain of previous_message_blocks back to next_outbox_height.
1163-
let mut certificates = BTreeMap::new();
1164-
let mut current_height = height;
11651159

1166-
// Stop if we've reached the height we've already processed.
1167-
while current_height >= next_outbox_height {
1168-
// Download the certificate for this height.
1169-
let downloaded = self
1170-
.requests_scheduler
1171-
.download_certificates_by_heights(
1172-
remote_node,
1173-
sender_chain_id,
1174-
vec![current_height],
1175-
)
1176-
.await?;
1177-
let Some(certificate) = downloaded.into_iter().next() else {
1178-
return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1179-
chain_id: sender_chain_id,
1180-
height: current_height,
1181-
});
1182-
};
1160+
// Skip if we already have all certificates up to this height.
1161+
if height < next_outbox_height {
1162+
return Ok(());
1163+
}
11831164

1184-
// Validate the certificate.
1185-
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1186-
.into_result()?;
1165+
let (max_epoch, committees) = self.admin_committees().await?;
11871166

1188-
// Check if there's a previous message block to our chain.
1189-
let block = certificate.block();
1190-
let next_height = block
1191-
.body
1192-
.previous_message_blocks
1193-
.get(&receiver_chain_id)
1194-
.map(|(_prev_hash, prev_height)| *prev_height);
1195-
1196-
// Store this certificate.
1197-
certificates.insert(current_height, certificate);
1198-
1199-
if let Some(prev_height) = next_height {
1200-
// Continue with the previous block.
1201-
current_height = prev_height;
1202-
} else {
1203-
// No more dependencies.
1204-
break;
1205-
}
1206-
}
1167+
// Request the proxy to do the traversal and return the certificates
1168+
// in ascending height order, ready for processing.
1169+
let certificates = self
1170+
.requests_scheduler
1171+
.download_sender_certificates_for_receiver(
1172+
remote_node,
1173+
sender_chain_id,
1174+
receiver_chain_id,
1175+
height,
1176+
next_outbox_height,
1177+
)
1178+
.await?;
12071179

12081180
if certificates.is_empty() {
12091181
self.local_node
12101182
.retry_pending_cross_chain_requests(sender_chain_id)
12111183
.await?;
1184+
return Ok(());
12121185
}
12131186

1214-
// Process certificates in ascending block height order (BTreeMap keeps them sorted).
1215-
for certificate in certificates.into_values() {
1187+
// Process certificates in ascending block height order (already sorted by proxy).
1188+
for certificate in certificates {
1189+
// Validate the certificate.
1190+
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1191+
.into_result()?;
1192+
12161193
self.receive_sender_certificate(
12171194
certificate,
12181195
ReceiveCertificateMode::AlreadyChecked,

linera-core/src/client/requests_scheduler/request.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ pub enum RequestKey {
1919
chain_id: ChainId,
2020
heights: Vec<BlockHeight>,
2121
},
22+
/// Download sender certificates for a receiver chain
23+
SenderCertificates {
24+
sender_chain_id: ChainId,
25+
receiver_chain_id: ChainId,
26+
target_height: BlockHeight,
27+
start_height: BlockHeight,
28+
},
2229
/// Download a blob by ID
2330
Blob(BlobId),
2431
/// Download a pending blob
@@ -32,6 +39,9 @@ impl RequestKey {
3239
pub(super) fn chain_id(&self) -> Option<ChainId> {
3340
match self {
3441
RequestKey::Certificates { chain_id, .. } => Some(*chain_id),
42+
RequestKey::SenderCertificates {
43+
sender_chain_id, ..
44+
} => Some(*sender_chain_id),
3545
RequestKey::PendingBlob { chain_id, .. } => Some(*chain_id),
3646
_ => None,
3747
}

linera-core/src/client/requests_scheduler/scheduler.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,35 @@ impl<Env: Environment> RequestsScheduler<Env> {
396396
.await
397397
}
398398

399+
pub async fn download_sender_certificates_for_receiver(
400+
&self,
401+
peer: &RemoteNode<Env::ValidatorNode>,
402+
sender_chain_id: ChainId,
403+
receiver_chain_id: ChainId,
404+
target_height: BlockHeight,
405+
start_height: BlockHeight,
406+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
407+
self.with_peer(
408+
RequestKey::SenderCertificates {
409+
sender_chain_id,
410+
receiver_chain_id,
411+
target_height,
412+
start_height,
413+
},
414+
peer.clone(),
415+
move |peer| async move {
416+
peer.download_sender_certificates_for_receiver(
417+
sender_chain_id,
418+
receiver_chain_id,
419+
target_height,
420+
start_height,
421+
)
422+
.await
423+
},
424+
)
425+
.await
426+
}
427+
399428
pub async fn download_certificate_for_blob(
400429
&self,
401430
peer: &RemoteNode<Env::ValidatorNode>,

linera-core/src/data_types.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,22 @@ pub struct CertificatesByHeightRequest {
347347
pub heights: Vec<BlockHeight>,
348348
}
349349

350+
/// Request for downloading sender certificates with their sending ancestors.
351+
/// The proxy will traverse `previous_message_blocks` to find all certificates
352+
/// that sent messages to the receiver chain.
353+
#[derive(Debug, Clone)]
354+
pub struct SenderCertificatesRequest {
355+
/// The chain that sent the messages.
356+
pub sender_chain_id: ChainId,
357+
/// The chain that receives the messages.
358+
pub receiver_chain_id: ChainId,
359+
/// The target block height to start from.
360+
pub target_height: BlockHeight,
361+
/// The height from which to stop traversing (exclusive lower bound).
362+
/// This is typically the `next_outbox_height` the client already has.
363+
pub start_height: BlockHeight,
364+
}
365+
350366
/// The outcome of trying to commit a list of operations to the chain.
351367
#[derive(Debug)]
352368
pub enum ClientOutcome<T> {

linera-core/src/node.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ pub trait ValidatorNode {
156156
heights: Vec<BlockHeight>,
157157
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
158158

159+
/// Downloads sender certificates with their sending ancestors.
160+
/// The proxy traverses `previous_message_blocks` to find all certificates
161+
/// from `sender_chain_id` that sent messages to `receiver_chain_id`,
162+
/// starting from `target_height` and stopping at `start_height`.
163+
/// Returns certificates in ascending height order, ready for processing.
164+
async fn download_sender_certificates_for_receiver(
165+
&self,
166+
sender_chain_id: ChainId,
167+
receiver_chain_id: ChainId,
168+
target_height: BlockHeight,
169+
start_height: BlockHeight,
170+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
171+
159172
/// Returns the hash of the `Certificate` that last used a blob.
160173
async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
161174

linera-core/src/remote_node.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,43 @@ impl<N: ValidatorNode> RemoteNode<N> {
266266
Ok(certificates)
267267
}
268268

269+
/// Downloads sender certificates with their sending ancestors.
270+
#[instrument(level = "trace")]
271+
pub async fn download_sender_certificates_for_receiver(
272+
&self,
273+
sender_chain_id: ChainId,
274+
receiver_chain_id: ChainId,
275+
target_height: BlockHeight,
276+
start_height: BlockHeight,
277+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
278+
let certificates = self
279+
.node
280+
.download_sender_certificates_for_receiver(
281+
sender_chain_id,
282+
receiver_chain_id,
283+
target_height,
284+
start_height,
285+
)
286+
.await?;
287+
288+
// Validate that all returned certificates are for the sender chain
289+
// and in ascending height order.
290+
let mut prev_height = None;
291+
for certificate in &certificates {
292+
ensure!(
293+
certificate.inner().chain_id() == sender_chain_id,
294+
NodeError::UnexpectedCertificateValue
295+
);
296+
let height = certificate.inner().height();
297+
if let Some(prev) = prev_height {
298+
ensure!(height > prev, NodeError::UnexpectedCertificateValue);
299+
}
300+
prev_height = Some(height);
301+
}
302+
303+
Ok(certificates)
304+
}
305+
269306
/// Checks that requesting these blobs when trying to handle this certificate is legitimate,
270307
/// i.e. that there are no duplicates and the blobs are actually required.
271308
pub fn check_blobs_not_found<T: CertificateValue>(

0 commit comments

Comments
 (0)