Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ where
trace!("Starting `ChainWorkerActor`");

let regular_batch_size = self.config.regular_request_batch_size;
let cross_chain_batch_size = self.config.cross_chain_update_batch_size;
let cross_chain_update_batch_size = self.config.cross_chain_update_batch_size;
let confirmation_batch_size = self.config.cross_chain_confirmation_batch_size;

// The first iteration waits indefinitely; subsequent iterations have a timeout.
let mut first_iteration = true;
Expand All @@ -444,12 +445,12 @@ where
loop {
// Check which streams have data ready.
let types = [
(RequestType::Confirmation, is_ready(&mut confirmations)),
(
RequestType::CrossChainUpdate,
is_ready(&mut cross_chain_updates),
),
(RequestType::Regular, is_ready(&mut requests)),
(RequestType::Confirmation, is_ready(&mut confirmations)),
];

// Find the next ready queue in rotation order.
Expand Down Expand Up @@ -549,7 +550,7 @@ where
let mut callbacks_by_origin: CrossChainUpdateCallbacks = BTreeMap::new();
let mut count = 0;

while count < cross_chain_batch_size {
while count < cross_chain_update_batch_size {
match Pin::new(&mut cross_chain_updates).next().now_or_never() {
Some(Some((req, _span, _enqueued_at))) => {
#[cfg(with_metrics)]
Expand Down Expand Up @@ -611,7 +612,7 @@ where
let mut callbacks = Vec::new();
let mut count = 0;

while count < cross_chain_batch_size {
while count < confirmation_batch_size {
match Pin::new(&mut confirmations).next().now_or_never() {
Some(Some((req, _span, _enqueued_at))) => {
#[cfg(with_metrics)]
Expand Down
8 changes: 4 additions & 4 deletions linera-core/src/chain_worker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ pub struct ChainWorkerConfig {
/// The size to truncate receive log entries in chain info responses.
pub chain_info_max_received_log_entries: usize,
/// Maximum number of regular requests to handle per round in the rotation.
/// The worker rotates between regular requests, cross-chain updates, and confirmations,
/// processing up to this many regular requests per turn.
pub regular_request_batch_size: usize,
/// Maximum number of cross-chain updates to batch together in a single processing round.
/// Higher values improve throughput but increase latency for individual updates.
pub cross_chain_update_batch_size: usize,
/// Maximum number of cross-chain confirmations to batch together in a single processing round.
pub cross_chain_confirmation_batch_size: usize,
}

impl ChainWorkerConfig {
Expand Down Expand Up @@ -71,8 +70,9 @@ impl Default for ChainWorkerConfig {
ttl: Default::default(),
sender_chain_ttl: Duration::from_secs(1),
chain_info_max_received_log_entries: CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
regular_request_batch_size: 1,
regular_request_batch_size: 10,
cross_chain_update_batch_size: 1000,
cross_chain_confirmation_batch_size: 500,
}
}
}
17 changes: 15 additions & 2 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,7 @@ where
/// Endpoint for cross-chain update requests.
cross_chain_updates: mpsc::UnboundedSender<(CrossChainUpdateRequest, tracing::Span, Instant)>,
/// Endpoint for confirmation requests.
confirmations:
mpsc::UnboundedSender<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>,
confirmations: mpsc::UnboundedSender<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>,
}

impl<StorageClient> Clone for ChainActorEndpoint<StorageClient>
Expand Down Expand Up @@ -605,6 +604,20 @@ where
self
}

/// Returns an instance with the specified cross-chain confirmation batch size.
///
/// Maximum number of cross-chain confirmations to batch together in a single processing round.
/// Confirmations are lighter than updates, so this can be set higher.
#[instrument(level = "trace", skip(self))]
pub fn with_cross_chain_confirmation_batch_size(
mut self,
cross_chain_confirmation_batch_size: usize,
) -> Self {
self.chain_worker_config.cross_chain_confirmation_batch_size =
cross_chain_confirmation_batch_size;
self
}

#[instrument(level = "trace", skip(self))]
pub fn nickname(&self) -> &str {
&self.nickname
Expand Down
19 changes: 14 additions & 5 deletions linera-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct ServerContext {
chain_info_max_received_log_entries: usize,
regular_request_batch_size: usize,
cross_chain_update_batch_size: usize,
cross_chain_confirmation_batch_size: usize,
}

impl ServerContext {
Expand Down Expand Up @@ -106,7 +107,8 @@ impl ServerContext {
.with_chain_worker_ttl(self.chain_worker_ttl)
.with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries)
.with_regular_request_batch_size(self.regular_request_batch_size)
.with_cross_chain_update_batch_size(self.cross_chain_update_batch_size);
.with_cross_chain_update_batch_size(self.cross_chain_update_batch_size)
.with_cross_chain_confirmation_batch_size(self.cross_chain_confirmation_batch_size);
(state, shard_id, shard.clone())
}

Expand Down Expand Up @@ -415,24 +417,29 @@ enum ServerCommand {
chain_info_max_received_log_entries: usize,

/// Maximum number of regular requests to handle per round in the rotation.
/// The worker rotates between regular requests, cross-chain updates, and confirmations,
/// processing up to this many regular requests per turn.
#[arg(
long,
default_value = "1",
default_value = "10",
env = "LINERA_SERVER_REGULAR_REQUEST_BATCH_SIZE"
)]
regular_request_batch_size: usize,

/// Maximum number of cross-chain updates to batch together in a single processing round.
/// Higher values improve throughput but increase latency for individual updates.
#[arg(
long,
default_value = "1000",
env = "LINERA_SERVER_CROSS_CHAIN_UPDATE_BATCH_SIZE"
)]
cross_chain_update_batch_size: usize,

/// Maximum number of cross-chain confirmations to batch together in a single processing round.
#[arg(
long,
default_value = "500",
env = "LINERA_SERVER_CROSS_CHAIN_CONFIRMATION_BATCH_SIZE"
)]
cross_chain_confirmation_batch_size: usize,

/// OpenTelemetry OTLP exporter endpoint (requires opentelemetry feature).
#[arg(long, env = "LINERA_OTLP_EXPORTER_ENDPOINT")]
otlp_exporter_endpoint: Option<String>,
Expand Down Expand Up @@ -564,6 +571,7 @@ async fn run(options: ServerOptions) {
chain_info_max_received_log_entries,
regular_request_batch_size,
cross_chain_update_batch_size,
cross_chain_confirmation_batch_size,
otlp_exporter_endpoint: _,
} => {
linera_version::VERSION_INFO.log();
Expand All @@ -583,6 +591,7 @@ async fn run(options: ServerOptions) {
chain_info_max_received_log_entries,
regular_request_batch_size,
cross_chain_update_batch_size,
cross_chain_confirmation_batch_size,
};
let wasm_runtime = wasm_runtime.with_wasm_default();
let store_config = storage_config
Expand Down
Loading