Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ jobs:
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
envVars: |
RESTATE_experimental_allow_protocol_v7=true
RESTATE_experimental_enable_protocol_v7=true
testArtifactOutput: sdk-typescript-protocol-v7
sdkRepoRef: 'new-shared-core'

Expand Down
8 changes: 7 additions & 1 deletion crates/admin-rest-model/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::{borrow::Cow, collections::HashMap, ops::RangeInclusive};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -75,6 +75,12 @@ pub struct VersionInformation {
///
/// Ingress endpoint that the Web UI should use to interact with.
pub ingress_endpoint: Option<AdvertisedAddress<HttpIngressPort>>,

/// # Restate experimental features
///
/// List experimental features with their
/// enabled state.
pub features: HashMap<Cow<'static, str>, bool>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/admin/src/rest_api/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ pub async fn version() -> Json<VersionInformation> {
.ingress
.advertised_address(tc.address_book())
})),
features: Configuration::pinned().common.experimental.features(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect that this feature map won't solely be created based on the Configuration as we might be rolling back from v1.8 where we ran the auto migration to enable vqueues. Once the migration has ran, v1.7 needs to use them as there is no way going back.

Since the migration will most likely run on a per partition basis and one might even end up with a partially migrated cluster when rolling back, I think that we are going to use the NodesConfiguration to signal to all nodes whether vqueues should be used or not. Details to be figured out.

})
}
7 changes: 6 additions & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ where
};

// Scoped invocations require vqueues to be enabled
if scope.is_some() && !Configuration::pinned().common.experimental_enable_vqueues {
if scope.is_some()
&& !Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
return Err(HandlerError::ScopeRequiresVQueues);
}

