diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 52f2b1597e..a803762476 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -82,6 +82,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue = const SERVICE_PROTOCOL_VERSION_V6: HeaderValue = HeaderValue::from_static("application/vnd.restate.invocation.v6"); +#[allow(clippy::declare_interior_mutable_const)] +const SERVICE_PROTOCOL_VERSION_V7: HeaderValue = + HeaderValue::from_static("application/vnd.restate.invocation.v7"); + #[allow(clippy::declare_interior_mutable_const)] const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server"); @@ -517,6 +521,7 @@ fn service_protocol_version_to_header_value( ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4, ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5, ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6, + ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7, } } diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 1e18d9004b..664381120e 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -23,7 +23,9 @@ use gardal::futures::StreamExt as GardalStreamExt; use http::uri::PathAndQuery; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use http_body::Frame; +use metrics::counter; use opentelemetry::trace::TraceFlags; +use restate_serde_util::ByteCount; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, trace, warn}; @@ -67,6 +69,7 @@ use crate::invocation_task::{ ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, collect_eager_state, invocation_id_to_header_value, retry_after, service_protocol_version_to_header_value, }; +use crate::metric_definitions::INVOKER_EAGER_STATE_TRUNCATED; /// Provides the value of the invocation id const INVOCATION_ID_HEADER_NAME: HeaderName = HeaderName::from_static("x-restate-invocation-id"); @@ -617,6 +620,43 @@ where duration_since_last_stored_entry: Duration, random_seed: u64, ) -> Result<(), InvokerError> + where + S: Stream> + Send, + E: std::error::Error + Send + Sync + 'static, + { + if self.service_protocol_version >= ServiceProtocolVersion::V7 { + self.write_start_v7( + http_stream_tx, + journal_size, + state, + retry_count_since_last_stored_entry, + duration_since_last_stored_entry, + random_seed, + ) + .await + } else { + self.write_start_legacy( + http_stream_tx, + journal_size, + state, + retry_count_since_last_stored_entry, + duration_since_last_stored_entry, + random_seed, + ) + .await + } + } + + /// V1-V6: Collect all state into a single StartMessage. + async fn write_start_legacy( + &mut self, + http_stream_tx: &mut InvokerRequestStreamSender, + journal_size: u32, + state: Option>, + retry_count_since_last_stored_entry: u32, + duration_since_last_stored_entry: Duration, + random_seed: u64, + ) -> Result<(), InvokerError> where S: Stream> + Send, E: std::error::Error + Send + Sync + 'static, @@ -629,7 +669,6 @@ where ) .await?; - // Send the invoke frame self.write( http_stream_tx, Message::new_start_message( @@ -650,6 +689,111 @@ where .await } + /// V7+: Stream state entries in batches after StartMessage, terminated by + /// EagerStateCompleteMessage. + async fn write_start_v7( + &mut self, + http_stream_tx: &mut InvokerRequestStreamSender, + journal_size: u32, + state: Option>, + retry_count_since_last_stored_entry: u32, + duration_since_last_stored_entry: Duration, + random_seed: u64, + ) -> Result<(), InvokerError> + where + S: Stream> + Send, + E: std::error::Error + Send + Sync + 'static, + { + /// Target byte size per EagerStateEntryMessage batch. Each batch stays well + /// under the default max message size limit while being large enough to + /// amortize per-message overhead. + const EAGER_STATE_BATCH_BYTE_TARGET: usize = 1024 * 1024; // 1 MB + + let eager_state_size_limit = self.invocation_task.eager_state_size_limit; + + // Send StartMessage with empty state_map -- state is streamed separately + self.write( + http_stream_tx, + Message::new_start_message( + Bytes::copy_from_slice(&self.invocation_task.invocation_id.to_bytes()), + self.invocation_task.invocation_id.to_string(), + self.invocation_task + .invocation_target + .key() + .map(|bs| bs.as_bytes().clone()), + journal_size, + false, // partial_state on StartMessage is irrelevant for V7 + vec![], // empty -- state streamed via EagerStateEntryMessage + retry_count_since_last_stored_entry, + duration_since_last_stored_entry, + random_seed, + ), + ) + .await?; + + // Stream state entries in batches, respecting the overall size limit + let partial_state = if let Some(state) = state { + let mut is_partial = state.is_partial(); + let mut stream = std::pin::pin!(state.into_inner()); + let mut batch = Vec::new(); + let mut batch_bytes = 0usize; + let mut total_bytes = 0usize; + + while let Some(result) = stream.next().await { + let (key, value) = result.map_err(|e| InvokerError::StateReader(e.into()))?; + let entry_size = key.len() + value.len(); + + // Check if adding this entry would exceed the overall size limit + if total_bytes.saturating_add(entry_size) > eager_state_size_limit { + debug!( + "Eager state size limit reached ({}, limit: {}), \ + sending partial state", + ByteCount::from(total_bytes), + ByteCount::from(eager_state_size_limit), + ); + counter!(INVOKER_EAGER_STATE_TRUNCATED).increment(1); + is_partial = true; + break; + } + + total_bytes = total_bytes.saturating_add(entry_size); + batch_bytes += entry_size; + batch.push(StateEntry { key, value }); + + if batch_bytes >= EAGER_STATE_BATCH_BYTE_TARGET { + self.write( + http_stream_tx, + Message::EagerStateEntry(proto::EagerStateEntryMessage { + state_map: std::mem::take(&mut batch), + }), + ) + .await?; + batch_bytes = 0; + } + } + + // Flush remaining entries + if !batch.is_empty() { + self.write( + http_stream_tx, + Message::EagerStateEntry(proto::EagerStateEntryMessage { state_map: batch }), + ) + .await?; + } + + is_partial + } else { + true // no state available -- partial + }; + + // Send terminator + self.write( + http_stream_tx, + Message::EagerStateComplete(proto::EagerStateCompleteMessage { partial_state }), + ) + .await + } + async fn write_entry( &mut self, http_stream_tx: &mut InvokerRequestStreamSender, @@ -808,6 +952,13 @@ where Message::Suspension(suspension) => self.handle_suspension_message(suspension), Message::Error(e) => self.handle_error_message(e), Message::End(_) => TerminalLoopState::Closed, + // Server-to-SDK only; receiving from SDK is a protocol error + Message::EagerStateEntry(_) => TerminalLoopState::Failed( + InvokerError::UnexpectedMessageV4(MessageType::EagerStateEntry), + ), + Message::EagerStateComplete(_) => TerminalLoopState::Failed( + InvokerError::UnexpectedMessageV4(MessageType::EagerStateComplete), + ), // Run completion proposal Message::ProposeRunCompletion(run_completion) => { diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index c62a80be5a..1d38184b1f 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -334,6 +334,8 @@ gen_message!( End Control = 0x0003, CommandAck Control = 0x0004, ProposeRunCompletion Control = 0x0005, + EagerStateEntry Control = 0x0006, + EagerStateComplete Control = 0x0007, Input Command noparse allows_ack = 0x0400, Output Command noparse allows_ack = 0x0401, diff --git a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto index 42ac5e6363..f531376596 100644 --- a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto +++ b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto @@ -37,6 +37,11 @@ enum ServiceProtocolVersion { // * StartMessage.random_seed // * Failure.metadata V6 = 6; + // Added: + // * EagerStateEntryMessage and EagerStateCompleteMessage for streaming state + // entries after the StartMessage, allowing state sizes to exceed the + // maximum message size limit. + V7 = 7; } // --- Core frames --- diff --git a/crates/types/src/service_protocol.rs b/crates/types/src/service_protocol.rs index 8cba91e98b..5c68e51108 100644 --- a/crates/types/src/service_protocol.rs +++ b/crates/types/src/service_protocol.rs @@ -16,7 +16,7 @@ use std::ops::RangeInclusive; pub const MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = ServiceProtocolVersion::V1; pub const MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = - ServiceProtocolVersion::V6; + ServiceProtocolVersion::V7; pub const MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = ServiceProtocolVersion::V5; diff --git a/release-notes/unreleased/4344-streaming-eager-state.md b/release-notes/unreleased/4344-streaming-eager-state.md new file mode 100644 index 0000000000..60d190957d --- /dev/null +++ b/release-notes/unreleased/4344-streaming-eager-state.md @@ -0,0 +1,27 @@ +# Release Notes for Issue #4344: Streaming Eager State (Protocol V7) + +## New Feature + +### What Changed +Added service protocol V7, which introduces streaming of eager state entries +to service endpoints. Instead of packing all state key-value pairs into a +single StartMessage (which could exceed the message size limit), the server +now streams state entries in batches via new `EagerStateEntryMessage` and +`EagerStateCompleteMessage` protocol messages. + +### Why This Matters +Previously, services with large accumulated state (exceeding the ~32 MB +default message size limit) would fail to invoke because the state couldn't +fit in a single StartMessage. With V7, state of any total size can be +transferred, as long as each individual key-value pair fits within the +message size limit. + +### Impact on Users +- This is an opt-in feature: V7 is only negotiated when both the server + and SDK support it. Existing SDK versions continue to use V1-V6 without + any change in behavior. +- SDKs must be updated to handle V7 to benefit from streaming state. +- No configuration changes required on the server side. + +### Related Issues +- Issue #4344: State can exceed max message size when invoking services diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index d9423d6e23..e0e279ff16 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -37,6 +37,11 @@ enum ServiceProtocolVersion { // * StartMessage.random_seed // * Failure.metadata V6 = 6; + // Added: + // * EagerStateEntryMessage and EagerStateCompleteMessage for streaming state + // entries after the StartMessage, allowing state sizes to exceed the + // maximum message size limit. + V7 = 7; } // --- Core frames --- @@ -143,6 +148,28 @@ message ProposeRunCompletionMessage { }; } +// Type: 0x0000 + 6 +// Carries a batch of state entries to the SDK during the state transfer +// phase between StartMessage and journal replay entries. +// May be sent zero or more times. Only valid in V7+. +message EagerStateEntryMessage { + // protolint:disable:next REPEATED_FIELD_NAMES_PLURALIZED + repeated StartMessage.StateEntry state_map = 1; +} + +// Type: 0x0000 + 7 +// Signals the end of the eager state transfer phase. +// Sent exactly once, after the last EagerStateEntryMessage (or +// immediately after StartMessage if no entries are streamed). +// Only valid in V7+. +message EagerStateCompleteMessage { + // When true, the combined state from StartMessage.state_map and all + // preceding EagerStateEntryMessages is only a partial view. + // The SDK must use lazy state commands for missing keys. + bool partial_state = 1; +} + + // --- Commands and Notifications --- // The Journal is modelled as commands and notifications.