Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 168 additions & 126 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use linera_base::{
OracleResponse, Timestamp,
},
ensure,
identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
identifiers::{AccountOwner, ApplicationId, ChainId, StreamId},
ownership::ChainOwnership,
};
use linera_execution::{
Expand Down Expand Up @@ -46,7 +46,7 @@ use crate::{
manager::ChainManager,
outbox::OutboxStateView,
pending_blobs::PendingBlobsView,
ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
ChainError, ChainExecutionContext, ExecutionResultExt,
};

#[cfg(test)]
Expand Down Expand Up @@ -417,62 +417,88 @@ where
.with_execution_context(ChainExecutionContext::DescribeApplication)
}

#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
target = %target,
height = %height
))]
/// Marks messages as received for multiple recipients.
/// Takes a map from recipient to the maximum confirmed height.
///
/// Returns `true` if any messages were marked as received.
pub async fn mark_messages_as_received(
&mut self,
target: &ChainId,
height: BlockHeight,
max_height_by_recipient: BTreeMap<ChainId, BlockHeight>,
) -> Result<bool, ChainError> {
let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
let updates = outbox.mark_messages_as_received(height).await?;
if updates.is_empty() {
return Ok(false);
}
for update in updates {
let counter = self
.outbox_counters
.get_mut()
.get_mut(&update)
.ok_or_else(|| {
ChainError::InternalError("message counter should be present".into())
})?;
*counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
if *counter == 0 {
// Important for the test in `all_messages_delivered_up_to`.
self.outbox_counters.get_mut().remove(&update);
// Load all outboxes at once.
let recipients = max_height_by_recipient
.keys()
.copied()
.collect::<Vec<ChainId>>();
let loaded_outboxes = self.outboxes.try_load_entries_pairs_mut(recipients).await?;

// Process each recipient concurrently.
let processing_futures =
loaded_outboxes
.into_iter()
.filter_map(|(recipient, mut outbox)| {
let max_height = *max_height_by_recipient.get(&recipient)?;

Some(async move {
let updates = outbox.mark_messages_as_received(max_height).await?;
Ok::<_, ChainError>((recipient, updates, outbox))
})
});

// Execute all processing concurrently.
let processing_results = futures::future::try_join_all(processing_futures).await?;

// Apply counter updates to shared state sequentially.
let mut any_updates = false;
for (recipient, updates, outbox) in &processing_results {
if updates.is_empty() {
continue;
}
}
if outbox.queue.count() == 0 {
self.nonempty_outboxes.get_mut().remove(target);
// If the outbox is empty and not ahead of the executed blocks, remove it.
if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
self.outboxes.remove_entry(target)?;
any_updates = true;
for update in updates {
let counter = self
.outbox_counters
.get_mut()
.get_mut(update)
.ok_or_else(|| {
ChainError::InternalError("message counter should be present".into())
})?;
*counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
if *counter == 0 {
// Important for the test in `max_height_with_all_messages_delivered`.
self.outbox_counters.get_mut().remove(update);
}
}
if outbox.queue.count() == 0 {
self.nonempty_outboxes.get_mut().remove(recipient);
// If the outbox is empty and not ahead of the executed blocks, remove it.
if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
self.outboxes.remove_entry(recipient)?;
}
}
}

#[cfg(with_metrics)]
metrics::NUM_OUTBOXES
.with_label_values(&[])
.observe(self.outboxes.count().await? as f64);
Ok(true)

Ok(any_updates)
}

/// Returns true if there are no more outgoing messages in flight up to the given
/// block height.
pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
/// Returns the lowest block height for which we don't know that all outgoing messages have
/// been delivered.
pub fn min_height_with_undelivered_messages(&self) -> BlockHeight {
let next_height = self.tip_state.get().next_block_height;
let counters = self.outbox_counters.get();
tracing::debug!(
"Messages left in {:.8}'s outbox: {:?}",
self.chain_id(),
self.outbox_counters.get()
counters
);
if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
key > &height
} else {
true
}
counters
.first_key_value()
.map_or(next_height, |(height, _)| (*height).min(next_height))
}

/// Invariant for the states of active chains.
Expand Down Expand Up @@ -508,17 +534,6 @@ where
Ok(())
}

pub async fn next_block_height_to_receive(
&self,
origin: &ChainId,
) -> Result<BlockHeight, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
match inbox {
Some(inbox) => inbox.next_block_height_to_receive(),
None => Ok(BlockHeight::ZERO),
}
}

/// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
///
/// The "+ 1" is so that it can be used in the same places as `next_block_height`.
Expand All @@ -529,90 +544,117 @@ where
Ok(self.tip_state.get().next_block_height)
}