Expand Down
3 changes: 2 additions & 1 deletion crates/ingress-kafka/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ impl InvocationBuilder {
if invocation_target.scope().is_some()
&& !restate_types::config::Configuration::pinned()
.common
.experimental_enable_vqueues
.experimental
.is_vqueues_enabled()
{
bail!("Scoped invocations require experimental vqueues to be enabled");
}
Expand Down
8 changes: 5 additions & 3 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ impl<StorageReader, TEntryEnricher, Schemas> Service<StorageReader, TEntryEnrich
action_token_bucket,
allow_protocol_v7: Configuration::pinned()
.common
.experimental_allow_protocol_v7,
.experimental
.is_protocol_v7_enabled(),
},
schemas,
invocation_tasks: Default::default(),
Expand Down Expand Up @@ -1302,7 +1303,8 @@ where
// their memory.
if Configuration::pinned()
.common
.experimental_enable_invoker_yield
.experimental
.is_invoker_yield_enabled()
{
debug!(
restate.invocation.target = %ism.invocation_target,
Expand Down Expand Up @@ -3062,7 +3064,7 @@ mod tests {
async fn yield_flag_enabled_sends_yield_effect() {
// Enable the experimental yield flag
let mut config = Configuration::default();
config.common.experimental_enable_invoker_yield = true;
config.common.experimental.set_invoker_yield(true);
restate_types::config::set_current_config(config);

let invoker_options = InvokerOptionsBuilder::default()
Expand Down
120 changes: 100 additions & 20 deletions crates/types/src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::LazyLock;
use std::time::Duration;

use enumset::EnumSet;
use paste::paste;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

Expand Down Expand Up @@ -463,20 +464,6 @@ pub struct CommonOptions {
#[serde(flatten)]
pub gossip: GossipOptions,

/// Current in heavy development, do not enable this feature unless you are a contributor
#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
pub experimental_enable_vqueues: bool,

/// When enabled, invocations that exhaust their memory budget will yield back to
/// the scheduler instead of consuming retry attempts. Requires all nodes in the
/// cluster to be running v1.7.0 or later because it introduces a new WAL variant.
///
/// Since v1.6.3
#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
pub experimental_enable_invoker_yield: bool,

/// # HLC maximum drift
///
/// Restate uses an internal hybrid-logical-clock (HLC) to track causality between
Expand All @@ -494,6 +481,103 @@ pub struct CommonOptions {
#[serde(default)]
hlc_max_drift: FriendlyDuration,

#[serde(flatten)]
pub experimental: Experimental,
}

/// Declares the [`Experimental`] feature-flag struct from a list of feature names.
///
/// Each entry is a bare identifier (optionally preceded by doc comments / attributes) inside
/// `experimental! { ... }`. For a feature `foo` the macro generates:
/// - a `experimental_enable_foo: bool` field on [`Experimental`] — this is the on-disk /
/// JSON-schema name, so the configuration schema always exposes flags as
/// `experimental_enable_<feature>`;
/// - `Experimental::is_foo_enabled()` and `Experimental::set_foo(enable)` accessors;
/// - an entry in [`Experimental::features`] keyed on the bare name `"foo"` (without the
/// `experimental_enable_` prefix), which is what is surfaced through the admin `/version` API.
///
/// Adding a new experimental flag is therefore a one-line change at the invocation site below:
/// no other code needs to be touched for the flag to show up in `/version`.
macro_rules! experimental {
(@gen_struct [] -> [$($body:tt)*]) => {
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(default))]
#[serde(rename_all = "kebab-case")]
pub struct Experimental {
$($body)*
}
};
(@gen_struct [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($body:tt)*]) => {
paste!{
experimental!(@gen_struct [$($($tail)*)?] -> [
$($body)*

$(#[$($attrss)*])*
#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
[<experimental_enable_ $feat>]: bool,
]);
}
};
(@gen_features [] -> [$($field:ident)*]) => {
impl Experimental {
pub fn features(&self) -> std::collections::HashMap<std::borrow::Cow<'static, str>, bool> {
let mut map = std::collections::HashMap::default();
$(
paste!{
map.insert(std::borrow::Cow::Borrowed(stringify!($field)), self.[<experimental_enable_ $field>]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the key of map be &'static str?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I was going for at first. But unfortunately this won't work with the VersionInformation struct which must implement serde::Deserialize. To avoid copying strings I used Cow instead.

}
)*
map
}
}
};
(@gen_features [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($acc:ident)*]) => {
experimental!(@gen_features [$($($tail)*)?] -> [$($acc)* $feat]);
};
(@gen_getters [] -> [$($field:ident)*]) => {
impl Experimental {
$(
paste!{
pub fn [<is_ $field _enabled>](&self) -> bool {
self.[<experimental_enable_ $field>]
}

pub fn [<set_ $field>](&mut self, enable: bool) {
self.[<experimental_enable_ $field>] = enable;
}
}
)*
}
};
(@gen_getters [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($acc:ident)*]) => {
experimental!(@gen_getters [$($($tail)*)?] -> [$($acc)* $feat]);
};


{$($tokens:tt)*} => {
experimental!(@gen_struct [$($tokens)*] -> []);
experimental!(@gen_features [$($tokens)*] -> []);
experimental!(@gen_getters [$($tokens)*] -> []);
};
}

// List of experimental features. Add a new identifier below to introduce a flag; the
// `experimental!` macro will generate the `experimental_enable_<name>` config field, the
// `is_<name>_enabled()` / `set_<name>()` accessors, and the entry exposed (under the bare
// name, without the `experimental_enable_` prefix) by the admin `/version` API.
experimental! {
/// Current in heavy development, do not enable this feature unless you are a contributor
vqueues,

/// When enabled, invocations that exhaust their memory budget will yield back to
/// the scheduler instead of consuming retry attempts. Requires all nodes in the
/// cluster to be running v1.7.0 or later because it introduces a new WAL variant.
///
/// Since v1.7.0
invoker_yield,

/// # Enables service protocol v7
///
/// Introduced in Restate v1.7
Expand All @@ -502,9 +586,7 @@ pub struct CommonOptions {
///
/// Once enabled, you **cannot** rollback back to previous versions
/// where v7 is not supported < v1.7
#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
pub experimental_allow_protocol_v7: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to update ci.yml to set the new env variable to enable the protocol v7.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch! thanks

protocol_v7,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve protocol-v7 flag name compatibility

Keep the protocol-v7 config field backward-compatible: declaring protocol_v7 here makes the macro generate experimental_enable_protocol_v7, but the previous public config key was experimental_allow_protocol_v7. Existing configs that still set the old key will no longer enable v7 after this change (unknown fields are ignored during deserialization), which can silently keep nodes on the old protocol behavior. Add an alias/compat path for the old name or keep this flag explicitly named.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protocol_v7 is an unreleased feature so it's okay to rename the flag

}

serde_with::with_prefix!(pub prefix_tokio_console "tokio_console_");
Expand Down Expand Up @@ -740,10 +822,8 @@ impl Default for CommonOptions {
initialization_timeout: NonZeroFriendlyDuration::from_secs_unchecked(5 * 60),
disable_telemetry: false,
gossip: GossipOptions::default(),
experimental_enable_vqueues: false,
experimental_enable_invoker_yield: false,
hlc_max_drift: FriendlyDuration::from_millis(5000),
experimental_allow_protocol_v7: false,
experimental: Experimental::default(),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,11 @@ impl LeaderState {
ActionEffect::UpsertRuleBook(book) => {
// todo(tillrohrmann) also enable the feature once the partition has been migrated
// to use vqueues and then rolling back to v1.7
if Configuration::pinned().common.experimental_enable_vqueues {
if Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
self.self_proposer
.self_propose(
self.partition_key_range.start(),
Expand Down
2 changes: 1 addition & 1 deletion crates/worker/src/partition/leadership/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ where
})?
.into_guard();

let scheduler_service = if config.common.experimental_enable_vqueues {
let scheduler_service = if config.common.experimental.is_vqueues_enabled() {
let scheduler = SchedulerService::create(
ResourceManager::create(
partition_store.partition_db().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ where
// Invoker paused the invocation, let's record the event, then set the status to paused
debug_if_leader!(ctx.is_leader, "Paused the invocation");

if Configuration::pinned().common.experimental_enable_vqueues {
if Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
// todo: use the new status
let entry_id = EntryId::from(&invocation_id);
let Some(header) = ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ where

metadata.timestamps.update(ctx.record_created_at);

if Configuration::pinned().common.experimental_enable_vqueues {
if Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
ctx.vqueue_move_invocation_to_inbox_stage(&self.invocation_id)
.await?;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ where
.timestamps
.update(ctx.record_created_at);

if Configuration::pinned().common.experimental_enable_vqueues {
if Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
let now = UniqueTimestamp::from_unix_millis_unchecked(ctx.record_created_at);
let entry_id = EntryId::from(&self.invocation_id);
let Some(header) = ctx
Expand Down
Loading
Loading