Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue =
const SERVICE_PROTOCOL_VERSION_V6: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v6");

#[allow(clippy::declare_interior_mutable_const)]
const SERVICE_PROTOCOL_VERSION_V7: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v7");

#[allow(clippy::declare_interior_mutable_const)]
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");

Expand Down Expand Up @@ -517,6 +521,7 @@ fn service_protocol_version_to_header_value(
ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4,
ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5,
ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6,
ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7,
}
}

Expand Down
153 changes: 152 additions & 1 deletion crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use gardal::futures::StreamExt as GardalStreamExt;
use http::uri::PathAndQuery;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use http_body::Frame;
use metrics::counter;
use opentelemetry::trace::TraceFlags;
use restate_serde_util::ByteCount;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, trace, warn};
Expand Down Expand Up @@ -67,6 +69,7 @@ use crate::invocation_task::{
ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, collect_eager_state,
invocation_id_to_header_value, retry_after, service_protocol_version_to_header_value,
};
use crate::metric_definitions::INVOKER_EAGER_STATE_TRUNCATED;

/// Provides the value of the invocation id
const INVOCATION_ID_HEADER_NAME: HeaderName = HeaderName::from_static("x-restate-invocation-id");
Expand Down Expand Up @@ -617,6 +620,43 @@ where
duration_since_last_stored_entry: Duration,
random_seed: u64,
) -> Result<(), InvokerError>
where
S: Stream<Item = Result<(Bytes, Bytes), E>> + Send,
E: std::error::Error + Send + Sync + 'static,
{
if self.service_protocol_version >= ServiceProtocolVersion::V7 {
self.write_start_v7(
http_stream_tx,
journal_size,
state,
retry_count_since_last_stored_entry,
duration_since_last_stored_entry,
random_seed,
)
.await
} else {
self.write_start_legacy(
http_stream_tx,
journal_size,
state,
retry_count_since_last_stored_entry,
duration_since_last_stored_entry,
random_seed,
)
.await
}
}

/// V1-V6: Collect all state into a single StartMessage.
async fn write_start_legacy<S, E>(
&mut self,
http_stream_tx: &mut InvokerRequestStreamSender,
journal_size: u32,
state: Option<EagerState<S>>,
retry_count_since_last_stored_entry: u32,
duration_since_last_stored_entry: Duration,
random_seed: u64,
) -> Result<(), InvokerError>
where
S: Stream<Item = Result<(Bytes, Bytes), E>> + Send,
E: std::error::Error + Send + Sync + 'static,
Expand All @@ -629,7 +669,6 @@ where
)
.await?;

