Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/admin-rest-model/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct UpsertRuleRequest {
/// Free-form description shown in the rule book; not consulted at
/// runtime.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
pub description: Option<String>,
/// Soft-tombstone toggle. `true` parks the rule (the runtime treats
/// it as absent) without removing it.
#[serde(default)]
Expand Down Expand Up @@ -74,12 +74,12 @@ pub struct RuleResponse {
pub pattern: RulePattern<ReString>,
pub limits: UserLimits,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
pub description: Option<String>,
pub disabled: bool,
/// Per-rule version: bumped on runtime-relevant changes.
#[cfg_attr(feature = "schema", schema(value_type = u32))]
pub version: Version,
/// Seconds since UNIX epoch.
/// Millis since UNIX epoch.
pub last_modified_millis_since_epoch: u64,
}

Expand All @@ -88,7 +88,7 @@ impl From<(RulePattern<ReString>, &PersistedRule)> for RuleResponse {
RuleResponse {
pattern,
limits: rule.limits.clone(),
reason: rule.reason.clone(),
description: rule.description.clone(),
disabled: rule.disabled,
version: rule.version,
last_modified_millis_since_epoch: rule.last_modified.as_u64(),
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/rest_api/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn upsert_rules<Metadata, Discovery, Telemetry, Invocations, Transport
entry.pattern.clone(),
RuleChange::Upsert(RuleUpsert {
limits: entry.limits.clone(),
reason: entry.reason.clone(),
description: entry.description.clone(),
disabled: entry.disabled,
precondition: entry.precondition,
}),
Expand Down Expand Up @@ -217,6 +217,6 @@ fn notify_rule_book_observer<Metadata, Discovery, Telemetry, Invocations, Transp
book: RuleBook,
) {
if let Some(observer) = &state.rule_book_observer {
observer(book);
observer.notify_observed(book);
}
}
8 changes: 4 additions & 4 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use axum::error_handling::HandleErrorLayer;
Expand All @@ -23,6 +24,7 @@ use tracing::{Span, debug, info, info_span};
use restate_admin_rest_model::version::AdminApiVersion;
use restate_core::network::{TransportConnect, net_util};
use restate_core::{MetadataWriter, TaskCenter};
use restate_limiter::rule_book::RuleBookObserver;
use restate_metadata_store::MetadataStoreClient;
use restate_service_client::HttpClient;
use restate_service_protocol_v4::discovery::ServiceDiscovery;
Expand All @@ -39,8 +41,6 @@ use crate::rest_api::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION};
use crate::schema_registry_integration::{MetadataService, TelemetryClient};
use crate::{rest_api, state};

pub use crate::state::RuleBookObserver;

#[derive(Debug, thiserror::Error)]
#[error("could not create the service client: {0}")]
pub struct BuildError(#[from] restate_service_client::BuildError);
Expand All @@ -53,7 +53,7 @@ pub struct AdminService<Metadata, Discovery, Telemetry, Invocations, Transport>
invocation_client: Invocations,
query_context: Option<restate_storage_query_datafusion::context::QueryContext>,
metadata_client: MetadataStoreClient,
rule_book_observer: Option<RuleBookObserver>,
rule_book_observer: Option<Arc<dyn RuleBookObserver>>,
}

impl<Invocations, Transport>
Expand Down Expand Up @@ -98,7 +98,7 @@ where
}
}

