Skip to content

Commit ff9cb45

Browse files
committed
Add cross chain confirmation batch size option
1 parent 5f23cf2 commit ff9cb45

4 files changed

Lines changed: 38 additions & 15 deletions

File tree

linera-core/src/chain_worker/actor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,8 @@ where
417417
trace!("Starting `ChainWorkerActor`");
418418

419419
let regular_batch_size = self.config.regular_request_batch_size;
420-
let cross_chain_batch_size = self.config.cross_chain_update_batch_size;
420+
let cross_chain_update_batch_size = self.config.cross_chain_update_batch_size;
421+
let confirmation_batch_size = self.config.cross_chain_confirmation_batch_size;
421422

422423
// The first iteration waits indefinitely; subsequent iterations have a timeout.
423424
let mut first_iteration = true;
@@ -444,12 +445,12 @@ where
444445
loop {
445446
// Check which streams have data ready.
446447
let types = [
447-
(RequestType::Confirmation, is_ready(&mut confirmations)),
448448
(
449449
RequestType::CrossChainUpdate,
450450
is_ready(&mut cross_chain_updates),
451451
),
452452
(RequestType::Regular, is_ready(&mut requests)),
453+
(RequestType::Confirmation, is_ready(&mut confirmations)),
453454
];
454455

455456
// Find the next ready queue in rotation order.
@@ -549,7 +550,7 @@ where
549550
let mut callbacks_by_origin: CrossChainUpdateCallbacks = BTreeMap::new();
550551
let mut count = 0;
551552

552-
while count < cross_chain_batch_size {
553+
while count < cross_chain_update_batch_size {
553554
match Pin::new(&mut cross_chain_updates).next().now_or_never() {
554555
Some(Some((req, _span, _enqueued_at))) => {
555556
#[cfg(with_metrics)]
@@ -611,7 +612,7 @@ where
611612
let mut callbacks = Vec::new();
612613
let mut count = 0;
613614

614-
while count < cross_chain_batch_size {
615+
while count < confirmation_batch_size {
615616
match Pin::new(&mut confirmations).next().now_or_never() {
616617
Some(Some((req, _span, _enqueued_at))) => {
617618
#[cfg(with_metrics)]

linera-core/src/chain_worker/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@ pub struct ChainWorkerConfig {
3232
/// The size to truncate receive log entries in chain info responses.
3333
pub chain_info_max_received_log_entries: usize,
3434
/// Maximum number of regular requests to handle per round in the rotation.
35-
/// The worker rotates between regular requests, cross-chain updates, and confirmations,
36-
/// processing up to this many regular requests per turn.
3735
pub regular_request_batch_size: usize,
3836
/// Maximum number of cross-chain updates to batch together in a single processing round.
39-
/// Higher values improve throughput but increase latency for individual updates.
4037
pub cross_chain_update_batch_size: usize,
38+
/// Maximum number of cross-chain confirmations to batch together in a single processing round.
39+
pub cross_chain_confirmation_batch_size: usize,
4140
}
4241

4342
impl ChainWorkerConfig {
@@ -71,8 +70,9 @@ impl Default for ChainWorkerConfig {
7170
ttl: Default::default(),
7271
sender_chain_ttl: Duration::from_secs(1),
7372
chain_info_max_received_log_entries: CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
74-
regular_request_batch_size: 1,
73+
regular_request_batch_size: 10,
7574
cross_chain_update_batch_size: 1000,
75+
cross_chain_confirmation_batch_size: 500,
7676
}
7777
}
7878
}

linera-core/src/worker.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,7 @@ where
423423
/// Endpoint for cross-chain update requests.
424424
cross_chain_updates: mpsc::UnboundedSender<(CrossChainUpdateRequest, tracing::Span, Instant)>,
425425
/// Endpoint for confirmation requests.
426-
confirmations:
427-
mpsc::UnboundedSender<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>,
426+
confirmations: mpsc::UnboundedSender<(ConfirmUpdatedRecipientRequest, tracing::Span, Instant)>,
428427
}
429428

430429
impl<StorageClient> Clone for ChainActorEndpoint<StorageClient>
@@ -605,6 +604,20 @@ where
605604
self
606605
}
607606

607+
/// Returns an instance with the specified cross-chain confirmation batch size.
608+
///
609+
/// Maximum number of cross-chain confirmations to batch together in a single processing round.
610+
/// Confirmations are lighter than updates, so this can be set higher.
611+
#[instrument(level = "trace", skip(self))]
612+
pub fn with_cross_chain_confirmation_batch_size(
613+
mut self,
614+
cross_chain_confirmation_batch_size: usize,
615+
) -> Self {
616+
self.chain_worker_config.cross_chain_confirmation_batch_size =
617+
cross_chain_confirmation_batch_size;
618+
self
619+
}
620+
608621
#[instrument(level = "trace", skip(self))]
609622
pub fn nickname(&self) -> &str {
610623
&self.nickname

linera-service/src/server.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ struct ServerContext {
7575
chain_info_max_received_log_entries: usize,
7676
regular_request_batch_size: usize,
7777
cross_chain_update_batch_size: usize,
78+
cross_chain_confirmation_batch_size: usize,
7879
}
7980

8081
impl ServerContext {
@@ -106,7 +107,8 @@ impl ServerContext {
106107
.with_chain_worker_ttl(self.chain_worker_ttl)
107108
.with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries)
108109
.with_regular_request_batch_size(self.regular_request_batch_size)
109-
.with_cross_chain_update_batch_size(self.cross_chain_update_batch_size);
110+
.with_cross_chain_update_batch_size(self.cross_chain_update_batch_size)
111+
.with_cross_chain_confirmation_batch_size(self.cross_chain_confirmation_batch_size);
110112
(state, shard_id, shard.clone())
111113
}
112114

@@ -415,24 +417,29 @@ enum ServerCommand {
415417
chain_info_max_received_log_entries: usize,
416418

417419
/// Maximum number of regular requests to handle per round in the rotation.
418-
/// The worker rotates between regular requests, cross-chain updates, and confirmations,
419-
/// processing up to this many regular requests per turn.
420420
#[arg(
421421
long,
422-
default_value = "1",
422+
default_value = "10",
423423
env = "LINERA_SERVER_REGULAR_REQUEST_BATCH_SIZE"
424424
)]
425425
regular_request_batch_size: usize,
426426

427427
/// Maximum number of cross-chain updates to batch together in a single processing round.
428-
/// Higher values improve throughput but increase latency for individual updates.
429428
#[arg(
430429
long,
431430
default_value = "1000",
432431
env = "LINERA_SERVER_CROSS_CHAIN_UPDATE_BATCH_SIZE"
433432
)]
434433
cross_chain_update_batch_size: usize,
435434

435+
/// Maximum number of cross-chain confirmations to batch together in a single processing round.
436+
#[arg(
437+
long,
438+
default_value = "500",
439+
env = "LINERA_SERVER_CROSS_CHAIN_CONFIRMATION_BATCH_SIZE"
440+
)]
441+
cross_chain_confirmation_batch_size: usize,
442+
436443
/// OpenTelemetry OTLP exporter endpoint (requires opentelemetry feature).
437444
#[arg(long, env = "LINERA_OTLP_EXPORTER_ENDPOINT")]
438445
otlp_exporter_endpoint: Option<String>,
@@ -564,6 +571,7 @@ async fn run(options: ServerOptions) {
564571
chain_info_max_received_log_entries,
565572
regular_request_batch_size,
566573
cross_chain_update_batch_size,
574+
cross_chain_confirmation_batch_size,
567575
otlp_exporter_endpoint: _,
568576
} => {
569577
linera_version::VERSION_INFO.log();
@@ -583,6 +591,7 @@ async fn run(options: ServerOptions) {
583591
chain_info_max_received_log_entries,
584592
regular_request_batch_size,
585593
cross_chain_update_batch_size,
594+
cross_chain_confirmation_batch_size,
586595
};
587596
let wasm_runtime = wasm_runtime.with_wasm_default();
588597
let store_config = storage_config

0 commit comments

Comments
 (0)