// Send the invoke frame
self.write(
http_stream_tx,
Message::new_start_message(
Expand All @@ -650,6 +689,111 @@ where
.await
}

/// V7+: Stream state entries in batches after StartMessage, terminated by
/// EagerStateCompleteMessage.
async fn write_start_v7<S, E>(
&mut self,
http_stream_tx: &mut InvokerRequestStreamSender,
journal_size: u32,
state: Option<EagerState<S>>,
retry_count_since_last_stored_entry: u32,
duration_since_last_stored_entry: Duration,
random_seed: u64,
) -> Result<(), InvokerError>
where
S: Stream<Item = Result<(Bytes, Bytes), E>> + Send,
E: std::error::Error + Send + Sync + 'static,
{
/// Target byte size per EagerStateEntryMessage batch. Each batch stays well
/// under the default max message size limit while being large enough to
/// amortize per-message overhead.
const EAGER_STATE_BATCH_BYTE_TARGET: usize = 1024 * 1024; // 1 MB

let eager_state_size_limit = self.invocation_task.eager_state_size_limit;

// Send StartMessage with empty state_map -- state is streamed separately
self.write(
http_stream_tx,
Message::new_start_message(
Bytes::copy_from_slice(&self.invocation_task.invocation_id.to_bytes()),
self.invocation_task.invocation_id.to_string(),
self.invocation_task
.invocation_target
.key()
.map(|bs| bs.as_bytes().clone()),
journal_size,
false, // partial_state on StartMessage is irrelevant for V7
vec![], // empty -- state streamed via EagerStateEntryMessage
retry_count_since_last_stored_entry,
duration_since_last_stored_entry,
random_seed,
),
)
.await?;

// Stream state entries in batches, respecting the overall size limit
let partial_state = if let Some(state) = state {
let mut is_partial = state.is_partial();
let mut stream = std::pin::pin!(state.into_inner());
let mut batch = Vec::new();
let mut batch_bytes = 0usize;
let mut total_bytes = 0usize;

while let Some(result) = stream.next().await {
let (key, value) = result.map_err(|e| InvokerError::StateReader(e.into()))?;
let entry_size = key.len() + value.len();

// Check if adding this entry would exceed the overall size limit
if total_bytes.saturating_add(entry_size) > eager_state_size_limit {
debug!(
"Eager state size limit reached ({}, limit: {}), \
sending partial state",
ByteCount::from(total_bytes),
ByteCount::from(eager_state_size_limit),
);
counter!(INVOKER_EAGER_STATE_TRUNCATED).increment(1);
is_partial = true;
break;
}

total_bytes = total_bytes.saturating_add(entry_size);
batch_bytes += entry_size;
batch.push(StateEntry { key, value });

if batch_bytes >= EAGER_STATE_BATCH_BYTE_TARGET {
self.write(
http_stream_tx,
Message::EagerStateEntry(proto::EagerStateEntryMessage {
state_map: std::mem::take(&mut batch),
}),
)
.await?;
batch_bytes = 0;
}
}

// Flush remaining entries
if !batch.is_empty() {
self.write(
http_stream_tx,
Message::EagerStateEntry(proto::EagerStateEntryMessage { state_map: batch }),
)
.await?;
}

is_partial
} else {
true // no state available -- partial
};

// Send terminator
self.write(
http_stream_tx,
Message::EagerStateComplete(proto::EagerStateCompleteMessage { partial_state }),
)
.await
}

async fn write_entry(
&mut self,
http_stream_tx: &mut InvokerRequestStreamSender,
Expand Down Expand Up @@ -808,6 +952,13 @@ where
Message::Suspension(suspension) => self.handle_suspension_message(suspension),
Message::Error(e) => self.handle_error_message(e),
Message::End(_) => TerminalLoopState::Closed,
// Server-to-SDK only; receiving from SDK is a protocol error
Message::EagerStateEntry(_) => TerminalLoopState::Failed(
InvokerError::UnexpectedMessageV4(MessageType::EagerStateEntry),
),
Message::EagerStateComplete(_) => TerminalLoopState::Failed(
InvokerError::UnexpectedMessageV4(MessageType::EagerStateComplete),
),

// Run completion proposal
Message::ProposeRunCompletion(run_completion) => {
Expand Down
2 changes: 2 additions & 0 deletions crates/service-protocol-v4/src/message_codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ gen_message!(
End Control = 0x0003,
CommandAck Control = 0x0004,
ProposeRunCompletion Control = 0x0005,
EagerStateEntry Control = 0x0006,
EagerStateComplete Control = 0x0007,

Input Command noparse allows_ack = 0x0400,
Output Command noparse allows_ack = 0x0401,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ enum ServiceProtocolVersion {
// * StartMessage.random_seed
// * Failure.metadata
V6 = 6;
// Added:
// * EagerStateEntryMessage and EagerStateCompleteMessage for streaming state
// entries after the StartMessage, allowing state sizes to exceed the
// maximum message size limit.
V7 = 7;
}

// --- Core frames ---
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/service_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::RangeInclusive;
pub const MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V1;
pub const MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V6;
ServiceProtocolVersion::V7;

pub const MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V5;
Expand Down
27 changes: 27 additions & 0 deletions release-notes/unreleased/4344-streaming-eager-state.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Release Notes for Issue #4344: Streaming Eager State (Protocol V7)

## New Feature

### What Changed
Added service protocol V7, which introduces streaming of eager state entries
to service endpoints. Instead of packing all state key-value pairs into a
single StartMessage (which could exceed the message size limit), the server
now streams state entries in batches via new `EagerStateEntryMessage` and
`EagerStateCompleteMessage` protocol messages.

### Why This Matters
Previously, services with large accumulated state (exceeding the ~32 MB
default message size limit) would fail to invoke because the state couldn't
fit in a single StartMessage. With V7, state of any total size can be
transferred, as long as each individual key-value pair fits within the
message size limit.

### Impact on Users
- This is an opt-in feature: V7 is only negotiated when both the server
and SDK support it. Existing SDK versions continue to use V1-V6 without
any change in behavior.
- SDKs must be updated to handle V7 to benefit from streaming state.
- No configuration changes required on the server side.

### Related Issues
- Issue #4344: State can exceed max message size when invoking services
27 changes: 27 additions & 0 deletions service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ enum ServiceProtocolVersion {
// * StartMessage.random_seed
// * Failure.metadata
V6 = 6;
// Added:
// * EagerStateEntryMessage and EagerStateCompleteMessage for streaming state
// entries after the StartMessage, allowing state sizes to exceed the
// maximum message size limit.
V7 = 7;
}

// --- Core frames ---
Expand Down Expand Up @@ -143,6 +148,28 @@ message ProposeRunCompletionMessage {
};
}

// Type: 0x0000 + 6
// Carries a batch of state entries to the SDK during the state transfer
// phase between StartMessage and journal replay entries.
// May be sent zero or more times. Only valid in V7+.
message EagerStateEntryMessage {
// protolint:disable:next REPEATED_FIELD_NAMES_PLURALIZED
repeated StartMessage.StateEntry state_map = 1;
}

// Type: 0x0000 + 7
// Signals the end of the eager state transfer phase.
// Sent exactly once, after the last EagerStateEntryMessage (or
// immediately after StartMessage if no entries are streamed).
// Only valid in V7+.
message EagerStateCompleteMessage {
// When true, the combined state from StartMessage.state_map and all
// preceding EagerStateEntryMessages is only a partial view.
// The SDK must use lazy state commands for missing keys.
bool partial_state = 1;
}


// --- Commands and Notifications ---

// The Journal is modelled as commands and notifications.
Expand Down
Loading