diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index 372ce3e4c4e5..5488482f3187 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -14,7 +14,7 @@ use linera_base::{ OracleResponse, Timestamp, }, ensure, - identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId}, + identifiers::{AccountOwner, ApplicationId, ChainId, StreamId}, ownership::ChainOwnership, }; use linera_execution::{ @@ -46,7 +46,7 @@ use crate::{ manager::ChainManager, outbox::OutboxStateView, pending_blobs::PendingBlobsView, - ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt, + ChainError, ChainExecutionContext, ExecutionResultExt, }; #[cfg(test)] @@ -417,62 +417,88 @@ where .with_execution_context(ChainExecutionContext::DescribeApplication) } - #[instrument(skip_all, fields( - chain_id = %self.chain_id(), - target = %target, - height = %height - ))] + /// Marks messages as received for multiple recipients. + /// Takes a map from recipient to the maximum confirmed height. + /// + /// Returns `true` if any messages were marked as received. pub async fn mark_messages_as_received( &mut self, - target: &ChainId, - height: BlockHeight, + max_height_by_recipient: BTreeMap, ) -> Result { - let mut outbox = self.outboxes.try_load_entry_mut(target).await?; - let updates = outbox.mark_messages_as_received(height).await?; - if updates.is_empty() { - return Ok(false); - } - for update in updates { - let counter = self - .outbox_counters - .get_mut() - .get_mut(&update) - .ok_or_else(|| { - ChainError::InternalError("message counter should be present".into()) - })?; - *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?; - if *counter == 0 { - // Important for the test in `all_messages_delivered_up_to`. - self.outbox_counters.get_mut().remove(&update); + // Load all outboxes at once. + let recipients = max_height_by_recipient + .keys() + .copied() + .collect::>(); + let loaded_outboxes = self.outboxes.try_load_entries_pairs_mut(recipients).await?; + + // Process each recipient concurrently. + let processing_futures = + loaded_outboxes + .into_iter() + .filter_map(|(recipient, mut outbox)| { + let max_height = *max_height_by_recipient.get(&recipient)?; + + Some(async move { + let updates = outbox.mark_messages_as_received(max_height).await?; + Ok::<_, ChainError>((recipient, updates, outbox)) + }) + }); + + // Execute all processing concurrently. + let processing_results = futures::future::try_join_all(processing_futures).await?; + + // Apply counter updates to shared state sequentially. + let mut any_updates = false; + for (recipient, updates, outbox) in &processing_results { + if updates.is_empty() { + continue; } - } - if outbox.queue.count() == 0 { - self.nonempty_outboxes.get_mut().remove(target); - // If the outbox is empty and not ahead of the executed blocks, remove it. - if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height { - self.outboxes.remove_entry(target)?; + any_updates = true; + for update in updates { + let counter = self + .outbox_counters + .get_mut() + .get_mut(update) + .ok_or_else(|| { + ChainError::InternalError("message counter should be present".into()) + })?; + *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?; + if *counter == 0 { + // Important for the test in `max_height_with_all_messages_delivered`. + self.outbox_counters.get_mut().remove(update); + } + } + if outbox.queue.count() == 0 { + self.nonempty_outboxes.get_mut().remove(recipient); + // If the outbox is empty and not ahead of the executed blocks, remove it. + if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height { + self.outboxes.remove_entry(recipient)?; + } } } + #[cfg(with_metrics)] metrics::NUM_OUTBOXES .with_label_values(&[]) .observe(self.outboxes.count().await? as f64); - Ok(true) + + Ok(any_updates) } - /// Returns true if there are no more outgoing messages in flight up to the given - /// block height. - pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool { + /// Returns the lowest block height for which we don't know that all outgoing messages have + /// been delivered. + pub fn min_height_with_undelivered_messages(&self) -> BlockHeight { + let next_height = self.tip_state.get().next_block_height; + let counters = self.outbox_counters.get(); tracing::debug!( "Messages left in {:.8}'s outbox: {:?}", self.chain_id(), - self.outbox_counters.get() + counters ); - if let Some((key, _)) = self.outbox_counters.get().first_key_value() { - key > &height - } else { - true - } + counters + .first_key_value() + .map_or(next_height, |(height, _)| (*height).min(next_height)) } /// Invariant for the states of active chains. @@ -508,17 +534,6 @@ where Ok(()) } - pub async fn next_block_height_to_receive( - &self, - origin: &ChainId, - ) -> Result { - let inbox = self.inboxes.try_load_entry(origin).await?; - match inbox { - Some(inbox) => inbox.next_block_height_to_receive(), - None => Ok(BlockHeight::ZERO), - } - } - /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks. /// /// The "+ 1" is so that it can be used in the same places as `next_block_height`. @@ -529,90 +544,117 @@ where Ok(self.tip_state.get().next_block_height) } - pub async fn last_anticipated_block_height( + /// Returns inbox metadata for multiple origins, loading all inboxes concurrently. + /// + /// For each origin, returns a tuple of: + /// - `next_block_height_to_receive`: The next block height expected from this origin. + /// - `last_anticipated_block_height`: The height of the last removed bundle, if any. + pub async fn inbox_info_for_origins( &self, - origin: &ChainId, - ) -> Result, ChainError> { - let inbox = self.inboxes.try_load_entry(origin).await?; - match inbox { - Some(inbox) => match inbox.removed_bundles.back().await? { - Some(bundle) => Ok(Some(bundle.height)), - None => Ok(None), - }, - None => Ok(None), - } + origins: impl IntoIterator, + ) -> Result)>, ChainError> { + let origins: Vec<_> = origins.into_iter().collect(); + let loaded_inboxes = self.inboxes.try_load_entries_pairs(origins).await?; + + let futures: Vec<_> = loaded_inboxes + .into_iter() + .map(|(origin, inbox)| async move { + let (next_height, last_anticipated) = match inbox { + Some(inbox) => { + let next = inbox.next_block_height_to_receive()?; + let last = match inbox.removed_bundles.back().await? { + Some(bundle) => Some(bundle.height), + None => None, + }; + (next, last) + } + None => (BlockHeight::ZERO, None), + }; + Ok::<_, ChainError>((origin, (next_height, last_anticipated))) + }) + .collect(); + + futures::future::try_join_all(futures) + .await + .map(|results| results.into_iter().collect()) } - /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an - /// internal error if the bundle doesn't appear to be new, based on the sender's - /// height. The value `local_time` is specific to each validator and only used for - /// round timeouts. + /// Processes multiple message bundles from different origins concurrently. + /// Each inbox is loaded once and all its bundles are processed together. + /// Side effects to shared state (`unskippable_bundles`, `received_log`) are collected + /// during concurrent processing and applied sequentially at the end. /// - /// Returns `true` if incoming `Subscribe` messages created new outbox entries. - #[instrument(skip_all, fields( - chain_id = %self.chain_id(), - origin = %origin, - bundle_height = %bundle.height - ))] - pub async fn receive_message_bundle( + /// The bundles are provided as a map from origin chain to a map of bundles keyed by + /// `(height, transaction_index)`. This ensures proper ordering and deduplication. + pub async fn receive_message_bundles( &mut self, - origin: &ChainId, - bundle: MessageBundle, + mut bundles_by_origin: BTreeMap>, local_time: Timestamp, - add_to_received_log: bool, ) -> Result<(), ChainError> { - assert!(!bundle.messages.is_empty()); - let chain_id = self.chain_id(); - tracing::trace!( - "Processing new messages to {chain_id:.8} from {origin} at height {}", - bundle.height, - ); - let chain_and_height = ChainAndHeight { - chain_id: *origin, - height: bundle.height, - }; - - match self.initialize_if_needed(local_time).await { - Ok(_) => (), - // if the only issue was that we couldn't initialize the chain because of a - // missing chain description blob, we might still want to update the inbox - Err(ChainError::ExecutionError(exec_err, _)) - if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs) - if blobs.iter().all(|blob_id| { - blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0 - })) => {} - err => { - return err; - } + if bundles_by_origin.is_empty() { + return Ok(()); } - // Process the inbox bundle and update the inbox state. - let mut inbox = self.inboxes.try_load_entry_mut(origin).await?; - #[cfg(with_metrics)] - metrics::NUM_INBOXES - .with_label_values(&[]) - .observe(self.inboxes.count().await? as f64); - let entry = BundleInInbox::new(*origin, &bundle); - let skippable = bundle.is_skippable(); - let newly_added = inbox - .add_bundle(bundle) - .await - .map_err(|error| match error { - InboxError::ViewError(error) => ChainError::ViewError(error), - error => ChainError::InternalError(format!( - "while processing messages in certified block: {error}" - )), - })?; - if newly_added && !skippable { - let seen = local_time; - self.unskippable_bundles - .push_back(TimestampedBundleInInbox { entry, seen }); - } + // Load the inboxes for the origins. + let origins: Vec = bundles_by_origin.keys().copied().collect(); + let loaded_inboxes = self.inboxes.try_load_entries_pairs_mut(origins).await?; + + // Process each origin concurrently. + let processing_futures: Vec<_> = loaded_inboxes + .into_iter() + .filter_map(|(origin, mut inbox)| { + let bundles = bundles_by_origin.remove(&origin)?; + + Some(async move { + let mut unskippable_bundles = Vec::new(); + let mut received_log_heights = BTreeSet::new(); + + for bundle in bundles.into_values() { + let height = bundle.height; + let entry = BundleInInbox::new(origin, &bundle); + let skippable = bundle.is_skippable(); + + // Add bundle to the inbox. + let newly_added = + inbox + .add_bundle(bundle) + .await + .map_err(|error| match error { + InboxError::ViewError(error) => ChainError::ViewError(error), + error => ChainError::InternalError(format!( + "while processing messages in certified block: {error}" + )), + })?; + + // Collect side effects for later application. + if newly_added && !skippable { + unskippable_bundles.push((entry, local_time)); + } + received_log_heights.insert(height); + } - // Remember the certificate for future validator/client synchronizations. - if add_to_received_log { - self.received_log.push(chain_and_height); + Ok::<_, ChainError>((origin, unskippable_bundles, received_log_heights)) + }) + }) + .collect(); + + // Execute all processing concurrently. + let processing_results = futures::future::try_join_all(processing_futures).await?; + + // Apply side effects to shared state sequentially. + for (origin, unskippable_bundles, received_log_heights) in processing_results { + for (entry, seen) in unskippable_bundles { + self.unskippable_bundles + .push_back(TimestampedBundleInInbox { entry, seen }); + } + for height in received_log_heights { + self.received_log.push(ChainAndHeight { + chain_id: origin, + height, + }); + } } + Ok(()) } diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index 191bbc17004f..93a583495e8e 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -6,11 +6,12 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, + pin::Pin, sync::{self, Arc, RwLock}, }; use custom_debug_derive::Debug; -use futures::FutureExt; +use futures::{stream::Peekable, FutureExt as _, StreamExt as _}; use linera_base::{ crypto::{CryptoHash, ValidatorPublicKey}, data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, TimeDelta, Timestamp}, @@ -22,7 +23,7 @@ use linera_base::{ use linera_chain::{ data_types::{BlockProposal, MessageBundle, ProposedBlock}, types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate}, - ChainStateView, + ChainError, ChainStateView, }; use linera_execution::{ system::EventSubscriptions, ExecutionStateView, Query, QueryContext, QueryOutcome, @@ -31,6 +32,7 @@ use linera_execution::{ use linera_storage::{Clock as _, Storage}; use linera_views::context::InactiveContext; use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, instrument, trace, Instrument as _}; use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier}; @@ -48,15 +50,17 @@ mod metrics { use std::sync::LazyLock; use linera_base::prometheus_util::{ - exponential_bucket_interval, register_histogram, register_int_gauge, + exponential_bucket_latencies, register_histogram_vec, register_int_gauge, }; - use prometheus::{Histogram, IntGauge}; - - pub static CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME: LazyLock = LazyLock::new(|| { - register_histogram( - "chain_worker_request_queue_wait_time", - "Time (ms) a chain worker request waits in queue before being processed", - exponential_bucket_interval(0.1_f64, 10_000.0), + use prometheus::{HistogramVec, IntGauge}; + + /// Time (ms) a request waits in queue before being processed, by queue type. + pub static CHAIN_WORKER_QUEUE_WAIT_TIME: LazyLock = LazyLock::new(|| { + register_histogram_vec( + "chain_worker_queue_wait_time", + "Time (ms) a request waits in queue before being processed", + &["queue_type"], + exponential_bucket_latencies(10_000.0), ) }); @@ -154,22 +158,6 @@ where oneshot::Sender>, }, - /// Process a cross-chain update. - ProcessCrossChainUpdate { - origin: ChainId, - bundles: Vec<(Epoch, MessageBundle)>, - #[debug(skip)] - callback: oneshot::Sender, WorkerError>>, - }, - - /// Handle cross-chain request to confirm that the recipient was updated. - ConfirmUpdatedRecipient { - recipient: ChainId, - latest_height: BlockHeight, - #[debug(skip)] - callback: oneshot::Sender>, - }, - /// Handle a [`ChainInfoQuery`]. HandleChainInfoQuery { query: ChainInfoQuery, @@ -266,6 +254,54 @@ where }, } +/// A request to process a cross-chain update. +#[derive(Debug)] +pub(crate) struct CrossChainUpdateRequest { + pub origin: ChainId, + pub bundles: Vec<(Epoch, MessageBundle)>, + #[debug(skip)] + pub callback: oneshot::Sender, WorkerError>>, +} + +/// A request to confirm that a recipient was updated. +#[derive(Debug)] +pub(crate) struct ConfirmUpdatedRecipientRequest { + pub recipient: ChainId, + pub latest_height: BlockHeight, + #[debug(skip)] + pub callback: oneshot::Sender>, +} + +/// A callback sender for cross-chain update results, mapped by origin chain ID. +type CrossChainUpdateCallbacks = + BTreeMap, WorkerError>>>>; + +/// The type of request to process next in the actor loop. +#[derive(Clone, Copy, PartialEq, Eq)] +enum RequestType { + /// A cross-chain update request. + CrossChainUpdate, + /// A confirmation that a recipient was updated. + Confirmation, + /// A regular chain worker request. + Regular, +} + +/// The receiver endpoints for a [`ChainWorkerActor`]. +pub(crate) struct ChainActorReceivers +where + Context: linera_views::context::Context + Clone + Send + Sync + 'static, +{ + /// Receiver for regular chain worker requests. + pub requests: mpsc::UnboundedReceiver<(ChainWorkerRequest, tracing::Span, Instant)>, + /// Receiver for cross-chain update requests. + pub cross_chain_updates: + mpsc::UnboundedReceiver<(CrossChainUpdateRequest, tracing::Span, Instant)>, + /// Receiver for confirmation requests. + pub confirmations: + mpsc::UnboundedReceiver<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>, +} + /// The actor worker type. pub(crate) struct ChainWorkerActor where @@ -334,11 +370,7 @@ where tracked_chains: Option>>>, delivery_notifier: DeliveryNotifier, chain_id: ChainId, - incoming_requests: mpsc::UnboundedReceiver<( - ChainWorkerRequest, - tracing::Span, - Instant, - )>, + receivers: ChainActorReceivers, is_tracked: bool, ) { #[cfg(with_metrics)] @@ -353,7 +385,7 @@ where chain_id, is_tracked, }; - if let Err(err) = actor.handle_requests(incoming_requests).await { + if let Err(err) = actor.handle_requests(receivers).await { tracing::error!("Chain actor error: {err}"); } #[cfg(with_metrics)] @@ -380,78 +412,268 @@ where )] async fn handle_requests( self, - mut incoming_requests: mpsc::UnboundedReceiver<( - ChainWorkerRequest, - tracing::Span, - Instant, - )>, + receivers: ChainActorReceivers, ) -> Result<(), WorkerError> { trace!("Starting `ChainWorkerActor`"); - while let Some((request, span, _queued_at)) = incoming_requests.recv().await { - // Record how long the request waited in queue (in milliseconds) - #[cfg(with_metrics)] - { - let queue_wait_time_ms = _queued_at.elapsed().as_secs_f64() * 1000.0; - metrics::CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME.observe(queue_wait_time_ms); - } + let regular_batch_size = self.config.regular_request_batch_size; + let cross_chain_batch_size = self.config.cross_chain_update_batch_size; - let (_service_runtime_thread, service_runtime_task, service_runtime_endpoint) = - if self.config.long_lived_services { - let actor = ServiceRuntimeActor::spawn(self.chain_id).await; - (Some(actor.thread), Some(actor.task), Some(actor.endpoint)) + // The first iteration waits indefinitely; subsequent iterations have a timeout. + let mut first_iteration = true; + + // The chain worker state, loaded lazily. + let mut worker: Option> = None; + let mut service_runtime_task = None; + #[allow(unused)] + let mut service_runtime_thread = None; + + // Convert receivers to peekable streams so we can wait without consuming. + let mut requests = UnboundedReceiverStream::new(receivers.requests).peekable(); + let mut cross_chain_updates = + UnboundedReceiverStream::new(receivers.cross_chain_updates).peekable(); + let mut confirmations = UnboundedReceiverStream::new(receivers.confirmations).peekable(); + + // Rotate through request types: Regular -> CrossChainUpdate -> Confirmation -> ... + let mut next_type_idx = 0; + + fn is_ready(stream: &mut Peekable>) -> bool { + Pin::new(stream).peek().now_or_never().is_some() + } + + loop { + // Check which streams have data ready. + let types = [ + (RequestType::Confirmation, is_ready(&mut confirmations)), + ( + RequestType::CrossChainUpdate, + is_ready(&mut cross_chain_updates), + ), + (RequestType::Regular, is_ready(&mut requests)), + ]; + + // Find the next ready queue in rotation order. + let Some((type_idx, request_type)) = types + .iter() + .enumerate() + .cycle() + .skip(next_type_idx) + .take(types.len()) + .find_map(|(idx, (request_type, is_ready))| { + is_ready.then_some((idx, request_type)) + }) + else { + // No requests available, wait on all queues. + // On the first iteration, wait indefinitely. Otherwise, use a timeout. + let timeout_future = if first_iteration { + futures::future::pending().left_future() } else { - (None, None, None) + self.sleep_until_timeout().right_future() }; - trace!("Loading chain state of {}", self.chain_id); - let mut worker = ChainWorkerState::load( - self.config.clone(), - self.storage.clone(), - self.block_values.clone(), - self.execution_state_cache.clone(), - self.tracked_chains.clone(), - self.delivery_notifier.clone(), - self.chain_id, - service_runtime_endpoint, - ) - .instrument(span.clone()) - .await?; - #[cfg(with_metrics)] - metrics::CHAIN_WORKER_STATES_LOADED.inc(); - - Box::pin(worker.handle_request(request)) - .instrument(span) - .await; - - loop { - futures::select! { - () = self.sleep_until_timeout().fuse() => break, - maybe_request = incoming_requests.recv().fuse() => { - let Some((request, span, _queued_at)) = maybe_request else { - break; // Request sender was dropped. - }; + // Wait for any stream to have data or timeout. + let dropped = futures::select! { + () = timeout_future.fuse() => { + // Timeout: unload chain state. + if let Some(mut w) = worker.take() { + trace!("Unloading chain state of {} ...", self.chain_id); + w.clear_shared_chain_view().await; + drop(w); + #[cfg(with_metrics)] + metrics::CHAIN_WORKER_STATES_LOADED.dec(); + if let Some(task) = service_runtime_task.take() { + task.await?; + } + service_runtime_thread = None; + trace!("Done unloading chain state of {}", self.chain_id); + } + first_iteration = true; + continue; + }, + result = Pin::new(&mut cross_chain_updates).peek().fuse() => result.is_none(), + result = Pin::new(&mut confirmations).peek().fuse() => result.is_none(), + result = Pin::new(&mut requests).peek().fuse() => result.is_none(), + }; - // Record how long the request waited in queue (in milliseconds) - #[cfg(with_metrics)] - { - let queue_wait_time_ms = _queued_at.elapsed().as_secs_f64() * 1000.0; - metrics::CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME.observe(queue_wait_time_ms); + if dropped { + break; + } + // After waking, re-evaluate at top of loop. + continue; + }; + + first_iteration = false; + + // Advance rotation to next type. + next_type_idx = (type_idx + 1) % types.len(); + + // Load chain state if not already loaded. + let worker = match &mut worker { + Some(w) => w, + None => { + let service_runtime_endpoint = if self.config.long_lived_services { + let actor = ServiceRuntimeActor::spawn(self.chain_id).await; + service_runtime_thread = Some(actor.thread); + service_runtime_task = Some(actor.task); + Some(actor.endpoint) + } else { + None + }; + + trace!("Loading chain state of {}", self.chain_id); + worker = Some( + ChainWorkerState::load( + self.config.clone(), + self.storage.clone(), + self.block_values.clone(), + self.execution_state_cache.clone(), + self.tracked_chains.clone(), + self.delivery_notifier.clone(), + self.chain_id, + service_runtime_endpoint, + ) + .await?, + ); + #[cfg(with_metrics)] + metrics::CHAIN_WORKER_STATES_LOADED.inc(); + worker.as_mut().unwrap() + } + }; + + // Process the request based on type. + match request_type { + RequestType::CrossChainUpdate => { + // Drain requests and group by origin, merging bundles. + let mut updates: BTreeMap> = + BTreeMap::new(); + let mut callbacks_by_origin: CrossChainUpdateCallbacks = BTreeMap::new(); + let mut count = 0; + + while count < cross_chain_batch_size { + match Pin::new(&mut cross_chain_updates).next().now_or_never() { + Some(Some((req, _span, enqueued_at))) => { + #[cfg(with_metrics)] + metrics::CHAIN_WORKER_QUEUE_WAIT_TIME + .with_label_values(&["cross_chain_updates"]) + .observe(enqueued_at.elapsed().as_millis() as f64); + updates.entry(req.origin).or_default().extend(req.bundles); + callbacks_by_origin + .entry(req.origin) + .or_default() + .push(req.callback); + count += 1; + } + None | Some(None) => break, + } + } + + if updates.is_empty() { + tracing::error!("cross-chain update queue empty; this is a bug!"); + continue; // Queue was empty after all. + } + trace!("batching {count} cross-chain updates"); + + // Sort and deduplicate bundles for each origin. + for bundles in updates.values_mut() { + // Sort by height, then transaction_index, then epoch (descending). + bundles.sort_by(|(epoch_a, a), (epoch_b, b)| { + a.height + .cmp(&b.height) + .then_with(|| a.transaction_index.cmp(&b.transaction_index)) + .then_with(|| epoch_b.cmp(epoch_a)) + }); + // Deduplicate by (height, transaction_index), keeping latest epoch. + bundles.dedup_by(|(_, a), (_, b)| { + a.height == b.height && a.transaction_index == b.transaction_index + }); + } + + match Box::pin(worker.process_cross_chain_update(updates)).await { + Ok(heights_by_origin) => { + for (origin, height) in heights_by_origin { + if let Some(callbacks) = callbacks_by_origin.remove(&origin) { + for callback in callbacks { + let _ = callback.send(Ok(height)); + } + } + } + } + Err(err) => { + let all_callbacks: Vec<_> = + callbacks_by_origin.into_values().flatten().collect(); + send_batch_error(all_callbacks, err); + } + } + } + RequestType::Confirmation => { + // Drain requests and group by recipient, keeping max height. + let mut confirmations_map: BTreeMap = BTreeMap::new(); + let mut callbacks = Vec::new(); + let mut count = 0; + + while count < cross_chain_batch_size { + match Pin::new(&mut confirmations).next().now_or_never() { + Some(Some((req, _span, enqueued_at))) => { + #[cfg(with_metrics)] + metrics::CHAIN_WORKER_QUEUE_WAIT_TIME + .with_label_values(&["confirmations"]) + .observe(enqueued_at.elapsed().as_millis() as f64); + confirmations_map + .entry(req.recipient) + .and_modify(|h| *h = (*h).max(req.latest_height)) + .or_insert(req.latest_height); + callbacks.push(req.callback); + count += 1; + } + None | Some(None) => break, } + } + + if confirmations_map.is_empty() { + tracing::error!("cross-chain confirmation queue empty; this is a bug!"); + continue; // Queue was empty after all. + } + trace!("batching {count} confirmations"); - Box::pin(worker.handle_request(request)).instrument(span).await; + match Box::pin(worker.confirm_updated_recipient(confirmations_map)).await { + Ok(()) => { + for callback in callbacks { + let _ = callback.send(Ok(())); + } + } + Err(err) => send_batch_error(callbacks, err), + } + } + RequestType::Regular => { + for _ in 0..regular_batch_size { + let Some(Some((request, span, enqueued_at))) = + Pin::new(&mut requests).next().now_or_never() + else { + break; + }; + #[cfg(with_metrics)] + metrics::CHAIN_WORKER_QUEUE_WAIT_TIME + .with_label_values(&["regular"]) + .observe(enqueued_at.elapsed().as_millis() as f64); + Box::pin(worker.handle_request(request)) + .instrument(span) + .await; } } } + } + // Clean up on exit. + if let Some(mut w) = worker.take() { trace!("Unloading chain state of {} ...", self.chain_id); - worker.clear_shared_chain_view().await; - drop(worker); + w.clear_shared_chain_view().await; + drop(w); #[cfg(with_metrics)] metrics::CHAIN_WORKER_STATES_LOADED.dec(); - if let Some(task) = service_runtime_task { + if let Some(task) = service_runtime_task.take() { task.await?; } + drop(service_runtime_thread); trace!("Done unloading chain state of {}", self.chain_id); } @@ -467,3 +689,16 @@ fn elide_option(option: &Option, f: &mut fmt::Formatter) -> fmt::Result { None => write!(f, "None"), } } + +/// Sends an error to the first callback and a generic batch failure to the rest. +fn send_batch_error(callbacks: Vec>>, err: WorkerError) { + let mut iter = callbacks.into_iter(); + if let Some(first) = iter.next() { + let _ = first.send(Err(err)); + } + for callback in iter { + let _ = callback.send(Err(WorkerError::ChainError(Box::new( + ChainError::InternalError("Batch processing failed".to_string()), + )))); + } +} diff --git a/linera-core/src/chain_worker/config.rs b/linera-core/src/chain_worker/config.rs index 4208af351308..44518be62be6 100644 --- a/linera-core/src/chain_worker/config.rs +++ b/linera-core/src/chain_worker/config.rs @@ -31,6 +31,13 @@ pub struct ChainWorkerConfig { pub sender_chain_ttl: Duration, /// The size to truncate receive log entries in chain info responses. pub chain_info_max_received_log_entries: usize, + /// Maximum number of regular requests to handle per round in the rotation. + /// The worker rotates between regular requests, cross-chain updates, and confirmations, + /// processing up to this many regular requests per turn. + pub regular_request_batch_size: usize, + /// Maximum number of cross-chain updates to batch together in a single processing round. + /// Higher values improve throughput but increase latency for individual updates. + pub cross_chain_update_batch_size: usize, } impl ChainWorkerConfig { @@ -64,6 +71,8 @@ impl Default for ChainWorkerConfig { ttl: Default::default(), sender_chain_ttl: Duration::from_secs(1), chain_info_max_received_log_entries: CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, + regular_request_batch_size: 1, + cross_chain_update_batch_size: 1000, } } } diff --git a/linera-core/src/chain_worker/mod.rs b/linera-core/src/chain_worker/mod.rs index f74513b0c2a2..f758068068b0 100644 --- a/linera-core/src/chain_worker/mod.rs +++ b/linera-core/src/chain_worker/mod.rs @@ -12,7 +12,10 @@ pub(super) use self::delivery_notifier::DeliveryNotifier; #[cfg(test)] pub(crate) use self::state::CrossChainUpdateHelper; pub(crate) use self::{ - actor::{ChainWorkerActor, ChainWorkerRequest, EventSubscriptionsResult}, + actor::{ + ChainActorReceivers, ChainWorkerActor, ChainWorkerRequest, ConfirmUpdatedRecipientRequest, + CrossChainUpdateRequest, EventSubscriptionsResult, + }, config::ChainWorkerConfig, state::BlockOutcome, }; diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 1d862b7b2076..612af455f947 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -9,7 +9,7 @@ use std::{ sync::{self, Arc}, }; -use futures::future::Either; +use futures::future::{try_join_all, Either}; #[cfg(with_metrics)] use linera_base::prometheus_util::MeasureLatency as _; use linera_base::{ @@ -194,23 +194,6 @@ where .await, ) .is_ok(), - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - callback, - } => callback - .send(self.process_cross_chain_update(origin, bundles).await) - .is_ok(), - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } => callback - .send( - self.confirm_updated_recipient(recipient, latest_height) - .await, - ) - .is_ok(), ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback .send(self.handle_chain_info_query(query).await) .is_ok(), @@ -617,35 +600,49 @@ where Ok(cross_chain_requests) } - /// Returns true if there are no more outgoing messages in flight up to the given - /// block height. - #[instrument(skip_all, fields( - chain_id = %self.chain_id(), - height = %height - ))] - async fn all_messages_to_tracked_chains_delivered_up_to( + /// Returns the lowest block height for which we don't know that all outgoing messages to + /// tracked chains have been delivered. + /// + /// If `tracked_chains` is `None` (validator mode), this returns the lowest height + /// for which delivery of all outgoing messages is now known to be complete. + #[instrument(skip_all, fields(chain_id = %self.chain_id()))] + async fn min_height_with_undelivered_messages_to_tracked_chains( &self, - height: BlockHeight, - ) -> Result { - if self.chain.all_messages_delivered_up_to(height) { - return Ok(true); - } + ) -> Result { + // Get the global max delivered height from outbox_counters. + let global_height = self.chain.min_height_with_undelivered_messages(); + let Some(tracked_chains) = self.tracked_chains.as_ref() else { - return Ok(false); + return Ok(global_height); // Validator mode: All chains are tracked. }; + + let next_height = self.chain.tip_state.get().next_block_height; + + if global_height == next_height { + return Ok(global_height); + } + + // Client mode: check only outboxes to tracked chains. let mut targets = self.chain.nonempty_outbox_chain_ids(); { let tracked_chains = tracked_chains.read().unwrap(); targets.retain(|target| tracked_chains.contains(target)); } - let outboxes = self.chain.load_outboxes(&targets).await?; - for outbox in outboxes { - let front = outbox.queue.front().await?; - if front.is_some_and(|key| key <= height) { - return Ok(false); - } + + if targets.is_empty() { + return Ok(next_height); // No pending messages to tracked chains. } - Ok(true) + + let outboxes = self.chain.load_outboxes(&targets).await?; + + // Find the minimum pending height across all tracked outboxes. + let fronts = try_join_all(outboxes.iter().map(|outbox| outbox.queue.front())).await?; + let min_height = fronts + .into_iter() + .flatten() + .min() + .map_or(next_height, |height| height.min(next_height)); + Ok(min_height) } /// Processes a leader timeout issued for this multi-owner chain. @@ -1061,80 +1058,104 @@ where } } - /// Updates the chain's inboxes, receiving messages from a cross-chain update. - #[instrument(level = "trace", skip(self, bundles))] - async fn process_cross_chain_update( + /// Processes a batch of cross-chain updates efficiently with a single save operation. + /// Takes a map from origin chain to bundles (already deduplicated by caller). + /// Returns a map from origin chain to the last updated height (if any). + #[instrument(level = "trace", skip(self, updates))] + pub(super) async fn process_cross_chain_update( &mut self, - origin: ChainId, - bundles: Vec<(Epoch, MessageBundle)>, - ) -> Result, WorkerError> { - // Only process certificates with relevant heights and epochs. - let next_height_to_receive = self.chain.next_block_height_to_receive(&origin).await?; - let last_anticipated_block_height = - self.chain.last_anticipated_block_height(&origin).await?; - let helper = CrossChainUpdateHelper::new(&self.config, &self.chain); + updates: BTreeMap>, + ) -> Result>, WorkerError> { let recipient = self.chain_id(); - let bundles = helper.select_message_bundles( - &origin, - recipient, - next_height_to_receive, - last_anticipated_block_height, - bundles, - )?; - let Some(last_updated_height) = bundles.last().map(|bundle| bundle.height) else { - return Ok(None); - }; - // Process the received messages in certificates. - let local_time = self.storage.clock().current_time(); - let mut previous_height = None; - for bundle in bundles { - let add_to_received_log = previous_height != Some(bundle.height); - previous_height = Some(bundle.height); - // Update the staged chain state with the received block. - self.chain - .receive_message_bundle(&origin, bundle, local_time, add_to_received_log) - .await?; + let helper = CrossChainUpdateHelper::new(&self.config, &self.chain); + + // Load all inbox metadata concurrently. + let inbox_info = self + .chain + .inbox_info_for_origins(updates.keys().copied()) + .await?; + + // Filter bundles and prepare for batch processing, grouped by origin. + let mut bundles_by_origin = BTreeMap::>::new(); + let mut last_height_by_origin = BTreeMap::new(); + + for (origin, bundles) in updates { + let (next_height_to_receive, last_anticipated_block_height) = inbox_info + .get(&origin) + .copied() + .unwrap_or((BlockHeight::ZERO, None)); + + let filtered_bundles = helper.select_message_bundles( + &origin, + recipient, + next_height_to_receive, + last_anticipated_block_height, + bundles, + )?; + + let last_updated_height = filtered_bundles.last().map(|bundle| bundle.height); + last_height_by_origin.insert(origin, last_updated_height); + + if !filtered_bundles.is_empty() { + let origin_bundles = bundles_by_origin.entry(origin).or_default(); + for bundle in filtered_bundles { + origin_bundles.insert((bundle.height, bundle.transaction_index), bundle); + } + } } + + if bundles_by_origin.is_empty() { + return Ok(last_height_by_origin); + } + + // Try to initialize the chain (may read chain description blob). + let local_time = self.storage.clock().current_time(); + self.chain.initialize_if_needed(local_time).await.ok(); + + // Check if the chain is still inactive. if !self.config.allow_inactive_chains && !self.chain.is_active() { - // Refuse to create a chain state if the chain is still inactive by - // now. Accordingly, do not send a confirmation, so that the - // cross-chain update is retried later. - warn!( - "Refusing to deliver messages to {recipient:?} from {origin:?} \ - at height {last_updated_height} because the recipient is still inactive", - ); - return Ok(None); + for origin in last_height_by_origin.keys() { + warn!( + "Refusing to deliver messages to {recipient:?} from {origin:?} \ + because the recipient is still inactive", + ); + } + return Ok(last_height_by_origin + .keys() + .map(|origin| (*origin, None)) + .collect()); } - // Save the chain. + + // Process all bundles via ChainStateView's batch method. + self.chain + .receive_message_bundles(bundles_by_origin, local_time) + .await?; + + // Save the chain once for all updates. self.save().await?; - Ok(Some(last_updated_height)) + Ok(last_height_by_origin) } - /// Handles the cross-chain request confirming that the recipient was updated. - #[instrument(skip_all, fields( - chain_id = %self.chain_id(), - recipient = %recipient, - latest_height = %latest_height - ))] - async fn confirm_updated_recipient( + /// Handles a batch of cross-chain confirmation requests efficiently with a single save operation. + /// Takes a map from recipient to the maximum confirmed height. + #[instrument(level = "trace", skip(self, confirmations))] + pub(super) async fn confirm_updated_recipient( &mut self, - recipient: ChainId, - latest_height: BlockHeight, + confirmations: BTreeMap, ) -> Result<(), WorkerError> { - let fully_delivered = self - .chain - .mark_messages_as_received(&recipient, latest_height) - .await? - && self - .all_messages_to_tracked_chains_delivered_up_to(latest_height) - .await?; - - self.save().await?; + let has_updates = self.chain.mark_messages_as_received(confirmations).await?; - if fully_delivered { - self.delivery_notifier.notify(latest_height); + if has_updates { + if let Ok(delivered_height) = self + .min_height_with_undelivered_messages_to_tracked_chains() + .await? + .try_sub_one() + { + self.delivery_notifier.notify(delivered_height); + } } + self.save().await?; Ok(()) } diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 33276a74c62b..f8a7f21017e1 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -23,7 +23,7 @@ use linera_base::{ #[cfg(with_testing)] use linera_chain::ChainExecutionContext; use linera_chain::{ - data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock}, + data_types::{BlockExecutionOutcome, BlockProposal, ProposedBlock}, types::{ Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate, @@ -42,7 +42,8 @@ use tracing::{error, instrument, trace, warn}; pub(crate) use crate::chain_worker::EventSubscriptionsResult; use crate::{ chain_worker::{ - BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, + BlockOutcome, ChainActorReceivers, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, + ConfirmUpdatedRecipientRequest, CrossChainUpdateRequest, DeliveryNotifier, }, data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, join_set_ext::{JoinSet, JoinSetExt}, @@ -408,12 +409,62 @@ where } } -/// The sender endpoint for [`ChainWorkerRequest`]s. -type ChainActorEndpoint = mpsc::UnboundedSender<( - ChainWorkerRequest<::Context>, - tracing::Span, - Instant, -)>; +/// The sender endpoints for communicating with a [`ChainWorkerActor`]. +struct ChainActorEndpoint +where + StorageClient: Storage, +{ + /// Endpoint for regular chain worker requests. + requests: mpsc::UnboundedSender<( + ChainWorkerRequest<::Context>, + tracing::Span, + Instant, + )>, + /// Endpoint for cross-chain update requests. + cross_chain_updates: mpsc::UnboundedSender<(CrossChainUpdateRequest, tracing::Span, Instant)>, + /// Endpoint for confirmation requests. + confirmations: + mpsc::UnboundedSender<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>, +} + +impl Clone for ChainActorEndpoint +where + StorageClient: Storage, +{ + fn clone(&self) -> Self { + Self { + requests: self.requests.clone(), + cross_chain_updates: self.cross_chain_updates.clone(), + confirmations: self.confirmations.clone(), + } + } +} + +impl ChainActorEndpoint +where + StorageClient: Storage, +{ + /// Creates a new pair of endpoint (senders) and receivers for a chain worker actor. + fn new() -> (Self, ChainActorReceivers) { + let (requests_sender, requests_receiver) = mpsc::unbounded_channel(); + let (cross_chain_sender, cross_chain_receiver) = mpsc::unbounded_channel(); + let (confirmations_sender, confirmations_receiver) = mpsc::unbounded_channel(); + + let endpoint = Self { + requests: requests_sender, + cross_chain_updates: cross_chain_sender, + confirmations: confirmations_sender, + }; + + let receivers = ChainActorReceivers { + requests: requests_receiver, + cross_chain_updates: cross_chain_receiver, + confirmations: confirmations_receiver, + }; + + (endpoint, receivers) + } +} pub(crate) type DeliveryNotifiers = HashMap; @@ -530,6 +581,30 @@ where self } + /// Returns an instance with the specified regular request batch size. + /// + /// Maximum number of regular requests to handle per round in the rotation. + /// The worker rotates between regular requests, cross-chain updates, and confirmations, + /// processing up to this many regular requests per turn. + #[instrument(level = "trace", skip(self))] + pub fn with_regular_request_batch_size(mut self, regular_request_batch_size: usize) -> Self { + self.chain_worker_config.regular_request_batch_size = regular_request_batch_size; + self + } + + /// Returns an instance with the specified cross-chain update batch size. + /// + /// Maximum number of cross-chain updates to batch together in a single processing round. + /// Higher values improve throughput but increase latency for individual updates. + #[instrument(level = "trace", skip(self))] + pub fn with_cross_chain_update_batch_size( + mut self, + cross_chain_update_batch_size: usize, + ) -> Self { + self.chain_worker_config.cross_chain_update_batch_size = cross_chain_update_batch_size; + self + } + #[instrument(level = "trace", skip(self))] pub fn nickname(&self) -> &str { &self.nickname @@ -774,28 +849,6 @@ where .await } - #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields( - nickname = %self.nickname, - origin = %origin, - recipient = %recipient, - num_bundles = %bundles.len() - ))] - async fn process_cross_chain_update( - &self, - origin: ChainId, - recipient: ChainId, - bundles: Vec<(Epoch, MessageBundle)>, - ) -> Result, WorkerError> { - self.query_chain_worker(recipient, move |callback| { - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - callback, - } - }) - .await - } - /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block. #[instrument(level = "trace", skip(self, chain_id, height), fields( nickname = %self.nickname, @@ -845,102 +898,164 @@ where oneshot::Sender>, ) -> ChainWorkerRequest, ) -> Result { - // Build the request. let (callback, response) = oneshot::channel(); let request = request_builder(callback); - // Call the endpoint, possibly a new one. - let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?; - - // We just created an endpoint: spawn the actor. - if let Some(receiver) = new_receiver { - let delivery_notifier = self - .delivery_notifiers - .lock() - .unwrap() - .entry(chain_id) - .or_default() - .clone(); - - let is_tracked = self - .tracked_chains - .as_ref() - .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id)); - - let actor_task = ChainWorkerActor::run( - self.chain_worker_config.clone(), - self.storage.clone(), - self.block_cache.clone(), - self.execution_state_cache.clone(), - self.tracked_chains.clone(), - delivery_notifier, - chain_id, - receiver, - is_tracked, - ); - - self.chain_worker_tasks - .lock() - .unwrap() - .spawn_task(actor_task); - } + self.send_chain_worker_request(chain_id, |endpoint| { + endpoint + .requests + .send((request, tracing::Span::current(), Instant::now())) + })?; - // Finally, wait a response. - match response.await { - Err(e) => { - // The actor endpoint was dropped. Better luck next time! - Err(WorkerError::ChainActorRecvError { - chain_id, - error: Box::new(e), - }) - } - Ok(response) => response, - } + response + .await + .map_err(|e| WorkerError::ChainActorRecvError { + chain_id, + error: Box::new(e), + })? } - /// Find an endpoint and call it. Create the endpoint if necessary. - #[instrument(level = "trace", skip(self), fields( + /// Sends a request to a chain worker, creating the worker if necessary. + /// + /// This method holds the lock on the chain workers map while sending to ensure that + /// if the send fails (because the actor was dropped), no other thread can get a stale + /// endpoint. The endpoint is removed before sending and only re-inserted on success. + #[instrument(level = "trace", skip(self, send_fn), fields( nickname = %self.nickname, chain_id = %chain_id ))] - #[expect(clippy::type_complexity)] - fn call_and_maybe_create_chain_worker_endpoint( + fn send_chain_worker_request( &self, chain_id: ChainId, - request: ChainWorkerRequest, - ) -> Result< - Option< - mpsc::UnboundedReceiver<( - ChainWorkerRequest, - tracing::Span, - Instant, - )>, - >, - WorkerError, - > { + send_fn: impl FnOnce( + &ChainActorEndpoint, + ) -> Result<(), mpsc::error::SendError>, + ) -> Result<(), WorkerError> { let mut chain_workers = self.chain_workers.lock().unwrap(); - let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) { + // Remove the endpoint from the map. If the send fails, we don't want other threads + // to get a stale endpoint. + let (endpoint, receivers) = if let Some(endpoint) = chain_workers.remove(&chain_id) { (endpoint, None) } else { - let (sender, receiver) = mpsc::unbounded_channel(); - (sender, Some(receiver)) + let (endpoint, receivers) = ChainActorEndpoint::new(); + (endpoint, Some(receivers)) }; - if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) { - // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. + // Try to send the request. + if let Err(e) = send_fn(&endpoint) { + // The actor was dropped. Don't re-insert the endpoint. return Err(WorkerError::ChainActorSendError { chain_id, error: Box::new(e), }); } - // Put back the sender in the cache for next time. - chain_workers.insert(chain_id, sender); + // Send succeeded. Re-insert the endpoint. + chain_workers.insert(chain_id, endpoint); #[cfg(with_metrics)] metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64); - Ok(new_receiver) + // If this is a new worker, spawn the actor (after releasing the lock). + if let Some(receivers) = receivers { + drop(chain_workers); + self.spawn_chain_worker_actor(chain_id, receivers); + } + + Ok(()) + } + + /// Spawns a new chain worker actor. + fn spawn_chain_worker_actor( + &self, + chain_id: ChainId, + receivers: ChainActorReceivers, + ) { + let delivery_notifier = self + .delivery_notifiers + .lock() + .unwrap() + .entry(chain_id) + .or_default() + .clone(); + + let is_tracked = self + .tracked_chains + .as_ref() + .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id)); + + let actor_task = ChainWorkerActor::run( + self.chain_worker_config.clone(), + self.storage.clone(), + self.block_cache.clone(), + self.execution_state_cache.clone(), + self.tracked_chains.clone(), + delivery_notifier, + chain_id, + receivers, + is_tracked, + ); + + self.chain_worker_tasks + .lock() + .unwrap() + .spawn_task(actor_task); + } + + /// Processes a cross-chain update. + async fn process_cross_chain_update( + &self, + recipient: ChainId, + origin: ChainId, + bundles: Vec<(Epoch, linera_chain::data_types::MessageBundle)>, + ) -> Result, WorkerError> { + let (callback, response) = oneshot::channel(); + let request = CrossChainUpdateRequest { + origin, + bundles, + callback, + }; + + self.send_chain_worker_request(recipient, |endpoint| { + endpoint + .cross_chain_updates + .send((request, tracing::Span::current(), Instant::now())) + })?; + + response + .await + .map_err(|e| WorkerError::ChainActorRecvError { + chain_id: recipient, + error: Box::new(e), + })? + } + + /// Processes a cross-chain update confirmation. + async fn process_cross_chain_confirmation( + &self, + sender: ChainId, + recipient: ChainId, + latest_height: BlockHeight, + ) -> Result<(), WorkerError> { + let (callback, response) = oneshot::channel(); + let request = ConfirmUpdatedRecipientRequest { + recipient, + latest_height, + callback, + }; + + self.send_chain_worker_request(sender, |endpoint| { + endpoint + .confirmations + .send((request, tracing::Span::current(), Instant::now())) + })?; + + response + .await + .map_err(|e| WorkerError::ChainActorRecvError { + chain_id: sender, + error: Box::new(e), + })? } #[instrument(skip_all, fields( @@ -1210,7 +1325,7 @@ where let mut actions = NetworkActions::default(); let origin = sender; let Some(height) = self - .process_cross_chain_update(origin, recipient, bundles) + .process_cross_chain_update(recipient, origin, bundles) .await? else { return Ok(actions); @@ -1233,14 +1348,8 @@ where recipient, latest_height, } => { - self.query_chain_worker(sender, move |callback| { - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } - }) - .await?; + self.process_cross_chain_confirmation(sender, recipient, latest_height) + .await?; Ok(NetworkActions::default()) } } diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index 2ed9913adfbc..90aa7867b996 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -73,6 +73,8 @@ struct ServerContext { block_cache_size: usize, execution_state_cache_size: usize, chain_info_max_received_log_entries: usize, + regular_request_batch_size: usize, + cross_chain_update_batch_size: usize, } impl ServerContext { @@ -102,7 +104,9 @@ impl ServerContext { .with_allow_messages_from_deprecated_epochs(false) .with_block_time_grace_period(self.block_time_grace_period) .with_chain_worker_ttl(self.chain_worker_ttl) - .with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries); + .with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries) + .with_regular_request_batch_size(self.regular_request_batch_size) + .with_cross_chain_update_batch_size(self.cross_chain_update_batch_size); (state, shard_id, shard.clone()) } @@ -410,6 +414,25 @@ enum ServerCommand { )] chain_info_max_received_log_entries: usize, + /// Maximum number of regular requests to handle per round in the rotation. + /// The worker rotates between regular requests, cross-chain updates, and confirmations, + /// processing up to this many regular requests per turn. + #[arg( + long, + default_value = "1", + env = "LINERA_SERVER_REGULAR_REQUEST_BATCH_SIZE" + )] + regular_request_batch_size: usize, + + /// Maximum number of cross-chain updates to batch together in a single processing round. + /// Higher values improve throughput but increase latency for individual updates. + #[arg( + long, + default_value = "1000", + env = "LINERA_SERVER_CROSS_CHAIN_UPDATE_BATCH_SIZE" + )] + cross_chain_update_batch_size: usize, + /// OpenTelemetry OTLP exporter endpoint (requires opentelemetry feature). #[arg(long, env = "LINERA_OTLP_EXPORTER_ENDPOINT")] otlp_exporter_endpoint: Option, @@ -539,6 +562,8 @@ async fn run(options: ServerOptions) { wasm_runtime, chain_worker_ttl, chain_info_max_received_log_entries, + regular_request_batch_size, + cross_chain_update_batch_size, otlp_exporter_endpoint: _, } => { linera_version::VERSION_INFO.log(); @@ -556,6 +581,8 @@ async fn run(options: ServerOptions) { block_cache_size: options.block_cache_size, execution_state_cache_size: options.execution_state_cache_size, chain_info_max_received_log_entries, + regular_request_batch_size, + cross_chain_update_batch_size, }; let wasm_runtime = wasm_runtime.with_wasm_default(); let store_config = storage_config