pub async fn last_anticipated_block_height(
/// Returns inbox metadata for multiple origins, loading all inboxes concurrently.
///
/// For each origin, returns a tuple of:
/// - `next_block_height_to_receive`: The next block height expected from this origin.
/// - `last_anticipated_block_height`: The height of the last removed bundle, if any.
pub async fn inbox_info_for_origins(
&self,
origin: &ChainId,
) -> Result<Option<BlockHeight>, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
match inbox {
Some(inbox) => match inbox.removed_bundles.back().await? {
Some(bundle) => Ok(Some(bundle.height)),
None => Ok(None),
},
None => Ok(None),
}
origins: impl IntoIterator<Item = ChainId>,
) -> Result<BTreeMap<ChainId, (BlockHeight, Option<BlockHeight>)>, ChainError> {
let origins: Vec<_> = origins.into_iter().collect();
let loaded_inboxes = self.inboxes.try_load_entries_pairs(origins).await?;

let futures: Vec<_> = loaded_inboxes
.into_iter()
.map(|(origin, inbox)| async move {
let (next_height, last_anticipated) = match inbox {
Some(inbox) => {
let next = inbox.next_block_height_to_receive()?;
let last = match inbox.removed_bundles.back().await? {
Some(bundle) => Some(bundle.height),
None => None,
};
(next, last)
}
None => (BlockHeight::ZERO, None),
};
Ok::<_, ChainError>((origin, (next_height, last_anticipated)))
})
.collect();

futures::future::try_join_all(futures)
.await
.map(|results| results.into_iter().collect())
}

/// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
/// internal error if the bundle doesn't appear to be new, based on the sender's
/// height. The value `local_time` is specific to each validator and only used for
/// round timeouts.
/// Processes multiple message bundles from different origins concurrently.
/// Each inbox is loaded once and all its bundles are processed together.
/// Side effects to shared state (`unskippable_bundles`, `received_log`) are collected
/// during concurrent processing and applied sequentially at the end.
///
/// Returns `true` if incoming `Subscribe` messages created new outbox entries.
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
origin = %origin,
bundle_height = %bundle.height
))]
pub async fn receive_message_bundle(
/// The bundles are provided as a map from origin chain to a map of bundles keyed by
/// `(height, transaction_index)`. This ensures proper ordering and deduplication.
pub async fn receive_message_bundles(
&mut self,
origin: &ChainId,
bundle: MessageBundle,
mut bundles_by_origin: BTreeMap<ChainId, BTreeMap<(BlockHeight, u32), MessageBundle>>,
local_time: Timestamp,
add_to_received_log: bool,
) -> Result<(), ChainError> {
assert!(!bundle.messages.is_empty());
let chain_id = self.chain_id();
tracing::trace!(
"Processing new messages to {chain_id:.8} from {origin} at height {}",
bundle.height,
);
let chain_and_height = ChainAndHeight {
chain_id: *origin,
height: bundle.height,
};

match self.initialize_if_needed(local_time).await {
Ok(_) => (),
// if the only issue was that we couldn't initialize the chain because of a
// missing chain description blob, we might still want to update the inbox
Err(ChainError::ExecutionError(exec_err, _))
if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
if blobs.iter().all(|blob_id| {
blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
})) => {}
err => {
return err;
}
if bundles_by_origin.is_empty() {
return Ok(());
}

// Process the inbox bundle and update the inbox state.
let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
#[cfg(with_metrics)]
metrics::NUM_INBOXES
.with_label_values(&[])
.observe(self.inboxes.count().await? as f64);
let entry = BundleInInbox::new(*origin, &bundle);
let skippable = bundle.is_skippable();
let newly_added = inbox
.add_bundle(bundle)
.await
.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;
if newly_added && !skippable {
let seen = local_time;
self.unskippable_bundles
.push_back(TimestampedBundleInInbox { entry, seen });
}
// Load the inboxes for the origins.
let origins: Vec<ChainId> = bundles_by_origin.keys().copied().collect();
let loaded_inboxes = self.inboxes.try_load_entries_pairs_mut(origins).await?;

// Process each origin concurrently.
let processing_futures: Vec<_> = loaded_inboxes
.into_iter()
.filter_map(|(origin, mut inbox)| {
let bundles = bundles_by_origin.remove(&origin)?;

Some(async move {
let mut unskippable_bundles = Vec::new();
let mut received_log_heights = BTreeSet::new();

for bundle in bundles.into_values() {
let height = bundle.height;
let entry = BundleInInbox::new(origin, &bundle);
let skippable = bundle.is_skippable();

// Add bundle to the inbox.
let newly_added =
inbox
.add_bundle(bundle)
.await
.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;

// Collect side effects for later application.
if newly_added && !skippable {
unskippable_bundles.push((entry, local_time));
}
received_log_heights.insert(height);
}

// Remember the certificate for future validator/client synchronizations.
if add_to_received_log {
self.received_log.push(chain_and_height);
Ok::<_, ChainError>((origin, unskippable_bundles, received_log_heights))
})
})
.collect();

// Execute all processing concurrently.
let processing_results = futures::future::try_join_all(processing_futures).await?;

// Apply side effects to shared state sequentially.
for (origin, unskippable_bundles, received_log_heights) in processing_results {
for (entry, seen) in unskippable_bundles {
self.unskippable_bundles
.push_back(TimestampedBundleInInbox { entry, seen });
}
for height in received_log_heights {
self.received_log.push(ChainAndHeight {
chain_id: origin,
height,
});
}
}

Ok(())
}

Expand Down
Loading
Loading