diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4b06a9c715..08ade56d0e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -244,7 +244,7 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} envVars: | - RESTATE_experimental_allow_protocol_v7=true + RESTATE_experimental_enable_protocol_v7=true testArtifactOutput: sdk-typescript-protocol-v7 sdkRepoRef: 'new-shared-core' diff --git a/crates/admin-rest-model/src/version.rs b/crates/admin-rest-model/src/version.rs index c9981c7906..1e5941ea11 100644 --- a/crates/admin-rest-model/src/version.rs +++ b/crates/admin-rest-model/src/version.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::RangeInclusive; +use std::{borrow::Cow, collections::HashMap, ops::RangeInclusive}; use serde::{Deserialize, Serialize}; @@ -75,6 +75,12 @@ pub struct VersionInformation { /// /// Ingress endpoint that the Web UI should use to interact with. pub ingress_endpoint: Option>, + + /// # Restate experimental features + /// + /// List experimental features with their + /// enabled state. + pub features: HashMap, bool>, } #[cfg(test)] diff --git a/crates/admin/src/rest_api/version.rs b/crates/admin/src/rest_api/version.rs index 618ee68282..2818562bdf 100644 --- a/crates/admin/src/rest_api/version.rs +++ b/crates/admin/src/rest_api/version.rs @@ -39,5 +39,6 @@ pub async fn version() -> Json { .ingress .advertised_address(tc.address_book()) })), + features: Configuration::pinned().common.experimental.features(), }) } diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index 85ddfc844d..beb9057980 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -140,7 +140,12 @@ where }; // Scoped invocations require vqueues to be enabled - if scope.is_some() && !Configuration::pinned().common.experimental_enable_vqueues { + if scope.is_some() + && !Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { return Err(HandlerError::ScopeRequiresVQueues); } diff --git a/crates/ingress-kafka/src/builder.rs b/crates/ingress-kafka/src/builder.rs index 6c65bf3f4e..01bc00ae8b 100644 --- a/crates/ingress-kafka/src/builder.rs +++ b/crates/ingress-kafka/src/builder.rs @@ -250,7 +250,8 @@ impl InvocationBuilder { if invocation_target.scope().is_some() && !restate_types::config::Configuration::pinned() .common - .experimental_enable_vqueues + .experimental + .is_vqueues_enabled() { bail!("Scoped invocations require experimental vqueues to be enabled"); } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index e209ccfa32..97c1321114 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -264,7 +264,8 @@ impl Service`; +/// - `Experimental::is_foo_enabled()` and `Experimental::set_foo(enable)` accessors; +/// - an entry in [`Experimental::features`] keyed on the bare name `"foo"` (without the +/// `experimental_enable_` prefix), which is what is surfaced through the admin `/version` API. +/// +/// Adding a new experimental flag is therefore a one-line change at the invocation site below: +/// no other code needs to be touched for the flag to show up in `/version`. +macro_rules! experimental { + (@gen_struct [] -> [$($body:tt)*]) => { + #[derive(Debug, Clone, Default, Serialize, Deserialize)] + #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] + #[cfg_attr(feature = "schemars", schemars(default))] + #[serde(rename_all = "kebab-case")] + pub struct Experimental { + $($body)* + } + }; + (@gen_struct [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($body:tt)*]) => { + paste!{ + experimental!(@gen_struct [$($($tail)*)?] -> [ + $($body)* + + $(#[$($attrss)*])* + #[cfg_attr(feature = "schemars", schemars(skip))] + #[serde(skip_serializing_if = "std::ops::Not::not", default)] + []: bool, + ]); + } + }; + (@gen_features [] -> [$($field:ident)*]) => { + impl Experimental { + pub fn features(&self) -> std::collections::HashMap, bool> { + let mut map = std::collections::HashMap::default(); + $( + paste!{ + map.insert(std::borrow::Cow::Borrowed(stringify!($field)), self.[]); + } + )* + map + } + } + }; + (@gen_features [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($acc:ident)*]) => { + experimental!(@gen_features [$($($tail)*)?] -> [$($acc)* $feat]); + }; + (@gen_getters [] -> [$($field:ident)*]) => { + impl Experimental { + $( + paste!{ + pub fn [](&self) -> bool { + self.[] + } + + pub fn [](&mut self, enable: bool) { + self.[] = enable; + } + } + )* + } + }; + (@gen_getters [$(#[$($attrss:meta)*])* $feat:ident $(, $($tail:tt)*)?] -> [$($acc:ident)*]) => { + experimental!(@gen_getters [$($($tail)*)?] -> [$($acc)* $feat]); + }; + + + {$($tokens:tt)*} => { + experimental!(@gen_struct [$($tokens)*] -> []); + experimental!(@gen_features [$($tokens)*] -> []); + experimental!(@gen_getters [$($tokens)*] -> []); + }; +} + +// List of experimental features. Add a new identifier below to introduce a flag; the +// `experimental!` macro will generate the `experimental_enable_` config field, the +// `is__enabled()` / `set_()` accessors, and the entry exposed (under the bare +// name, without the `experimental_enable_` prefix) by the admin `/version` API. +experimental! { + /// Current in heavy development, do not enable this feature unless you are a contributor + vqueues, + + /// When enabled, invocations that exhaust their memory budget will yield back to + /// the scheduler instead of consuming retry attempts. Requires all nodes in the + /// cluster to be running v1.7.0 or later because it introduces a new WAL variant. + /// + /// Since v1.7.0 + invoker_yield, + /// # Enables service protocol v7 /// /// Introduced in Restate v1.7 @@ -502,9 +586,7 @@ pub struct CommonOptions { /// /// Once enabled, you **cannot** rollback back to previous versions /// where v7 is not supported < v1.7 - #[cfg_attr(feature = "schemars", schemars(skip))] - #[serde(skip_serializing_if = "std::ops::Not::not", default)] - pub experimental_allow_protocol_v7: bool, + protocol_v7, } serde_with::with_prefix!(pub prefix_tokio_console "tokio_console_"); @@ -740,10 +822,8 @@ impl Default for CommonOptions { initialization_timeout: NonZeroFriendlyDuration::from_secs_unchecked(5 * 60), disable_telemetry: false, gossip: GossipOptions::default(), - experimental_enable_vqueues: false, - experimental_enable_invoker_yield: false, hlc_max_drift: FriendlyDuration::from_millis(5000), - experimental_allow_protocol_v7: false, + experimental: Experimental::default(), } } } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 17b5e9f720..6a6b538df9 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -449,7 +449,11 @@ impl LeaderState { ActionEffect::UpsertRuleBook(book) => { // todo(tillrohrmann) also enable the feature once the partition has been migrated // to use vqueues and then rolling back to v1.7 - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { self.self_proposer .self_propose( self.partition_key_range.start(), diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 14595a262d..55a9fc233e 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -464,7 +464,7 @@ where })? .into_guard(); - let scheduler_service = if config.common.experimental_enable_vqueues { + let scheduler_service = if config.common.experimental.is_vqueues_enabled() { let scheduler = SchedulerService::create( ResourceManager::create( partition_store.partition_db().clone(), diff --git a/crates/worker/src/partition/state_machine/lifecycle/paused.rs b/crates/worker/src/partition/state_machine/lifecycle/paused.rs index 0341212b01..dbf9d8bc61 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/paused.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/paused.rs @@ -65,7 +65,11 @@ where // Invoker paused the invocation, let's record the event, then set the status to paused debug_if_leader!(ctx.is_leader, "Paused the invocation"); - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { // todo: use the new status let entry_id = EntryId::from(&invocation_id); let Some(header) = ctx diff --git a/crates/worker/src/partition/state_machine/lifecycle/resume.rs b/crates/worker/src/partition/state_machine/lifecycle/resume.rs index b4a0d63791..15b095ddfa 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/resume.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/resume.rs @@ -41,7 +41,11 @@ where metadata.timestamps.update(ctx.record_created_at); - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { ctx.vqueue_move_invocation_to_inbox_stage(&self.invocation_id) .await?; } else { diff --git a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs index e0a9ad43dd..14ecf6f46f 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs @@ -109,7 +109,11 @@ where .timestamps .update(ctx.record_created_at); - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { let now = UniqueTimestamp::from_unix_millis_unchecked(ctx.record_created_at); let entry_id = EntryId::from(&self.invocation_id); let Some(header) = ctx diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index e34b6a87ac..b2bd84bedd 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -936,7 +936,11 @@ impl StateMachineApplyContext<'_, S> { }); } - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { // skips the rest of this logic and jumps straight to vqueues' implementation return self .vqueue_enqueue( @@ -1557,7 +1561,11 @@ impl StateMachineApplyContext<'_, S> { + WriteLockTable + ReadVQueueTable, { - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { self.vqueue_enqueue_state_mutation(mutation).await?; } else { let service_status = self @@ -1896,7 +1904,11 @@ impl StateMachineApplyContext<'_, S> { Some(&invocation_target), )?; - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { let record_unique_ts = UniqueTimestamp::from_unix_millis_unchecked(self.record_created_at); let new_status = match termination_flavor { @@ -2008,7 +2020,11 @@ impl StateMachineApplyContext<'_, S> { Some(&invocation_target), )?; - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { let record_unique_ts = UniqueTimestamp::from_unix_millis_unchecked(self.record_created_at); let new_status = match termination_flavor { @@ -2871,7 +2887,11 @@ impl StateMachineApplyContext<'_, S> { .await?; } - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { if invocation_target.invocation_target_ty() == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) { @@ -3235,7 +3255,11 @@ impl StateMachineApplyContext<'_, S> { + WriteJournalTable + journal_table_v2::WriteJournalTable, { - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { return Ok(()); } @@ -4546,7 +4570,11 @@ impl StateMachineApplyContext<'_, S> { .put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata)) .map_err(Error::Storage)?; - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { self.vqueue_move_invocation_to_inbox_stage(&invocation_id) .await?; } else { @@ -4587,7 +4615,11 @@ impl StateMachineApplyContext<'_, S> { metadata.timestamps.update(self.record_created_at); - if Configuration::pinned().common.experimental_enable_vqueues { + if Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { let now = UniqueTimestamp::from_unix_millis_unchecked(self.record_created_at); let entry_id = EntryId::from(&invocation_id); let Some(header) = self