Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@
origin: ChainId,
bundles: Vec<(Epoch, MessageBundle)>,
) -> Result<Option<BlockHeight>, WorkerError> {
let incoming_count = bundles.len();

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / indexer-check

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / default-features-and-witty-integration-test

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / check-outdated-cli-md

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-clippy

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / ethereum-tests

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-check-linera-service-graphql-schema

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-doc

unused variable: `incoming_count`

Check failure on line 1072 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / web

unused variable: `incoming_count`
// Only process certificates with relevant heights and epochs.
let next_height_to_receive = self.chain.next_block_height_to_receive(&origin).await?;
let last_anticipated_block_height =
Expand All @@ -1082,18 +1083,19 @@
last_anticipated_block_height,
bundles,
)?;
let selected_count = bundles.len();

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / indexer-check

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / default-features-and-witty-integration-test

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / check-outdated-cli-md

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-clippy

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / ethereum-tests

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-check-linera-service-graphql-schema

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-doc

unused variable: `selected_count`

Check failure on line 1086 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / web

unused variable: `selected_count`
let Some(last_updated_height) = bundles.last().map(|bundle| bundle.height) else {
return Ok(None);
};
// Process the received messages in certificates.
let local_time = self.storage.clock().current_time();
let mut previous_height = None;
for bundle in bundles {
for bundle in &bundles {
let add_to_received_log = previous_height != Some(bundle.height);
previous_height = Some(bundle.height);
// Update the staged chain state with the received block.
self.chain
.receive_message_bundle(&origin, bundle, local_time, add_to_received_log)
.receive_message_bundle(&origin, bundle.clone(), local_time, add_to_received_log)
.await?;
}
if !self.config.allow_inactive_chains && !self.chain.is_active() {
Expand Down Expand Up @@ -1719,15 +1721,26 @@
);
}
if query.request_pending_message_bundles {
let load_start = linera_base::time::Instant::now();
let mut bundles = Vec::new();
let pairs = chain.inboxes.try_load_all_entries().await?;
let num_inboxes = pairs.len();
let action = if *chain.execution_state.system.closed.get() {
MessageAction::Reject
} else {
MessageAction::Accept
};
for (origin, inbox) in pairs {
for bundle in inbox.added_bundles.elements().await? {
let inbox_bundles: Vec<_> = inbox.added_bundles.elements().await?;

Check warning on line 1734 in linera-core/src/chain_worker/state.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-fmt

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-core/src/chain_worker/state.rs
let inbox_count = inbox_bundles.len();
if inbox_count > 0 {
tracing::debug!(
"inbox from {:?}: {} bundles",
origin,
inbox_count
);
}
for bundle in inbox_bundles {
bundles.push(IncomingBundle {
origin,
bundle,
Expand All @@ -1736,6 +1749,13 @@
}
}
bundles.sort_by_key(|b| b.bundle.timestamp);
let load_elapsed = load_start.elapsed();
tracing::info!(
"loaded {} bundles from {} inboxes in {:?}",
bundles.len(),
num_inboxes,
load_elapsed
);
info.requested_pending_message_bundles = bundles;
}
let hashes = chain
Expand Down
35 changes: 34 additions & 1 deletion linera-core/src/client/chain_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,15 @@ impl<Env: Environment> ChainClient<Env> {
return Ok(Vec::new());
}

let query_start = linera_base::time::Instant::now();
let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
let info = self
.client
.local_node
.handle_chain_info_query(query)
.await?
.info;
let query_elapsed = query_start.elapsed();
if self.preferred_owner.is_some_and(|owner| {
info.manager
.ownership
Expand All @@ -521,6 +523,12 @@ impl<Env: Environment> ChainClient<Env> {
);
}

info!(
"pending_bundles={} max={} query_time={:?}",
info.requested_pending_message_bundles.len(),
self.options.max_pending_message_bundles,
query_elapsed,
);
Ok(info
.requested_pending_message_bundles
.into_iter()
Expand Down Expand Up @@ -2262,20 +2270,45 @@ impl<Env: Environment> ChainClient<Env> {
let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();

let mut certificates = Vec::new();
let mut block_count = 0u64;
let process_start = linera_base::time::Instant::now();
loop {
let block_start = linera_base::time::Instant::now();
// We provide no operations - this means that the only operations executed
// will be epoch changes, receiving messages and processing event stream
// updates, if any are pending.
match self.execute_block(vec![], vec![]).await {
Ok(ExecuteBlockOutcome::Executed(certificate))
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
block_count += 1;
let block_elapsed = block_start.elapsed();
let bundles_in_block = certificate.block().body.incoming_bundles().count();
info!(
"block #{} completed: bundles={} time={:?} height={}",
block_count,
bundles_in_block,
block_elapsed,
certificate.block().header.height,
);
certificates.push(certificate);
}
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
info!(
"process_inbox: timeout after {} blocks in {:?}",
block_count,
process_start.elapsed()
);
return Ok((certificates, Some(timeout)));
}
// Nothing in the inbox and no stream updates to be processed.
Err(Error::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(chain_error),
))) if matches!(*chain_error, ChainError::EmptyBlock) => {
info!(
"process_inbox: completed {} blocks in {:?}",
block_count,
process_start.elapsed()
);
return Ok((certificates, None));
}
Err(error) => return Err(error),
Expand Down
Loading