diff --git a/Cargo.lock b/Cargo.lock index f920c6cb75..ba28af222b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7336,6 +7336,7 @@ dependencies = [ "restate-futures-util", "restate-invoker-api", "restate-queue", + "restate-serde-util", "restate-service-client", "restate-service-protocol", "restate-service-protocol-v4", diff --git a/crates/invoker-impl/Cargo.toml b/crates/invoker-impl/Cargo.toml index 67f0ee32ef..1bddd77902 100644 --- a/crates/invoker-impl/Cargo.toml +++ b/crates/invoker-impl/Cargo.toml @@ -19,6 +19,7 @@ restate-errors = { workspace = true } restate-futures-util = { workspace = true } restate-invoker-api = { workspace = true } restate-queue = { workspace = true } +restate-serde-util = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["message", "codec"] } restate-service-protocol-v4 = { workspace = true, features = ["message-codec", "entry-codec"] } diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 1190e48c60..83dadb37db 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -22,18 +22,21 @@ use std::task::{Context, Poll, ready}; use std::time::{Duration, Instant}; use bytes::Bytes; -use futures::{FutureExt, Stream}; +use futures::{FutureExt, Stream, StreamExt}; use http::response::Parts as ResponseParts; use http::{HeaderName, HeaderValue, Response}; use http_body::{Body, Frame}; -use metrics::histogram; +use metrics::{counter, histogram}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_util::task::AbortOnDropHandle; -use tracing::instrument; +use tracing::{debug, instrument}; -use restate_invoker_api::invocation_reader::{InvocationReader, InvocationReaderTransaction}; +use restate_invoker_api::invocation_reader::{ + EagerState, InvocationReader, InvocationReaderTransaction, +}; use restate_invoker_api::{EntryEnricher, InvokeInputJournal}; +use restate_serde_util::ByteCount; use restate_service_client::{Request, ResponseBody, ServiceClient, ServiceClientError}; use restate_types::deployment::PinnedDeployment; use restate_types::identifiers::{InvocationId, PartitionLeaderEpoch}; @@ -51,7 +54,7 @@ use restate_types::service_protocol::ServiceProtocolVersion; use crate::TokenBucket; use crate::error::InvokerError; use crate::invocation_task::service_protocol_runner::ServiceProtocolRunner; -use crate::metric_definitions::{ID_LOOKUP, INVOKER_TASK_DURATION}; +use crate::metric_definitions::{ID_LOOKUP, INVOKER_EAGER_STATE_TRUNCATED, INVOKER_TASK_DURATION}; // Clippy false positive, might be caused by Bytes contained within HeaderValue. // https://github.com/rust-lang/rust/issues/40543#issuecomment-1212981256 @@ -82,6 +85,56 @@ const SERVICE_PROTOCOL_VERSION_V6: HeaderValue = #[allow(clippy::declare_interior_mutable_const)] const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server"); +/// Collects state entries from an [`EagerState`] stream, respecting a size limit. +/// +/// Returns a tuple of `(is_partial, entries)` where: +/// - `is_partial` is true if the state was already partial or if collection stopped due to size limit +/// - `entries` contains the collected and mapped key-value bytes +/// +/// If the first entry already exceeds the size limit, then an empty entries [`Vec`] is returned. +async fn collect_eager_state( + state: Option>, + size_limit: usize, + mut mapper: impl FnMut((Bytes, Bytes)) -> T, +) -> Result<(bool, Vec), InvokerError> +where + S: Stream> + Send, + E: std::error::Error + Send + Sync + 'static, +{ + let Some(state) = state else { + return Ok((true, Vec::new())); + }; + + let mut is_partial = state.is_partial(); + let mut entries = Vec::new(); + let mut total_size: usize = 0; + + let mut stream = std::pin::pin!(state.into_inner()); + while let Some(result) = stream.next().await { + let (key, value) = result.map_err(|e| InvokerError::StateReader(e.into()))?; + let entry_size = key.len() + value.len(); + + // Check if adding this entry would exceed the limit + if total_size.saturating_add(entry_size) > size_limit { + debug!( + "Eager state size limit reached ({}, limit: {}), \ + sending partial state with {} entries", + ByteCount::from(total_size), + ByteCount::from(size_limit), + entries.len() + ); + counter!(INVOKER_EAGER_STATE_TRUNCATED).increment(1); + is_partial = true; + break; + } + + total_size = total_size.saturating_add(entry_size); + entries.push(mapper((key, value))); + } + + Ok((is_partial, entries)) +} + pub(super) struct InvocationTaskOutput { pub(super) partition: PartitionLeaderEpoch, pub(super) invocation_id: InvocationId, @@ -143,7 +196,7 @@ pub(super) struct InvocationTask { invocation_target: InvocationTarget, inactivity_timeout: Duration, abort_timeout: Duration, - disable_eager_state: bool, + eager_state_size_limit: usize, message_size_warning: NonZeroUsize, message_size_limit: NonZeroUsize, retry_count_since_last_stored_entry: u32, @@ -213,7 +266,7 @@ where invocation_target: InvocationTarget, default_inactivity_timeout: Duration, default_abort_timeout: Duration, - disable_eager_state: bool, + eager_state_size_limit: usize, message_size_warning: NonZeroUsize, message_size_limit: NonZeroUsize, retry_count_since_last_stored_entry: u32, @@ -230,7 +283,7 @@ where invocation_target, inactivity_timeout: default_inactivity_timeout, abort_timeout: default_abort_timeout, - disable_eager_state, + eager_state_size_limit, entry_enricher, schemas: deployment_metadata_resolver, invoker_tx, @@ -389,10 +442,17 @@ where ))); } - // Determine if we need to read state + // Resolve the effective eager state size limit: + // Per-handler/service override takes precedence over server-level config. + // 0 means "disable eager state", non-zero values are clamped to the message size limit. + if let Some(limit) = invocation_attempt_options.eager_state_size_limit { + let limit = limit.as_usize(); + self.eager_state_size_limit = limit.min(self.message_size_limit.get()); + } + + // Determine if we need to read state (0 means lazy state / no eager state) let keyed_service_id = if self.invocation_target.as_keyed_service_id().is_some() - && invocation_attempt_options.enable_lazy_state != Some(true) - && !self.disable_eager_state + && self.eager_state_size_limit > 0 { self.invocation_target.as_keyed_service_id() } else { @@ -548,3 +608,132 @@ impl Stream for ResponseStream { } } } + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use futures::stream; + + use restate_invoker_api::invocation_reader::EagerState; + + use super::collect_eager_state; + + type StateResult = Result<(Bytes, Bytes), std::io::Error>; + + // Helper to create a (Bytes, Bytes) pair of known sizes + fn entry(key_size: usize, value_size: usize) -> StateResult { + Ok(( + Bytes::from(vec![b'k'; key_size]), + Bytes::from(vec![b'v'; value_size]), + )) + } + + #[tokio::test] + async fn collect_eager_state_no_state_returns_partial() { + let (is_partial, entries) = collect_eager_state::< + stream::Empty>, + _, + _, + >(None, 1024, std::convert::identity) + .await + .unwrap(); + + assert!(is_partial, "no state should be reported as partial"); + assert!(entries.is_empty()); + } + + #[tokio::test] + async fn collect_eager_state_complete_within_limit() { + let items = vec![entry(10, 20), entry(5, 15)]; + let state = EagerState::new_complete(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 1024, std::convert::identity) + .await + .unwrap(); + + assert!(!is_partial, "all entries fit within limit"); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn collect_eager_state_preserves_partial_flag() { + // Stream is pre-flagged as partial even though all entries fit + let items = vec![entry(10, 10)]; + let state = EagerState::new_partial(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 1024, std::convert::identity) + .await + .unwrap(); + + assert!(is_partial, "partial flag should be preserved from source"); + assert_eq!(entries.len(), 1); + } + + #[tokio::test] + async fn collect_eager_state_truncates_at_limit() { + // 3 entries of 50 bytes each, limit of 120 bytes => should fit 2 + let items = vec![entry(25, 25), entry(25, 25), entry(25, 25)]; + let state = EagerState::new_complete(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 120, std::convert::identity) + .await + .unwrap(); + + assert!(is_partial, "should be partial after truncation"); + assert_eq!(entries.len(), 2, "only 2 entries should fit (100 bytes)"); + } + + #[tokio::test] + async fn collect_eager_state_first_entry_always_included() { + // Single entry larger than the limit — should return empty entries + let items = vec![entry(100, 101)]; + let state = EagerState::new_complete(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 200, std::convert::identity) + .await + .unwrap(); + + assert!(is_partial, "first entry exceeded limit so partial state"); + assert!( + entries.is_empty(), + "first entry exceeded limit so empty entries" + ); + } + + #[tokio::test] + async fn collect_eager_state_stream_error_propagated() { + let items: Vec = vec![Err(std::io::Error::other("boom"))]; + let state = EagerState::new_complete(stream::iter(items)); + + let result = collect_eager_state(Some(state), 1024, std::convert::identity).await; + assert!(result.is_err(), "stream error should be propagated"); + } + + #[tokio::test] + async fn collect_eager_state_exact_boundary() { + // 2 entries of exactly 50 bytes each, limit of 100 => both should fit + let items = vec![entry(25, 25), entry(25, 25)]; + let state = EagerState::new_complete(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 100, std::convert::identity) + .await + .unwrap(); + + assert!(!is_partial, "entries exactly at limit should fit"); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn collect_eager_state_one_byte_over_limit() { + // 2 entries of 50 bytes each, limit of 99 => only first should fit + let items = vec![entry(25, 25), entry(25, 25)]; + let state = EagerState::new_complete(stream::iter(items)); + + let (is_partial, entries) = collect_eager_state(Some(state), 99, std::convert::identity) + .await + .unwrap(); + + assert!(is_partial, "should be partial when 1 byte over"); + assert_eq!(entries.len(), 1, "only first entry should fit"); + } +} diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index 1f7d7f7f5d..b6873245fb 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -13,7 +13,7 @@ use std::convert::Infallible; use std::time::Duration; use bytes::Bytes; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{Stream, StreamExt, stream}; use http::uri::PathAndQuery; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use http_body::Frame; @@ -48,7 +48,7 @@ use crate::Notification; use crate::error::{InvocationErrorRelatedEntry, InvokerError, SdkInvocationError}; use crate::invocation_task::{ InvocationTask, InvocationTaskOutputInner, InvokerBodyStream, InvokerRequestStreamSender, - ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, + ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, collect_eager_state, invocation_id_to_header_value, service_protocol_version_to_header_value, }; @@ -484,19 +484,13 @@ where S: Stream> + Send, E: std::error::Error + Send + Sync + 'static, { - // Collect state if present, mapping to StateEntry while collecting - let (partial_state, state_map) = if let Some(state) = state { - let is_partial = state.is_partial(); - let entries: Vec = state - .into_inner() - .map_ok(|(key, value)| StateEntry { key, value }) - .try_collect() - .await - .map_err(|e| InvokerError::StateReader(e.into()))?; - (is_partial, entries) - } else { - (true, Vec::new()) - }; + // Collect state entries with size limit + let (partial_state, state_map) = collect_eager_state( + state, + self.invocation_task.eager_state_size_limit, + |(key, value)| StateEntry { key, value }, + ) + .await?; // Send the invoke frame self.write( diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 74e4e930b6..ef2f69956f 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -18,7 +18,7 @@ use std::time::Duration; use bytes::Bytes; use bytestring::ByteString; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{Stream, StreamExt, stream}; use gardal::futures::StreamExt as GardalStreamExt; use http::uri::PathAndQuery; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; @@ -64,7 +64,7 @@ use crate::error::{ }; use crate::invocation_task::{ InvocationTask, InvocationTaskOutputInner, InvokerBodyStream, InvokerRequestStreamSender, - ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, + ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, collect_eager_state, invocation_id_to_header_value, retry_after, service_protocol_version_to_header_value, }; @@ -599,19 +599,13 @@ where S: Stream> + Send, E: std::error::Error + Send + Sync + 'static, { - // Collect state if present, mapping to StateEntry while collecting - let (partial_state, state_map) = if let Some(state) = state { - let is_partial = state.is_partial(); - let entries: Vec = state - .into_inner() - .map_ok(|(key, value)| StateEntry { key, value }) - .try_collect() - .await - .map_err(|e| InvokerError::StateReader(e.into()))?; - (is_partial, entries) - } else { - (true, Vec::new()) - }; + // Collect state entries with size limit + let (partial_state, state_map) = collect_eager_state( + state, + self.invocation_task.eager_state_size_limit, + |(key, value)| StateEntry { key, value }, + ) + .await?; // Send the invoke frame self.write( diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 66957a5cb1..1361e7e4a3 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -145,7 +145,7 @@ where invocation_target, opts.inactivity_timeout.into(), opts.abort_timeout.into(), - opts.disable_eager_state, + opts.eager_state_size_limit(), opts.message_size_warning.as_non_zero_usize(), opts.message_size_limit(), retry_count_since_last_stored_entry, diff --git a/crates/invoker-impl/src/metric_definitions.rs b/crates/invoker-impl/src/metric_definitions.rs index 358f406d9f..24339920dc 100644 --- a/crates/invoker-impl/src/metric_definitions.rs +++ b/crates/invoker-impl/src/metric_definitions.rs @@ -69,6 +69,7 @@ pub const INVOKER_CONCURRENCY_SLOTS_ACQUIRED: &str = "restate.invoker.concurrenc pub const INVOKER_CONCURRENCY_SLOTS_RELEASED: &str = "restate.invoker.concurrency_slots.released"; pub const INVOKER_CONCURRENCY_LIMIT: &str = "restate.invoker.concurrency_limit"; pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task_duration.seconds"; +pub const INVOKER_EAGER_STATE_TRUNCATED: &str = "restate.invoker.eager_state_truncated.total"; pub const TASK_OP_STARTED: &str = "started"; pub const TASK_OP_SUSPENDED: &str = "suspended"; @@ -111,4 +112,10 @@ pub(crate) fn describe_metrics() { Unit::Seconds, "Time taken to complete an invoker task" ); + + describe_counter!( + INVOKER_EAGER_STATE_TRUNCATED, + Unit::Count, + "Number of invocations where eager state was truncated due to size limit" + ); } diff --git a/crates/serde-util/src/byte_count.rs b/crates/serde-util/src/byte_count.rs index 63e1379004..33f2e45ae4 100644 --- a/crates/serde-util/src/byte_count.rs +++ b/crates/serde-util/src/byte_count.rs @@ -221,6 +221,12 @@ impl From> for u64 { } } +impl From> for ByteCount { + fn from(value: ByteCount) -> Self { + Self(value.0) + } +} + impl Serialize for ByteCount { fn serialize(&self, serializer: S) -> Result where diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 8654f1dc57..2b43c2466e 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -318,7 +318,22 @@ pub struct InvokerOptions { /// Number of concurrent invocations that can be processed by the invoker. concurrent_invocations_limit: Option, + /// # Eager state size limit (since v1.6.3) + /// + /// Maximum total size (in bytes) of state entries to send eagerly in the StartMessage. + /// When the total size of state entries exceeds this limit, only a partial state is sent + /// and the service will fetch remaining state lazily using GetEagerState commands. + /// + /// Set to `0` to disable eager state entirely (equivalent to enabling lazy state). + /// + /// This helps reduce memory pressure on deployments for services with large state. + /// If unset, defaults to `message-size-limit` (clamped to that value if set higher). + #[serde(skip_serializing_if = "Option::is_none")] + eager_state_size_limit: Option, + // -- Private config options (not exposed in the schema) + /// Deprecated since v1.6.3: Use `eager_state_size_limit` with a value of `0` instead. + /// When true, treated as `eager_state_size_limit = 0` (no eager state). #[cfg_attr(feature = "schemars", schemars(skip))] #[serde(skip_serializing_if = "std::ops::Not::not", default)] pub disable_eager_state: bool, @@ -380,12 +395,40 @@ impl InvokerOptions { } } + /// Resolved eager state size limit in bytes. After `merge()`, this is guaranteed + /// to be clamped to the message size limit. `0` means eager state is disabled. + pub fn eager_state_size_limit(&self) -> usize { + self.eager_state_size_limit + .map(|v| v.as_usize()) + .unwrap_or(self.message_size_limit().get()) + } + pub(crate) fn merge(&mut self, opts: &NetworkingOptions) { self.message_size_limit = Some( self.message_size_limit .map(|limit| limit.min(opts.message_size_limit)) .unwrap_or(opts.message_size_limit), ); + + // Fuse deprecated disable_eager_state into eager_state_size_limit + if self.disable_eager_state { + if self.eager_state_size_limit.is_some_and(|v| v.as_u64() > 0) { + warn!( + "Both 'disable-eager-state' and 'eager-state-size-limit' are set; \ + 'eager-state-size-limit' takes precedence. \ + 'disable-eager-state' is deprecated, use 'eager-state-size-limit = \"0\"' instead." + ); + } else if self.eager_state_size_limit.is_none() { + self.eager_state_size_limit = Some(ByteCount::ZERO); + } + } + + // Clamp eager_state_size_limit to the resolved message_size_limit + self.eager_state_size_limit = Some( + self.eager_state_size_limit + .map(|limit| limit.min(opts.message_size_limit.into())) + .unwrap_or(opts.message_size_limit.into()), + ); } } @@ -401,6 +444,7 @@ impl Default for InvokerOptions { message_size_limit: None, tmp_dir: None, concurrent_invocations_limit: Some(NonZeroUsize::new(1000).expect("is non zero")), + eager_state_size_limit: None, disable_eager_state: false, invocation_throttling: None, action_throttling: None, diff --git a/crates/types/src/schema/invocation_target.rs b/crates/types/src/schema/invocation_target.rs index 433836b308..8919436656 100644 --- a/crates/types/src/schema/invocation_target.rs +++ b/crates/types/src/schema/invocation_target.rs @@ -8,19 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::invocation::{ - InvocationRetention, InvocationTargetType, ServiceType, WorkflowHandlerType, -}; +use std::str::FromStr; +use std::time::Duration; +use std::{cmp, fmt}; -use crate::identifiers::DeploymentId; -use crate::retries::RetryIter; use bytes::Bytes; use bytestring::ByteString; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use std::str::FromStr; -use std::time::Duration; -use std::{cmp, fmt}; + +use restate_serde_util::ByteCount; + +use crate::identifiers::DeploymentId; +use crate::invocation::{ + InvocationRetention, InvocationTargetType, ServiceType, WorkflowHandlerType, +}; +use crate::retries::RetryIter; pub const DEFAULT_IDEMPOTENCY_RETENTION: Duration = Duration::from_secs(60 * 60 * 24); pub const DEFAULT_WORKFLOW_COMPLETION_RETENTION: Duration = Duration::from_secs(60 * 60 * 24); @@ -123,7 +126,10 @@ impl InvocationTargetMetadata { pub struct InvocationAttemptOptions { pub abort_timeout: Option, pub inactivity_timeout: Option, - pub enable_lazy_state: Option, + /// Per-handler/service override for the eager state size limit. + /// `Some(ByteCount::ZERO)` means no eager state (equivalent to lazy state). + /// `None` means no per-handler override (use server default). + pub eager_state_size_limit: Option, } // --- Input rules diff --git a/crates/types/src/schema/metadata/mod.rs b/crates/types/src/schema/metadata/mod.rs index a14071db87..3c1d673c6e 100644 --- a/crates/types/src/schema/metadata/mod.rs +++ b/crates/types/src/schema/metadata/mod.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use serde_with::serde_as; -use restate_serde_util::MapAsVecItem; +use restate_serde_util::{ByteCount, MapAsVecItem}; use restate_time_util::FriendlyDuration; use crate::config::{Configuration, InvocationRetryPolicyOptions}; @@ -750,14 +750,21 @@ impl InvocationTargetResolver for Schema { let service_revision = deployment.services.get(service_name)?; let handler = service_revision.handlers.get(handler_name)?; + let enable_lazy_state = handler + .enable_lazy_state + .or(service_revision.enable_lazy_state); + let eager_state_size_limit = if enable_lazy_state == Some(true) { + Some(ByteCount::ZERO) + } else { + None + }; + Some(InvocationAttemptOptions { abort_timeout: handler.abort_timeout.or(service_revision.abort_timeout), inactivity_timeout: handler .inactivity_timeout .or(service_revision.inactivity_timeout), - enable_lazy_state: handler - .enable_lazy_state - .or(service_revision.enable_lazy_state), + eager_state_size_limit, }) } diff --git a/crates/types/src/schema/metadata/serde_hacks.rs b/crates/types/src/schema/metadata/serde_hacks.rs index 5c3bf0d141..1a79ed453e 100644 --- a/crates/types/src/schema/metadata/serde_hacks.rs +++ b/crates/types/src/schema/metadata/serde_hacks.rs @@ -504,6 +504,7 @@ mod conversions { abort_timeout: handler.abort_timeout, documentation: handler.documentation, enable_lazy_state: handler.enable_lazy_state, + metadata: handler.metadata, retry_policy_initial_interval: None, retry_policy_exponentiation_factor: None, @@ -528,6 +529,7 @@ mod conversions { inactivity_timeout: service.inactivity_timeout, abort_timeout: service.abort_timeout, enable_lazy_state: service.enable_lazy_state, + retry_policy_initial_interval: None, retry_policy_exponentiation_factor: None, retry_policy_max_attempts: None, @@ -616,6 +618,7 @@ mod conversions { inactivity_timeout: handler.inactivity_timeout, abort_timeout: handler.abort_timeout, enable_lazy_state: handler.enable_lazy_state, + public: handler.public.unwrap_or(service.public), input_description: handler.input_rules.to_string(), output_description: handler.output_rules.to_string(), @@ -723,6 +726,7 @@ mod conversions { inactivity_timeout: handler.inactivity_timeout, abort_timeout: handler.abort_timeout, enable_lazy_state: handler.enable_lazy_state, + documentation: handler.documentation.clone(), metadata: handler.metadata.clone(), }, @@ -745,6 +749,7 @@ mod conversions { inactivity_timeout: service.inactivity_timeout, abort_timeout: service.abort_timeout, enable_lazy_state: service.enable_lazy_state, + documentation: service.documentation.clone(), metadata: service.metadata.clone(), }, diff --git a/crates/types/src/schema/metadata/updater/tests.rs b/crates/types/src/schema/metadata/updater/tests.rs index 0a6f53a859..577da9b6b6 100644 --- a/crates/types/src/schema/metadata/updater/tests.rs +++ b/crates/types/src/schema/metadata/updater/tests.rs @@ -2217,7 +2217,7 @@ mod endpoint_manifest_options_propagation { eq(InvocationAttemptOptions { abort_timeout: Some(Duration::from_secs(120)), inactivity_timeout: Some(Duration::from_secs(60)), - enable_lazy_state: None, + eager_state_size_limit: None, }) ) } @@ -2242,7 +2242,7 @@ mod endpoint_manifest_options_propagation { eq(InvocationAttemptOptions { abort_timeout: Some(Duration::from_secs(120)), inactivity_timeout: Some(Duration::from_secs(30)), - enable_lazy_state: None, + eager_state_size_limit: None, }) ) } diff --git a/crates/types/src/schema/service.rs b/crates/types/src/schema/service.rs index c184e5b347..4296fc3054 100644 --- a/crates/types/src/schema/service.rs +++ b/crates/types/src/schema/service.rs @@ -557,6 +557,7 @@ pub mod test_util { inactivity_timeout: None, abort_timeout: None, enable_lazy_state: None, + public: true, input_description: "any".to_string(), output_description: "any".to_string(), diff --git a/release-notes/unreleased/4345-eager-state-size-limit.md b/release-notes/unreleased/4345-eager-state-size-limit.md new file mode 100644 index 0000000000..f3e5c199b2 --- /dev/null +++ b/release-notes/unreleased/4345-eager-state-size-limit.md @@ -0,0 +1,45 @@ +# Release Notes for Issue #4345: Eager state size limit + +## New Feature / Deprecation + +### What Changed + +1. **Eager state size limit**: Added a new configuration option `eager-state-size-limit` to control + the amount of state sent eagerly to service endpoints in the StartMessage. + +2. **Deprecated `disable_eager_state`**: The private config option `disable_eager_state` is deprecated + in favor of `eager-state-size-limit = "0"`. Existing configs using `disable_eager_state: true` + continue to work and are internally treated as `eager-state-size-limit = "0"`. + +### Why This Matters + +When eager state is enabled (the default for Virtual Objects and Workflows), the server sends all +state entries to the service endpoint in the StartMessage. For services with large state, this forces +the deployment to hold the entire state in memory, which can be problematic for the service endpoint. + +This new option allows operators to cap the eager state size. When exceeded, the server sends partial +state and the service fetches remaining keys lazily on demand, reducing memory pressure on the +deployment. + +### Configuration + +```toml +[worker.invoker] +# Maximum total size of state entries to send eagerly. +# Set to "0" to disable eager state entirely (equivalent to lazy state). +# Defaults to message-size-limit if unset, and is always clamped to message-size-limit. +eager-state-size-limit = "10MB" +``` + +### Impact on Users + +- **Default behavior unchanged**: If unset, eager state is capped at the message size limit (existing behavior for most users) +- **When limit is set**: Services with state exceeding the limit will receive partial state and + use `GetEagerState`/`GetEagerStateKeys` commands to fetch remaining keys lazily +- **SDK compatibility**: All SDKs already support partial state — no changes required +- **`disable_eager_state` users**: The option still works but is deprecated. Migrate to + `eager-state-size-limit = "0"` for the same behavior. + +### Related Issues + +- #4344 - Stream state entries to service endpoints (longer-term solution)