pub fn with_rule_book_observer(self, observer: RuleBookObserver) -> Self {
pub fn with_rule_book_observer(self, observer: Arc<dyn RuleBookObserver>) -> Self {
Self {
rule_book_observer: Some(observer),
..self
Expand Down
19 changes: 4 additions & 15 deletions crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,15 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use restate_core::network::TransportConnect;
use restate_ingestion_client::IngestionClient;
use restate_limiter::RuleBook;
use restate_limiter::rule_book::RuleBookObserver;
use restate_metadata_store::MetadataStoreClient;
use restate_service_protocol_v4::serdes::SerdesClient;
use restate_storage_query_datafusion::context::QueryContext;
use restate_types::schema::registry::SchemaRegistry;
use restate_wal_protocol::Envelope;

/// Fire-and-forget callback that pushes a freshly written rule book
/// into a co-located worker's `RuleBookCache`. `None` on admin-only
/// nodes; the worker then learns about updates via its metadata-store
/// poll loop instead.
///
/// Takes the book by value: the cache only allocates an `Arc` for it
/// on the newer-version branch, so admin handlers don't have to
/// pre-wrap their result.
pub type RuleBookObserver = Arc<dyn Fn(RuleBook) + Send + Sync>;
use std::sync::Arc;

#[derive(Clone, derive_builder::Builder)]
pub struct AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport> {
Expand All @@ -40,7 +29,7 @@ pub struct AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transp
pub metadata_store_client: MetadataStoreClient,
// Some value if the query endpoint is activated
pub query_context: Option<QueryContext>,
pub rule_book_observer: Option<RuleBookObserver>,
pub rule_book_observer: Option<Arc<dyn RuleBookObserver>>,
}

impl<Metadata, Discovery, Telemetry, Invocations, Transport>
Expand All @@ -55,7 +44,7 @@ where
ingestion_client: IngestionClient<Transport, Envelope>,
metadata_store_client: MetadataStoreClient,
query_context: Option<QueryContext>,
rule_book_observer: Option<RuleBookObserver>,
rule_book_observer: Option<Arc<dyn RuleBookObserver>>,
) -> Self {
Self {
schema_registry,
Expand Down
75 changes: 49 additions & 26 deletions crates/limiter/src/rule_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! Persistent, versioned rule book that backs the in-memory [`crate::Rules`] store.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use restate_clock::time::MillisSinceEpoch;
use restate_types::{Version, Versioned};
Expand All @@ -27,6 +28,14 @@ pub const MAX_RULES_PER_BOOK: usize = 10_000;

/// One persisted rule entry. Keyed by its [`RulePattern`] in the
/// owning [`RuleBook`].
///
/// `version` advances on runtime-relevant changes only (`limits`,
/// `disabled`). Edits to `description` bump the enclosing [`RuleBook::version`].
/// See [`RuleBook::apply_change`] for the full version-bump contract.
///
/// `disabled` (rather than `enabled`) is the field name so the common case
/// — an active rule — corresponds to bilrost's empty state for `bool`
/// (`false`) and gets omitted from the wire.
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)]
pub struct PersistedRule {
/// Per-rule version. Advances on runtime-relevant changes
Expand All @@ -36,10 +45,9 @@ pub struct PersistedRule {
pub version: Version,
#[bilrost(tag(3))]
pub limits: UserLimits,
/// Operator-supplied note explaining why the rule was created or
/// modified.
/// Free-form operator-supplied description for the rule.
#[bilrost(tag(4))]
pub reason: Option<String>,
pub description: Option<String>,
/// Soft tombstone. `true` parks the rule so the runtime treats it
/// as absent without removing the persisted entry.
#[bilrost(tag(5))]
Expand All @@ -50,6 +58,22 @@ pub struct PersistedRule {
pub last_modified: MillisSinceEpoch,
}

/// Fire-and-forget callback that pushes a freshly written rule book
/// into a co-located worker's `RuleBookCache`. `None` on admin-only
/// nodes; the worker then learns about updates via its metadata-store
/// poll loop instead.
///
/// Takes the book by value: the cache only allocates an `Arc` for it
/// on the newer-version branch, so admin handlers don't have to
/// pre-wrap their result.
pub trait RuleBookObserver: Send + Sync {
/// Notify a newly observed rule book.
fn notify_observed(&self, rule_book: RuleBook);

/// Obtain the last known rule book
fn get(&self) -> Arc<RuleBook>;
}

/// The cluster-wide rule book.
///
/// Lives under a single key in the metadata store. The
Expand Down Expand Up @@ -145,7 +169,7 @@ impl RuleBook {
Some(PersistedRule {
version: new_book_version,
limits: upsert.limits,
reason: upsert.reason,
description: upsert.description,
disabled: upsert.disabled,
last_modified: now,
}),
Expand All @@ -155,7 +179,7 @@ impl RuleBook {
let runtime_changed = existing.limits != upsert.limits
|| existing.disabled != upsert.disabled;
let anything_changed =
runtime_changed || existing.reason != upsert.reason;
runtime_changed || existing.description != upsert.description;
if anything_changed {
let (new_version, last_modified) = if runtime_changed {
(existing.version.next(), now)
Expand All @@ -167,7 +191,7 @@ impl RuleBook {
Some(PersistedRule {
version: new_version,
limits: upsert.limits,
reason: upsert.reason,
description: upsert.description,
disabled: upsert.disabled,
last_modified,
}),
Expand Down Expand Up @@ -235,7 +259,7 @@ impl RuleBook {
/// "anything runtime-relevant changed").
/// - Visible in both with equal per-rule `version` emits nothing.
///
/// `last_modified` and `reason` never produce updates on their own.
/// `last_modified` and `description` never produce updates on their own.
pub fn diff(&self, previous: &Self) -> Box<[RuleUpdate]> {
let mut updates = Vec::new();

Expand Down Expand Up @@ -330,7 +354,7 @@ impl RuleBook {
#[derive(Debug, Clone)]
pub struct RuleUpsert {
pub limits: UserLimits,
pub reason: Option<String>,
pub description: Option<String>,
/// Default `false` (rule is active). Set to `true` to write a parked
/// rule that is invisible to the runtime until later toggled.
pub disabled: bool,
Expand Down Expand Up @@ -446,22 +470,21 @@ restate_encoding::bilrost_as_display_from_str!(RulePattern<ReString>);

#[cfg(test)]
mod tests {
use std::num::NonZeroU64;

use bilrost::{Message, OwnedMessage};
use std::num::NonZeroU32;

use super::*;

fn pat(s: &str) -> RulePattern<ReString> {
s.parse().unwrap()
}

fn upsert(concurrency: u64) -> RuleUpsert {
fn upsert(concurrency: u32) -> RuleUpsert {
RuleUpsert {
limits: UserLimits {
action_concurrency: NonZeroU64::new(concurrency),
action_concurrency: NonZeroU32::new(concurrency),
},
reason: None,
description: None,
disabled: false,
precondition: Precondition::None,
}
Expand Down Expand Up @@ -492,9 +515,9 @@ mod tests {
pat("*"),
PersistedRule {
limits: UserLimits {
action_concurrency: NonZeroU64::new(1000),
action_concurrency: NonZeroU32::new(1000),
},
reason: Some("global default".to_owned()),
description: Some("global default".to_owned()),
disabled: false,
last_modified: MillisSinceEpoch::new(42),
version: Version::from(1),
Expand All @@ -504,9 +527,9 @@ mod tests {
pat("scope1/*/tenant1"),
PersistedRule {
limits: UserLimits {
action_concurrency: NonZeroU64::new(10),
action_concurrency: NonZeroU32::new(10),
},
reason: None,
description: None,
disabled: true,
last_modified: MillisSinceEpoch::new(43),
version: Version::from(2),
Expand All @@ -530,7 +553,7 @@ mod tests {
let r = book.get(&pat("*")).unwrap();
assert_eq!(r.version, Version::from(1));
assert!(!r.disabled);
assert_eq!(r.limits.action_concurrency, NonZeroU64::new(1000));
assert_eq!(r.limits.action_concurrency, NonZeroU32::new(1000));
}

#[test]
Expand All @@ -544,7 +567,7 @@ mod tests {
book.apply_change(pat("*"), RuleChange::Upsert(upsert(500)))
.unwrap();
let r = book.get(&pat("*")).unwrap();
assert_eq!(r.limits.action_concurrency, NonZeroU64::new(500));
assert_eq!(r.limits.action_concurrency, NonZeroU32::new(500));
assert_eq!(r.version, v_before_rule.next());
assert_eq!(book.version(), v_before_book.next());
}
Expand All @@ -560,13 +583,13 @@ mod tests {
book.apply_change(
pat("*"),
RuleChange::Upsert(RuleUpsert {
reason: Some("vendor X paused".to_owned()),
description: Some("vendor X paused".to_owned()),
..upsert(1000)
}),
)
.unwrap();
let r = book.get(&pat("*")).unwrap();
assert_eq!(r.reason.as_deref(), Some("vendor X paused"));
assert_eq!(r.description.as_deref(), Some("vendor X paused"));
assert_eq!(r.version, v_before_rule, "per-rule version must NOT bump");
assert_eq!(
book.version(),
Expand Down Expand Up @@ -668,7 +691,7 @@ mod tests {
.unwrap();
assert_eq!(
book.get(&pat("*")).unwrap().limits.action_concurrency,
NonZeroU64::new(500)
NonZeroU32::new(500)
);
}

Expand Down Expand Up @@ -728,7 +751,7 @@ mod tests {
book.apply_change(pat("*"), RuleChange::Upsert(upsert(1000)))
.unwrap();
// bump the rule a few times
for i in 1u64..=3 {
for i in 1u32..=3 {
book.apply_change(pat("*"), RuleChange::Upsert(upsert(1000 + i)))
.unwrap();
}
Expand Down Expand Up @@ -928,7 +951,7 @@ mod tests {
let updates = book.diff(&prev);
assert_eq!(updates_summary(&updates), vec![("*".to_owned(), "upsert")]);
if let RuleUpdate::Upsert { limit, .. } = &updates[0] {
assert_eq!(limit.action_concurrency, NonZeroU64::new(500));
assert_eq!(limit.action_concurrency, NonZeroU32::new(500));
}
}

Expand Down Expand Up @@ -988,15 +1011,15 @@ mod tests {
}

#[test]
fn diff_reason_only_change_emits_nothing() {
fn diff_description_only_change_emits_nothing() {
let mut book = RuleBook::empty();
book.apply_change(pat("*"), RuleChange::Upsert(upsert(1000)))
.unwrap();
let prev = book.clone();
book.apply_change(
pat("*"),
RuleChange::Upsert(RuleUpsert {
reason: Some("audit note".to_owned()),
description: Some("audit note".to_owned()),
..upsert(1000)
}),
)
Expand Down
Loading
Loading