diff --git a/Cargo.lock b/Cargo.lock index 0a55630f..c54e03ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,6 +380,14 @@ dependencies = [ "serde", ] +[[package]] +name = "capability-demo" +version = "0.1.0" +dependencies = [ + "ordo-core", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -2380,6 +2388,7 @@ dependencies = [ "clap", "dashmap", "futures", + "hashbrown 0.14.5", "hex", "hmac", "hostname", diff --git a/Cargo.toml b/Cargo.toml index 54b521c9..9adc4530 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/ordo-server", "crates/ordo-platform", "crates/ordo-wasm", + "examples/capability-demo", ] [workspace.package] diff --git a/crates/ordo-core/src/capability/mod.rs b/crates/ordo-core/src/capability/mod.rs new file mode 100644 index 00000000..8898a762 --- /dev/null +++ b/crates/ordo-core/src/capability/mod.rs @@ -0,0 +1,72 @@ +mod provider; +mod registry; + +pub use provider::{ + CapabilityCategory, CapabilityConfig, CapabilityDescriptor, CapabilityInvoker, + CapabilityProvider, CapabilityRequest, CapabilityResponse, CircuitBreakerConfig, RetryPolicy, +}; +pub use registry::CapabilityRegistry; + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::{Action, ActionKind, Expr, RuleExecutor, RuleSet, Step, TerminalResult}; + use crate::{context::Value, error::Result}; + use std::sync::Arc; + + struct EchoProvider; + + impl CapabilityProvider for EchoProvider { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("demo.echo", CapabilityCategory::Compute) + .with_description("Echo payloads back to the caller") + } + + fn invoke(&self, request: &CapabilityRequest) -> Result { + Ok(CapabilityResponse::new(request.payload.clone())) + } + } + + #[test] + fn executor_external_call_routes_through_capability_registry() { + let registry = Arc::new(CapabilityRegistry::new()); + registry.register(Arc::new(EchoProvider)); + + let mut ruleset = RuleSet::new("capability_demo", "call_echo"); + ruleset.add_step(Step::action( + "call_echo", + "Call echo", + vec![Action { + kind: ActionKind::ExternalCall { + service: "demo.echo".to_string(), + method: "echo".to_string(), + params: vec![("amount".to_string(), Expr::field("amount"))], + timeout_ms: 250, + result_variable: Some("capability_result".to_string()), + }, + description: String::new(), + }], + "done", + )); + ruleset.add_step(Step::terminal( + "done", + "Done", + TerminalResult::new("OK").with_output( + "echoed_amount", + Expr::field("$capability_result.payload.amount"), + ), + )); + + let mut executor = RuleExecutor::new(); + executor.set_capability_invoker(registry); + + let input: Value = serde_json::from_str(r#"{"amount": 42}"#).unwrap(); + let result = executor.execute(&ruleset, input).unwrap(); + + let amount = result + .output + .get_path("echoed_amount") + .expect("echoed amount missing"); + assert_eq!(amount, &Value::int(42)); + } +} diff --git a/crates/ordo-core/src/capability/provider.rs b/crates/ordo-core/src/capability/provider.rs new file mode 100644 index 00000000..b8ac130a --- /dev/null +++ b/crates/ordo-core/src/capability/provider.rs @@ -0,0 +1,231 @@ +use crate::context::Value; +use crate::error::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// High-level execution tier for a capability. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CapabilityCategory { + Network, + Compute, + Action, +} + +/// Retry policy for a capability. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RetryPolicy { + pub max_attempts: u32, + pub backoff_ms: u64, +} + +impl RetryPolicy { + #[inline] + pub fn disabled() -> Self { + Self { + max_attempts: 1, + backoff_ms: 0, + } + } +} + +impl Default for RetryPolicy { + fn default() -> Self { + Self::disabled() + } +} + +/// Consecutive-failure breaker for a capability. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CircuitBreakerConfig { + pub failure_threshold: u32, + pub reset_timeout_ms: u64, +} + +impl CircuitBreakerConfig { + #[inline] + pub fn disabled() -> Self { + Self { + failure_threshold: 0, + reset_timeout_ms: 0, + } + } + + #[inline] + pub fn enabled(&self) -> bool { + self.failure_threshold > 0 + } +} + +impl Default for CircuitBreakerConfig { + fn default() -> Self { + Self::disabled() + } +} + +/// Runtime policy for a registered capability. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CapabilityConfig { + pub category: CapabilityCategory, + pub timeout_ms: Option, + pub retry: RetryPolicy, + pub circuit_breaker: CircuitBreakerConfig, +} + +impl CapabilityConfig { + #[inline] + pub fn new(category: CapabilityCategory) -> Self { + Self { + category, + timeout_ms: None, + retry: RetryPolicy::default(), + circuit_breaker: CircuitBreakerConfig::default(), + } + } + + #[inline] + pub fn timeout(mut self, timeout_ms: u64) -> Self { + self.timeout_ms = Some(timeout_ms); + self + } + + #[inline] + pub fn retry(mut self, retry: RetryPolicy) -> Self { + self.retry = retry; + self + } + + #[inline] + pub fn circuit_breaker(mut self, circuit_breaker: CircuitBreakerConfig) -> Self { + self.circuit_breaker = circuit_breaker; + self + } +} + +/// Metadata for a registered capability. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CapabilityDescriptor { + pub name: String, + pub description: String, + pub config: CapabilityConfig, +} + +impl CapabilityDescriptor { + #[inline] + pub fn new(name: impl Into, category: CapabilityCategory) -> Self { + Self { + name: name.into(), + description: String::new(), + config: CapabilityConfig::new(category), + } + } + + #[inline] + pub fn with_description(mut self, description: impl Into) -> Self { + self.description = description.into(); + self + } + + #[inline] + pub fn with_config(mut self, config: CapabilityConfig) -> Self { + self.config = config; + self + } +} + +/// Normalized request shape passed to capability providers. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CapabilityRequest { + pub capability: String, + pub operation: String, + #[serde(default)] + pub payload: Value, + #[serde(default)] + pub metadata: HashMap, + #[serde(default)] + pub timeout_ms: Option, + #[serde(default)] + pub category: Option, +} + +impl CapabilityRequest { + #[inline] + pub fn new( + capability: impl Into, + operation: impl Into, + payload: Value, + ) -> Self { + Self { + capability: capability.into(), + operation: operation.into(), + payload, + metadata: HashMap::new(), + timeout_ms: None, + category: None, + } + } + + #[inline] + pub fn with_timeout(mut self, timeout_ms: u64) -> Self { + self.timeout_ms = Some(timeout_ms); + self + } + + #[inline] + pub fn with_category(mut self, category: CapabilityCategory) -> Self { + self.category = Some(category); + self + } + + #[inline] + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } +} + +/// Normalized response shape returned by capability providers. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CapabilityResponse { + #[serde(default)] + pub payload: Value, + #[serde(default)] + pub metadata: HashMap, +} + +impl CapabilityResponse { + #[inline] + pub fn new(payload: Value) -> Self { + Self { + payload, + metadata: HashMap::new(), + } + } + + #[inline] + pub fn empty() -> Self { + Self::new(Value::Null) + } + + #[inline] + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } +} + +/// Provider implementation for a named capability. +pub trait CapabilityProvider: Send + Sync { + fn descriptor(&self) -> CapabilityDescriptor; + fn invoke(&self, request: &CapabilityRequest) -> Result; +} + +/// Trait used by the rule executor to call a capability registry. +pub trait CapabilityInvoker: Send + Sync { + fn invoke(&self, request: &CapabilityRequest) -> Result; + + fn describe(&self, capability: &str) -> Option { + let _ = capability; + None + } +} diff --git a/crates/ordo-core/src/capability/registry.rs b/crates/ordo-core/src/capability/registry.rs new file mode 100644 index 00000000..4755140f --- /dev/null +++ b/crates/ordo-core/src/capability/registry.rs @@ -0,0 +1,313 @@ +use super::provider::{ + CapabilityDescriptor, CapabilityInvoker, CapabilityProvider, CapabilityRequest, + CapabilityResponse, +}; +use crate::error::OrdoError; +use crate::error::Result; +use parking_lot::{Mutex, RwLock}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +#[derive(Debug, Default)] +struct CircuitState { + consecutive_failures: u32, + opened_at: Option, +} + +struct RegisteredCapability { + descriptor: CapabilityDescriptor, + provider: Arc, + state: Mutex, +} + +/// In-memory capability registry used by the executor and tests. +#[derive(Default)] +pub struct CapabilityRegistry { + providers: RwLock>>, +} + +impl CapabilityRegistry { + #[inline] + pub fn new() -> Self { + Self::default() + } + + pub fn register( + &self, + provider: Arc, + ) -> Option> { + let descriptor = provider.descriptor(); + let name = descriptor.name.clone(); + let registered = Arc::new(RegisteredCapability { + descriptor, + provider, + state: Mutex::new(CircuitState::default()), + }); + + self.providers + .write() + .insert(name, registered) + .map(|previous| previous.provider.clone()) + } + + #[inline] + pub fn contains(&self, capability: &str) -> bool { + self.providers.read().contains_key(capability) + } + + pub fn list(&self) -> Vec { + self.providers + .read() + .values() + .map(|entry| entry.descriptor.clone()) + .collect() + } + + fn lookup(&self, capability: &str) -> Option> { + self.providers.read().get(capability).cloned() + } + + fn ensure_circuit_closed(entry: &RegisteredCapability) -> Result<()> { + let config = &entry.descriptor.config.circuit_breaker; + if !config.enabled() { + return Ok(()); + } + + let mut state = entry.state.lock(); + if let Some(opened_at) = state.opened_at { + let elapsed = opened_at.elapsed(); + if elapsed >= Duration::from_millis(config.reset_timeout_ms) { + state.consecutive_failures = 0; + state.opened_at = None; + return Ok(()); + } + + let remaining = config + .reset_timeout_ms + .saturating_sub(elapsed.as_millis().min(u128::from(u64::MAX)) as u64); + return Err(OrdoError::CircuitOpen { + capability: entry.descriptor.name.clone(), + retry_after_ms: Some(remaining), + }); + } + + Ok(()) + } + + fn mark_success(entry: &RegisteredCapability) { + let mut state = entry.state.lock(); + state.consecutive_failures = 0; + state.opened_at = None; + } + + fn mark_failure(entry: &RegisteredCapability) { + let config = &entry.descriptor.config.circuit_breaker; + if !config.enabled() { + return; + } + + let mut state = entry.state.lock(); + state.consecutive_failures = state.consecutive_failures.saturating_add(1); + if state.consecutive_failures >= config.failure_threshold { + state.opened_at = Some(Instant::now()); + } + } + + fn is_retryable_error(error: &OrdoError) -> bool { + matches!( + error, + OrdoError::Timeout { .. } | OrdoError::CapabilityInvocation { .. } + ) + } +} + +impl CapabilityInvoker for CapabilityRegistry { + fn invoke(&self, request: &CapabilityRequest) -> Result { + let entry = + self.lookup(&request.capability) + .ok_or_else(|| OrdoError::CapabilityNotFound { + capability: request.capability.clone(), + })?; + + Self::ensure_circuit_closed(&entry)?; + + let attempts = entry.descriptor.config.retry.max_attempts.max(1); + let timeout_ms = request.timeout_ms.or(entry.descriptor.config.timeout_ms); + + for attempt in 0..attempts { + let start = Instant::now(); + let response = entry.provider.invoke(request); + let response = match (timeout_ms, response) { + (Some(limit), Ok(_)) if start.elapsed().as_millis() as u64 > limit => { + Err(OrdoError::Timeout { timeout_ms: limit }) + } + (_, other) => other, + }; + + match response { + Ok(response) => { + Self::mark_success(&entry); + return Ok(response); + } + Err(error) => { + Self::mark_failure(&entry); + let should_retry = attempt + 1 < attempts && Self::is_retryable_error(&error); + if should_retry { + #[cfg(not(target_arch = "wasm32"))] + if entry.descriptor.config.retry.backoff_ms > 0 { + std::thread::sleep(Duration::from_millis( + entry.descriptor.config.retry.backoff_ms, + )); + } + continue; + } + return Err(error); + } + } + } + + Err(OrdoError::internal_error_static( + "capability registry retry loop exited unexpectedly", + )) + } + + fn describe(&self, capability: &str) -> Option { + self.lookup(capability) + .map(|entry| entry.descriptor.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::capability::{ + CapabilityCategory, CapabilityConfig, CapabilityDescriptor, CircuitBreakerConfig, + RetryPolicy, + }; + use crate::context::Value; + use std::collections::VecDeque; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct SequenceProvider { + descriptor: CapabilityDescriptor, + calls: AtomicUsize, + responses: Mutex>>, + } + + impl SequenceProvider { + fn new( + descriptor: CapabilityDescriptor, + responses: Vec>, + ) -> Self { + Self { + descriptor, + calls: AtomicUsize::new(0), + responses: Mutex::new(responses.into()), + } + } + } + + impl CapabilityProvider for SequenceProvider { + fn descriptor(&self) -> CapabilityDescriptor { + self.descriptor.clone() + } + + fn invoke(&self, _request: &CapabilityRequest) -> Result { + self.calls.fetch_add(1, Ordering::Relaxed); + self.responses + .lock() + .pop_front() + .unwrap_or_else(|| Ok(CapabilityResponse::new(Value::string("default")))) + } + } + + #[test] + fn registers_and_invokes_provider() { + let registry = CapabilityRegistry::new(); + registry.register(Arc::new(SequenceProvider::new( + CapabilityDescriptor::new("demo.echo", CapabilityCategory::Compute), + vec![Ok(CapabilityResponse::new(Value::string("ok")))], + ))); + + let response = registry + .invoke(&CapabilityRequest::new( + "demo.echo", + "run", + Value::object(std::collections::HashMap::new()), + )) + .unwrap(); + + assert_eq!(response.payload, Value::string("ok")); + assert!(registry.contains("demo.echo")); + assert_eq!(registry.list().len(), 1); + } + + #[test] + fn retries_transient_errors() { + let registry = CapabilityRegistry::new(); + let descriptor = CapabilityDescriptor::new("demo.retry", CapabilityCategory::Network) + .with_config( + CapabilityConfig::new(CapabilityCategory::Network).retry(RetryPolicy { + max_attempts: 2, + backoff_ms: 0, + }), + ); + let provider = Arc::new(SequenceProvider::new( + descriptor, + vec![ + Err(OrdoError::CapabilityInvocation { + capability: "demo.retry".to_string(), + message: "temporary".into(), + }), + Ok(CapabilityResponse::new(Value::string("recovered"))), + ], + )); + let provider_ref = provider.clone(); + registry.register(provider); + + let response = registry + .invoke(&CapabilityRequest::new("demo.retry", "fetch", Value::Null)) + .unwrap(); + + assert_eq!(response.payload, Value::string("recovered")); + assert_eq!(provider_ref.calls.load(Ordering::Relaxed), 2); + } + + #[test] + fn opens_circuit_after_repeated_failures() { + let registry = CapabilityRegistry::new(); + registry.register(Arc::new(SequenceProvider::new( + CapabilityDescriptor::new("demo.breaker", CapabilityCategory::Action).with_config( + CapabilityConfig::new(CapabilityCategory::Action).circuit_breaker( + CircuitBreakerConfig { + failure_threshold: 2, + reset_timeout_ms: 1000, + }, + ), + ), + vec![ + Err(OrdoError::CapabilityInvocation { + capability: "demo.breaker".to_string(), + message: "boom".into(), + }), + Err(OrdoError::CapabilityInvocation { + capability: "demo.breaker".to_string(), + message: "boom".into(), + }), + ], + ))); + + assert!(registry + .invoke(&CapabilityRequest::new("demo.breaker", "emit", Value::Null)) + .is_err()); + assert!(registry + .invoke(&CapabilityRequest::new("demo.breaker", "emit", Value::Null)) + .is_err()); + + let error = registry + .invoke(&CapabilityRequest::new("demo.breaker", "emit", Value::Null)) + .unwrap_err(); + assert!(matches!(error, OrdoError::CircuitOpen { .. })); + } +} diff --git a/crates/ordo-core/src/context/store.rs b/crates/ordo-core/src/context/store.rs index 0ff6ef04..1f8c5342 100644 --- a/crates/ordo-core/src/context/store.rs +++ b/crates/ordo-core/src/context/store.rs @@ -61,8 +61,12 @@ impl Context { /// - `_index`: get the current iteration index (if set) pub fn get(&self, path: &str) -> Option<&Value> { if let Some(var_name) = path.strip_prefix('$') { - // Variable reference - self.variables.get(var_name) + // Variable reference, with optional nested path access. + if let Some((name, nested)) = var_name.split_once('.') { + self.variables.get(name)?.get_path(nested) + } else { + self.variables.get(var_name) + } } else if let Some(item_path) = path.strip_prefix("item.") { // Current iteration item field self.current_item.as_ref()?.get_path(item_path) @@ -178,6 +182,28 @@ mod tests { assert_eq!(ctx.get("$score"), None); } + #[test] + fn test_context_nested_variable_paths() { + let mut ctx = Context::new(Value::Null); + ctx.set_variable( + "result", + Value::object({ + let mut m = std::collections::HashMap::new(); + m.insert( + "payload".to_string(), + Value::object({ + let mut nested = std::collections::HashMap::new(); + nested.insert("score".to_string(), Value::int(7)); + nested + }), + ); + m + }), + ); + + assert_eq!(ctx.get("$result.payload.score"), Some(&Value::int(7))); + } + #[test] fn test_context_item() { let mut ctx = Context::new(Value::Null); diff --git a/crates/ordo-core/src/error.rs b/crates/ordo-core/src/error.rs index 134aceac..8cd3de57 100644 --- a/crates/ordo-core/src/error.rs +++ b/crates/ordo-core/src/error.rs @@ -54,6 +54,24 @@ pub enum OrdoError { #[error("RuleSet not found: {name}")] RuleSetNotFound { name: String }, + /// Capability not found + #[error("Capability not found: {capability}")] + CapabilityNotFound { capability: String }, + + /// Capability circuit breaker is open + #[error("Capability circuit open: {capability}")] + CircuitOpen { + capability: String, + retry_after_ms: Option, + }, + + /// Capability invocation failed + #[error("Capability invocation failed for {capability}: {message}")] + CapabilityInvocation { + capability: String, + message: Cow<'static, str>, + }, + /// Step not found #[error("Step not found: {step_id}")] StepNotFound { step_id: String }, @@ -171,6 +189,17 @@ impl OrdoError { } } + /// Create a capability invocation error + pub fn capability_invocation( + capability: impl Into, + message: impl Into>, + ) -> Self { + Self::CapabilityInvocation { + capability: capability.into(), + message: message.into(), + } + } + /// Create an internal error from a static string (no allocation) #[inline] pub fn internal_error_static(message: &'static str) -> Self { diff --git a/crates/ordo-core/src/lib.rs b/crates/ordo-core/src/lib.rs index 2c5a243e..ef59ae15 100644 --- a/crates/ordo-core/src/lib.rs +++ b/crates/ordo-core/src/lib.rs @@ -50,6 +50,7 @@ #![allow(missing_docs)] #![warn(clippy::all)] +pub mod capability; pub mod context; pub mod error; pub mod expr; @@ -62,6 +63,11 @@ pub mod trace; /// Prelude module for convenient imports pub mod prelude { + pub use crate::capability::{ + CapabilityCategory, CapabilityConfig, CapabilityDescriptor, CapabilityInvoker, + CapabilityProvider, CapabilityRegistry, CapabilityRequest, CapabilityResponse, + CircuitBreakerConfig, RetryPolicy, + }; pub use crate::context::{Context, Value}; pub use crate::error::{OrdoError, Result}; pub use crate::expr::{ diff --git a/crates/ordo-core/src/rule/compiled_executor.rs b/crates/ordo-core/src/rule/compiled_executor.rs index d823cdfc..e5afba22 100644 --- a/crates/ordo-core/src/rule/compiled_executor.rs +++ b/crates/ordo-core/src/rule/compiled_executor.rs @@ -5,6 +5,7 @@ use super::compiled::{ }; use super::metrics::{MetricSink, NoOpMetricSink}; use super::{ExecutionResult, TerminalResult}; +use crate::capability::{CapabilityInvoker, CapabilityRequest}; use crate::context::{Context, IString, Value}; use crate::error::{OrdoError, Result}; use crate::expr::BytecodeVM; @@ -36,6 +37,7 @@ use wasm_time::Instant; pub struct CompiledRuleExecutor { vm: BytecodeVM, metric_sink: Arc, + capability_invoker: Option>, } impl Default for CompiledRuleExecutor { @@ -49,6 +51,7 @@ impl CompiledRuleExecutor { Self { vm: BytecodeVM::new(), metric_sink: Arc::new(NoOpMetricSink), + capability_invoker: None, } } @@ -56,9 +59,18 @@ impl CompiledRuleExecutor { Self { vm: BytecodeVM::new(), metric_sink, + capability_invoker: None, } } + pub fn set_capability_invoker(&mut self, capability_invoker: Arc) { + self.capability_invoker = Some(capability_invoker); + } + + pub fn capability_invoker(&self) -> Option> { + self.capability_invoker.clone() + } + pub fn execute(&self, ruleset: &CompiledRuleSet, input: Value) -> Result { let start_time = Instant::now(); let mut ctx = Context::new(input); @@ -234,12 +246,38 @@ impl CompiledRuleExecutor { )) }) .collect::>>()?; - self.metric_sink.record_gauge(name, metric_value, &tags); + self.record_metric(name, metric_value, &tags)?; } } Ok(()) } + fn record_metric(&self, name: &str, value: f64, tags: &[(String, String)]) -> Result<()> { + if let Some(capability_invoker) = &self.capability_invoker { + let mut tag_values = std::collections::HashMap::with_capacity(tags.len()); + for (key, value) in tags { + tag_values.insert(key.clone(), Value::string(value)); + } + + let mut payload = std::collections::HashMap::with_capacity(3); + payload.insert("name".to_string(), Value::string(name)); + payload.insert("value".to_string(), Value::float(value)); + payload.insert("tags".to_string(), Value::object(tag_values)); + + let request = + CapabilityRequest::new("metrics.prometheus", "gauge", Value::object(payload)); + + match capability_invoker.invoke(&request) { + Ok(_) => return Ok(()), + Err(OrdoError::CapabilityNotFound { .. }) => {} + Err(error) => return Err(error), + } + } + + self.metric_sink.record_gauge(name, value, tags); + Ok(()) + } + fn build_output( &self, ruleset: &CompiledRuleSet, @@ -273,3 +311,82 @@ impl CompiledRuleExecutor { Ok(Value::object_optimized(output)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::capability::{ + CapabilityCategory, CapabilityDescriptor, CapabilityProvider, CapabilityRegistry, + CapabilityResponse, + }; + use crate::expr::Expr; + use crate::rule::metrics::MetricSink; + use crate::rule::{Action, ActionKind, RuleSet, RuleSetCompiler, Step, TerminalResult}; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct TestMetricSink { + gauge_calls: AtomicUsize, + } + + impl MetricSink for TestMetricSink { + fn record_gauge(&self, _name: &str, _value: f64, _tags: &[(String, String)]) { + self.gauge_calls.fetch_add(1, Ordering::SeqCst); + } + + fn record_counter(&self, _name: &str, _value: f64, _tags: &[(String, String)]) {} + } + + struct TestMetricCapability { + calls: AtomicUsize, + } + + impl CapabilityProvider for TestMetricCapability { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("metrics.prometheus", CapabilityCategory::Action) + } + + fn invoke(&self, _request: &CapabilityRequest) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + Ok(CapabilityResponse::empty()) + } + } + + #[test] + fn compiled_executor_prefers_capability_metrics_when_available() { + let mut ruleset = RuleSet::new("compiled_metric_test", "record_metric"); + ruleset.add_step(Step::action( + "record_metric", + "Record Metric", + vec![Action { + kind: ActionKind::Metric { + name: "compiled_metric".to_string(), + value: Expr::literal(9.0f64), + tags: vec![("env".to_string(), "test".to_string())], + }, + description: String::new(), + }], + "done", + )); + ruleset.add_step(Step::terminal("done", "Done", TerminalResult::new("OK"))); + let compiled = RuleSetCompiler::compile(&ruleset).unwrap(); + + let sink = Arc::new(TestMetricSink { + gauge_calls: AtomicUsize::new(0), + }); + let mut executor = CompiledRuleExecutor::with_metric_sink(sink.clone()); + let registry = Arc::new(CapabilityRegistry::new()); + let capability = Arc::new(TestMetricCapability { + calls: AtomicUsize::new(0), + }); + let capability_ref = capability.clone(); + registry.register(capability); + executor.set_capability_invoker(registry); + + let input = serde_json::from_str(r#"{}"#).unwrap(); + let result = executor.execute(&compiled, input).unwrap(); + + assert_eq!(result.code, "OK"); + assert_eq!(capability_ref.calls.load(Ordering::SeqCst), 1); + assert_eq!(sink.gauge_calls.load(Ordering::SeqCst), 0); + } +} diff --git a/crates/ordo-core/src/rule/executor.rs b/crates/ordo-core/src/rule/executor.rs index e7c3dde5..4815b9c4 100644 --- a/crates/ordo-core/src/rule/executor.rs +++ b/crates/ordo-core/src/rule/executor.rs @@ -5,6 +5,7 @@ use super::metrics::{MetricSink, NoOpMetricSink}; use super::model::{FieldMissingBehavior, RuleSet}; use super::step::{ActionKind, Condition, LogLevel, Step, StepKind, TerminalResult}; +use crate::capability::{CapabilityInvoker, CapabilityRequest}; use crate::context::{Context, Value}; use crate::error::{OrdoError, Result}; use crate::expr::{Evaluator, ExprParser}; @@ -90,6 +91,8 @@ pub struct RuleExecutor { metric_sink: Arc, /// Optional resolver for CallRuleSet actions resolver: Option>, + /// Optional capability invoker for ExternalCall actions + capability_invoker: Option>, /// Maximum nesting depth for CallRuleSet (prevents unbounded recursion) max_call_depth: usize, } @@ -101,6 +104,9 @@ impl Default for RuleExecutor { } impl RuleExecutor { + const METRIC_CAPABILITY: &'static str = "metrics.prometheus"; + const METRIC_OPERATION_GAUGE: &'static str = "gauge"; + /// Create a new executor pub fn new() -> Self { Self { @@ -108,6 +114,7 @@ impl RuleExecutor { trace_config: TraceConfig::default(), metric_sink: Arc::new(NoOpMetricSink), resolver: None, + capability_invoker: None, max_call_depth: 10, } } @@ -119,6 +126,7 @@ impl RuleExecutor { trace_config, metric_sink: Arc::new(NoOpMetricSink), resolver: None, + capability_invoker: None, max_call_depth: 10, } } @@ -130,6 +138,7 @@ impl RuleExecutor { trace_config: TraceConfig::default(), metric_sink, resolver: None, + capability_invoker: None, max_call_depth: 10, } } @@ -144,6 +153,7 @@ impl RuleExecutor { trace_config, metric_sink, resolver: None, + capability_invoker: None, max_call_depth: 10, } } @@ -153,6 +163,16 @@ impl RuleExecutor { self.resolver = Some(resolver); } + /// Set an invoker for ExternalCall actions + pub fn set_capability_invoker(&mut self, capability_invoker: Arc) { + self.capability_invoker = Some(capability_invoker); + } + + /// Get the configured capability invoker + pub fn capability_invoker(&self) -> Option> { + self.capability_invoker.clone() + } + /// Get the metric sink pub fn metric_sink(&self) -> &Arc { &self.metric_sink @@ -544,8 +564,7 @@ impl RuleExecutor { return Ok(()); } }; - // Record metric via sink - self.metric_sink.record_gauge(name, metric_value, tags); + self.record_metric(name, metric_value, tags)?; tracing::debug!(metric = %name, value = %metric_value, tags = ?tags, "Metric recorded"); } @@ -599,11 +618,78 @@ impl RuleExecutor { ctx.set_variable(result_variable, result_obj); } - ActionKind::ExternalCall { .. } => { - // TODO: Implement external calls - tracing::warn!("External calls not yet implemented"); + ActionKind::ExternalCall { + service, + method, + params, + result_variable, + timeout_ms, + } => { + let capability_invoker = self.capability_invoker.as_ref().ok_or_else(|| { + OrdoError::eval_error("ExternalCall requires a capability invoker") + })?; + + let mut payload = std::collections::HashMap::with_capacity(params.len()); + for (name, expr) in params { + payload.insert(name.clone(), self.evaluator.eval(expr, ctx)?); + } + + let mut request = + CapabilityRequest::new(service.clone(), method.clone(), Value::object(payload)); + if *timeout_ms > 0 { + request = request.with_timeout(*timeout_ms); + } + + let response = capability_invoker.invoke(&request)?; + if let Some(result_variable) = result_variable { + let response_obj = Value::object({ + let mut m = std::collections::HashMap::new(); + m.insert("capability".to_string(), Value::string(service)); + m.insert("operation".to_string(), Value::string(method)); + m.insert("payload".to_string(), response.payload); + let metadata = Value::object( + response + .metadata + .into_iter() + .map(|(key, value)| (key, Value::string(value))) + .collect(), + ); + m.insert("metadata".to_string(), metadata); + m + }); + ctx.set_variable(result_variable, response_obj); + } + } + } + Ok(()) + } + + fn record_metric(&self, name: &str, value: f64, tags: &[(String, String)]) -> Result<()> { + if let Some(capability_invoker) = &self.capability_invoker { + let mut tag_values = std::collections::HashMap::with_capacity(tags.len()); + for (key, value) in tags { + tag_values.insert(key.clone(), Value::string(value)); + } + + let mut payload = std::collections::HashMap::with_capacity(3); + payload.insert("name".to_string(), Value::string(name)); + payload.insert("value".to_string(), Value::float(value)); + payload.insert("tags".to_string(), Value::object(tag_values)); + + let request = CapabilityRequest::new( + Self::METRIC_CAPABILITY, + Self::METRIC_OPERATION_GAUGE, + Value::object(payload), + ); + + match capability_invoker.invoke(&request) { + Ok(_) => return Ok(()), + Err(OrdoError::CapabilityNotFound { .. }) => {} + Err(error) => return Err(error), } } + + self.metric_sink.record_gauge(name, value, tags); Ok(()) } @@ -867,6 +953,79 @@ mod tests { assert_eq!(sink.gauge_calls.load(Ordering::SeqCst), 1); } + #[test] + fn test_execute_metric_via_capability_invoker() { + use crate::capability::{ + CapabilityCategory, CapabilityDescriptor, CapabilityProvider, CapabilityRegistry, + CapabilityRequest, CapabilityResponse, + }; + use crate::rule::metrics::MetricSink; + use crate::rule::step::{Action, ActionKind}; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct TestMetricSink { + gauge_calls: AtomicUsize, + } + + impl MetricSink for TestMetricSink { + fn record_gauge(&self, _name: &str, _value: f64, _tags: &[(String, String)]) { + self.gauge_calls.fetch_add(1, Ordering::SeqCst); + } + + fn record_counter(&self, _name: &str, _value: f64, _tags: &[(String, String)]) {} + } + + struct TestMetricCapability { + calls: AtomicUsize, + } + + impl CapabilityProvider for TestMetricCapability { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("metrics.prometheus", CapabilityCategory::Action) + } + + fn invoke(&self, _request: &CapabilityRequest) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + Ok(CapabilityResponse::empty()) + } + } + + let sink = Arc::new(TestMetricSink { + gauge_calls: AtomicUsize::new(0), + }); + let mut executor = RuleExecutor::with_metric_sink(sink.clone()); + let registry = Arc::new(CapabilityRegistry::new()); + let capability = Arc::new(TestMetricCapability { + calls: AtomicUsize::new(0), + }); + let capability_ref = capability.clone(); + registry.register(capability); + executor.set_capability_invoker(registry); + + let mut ruleset = RuleSet::new("metric_capability_test", "record_metric"); + ruleset.add_step(Step::action( + "record_metric", + "Record Metric", + vec![Action { + kind: ActionKind::Metric { + name: "cap_metric".to_string(), + value: Expr::literal(7.0f64), + tags: vec![("env".to_string(), "test".to_string())], + }, + description: String::new(), + }], + "done", + )); + ruleset.add_step(Step::terminal("done", "Done", TerminalResult::new("OK"))); + + let input = serde_json::from_str(r#"{}"#).unwrap(); + let result = executor.execute(&ruleset, input).unwrap(); + + assert_eq!(result.code, "OK"); + assert_eq!(capability_ref.calls.load(Ordering::SeqCst), 1); + assert_eq!(sink.gauge_calls.load(Ordering::SeqCst), 0); + } + #[test] fn test_execute_batch_sequential() { let ruleset = create_test_ruleset(); diff --git a/crates/ordo-core/src/rule/step.rs b/crates/ordo-core/src/rule/step.rs index 6b88b2b9..023a2502 100644 --- a/crates/ordo-core/src/rule/step.rs +++ b/crates/ordo-core/src/rule/step.rs @@ -332,11 +332,13 @@ pub enum ActionKind { }, /// External call (future) - #[serde(skip)] ExternalCall { service: String, method: String, params: Vec<(String, Expr)>, + #[serde(default)] + result_variable: Option, + #[serde(default)] timeout_ms: u64, }, } diff --git a/crates/ordo-server/Cargo.toml b/crates/ordo-server/Cargo.toml index 5fd06dcf..ba75f99b 100644 --- a/crates/ordo-server/Cargo.toml +++ b/crates/ordo-server/Cargo.toml @@ -14,6 +14,7 @@ once_cell = "1.19" ordo-core = { version = "0.4.2", path = "../ordo-core" } ordo-proto = { version = "0.4.2", path = "../ordo-proto" } parking_lot.workspace = true +hashbrown.workspace = true prost.workspace = true rand = "0.8" rayon.workspace = true diff --git a/crates/ordo-server/src/api.rs b/crates/ordo-server/src/api.rs index 8486b306..ac6f4d7c 100644 --- a/crates/ordo-server/src/api.rs +++ b/crates/ordo-server/src/api.rs @@ -14,6 +14,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; +use crate::capability_registry::emit_rule_execution_audit; use crate::error::ApiError; use crate::json::SimdJson; use crate::metrics; @@ -486,9 +487,14 @@ pub async fn execute_ruleset( // Log audit event (with sampling) let source_ip = connect_info.map(|ci| ci.0.ip().to_string()); let rule_id = format!("{}/{}", tenant.id, name); - state - .audit_logger - .log_execution(&rule_id, result.duration_us, &result.code, source_ip); + emit_rule_execution_audit( + state.executor.capability_invoker(), + &state.audit_logger, + &rule_id, + result.duration_us, + &result.code, + source_ip, + ); result } @@ -504,7 +510,9 @@ pub async fn execute_ruleset( // Log audit event for errors (with sampling) let source_ip = connect_info.map(|ci| ci.0.ip().to_string()); let rule_id = format!("{}/{}", tenant.id, name); - state.audit_logger.log_execution( + emit_rule_execution_audit( + state.executor.capability_invoker(), + &state.audit_logger, &rule_id, start.elapsed().as_micros() as u64, "error", @@ -1437,6 +1445,9 @@ pub async fn execute_pipeline( let mut pipeline_executor = RuleExecutor::with_trace_and_metrics(TraceConfig::minimal(), state.metric_sink.clone()); pipeline_executor.set_resolver(Arc::new(snapshot)); + if let Some(capability_invoker) = state.executor.capability_invoker() { + pipeline_executor.set_capability_invoker(capability_invoker); + } let mut current_input = request.input; let mut stages = Vec::with_capacity(resolved.len()); diff --git a/crates/ordo-server/src/api_integration_tests.rs b/crates/ordo-server/src/api_integration_tests.rs index d9b3ed36..00d34c6c 100644 --- a/crates/ordo-server/src/api_integration_tests.rs +++ b/crates/ordo-server/src/api_integration_tests.rs @@ -27,6 +27,7 @@ use tower_http::trace::TraceLayer; use crate::{ api, audit::AuditLogger, + capability_registry::build_server_executor, debug::DebugSessionManager, metrics::PrometheusMetricSink, middleware, @@ -36,14 +37,13 @@ use crate::{ tenant::{TenantDefaults, TenantManager}, AppState, ServerConfig, }; -use ordo_core::prelude::RuleExecutor; /// Build a full test app with all API routes and middleware (matching main.rs router). async fn build_full_test_app() -> Router { let store = Arc::new(RwLock::new(RuleStore::new())); - let executor = Arc::new(RuleExecutor::new()); let metric_sink = Arc::new(PrometheusMetricSink::new()); let audit_logger = Arc::new(AuditLogger::new(None, 10)); + let executor = build_server_executor(metric_sink.clone(), Some(audit_logger.clone())); let debug_sessions = Arc::new(DebugSessionManager::new()); let defaults = TenantDefaults { default_qps_limit: None, diff --git a/crates/ordo-server/src/api_tests.rs b/crates/ordo-server/src/api_tests.rs index 0b1bf4ac..7e9d6fc2 100644 --- a/crates/ordo-server/src/api_tests.rs +++ b/crates/ordo-server/src/api_tests.rs @@ -16,6 +16,7 @@ use tower_http::trace::TraceLayer; use crate::{ api, audit::AuditLogger, + capability_registry::build_server_executor, debug::DebugSessionManager, metrics::PrometheusMetricSink, middleware, @@ -25,13 +26,12 @@ use crate::{ tenant::{TenantDefaults, TenantManager}, AppState, ServerConfig, }; -use ordo_core::prelude::RuleExecutor; async fn build_test_app() -> Router { let store = Arc::new(RwLock::new(RuleStore::new())); - let executor = Arc::new(RuleExecutor::new()); let metric_sink = Arc::new(PrometheusMetricSink::new()); let audit_logger = Arc::new(AuditLogger::new(None, 0)); + let executor = build_server_executor(metric_sink.clone(), Some(audit_logger.clone())); let debug_sessions = Arc::new(DebugSessionManager::new()); let defaults = TenantDefaults { default_qps_limit: None, @@ -372,9 +372,9 @@ async fn test_admin_reload_with_persistence() { let store = Arc::new(RwLock::new(RuleStore::new_with_persistence( dir.path().to_path_buf(), ))); - let executor = Arc::new(RuleExecutor::new()); let metric_sink = Arc::new(PrometheusMetricSink::new()); let audit_logger = Arc::new(AuditLogger::new(None, 0)); + let executor = build_server_executor(metric_sink.clone(), Some(audit_logger.clone())); let debug_sessions = Arc::new(DebugSessionManager::new()); let defaults = TenantDefaults { default_qps_limit: None, diff --git a/crates/ordo-server/src/capability_registry.rs b/crates/ordo-server/src/capability_registry.rs new file mode 100644 index 00000000..a0dbb58c --- /dev/null +++ b/crates/ordo-server/src/capability_registry.rs @@ -0,0 +1,576 @@ +use crate::audit::AuditLogger; +use crate::metrics::PrometheusMetricSink; +use ordo_core::context::Value; +use ordo_core::prelude::{ + CapabilityCategory, CapabilityConfig, CapabilityDescriptor, CapabilityInvoker, + CapabilityProvider, CapabilityRegistry, CapabilityRequest, CapabilityResponse, MetricSink, + OrdoError, Result, RuleExecutor, TraceConfig, +}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +/// Server-side runtime registry wrapper that adds tracing around capability calls. +#[derive(Default)] +pub struct ServerCapabilityRegistry { + inner: CapabilityRegistry, +} + +impl ServerCapabilityRegistry { + #[inline] + pub fn new() -> Self { + Self::default() + } + + #[inline] + pub fn register( + &self, + provider: Arc, + ) -> Option> { + self.inner.register(provider) + } + + pub fn register_metric_sink(&self, sink: Arc) { + self.register(Arc::new(PrometheusMetricCapability { sink })); + } + + pub fn register_audit_logger(&self, audit_logger: Arc) { + self.register(Arc::new(AuditCapability { audit_logger })); + } + + pub fn register_http_client(&self) { + self.register(Arc::new(HttpCapability::new())); + } +} + +impl CapabilityInvoker for ServerCapabilityRegistry { + fn invoke(&self, request: &CapabilityRequest) -> Result { + let span = tracing::info_span!( + "capability.invoke", + capability = %request.capability, + operation = %request.operation + ); + let _guard = span.enter(); + self.inner.invoke(request) + } + + fn describe(&self, capability: &str) -> Option { + self.inner.describe(capability) + } +} + +pub fn build_server_capability_invoker( + metric_sink: Arc, + audit_logger: Option>, +) -> Arc { + let registry = Arc::new(ServerCapabilityRegistry::new()); + registry.register_http_client(); + registry.register_metric_sink(metric_sink); + if let Some(audit_logger) = audit_logger { + registry.register_audit_logger(audit_logger); + } + registry +} + +pub fn build_rule_executor( + metric_sink: Arc, + capability_invoker: Option>, +) -> RuleExecutor { + let mut executor = RuleExecutor::with_trace_and_metrics(TraceConfig::minimal(), metric_sink); + if let Some(capability_invoker) = capability_invoker { + executor.set_capability_invoker(capability_invoker); + } + executor +} + +#[cfg(test)] +pub fn build_server_executor( + metric_sink: Arc, + audit_logger: Option>, +) -> Arc { + let capability_invoker = build_server_capability_invoker(metric_sink.clone(), audit_logger); + let metric_sink_trait: Arc = metric_sink; + Arc::new(build_rule_executor( + metric_sink_trait, + Some(capability_invoker), + )) +} + +pub fn emit_rule_execution_audit( + capability_invoker: Option>, + audit_logger: &AuditLogger, + rule_name: &str, + duration_us: u64, + result: &str, + source_ip: Option, +) { + if let Some(capability_invoker) = capability_invoker { + let payload = Value::object({ + let mut m = std::collections::HashMap::new(); + m.insert("rule_name".to_string(), Value::string(rule_name)); + m.insert("duration_us".to_string(), Value::int(duration_us as i64)); + m.insert("result".to_string(), Value::string(result)); + if let Some(source_ip) = &source_ip { + m.insert("source_ip".to_string(), Value::string(source_ip)); + } + m + }); + let request = CapabilityRequest::new("audit.logger", "rule_executed", payload); + match capability_invoker.invoke(&request) { + Ok(_) => return, + Err(OrdoError::CapabilityNotFound { .. }) => {} + Err(error) => { + tracing::warn!(rule = %rule_name, error = %error, "Audit capability invocation failed, falling back to direct logger"); + } + } + } + + audit_logger.log_execution(rule_name, duration_us, result, source_ip); +} + +pub fn invoke_http_json( + capability_invoker: Option>, + method: &str, + url: &str, + headers: HashMap, + json_body: &serde_json::Value, + timeout_ms: Option, +) -> Result> { + let Some(capability_invoker) = capability_invoker else { + return Ok(None); + }; + + let mut payload = std::collections::HashMap::new(); + payload.insert("url".to_string(), Value::string(url)); + payload.insert( + "headers".to_string(), + Value::object( + headers + .into_iter() + .map(|(key, value)| (key, Value::string(value))) + .collect(), + ), + ); + let body = serde_json::from_value(json_body.clone()).map_err(|error| { + OrdoError::capability_invocation("network.http", format!("invalid json body: {}", error)) + })?; + payload.insert("json_body".to_string(), body); + + let mut request = CapabilityRequest::new( + "network.http", + method.to_ascii_lowercase(), + Value::object(payload), + ); + if let Some(timeout_ms) = timeout_ms { + request = request.with_timeout(timeout_ms); + } + + match capability_invoker.invoke(&request) { + Ok(response) => Ok(Some(response)), + Err(OrdoError::CapabilityNotFound { .. }) => Ok(None), + Err(error) => Err(error), + } +} + +pub fn http_response_status(response: &CapabilityResponse) -> Option { + match response.payload.get_path("status") { + Some(Value::Int(status)) => (*status).try_into().ok(), + _ => None, + } +} + +struct PrometheusMetricCapability { + sink: Arc, +} + +impl CapabilityProvider for PrometheusMetricCapability { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("metrics.prometheus", CapabilityCategory::Action) + .with_description("Bridge capability calls into the Prometheus rule metric sink") + .with_config(CapabilityConfig::new(CapabilityCategory::Action)) + } + + fn invoke(&self, request: &CapabilityRequest) -> Result { + let payload = expect_object(&request.payload, "metrics.prometheus")?; + let name = required_string(payload, "name", "metrics.prometheus")?; + let value = required_number(payload, "value", "metrics.prometheus")?; + let tags = optional_tags(payload, "tags", "metrics.prometheus")?; + + match request.operation.as_str() { + "counter" => self.sink.record_counter(name, value, &tags), + "gauge" => self.sink.record_gauge(name, value, &tags), + other => { + return Err(OrdoError::capability_invocation( + "metrics.prometheus", + format!("unsupported operation '{}'", other), + )); + } + } + + Ok(CapabilityResponse::empty() + .with_metadata("metric", name.to_string()) + .with_metadata("operation", request.operation.clone())) + } +} + +struct AuditCapability { + audit_logger: Arc, +} + +impl CapabilityProvider for AuditCapability { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("audit.logger", CapabilityCategory::Action) + .with_description("Bridge capability calls into the structured audit logger") + .with_config(CapabilityConfig::new(CapabilityCategory::Action)) + } + + fn invoke(&self, request: &CapabilityRequest) -> Result { + let payload = expect_object(&request.payload, "audit.logger")?; + match request.operation.as_str() { + "rule_executed" => { + let rule_name = required_string(payload, "rule_name", "audit.logger")?; + let duration_us = required_i64(payload, "duration_us", "audit.logger")?; + let result = required_string(payload, "result", "audit.logger")?; + let source_ip = optional_string(payload, "source_ip"); + self.audit_logger.log_execution( + rule_name, + duration_us.max(0) as u64, + result, + source_ip, + ); + Ok(CapabilityResponse::empty() + .with_metadata("event", "rule_executed") + .with_metadata("rule_name", rule_name.to_string())) + } + other => Err(OrdoError::capability_invocation( + "audit.logger", + format!("unsupported operation '{}'", other), + )), + } + } +} + +struct HttpCapability { + client: reqwest::Client, +} + +impl HttpCapability { + fn new() -> Self { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .connect_timeout(Duration::from_secs(5)) + .build() + .unwrap_or_else(|_| reqwest::Client::new()); + Self { client } + } +} + +impl CapabilityProvider for HttpCapability { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("network.http", CapabilityCategory::Network) + .with_description("Issue outbound HTTP requests through a capability provider") + .with_config(CapabilityConfig::new(CapabilityCategory::Network).timeout(10_000)) + } + + fn invoke(&self, request: &CapabilityRequest) -> Result { + let payload = expect_object(&request.payload, "network.http")?; + let url = required_string(payload, "url", "network.http")?; + let method = request + .operation + .parse::() + .map_err(|error| { + OrdoError::capability_invocation( + "network.http", + format!("invalid method '{}': {}", request.operation, error), + ) + })?; + let headers = optional_tags(payload, "headers", "network.http")?; + + let json_body = payload + .get("json_body") + .map(|body| { + serde_json::to_value(body).map_err(|error| { + OrdoError::capability_invocation( + "network.http", + format!("failed to serialize request body: {}", error), + ) + }) + }) + .transpose()?; + let (status, body_text) = execute_http_request( + self.client.clone(), + method, + url.to_string(), + headers, + json_body, + request.timeout_ms, + )?; + + let mut payload = std::collections::HashMap::new(); + payload.insert("status".to_string(), Value::int(status as i64)); + payload.insert("body".to_string(), Value::string(&body_text)); + if let Ok(json_body) = serde_json::from_str::(&body_text) { + let json_body = serde_json::from_value(json_body).map_err(|error| { + OrdoError::capability_invocation( + "network.http", + format!("failed to convert response body: {}", error), + ) + })?; + payload.insert("json_body".to_string(), json_body); + } + + Ok(CapabilityResponse::new(Value::object(payload)) + .with_metadata("status", status.to_string())) + } +} + +fn execute_http_request( + client: reqwest::Client, + method: reqwest::Method, + url: String, + headers: Vec<(String, String)>, + json_body: Option, + timeout_ms: Option, +) -> Result<(u16, String)> { + async fn send( + client: reqwest::Client, + method: reqwest::Method, + url: String, + headers: Vec<(String, String)>, + json_body: Option, + timeout_ms: Option, + ) -> std::result::Result<(u16, String), reqwest::Error> { + let mut builder = client.request(method, url); + if let Some(timeout_ms) = timeout_ms { + builder = builder.timeout(Duration::from_millis(timeout_ms)); + } + for (name, value) in headers { + builder = builder.header(name, value); + } + if let Some(json_body) = json_body { + builder = builder.json(&json_body); + } + let response = builder.send().await?; + let status = response.status().as_u16(); + let body = response.text().await.unwrap_or_default(); + Ok((status, body)) + } + + let current = tokio::runtime::Handle::try_current(); + match current { + Ok(handle) => match handle.runtime_flavor() { + tokio::runtime::RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| { + handle.block_on(send(client, method, url, headers, json_body, timeout_ms)) + }) + .map_err(|error| OrdoError::capability_invocation("network.http", error.to_string())), + tokio::runtime::RuntimeFlavor::CurrentThread | _ => std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|error| { + OrdoError::capability_invocation( + "network.http", + format!("failed to build runtime: {}", error), + ) + })?; + runtime + .block_on(send(client, method, url, headers, json_body, timeout_ms)) + .map_err(|error| { + OrdoError::capability_invocation("network.http", error.to_string()) + }) + }) + .join() + .map_err(|_| { + OrdoError::capability_invocation("network.http", "http worker thread panicked") + })?, + }, + Err(_) => { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|error| { + OrdoError::capability_invocation( + "network.http", + format!("failed to build runtime: {}", error), + ) + })?; + runtime + .block_on(send(client, method, url, headers, json_body, timeout_ms)) + .map_err(|error| { + OrdoError::capability_invocation("network.http", error.to_string()) + }) + } + } +} + +fn expect_object<'a>( + value: &'a Value, + capability: &str, +) -> Result<&'a hashbrown::HashMap> { + value.as_object().ok_or_else(|| { + OrdoError::capability_invocation(capability, "expected object payload for capability") + }) +} + +fn required_string<'a>( + object: &'a hashbrown::HashMap, + field: &str, + capability: &str, +) -> Result<&'a str> { + object + .get(field) + .and_then(Value::as_str) + .ok_or_else(|| OrdoError::capability_invocation(capability, format!("missing '{}'", field))) +} + +fn optional_string( + object: &hashbrown::HashMap, + field: &str, +) -> Option { + object + .get(field) + .and_then(Value::as_str) + .map(ToString::to_string) +} + +fn required_number( + object: &hashbrown::HashMap, + field: &str, + capability: &str, +) -> Result { + match object.get(field) { + Some(Value::Int(value)) => Ok(*value as f64), + Some(Value::Float(value)) => Ok(*value), + Some(Value::Bool(value)) => Ok(if *value { 1.0 } else { 0.0 }), + _ => Err(OrdoError::capability_invocation( + capability, + format!("field '{}' must be numeric", field), + )), + } +} + +fn required_i64( + object: &hashbrown::HashMap, + field: &str, + capability: &str, +) -> Result { + match object.get(field) { + Some(Value::Int(value)) => Ok(*value), + _ => Err(OrdoError::capability_invocation( + capability, + format!("field '{}' must be an integer", field), + )), + } +} + +fn optional_tags( + object: &hashbrown::HashMap, + field: &str, + capability: &str, +) -> Result> { + let Some(Value::Object(tags)) = object.get(field) else { + return Ok(Vec::new()); + }; + + let mut result = Vec::with_capacity(tags.len()); + for (key, value) in tags { + let value = value.as_str().ok_or_else(|| { + OrdoError::capability_invocation(capability, format!("tag '{}' must be a string", key)) + })?; + result.push((key.to_string(), value.to_string())); + } + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{body::Bytes, http::Method, routing::any, Json, Router}; + use serde_json::json; + use tokio::{net::TcpListener, sync::mpsc, time::Duration}; + + #[test] + fn metric_capability_accepts_gauge_requests() { + let registry = ServerCapabilityRegistry::new(); + registry.register_metric_sink(Arc::new(PrometheusMetricSink::new())); + + let payload = Value::object({ + let mut m = std::collections::HashMap::new(); + m.insert("name".to_string(), Value::string("score")); + m.insert("value".to_string(), Value::int(3)); + m + }); + + let response = registry + .invoke(&CapabilityRequest::new( + "metrics.prometheus", + "gauge", + payload, + )) + .unwrap(); + + assert_eq!(response.metadata.get("metric"), Some(&"score".to_string())); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn http_capability_posts_json() { + let listener = match TcpListener::bind("127.0.0.1:0").await { + Ok(listener) => listener, + Err(error) if error.kind() == std::io::ErrorKind::PermissionDenied => return, + Err(error) => panic!("failed to bind test listener: {}", error), + }; + let addr = listener.local_addr().unwrap(); + let (tx, mut rx) = mpsc::unbounded_channel(); + let app = Router::new().route( + "/hook", + any(move |method: Method, body: Bytes| { + let tx = tx.clone(); + async move { + let json_body = serde_json::from_slice::(&body) + .unwrap_or(serde_json::Value::Null); + let _ = tx.send((method.clone(), json_body.clone())); + + Json(json!({ + "method": method.as_str(), + "received": json_body, + "ok": true + })) + } + }), + ); + let server = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let capability_invoker = + build_server_capability_invoker(Arc::new(PrometheusMetricSink::new()), None); + let response = invoke_http_json( + Some(capability_invoker), + "post", + &format!("http://{}/hook", addr), + HashMap::new(), + &json!({"hello": "world"}), + Some(1_000), + ) + .unwrap() + .unwrap(); + + assert_eq!(http_response_status(&response), Some(200)); + let (method, received_body) = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("timed out waiting for test server request") + .expect("test server did not receive request"); + assert_eq!(method, Method::POST); + assert_eq!(received_body.get("hello"), Some(&json!("world"))); + assert_eq!( + response.payload.get_path("json_body.received.hello"), + Some(&Value::string("world")) + ); + assert_eq!( + response.payload.get_path("json_body.method"), + Some(&Value::string("POST")) + ); + + server.abort(); + } +} diff --git a/crates/ordo-server/src/grpc.rs b/crates/ordo-server/src/grpc.rs index 4a3c16e6..3dfc7d30 100644 --- a/crates/ordo-server/src/grpc.rs +++ b/crates/ordo-server/src/grpc.rs @@ -582,12 +582,17 @@ impl OrdoService for OrdoGrpcService { #[cfg(test)] mod tests { use super::*; + use crate::audit::AuditLogger; + use crate::capability_registry::build_server_executor; + use crate::metrics::PrometheusMetricSink; use crate::rate_limiter::RateLimiter; use crate::tenant::{TenantDefaults, TenantManager}; async fn create_test_service() -> OrdoGrpcService { let store = Arc::new(RwLock::new(RuleStore::new())); - let executor = Arc::new(RuleExecutor::new()); + let metric_sink = Arc::new(PrometheusMetricSink::new()); + let audit_logger = Arc::new(AuditLogger::new(None, 0)); + let executor = build_server_executor(metric_sink, Some(audit_logger)); let defaults = TenantDefaults { default_qps_limit: Some(1000), default_burst_limit: Some(100), diff --git a/crates/ordo-server/src/main.rs b/crates/ordo-server/src/main.rs index c1ecd3ac..78a9c440 100644 --- a/crates/ordo-server/src/main.rs +++ b/crates/ordo-server/src/main.rs @@ -49,6 +49,7 @@ use tracing::{info, warn}; mod api; mod audit; +mod capability_registry; mod config; pub mod debug; mod error; @@ -67,10 +68,11 @@ pub mod wal; pub mod webhook; use audit::AuditLogger; +use capability_registry::{build_rule_executor, build_server_capability_invoker, invoke_http_json}; use config::ServerConfig; use grpc::OrdoGrpcService; use metrics::PrometheusMetricSink; -use ordo_core::prelude::{RuleExecutor, TraceConfig}; +use ordo_core::prelude::RuleExecutor; use ordo_core::signature::ed25519::decode_public_key; use ordo_core::signature::RuleVerifier; use rate_limiter::RateLimiter; @@ -225,13 +227,29 @@ async fn main() -> anyhow::Result<()> { let metric_sink = Arc::new(PrometheusMetricSink::new()); info!("Initialized Prometheus metric sink for custom rule metrics"); - // Initialize shared executor (moved out of RuleStore for lock-free execution) - let executor = Arc::new(RuleExecutor::with_trace_and_metrics( - TraceConfig::minimal(), - metric_sink.clone(), + let signature_verifier = build_signature_verifier(&config)?; + + // Initialize audit logger + let audit_logger = Arc::new(AuditLogger::new( + config.audit_dir.clone(), + config.audit_sample_rate, )); - let signature_verifier = build_signature_verifier(&config)?; + // Log audit configuration + if config.audit_dir.is_some() { + info!( + "Audit logging enabled: dir={:?}, sample_rate={}%", + config.audit_dir, config.audit_sample_rate + ); + } else { + info!( + "Audit logging to stdout only, sample_rate={}%", + config.audit_sample_rate + ); + } + + let capability_invoker = + build_server_capability_invoker(metric_sink.clone(), Some(audit_logger.clone())); // Initialize shared store (with or without persistence) let store = if let Some(ref rules_dir) = config.rules_dir { @@ -244,10 +262,11 @@ async fn main() -> anyhow::Result<()> { "Initializing store with persistence at {:?} (max {} versions)", store_dir, config.max_versions ); - let mut store = RuleStore::new_with_persistence_and_metrics( + let mut store = RuleStore::new_with_persistence_and_metrics_and_capabilities( store_dir, config.max_versions, metric_sink.clone(), + Some(capability_invoker.clone()), ); if let Some(verifier) = signature_verifier.clone() { store.set_signature_verifier(verifier, config.signature_allow_unsigned_local); @@ -335,7 +354,10 @@ async fn main() -> anyhow::Result<()> { Arc::new(RwLock::new(store)) } else { info!("Initializing in-memory store (no persistence)"); - let mut store = RuleStore::new_with_metrics(metric_sink.clone()); + let mut store = RuleStore::new_with_metrics_and_capabilities( + metric_sink.clone(), + Some(capability_invoker.clone()), + ); if let Some(verifier) = signature_verifier.clone() { store.set_signature_verifier(verifier, config.signature_allow_unsigned_local); } @@ -357,25 +379,13 @@ async fn main() -> anyhow::Result<()> { ); } - // Initialize audit logger - let audit_logger = Arc::new(AuditLogger::new( - config.audit_dir.clone(), - config.audit_sample_rate, + // Initialize shared executor (moved out of RuleStore for lock-free execution) + let metric_sink_trait: Arc = metric_sink.clone(); + let executor = Arc::new(build_rule_executor( + metric_sink_trait, + Some(capability_invoker.clone()), )); - // Log audit configuration - if config.audit_dir.is_some() { - info!( - "Audit logging enabled: dir={:?}, sample_rate={}%", - config.audit_dir, config.audit_sample_rate - ); - } else { - info!( - "Audit logging to stdout only, sample_rate={}%", - config.audit_sample_rate - ); - } - // Initialize debug session manager let debug_sessions = Arc::new(debug::DebugSessionManager::new()); // Initialize tenant manager @@ -408,7 +418,10 @@ async fn main() -> anyhow::Result<()> { // Shutdown broadcast channel — signal handlers and servers share this. let (shutdown_tx, shutdown_rx) = watch::channel(false); - let webhook_manager = webhook::WebhookManager::new(shutdown_rx.clone()); + let webhook_manager = webhook::WebhookManager::new_with_capabilities( + shutdown_rx.clone(), + Some(capability_invoker.clone()), + ); // Log server started event { @@ -632,6 +645,7 @@ async fn main() -> anyhow::Result<()> { let http_version = version.clone(); let http_token = token.clone(); let http_reg_secret = reg_secret.clone(); + let http_capability_invoker = capability_invoker.clone(); let log_url = platform_url.clone(); let log_name = http_server_name.clone(); @@ -652,25 +666,49 @@ async fn main() -> anyhow::Result<()> { }); let hb_payload = serde_json::json!({ "server_id": http_server_id }); - let hb_req_builder = || { - let mut r = client.post(&hb_url).json(&hb_payload); - if let Some(ref secret) = http_reg_secret { - r = r.header("x-registration-secret", secret.as_str()); - } - r - }; - let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - let mut r = client.post(®_url).json(&payload); + let mut headers = std::collections::HashMap::new(); if let Some(ref secret) = http_reg_secret { - r = r.header("x-registration-secret", secret.as_str()); + headers.insert("x-registration-secret".to_string(), secret.to_string()); + } + match invoke_http_json( + Some(http_capability_invoker.clone()), + "post", + ®_url, + headers.clone(), + &payload, + Some(5_000), + ) { + Ok(Some(_)) => {} + Ok(None) | Err(_) => { + let mut r = client.post(®_url).json(&payload); + if let Some(ref secret) = http_reg_secret { + r = r.header("x-registration-secret", secret.as_str()); + } + let _ = r.send().await; + } } - let _ = r.send().await; interval.tick().await; - let _ = hb_req_builder().send().await; + match invoke_http_json( + Some(http_capability_invoker.clone()), + "post", + &hb_url, + headers.clone(), + &hb_payload, + Some(5_000), + ) { + Ok(Some(_)) => {} + Ok(None) | Err(_) => { + let mut r = client.post(&hb_url).json(&hb_payload); + if let Some(ref secret) = http_reg_secret { + r = r.header("x-registration-secret", secret.as_str()); + } + let _ = r.send().await; + } + } } }); diff --git a/crates/ordo-server/src/store.rs b/crates/ordo-server/src/store.rs index 4afe5ebb..f892ee25 100644 --- a/crates/ordo-server/src/store.rs +++ b/crates/ordo-server/src/store.rs @@ -9,7 +9,7 @@ use crate::sync::event::SyncEvent; use crate::sync::file_watcher::RecentWrites; use crate::wal::{WalManager, WalOpKind}; use once_cell::sync::Lazy; -use ordo_core::prelude::{MetricSink, RuleExecutor, RuleSet, TraceConfig}; +use ordo_core::prelude::{CapabilityInvoker, MetricSink, RuleExecutor, RuleSet, TraceConfig}; use ordo_core::signature::{strip_signature, RuleVerifier}; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -114,11 +114,27 @@ pub struct VersionListResponse { } impl RuleStore { + fn build_executor( + metric_sink: Option>, + capability_invoker: Option>, + ) -> RuleExecutor { + let mut executor = match metric_sink { + Some(metric_sink) => { + RuleExecutor::with_trace_and_metrics(TraceConfig::minimal(), metric_sink) + } + None => RuleExecutor::with_trace(TraceConfig::minimal()), + }; + if let Some(capability_invoker) = capability_invoker { + executor.set_capability_invoker(capability_invoker); + } + executor + } + /// Create a new in-memory store (no persistence) pub fn new() -> Self { Self { rulesets: HashMap::new(), - executor: RuleExecutor::with_trace(TraceConfig::minimal()), + executor: Self::build_executor(None, None), rules_dir: None, multi_tenancy_enabled: false, default_tenant: "default".to_string(), @@ -137,9 +153,16 @@ impl RuleStore { /// Create a new in-memory store with a custom metric sink pub fn new_with_metrics(metric_sink: Arc) -> Self { + Self::new_with_metrics_and_capabilities(metric_sink, None) + } + + pub fn new_with_metrics_and_capabilities( + metric_sink: Arc, + capability_invoker: Option>, + ) -> Self { Self { rulesets: HashMap::new(), - executor: RuleExecutor::with_trace_and_metrics(TraceConfig::minimal(), metric_sink), + executor: Self::build_executor(Some(metric_sink), capability_invoker), rules_dir: None, multi_tenancy_enabled: false, default_tenant: "default".to_string(), @@ -166,7 +189,7 @@ impl RuleStore { pub fn new_with_persistence_and_versions(rules_dir: PathBuf, max_versions: usize) -> Self { Self { rulesets: HashMap::new(), - executor: RuleExecutor::with_trace(TraceConfig::minimal()), + executor: Self::build_executor(None, None), rules_dir: Some(rules_dir), multi_tenancy_enabled: false, default_tenant: "default".to_string(), @@ -188,10 +211,24 @@ impl RuleStore { rules_dir: PathBuf, max_versions: usize, metric_sink: Arc, + ) -> Self { + Self::new_with_persistence_and_metrics_and_capabilities( + rules_dir, + max_versions, + metric_sink, + None, + ) + } + + pub fn new_with_persistence_and_metrics_and_capabilities( + rules_dir: PathBuf, + max_versions: usize, + metric_sink: Arc, + capability_invoker: Option>, ) -> Self { Self { rulesets: HashMap::new(), - executor: RuleExecutor::with_trace_and_metrics(TraceConfig::minimal(), metric_sink), + executor: Self::build_executor(Some(metric_sink), capability_invoker), rules_dir: Some(rules_dir), multi_tenancy_enabled: false, default_tenant: "default".to_string(), diff --git a/crates/ordo-server/src/uds.rs b/crates/ordo-server/src/uds.rs index 253d0ef2..925e5adf 100644 --- a/crates/ordo-server/src/uds.rs +++ b/crates/ordo-server/src/uds.rs @@ -84,6 +84,9 @@ pub fn cleanup_uds(uds_path: &Path) { #[cfg(test)] mod tests { use super::*; + use crate::audit::AuditLogger; + use crate::capability_registry::build_server_executor; + use crate::metrics::PrometheusMetricSink; use crate::tenant::{TenantDefaults, TenantManager}; use std::time::Duration; use tempfile::tempdir; @@ -94,7 +97,9 @@ mod tests { let socket_path = temp_dir.path().join("test.sock"); let store = Arc::new(tokio::sync::RwLock::new(RuleStore::new())); - let executor = Arc::new(RuleExecutor::new()); + let metric_sink = Arc::new(PrometheusMetricSink::new()); + let audit_logger = Arc::new(AuditLogger::new(None, 0)); + let executor = build_server_executor(metric_sink, Some(audit_logger)); let defaults = TenantDefaults { default_qps_limit: Some(1000), default_burst_limit: Some(100), @@ -126,11 +131,30 @@ mod tests { .await }); - // Give server time to start - tokio::time::sleep(Duration::from_millis(100)).await; + // Wait for the socket file to appear rather than assuming a fixed startup latency. + let mut started = false; + for _ in 0..20 { + if socket_path.exists() { + started = true; + break; + } + if server_handle.is_finished() { + let result = server_handle.await.expect("UDS task join failed"); + match result { + Err(error) + if error + .downcast_ref::() + .is_some_and(|e| e.kind() == std::io::ErrorKind::PermissionDenied) => + { + return; + } + other => panic!("UDS server exited before socket creation: {:?}", other), + } + } + tokio::time::sleep(Duration::from_millis(50)).await; + } - // Verify socket file exists - assert!(socket_path.exists()); + assert!(started, "UDS socket was not created in time"); // Abort server server_handle.abort(); diff --git a/crates/ordo-server/src/webhook.rs b/crates/ordo-server/src/webhook.rs index 3ae3c13d..7ec006e6 100644 --- a/crates/ordo-server/src/webhook.rs +++ b/crates/ordo-server/src/webhook.rs @@ -5,6 +5,7 @@ //! to avoid blocking the main request path. use chrono::{DateTime, Utc}; +use ordo_core::prelude::CapabilityInvoker; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -12,6 +13,8 @@ use std::time::Duration; use tokio::sync::{mpsc, RwLock}; use tracing::{debug, error, info, warn}; +use crate::capability_registry::{http_response_status, invoke_http_json}; + /// Events that can trigger webhooks. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -99,6 +102,13 @@ pub struct WebhookManager { impl WebhookManager { /// Create a new WebhookManager and spawn the background delivery task. pub fn new(shutdown_rx: tokio::sync::watch::Receiver) -> Arc { + Self::new_with_capabilities(shutdown_rx, None) + } + + pub fn new_with_capabilities( + shutdown_rx: tokio::sync::watch::Receiver, + capability_invoker: Option>, + ) -> Arc { let (tx, rx) = mpsc::channel::(1024); let client = reqwest::Client::builder() .timeout(Duration::from_secs(10)) @@ -113,7 +123,7 @@ impl WebhookManager { }); // Spawn background delivery task - tokio::spawn(delivery_loop(rx, client, shutdown_rx)); + tokio::spawn(delivery_loop(rx, client, capability_invoker, shutdown_rx)); manager } @@ -198,13 +208,14 @@ impl WebhookManager { async fn delivery_loop( mut rx: mpsc::Receiver, client: reqwest::Client, + capability_invoker: Option>, mut shutdown_rx: tokio::sync::watch::Receiver, ) { loop { tokio::select! { job = rx.recv() => { match job { - Some(job) => deliver_with_retry(&client, job).await, + Some(job) => deliver_with_retry(&client, capability_invoker.clone(), job).await, None => break, // Channel closed } } @@ -212,7 +223,7 @@ async fn delivery_loop( info!("Webhook delivery task shutting down"); // Drain remaining jobs while let Ok(job) = rx.try_recv() { - deliver_with_retry(&client, job).await; + deliver_with_retry(&client, capability_invoker.clone(), job).await; } break; } @@ -222,55 +233,117 @@ async fn delivery_loop( } /// Deliver a single webhook with exponential backoff retry. -async fn deliver_with_retry(client: &reqwest::Client, job: DeliveryJob) { +async fn deliver_with_retry( + client: &reqwest::Client, + capability_invoker: Option>, + job: DeliveryJob, +) { let webhook_id = &job.webhook.id; let max_retries = job.webhook.max_retries as u32; + let body_json = serde_json::to_value(&job.payload).unwrap_or(serde_json::Value::Null); + let mut headers = HashMap::new(); + headers.insert("Content-Type".to_string(), "application/json".to_string()); + headers.insert( + "User-Agent".to_string(), + format!("ordo-webhook/{}", ordo_core::VERSION), + ); + headers.insert( + "X-Ordo-Event".to_string(), + format!("{:?}", job.payload.event).to_lowercase(), + ); + headers.insert("X-Ordo-Webhook-ID".to_string(), webhook_id.clone()); for attempt in 0..=max_retries { - let mut request = client - .post(&job.webhook.url) - .header("Content-Type", "application/json") - .header("User-Agent", format!("ordo-webhook/{}", ordo_core::VERSION)) - .header( - "X-Ordo-Event", - format!("{:?}", job.payload.event).to_lowercase(), - ) - .header("X-Ordo-Webhook-ID", webhook_id.as_str()) - .header("X-Ordo-Delivery-Attempt", (attempt + 1).to_string()); + let mut attempt_headers = headers.clone(); + attempt_headers.insert( + "X-Ordo-Delivery-Attempt".to_string(), + (attempt + 1).to_string(), + ); // HMAC signature if secret is set if let Some(ref secret) = job.webhook.secret { if let Ok(body_bytes) = serde_json::to_vec(&job.payload) { let signature = hmac_sha256(secret.as_bytes(), &body_bytes); - request = request.header("X-Ordo-Signature", format!("sha256={}", signature)); + attempt_headers.insert( + "X-Ordo-Signature".to_string(), + format!("sha256={}", signature), + ); } } - match request.json(&job.payload).send().await { - Ok(resp) if resp.status().is_success() => { - debug!( - webhook_id = %webhook_id, - status = %resp.status(), - attempt = attempt + 1, - "Webhook delivered" - ); - return; + let capability_result = invoke_http_json( + capability_invoker.clone(), + "post", + &job.webhook.url, + attempt_headers.clone(), + &body_json, + Some(10_000), + ); + + let mut delivered = false; + match capability_result { + Ok(Some(response)) => { + if let Some(status) = http_response_status(&response) { + if (200..300).contains(&status) { + debug!( + webhook_id = %webhook_id, + status = status, + attempt = attempt + 1, + "Webhook delivered" + ); + return; + } + warn!( + webhook_id = %webhook_id, + status = status, + attempt = attempt + 1, + "Webhook delivery failed" + ); + delivered = true; + } } - Ok(resp) => { + Ok(None) => {} + Err(error) => { warn!( webhook_id = %webhook_id, - status = %resp.status(), + error = %error, attempt = attempt + 1, - "Webhook delivery failed" + "Webhook capability delivery error" ); } - Err(e) => { - warn!( - webhook_id = %webhook_id, - error = %e, - attempt = attempt + 1, - "Webhook delivery error" - ); + } + + if !delivered { + let mut request = client.post(&job.webhook.url); + for (name, value) in &attempt_headers { + request = request.header(name, value); + } + match request.json(&job.payload).send().await { + Ok(resp) if resp.status().is_success() => { + debug!( + webhook_id = %webhook_id, + status = %resp.status(), + attempt = attempt + 1, + "Webhook delivered" + ); + return; + } + Ok(resp) => { + warn!( + webhook_id = %webhook_id, + status = %resp.status(), + attempt = attempt + 1, + "Webhook delivery failed" + ); + } + Err(e) => { + warn!( + webhook_id = %webhook_id, + error = %e, + attempt = attempt + 1, + "Webhook delivery error" + ); + } } } @@ -310,6 +383,8 @@ fn generate_id() -> String { #[cfg(test)] mod tests { use super::*; + use crate::capability_registry::build_server_capability_invoker; + use crate::metrics::PrometheusMetricSink; #[test] fn test_generate_id() { @@ -353,7 +428,13 @@ mod tests { #[tokio::test] async fn test_webhook_manager_crud() { let (_tx, rx) = tokio::sync::watch::channel(false); - let manager = WebhookManager::new(rx); + let manager = WebhookManager::new_with_capabilities( + rx, + Some(build_server_capability_invoker( + Arc::new(PrometheusMetricSink::new()), + None, + )), + ); // Register let id = manager @@ -408,7 +489,13 @@ mod tests { #[tokio::test] async fn test_fire_filters_inactive_and_events() { let (_tx, rx) = tokio::sync::watch::channel(false); - let manager = WebhookManager::new(rx); + let manager = WebhookManager::new_with_capabilities( + rx, + Some(build_server_capability_invoker( + Arc::new(PrometheusMetricSink::new()), + None, + )), + ); // Register inactive hook manager diff --git a/examples/capability-demo/Cargo.toml b/examples/capability-demo/Cargo.toml new file mode 100644 index 00000000..01c9c8e3 --- /dev/null +++ b/examples/capability-demo/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "capability-demo" +version = "0.1.0" +edition = "2021" + +[dependencies] +ordo-core = { path = "../../crates/ordo-core", default-features = false } +serde_json = "1" diff --git a/examples/capability-demo/src/main.rs b/examples/capability-demo/src/main.rs new file mode 100644 index 00000000..6ad80771 --- /dev/null +++ b/examples/capability-demo/src/main.rs @@ -0,0 +1,51 @@ +use ordo_core::prelude::*; +use std::sync::Arc; + +struct EchoProvider; + +impl CapabilityProvider for EchoProvider { + fn descriptor(&self) -> CapabilityDescriptor { + CapabilityDescriptor::new("demo.echo", CapabilityCategory::Compute) + .with_description("Echo payloads back to the caller") + } + + fn invoke(&self, request: &CapabilityRequest) -> Result { + Ok(CapabilityResponse::new(request.payload.clone())) + } +} + +fn main() { + let registry = Arc::new(CapabilityRegistry::new()); + registry.register(Arc::new(EchoProvider)); + + let mut ruleset = RuleSet::new("capability_demo", "call_echo"); + ruleset.add_step(Step::action( + "call_echo", + "Call capability", + vec![Action { + kind: ActionKind::ExternalCall { + service: "demo.echo".to_string(), + method: "echo".to_string(), + params: vec![("amount".to_string(), Expr::field("amount"))], + result_variable: Some("result".to_string()), + timeout_ms: 250, + }, + description: "Capability call demo".to_string(), + }], + "done", + )); + ruleset.add_step(Step::terminal( + "done", + "Done", + TerminalResult::new("OK") + .with_output("echoed_amount", Expr::field("$result.payload.amount")), + )); + + let mut executor = RuleExecutor::new(); + executor.set_capability_invoker(registry); + + let input: Value = serde_json::from_str(r#"{"amount": 42}"#).unwrap(); + let result = executor.execute(&ruleset, input).unwrap(); + + println!("{:?}", result.output); +} diff --git a/ordo-editor/apps/docs/.vitepress/config.mts b/ordo-editor/apps/docs/.vitepress/config.mts index dd535f24..2c821ca4 100644 --- a/ordo-editor/apps/docs/.vitepress/config.mts +++ b/ordo-editor/apps/docs/.vitepress/config.mts @@ -85,6 +85,7 @@ export default withMermaid(defineConfig({ { text: 'Rule Persistence', link: '/en/guide/persistence' }, { text: 'Version Management', link: '/en/guide/versioning' }, { text: 'Audit Logging', link: '/en/guide/audit-logging' }, + { text: 'Capabilities & External Calls', link: '/en/guide/capabilities' }, { text: 'Rule Signing', link: '/en/guide/rule-signing' }, { text: 'Decision Table', link: '/en/guide/decision-table' }, { text: 'Editor Store & Undo/Redo', link: '/en/guide/editor-store' }, @@ -179,6 +180,7 @@ export default withMermaid(defineConfig({ { text: '版本管理', link: '/zh/guide/versioning' }, { text: '规则签名', link: '/zh/guide/rule-signing' }, { text: '审计日志', link: '/zh/guide/audit-logging' }, + { text: '能力与外部调用', link: '/zh/guide/capabilities' }, { text: '决策表', link: '/zh/guide/decision-table' }, { text: '编辑器状态管理', link: '/zh/guide/editor-store' }, { text: '分布式部署', link: '/zh/guide/distributed-deployment' }, diff --git a/ordo-editor/apps/docs/en/guide/capabilities.md b/ordo-editor/apps/docs/en/guide/capabilities.md new file mode 100644 index 00000000..0670ee31 --- /dev/null +++ b/ordo-editor/apps/docs/en/guide/capabilities.md @@ -0,0 +1,161 @@ +# Capabilities and External Calls + +Ordo uses a capability boundary for outbound side effects and runtime integrations. This keeps rule execution deterministic inside the engine while still allowing rules and server components to call metrics, audit sinks, HTTP endpoints, and other providers through a stable interface. + +## What a capability is + +A capability provider exposes a named runtime service plus one or more operations. + +- The provider name is the capability name, such as `metrics.prometheus`, `audit.logger`, or `network.http`. +- The operation is the method invoked on that capability, such as `gauge`, `rule_executed`, or `post`. +- The payload is a typed object that the provider receives and returns. + +At runtime, `ExternalCall` actions are translated into a capability request: + +```json +{ + "action": "external_call", + "service": "demo.echo", + "method": "echo", + "params": [["amount", { "Field": "amount" }]], + "result_variable": "echo_result", + "timeout_ms": 250 +} +``` + +If `result_variable` is set, Ordo stores the capability response under that variable: + +- `$echo_result.capability` +- `$echo_result.operation` +- `$echo_result.payload` +- `$echo_result.metadata` + +## Built-in server capabilities + +The server currently registers these capability providers by default: + +| Capability | Category | Typical operations | Purpose | +| -------------------- | --------- | ---------------------------------------------------------- | ----------------------------------------------- | +| `metrics.prometheus` | `action` | `gauge`, `counter` | Record rule metrics through the Prometheus sink | +| `audit.logger` | `action` | `rule_executed` | Emit structured execution audit events | +| `network.http` | `network` | `get`, `post`, `put`, `patch`, `delete`, `head`, `options` | Send outbound HTTP requests | + +## Studio `externalCalls` mapping + +Studio action steps can define `externalCalls`. The editor adapter now converts them into engine `external_call` actions using these rules. + +### HTTP calls + +Use `type: "http"` when the target is an outbound HTTP endpoint. + +```ts +{ + type: 'http', + target: 'PATCH https://api.example.com/score', + params: { + applicantId: Expr.variable('$.applicant.id'), + score: Expr.number(720), + }, + resultVariable: 'http_result', + timeout: 1500, +} +``` + +This becomes: + +```json +{ + "action": "external_call", + "service": "network.http", + "method": "patch", + "params": [ + ["url", { "Literal": "https://api.example.com/score" }], + [ + "json_body", + { + "Object": [ + ["applicantId", { "Field": "applicant.id" }], + ["score", { "Literal": 720 }] + ] + } + ] + ], + "result_variable": "http_result", + "timeout_ms": 1500 +} +``` + +Rules: + +- If `target` starts with `METHOD URL`, that HTTP method is used. +- If no method prefix is provided, Ordo defaults to `POST`. +- `params` are sent as `json_body`. +- `target` becomes the `url` payload field for `network.http`. + +### Function and gRPC calls + +For `type: "function"` and `type: "grpc"`, the editor treats `target` as a capability reference. + +Supported target forms: + +- `demo.echo` +- `demo.echo#echo` +- `demo.echo::echo` + +Rules: + +- `service` is the capability name. +- `method` is parsed from `#` or `::` when present. +- If no method is supplied, Ordo defaults to `invoke` for `function` and `call` for `grpc`. +- `params` are passed through as the capability payload object. + +Example: + +```ts +{ + type: 'function', + target: 'demo.echo#echo', + params: { + payload: Expr.object({ + amount: Expr.variable('$.amount'), + approved: Expr.boolean(true), + }), + }, + resultVariable: 'echo_result', +} +``` + +## Expression support in capability payloads + +Capability payload values can use the same expression model as normal rule actions. The editor adapter now serializes: + +- literals +- field references +- arrays +- objects +- binary and unary expressions +- conditional expressions +- function calls +- simple member paths like `$.user.profile.id` + +## Current limitations + +Studio already models some fields that the engine does not execute yet: + +- `retry` +- `onError` +- `fallbackValue` + +These fields remain editor-level metadata today. They are preserved in the Studio model, but they are not translated into runtime behavior by the engine adapter yet. + +That means: + +- retries are not applied automatically by `ExternalCall` +- fallback values are not injected automatically on failure +- error handling modes are not yet mapped to engine semantics + +If you need those behaviors today, implement them inside the capability provider itself or keep the rule logic explicit in separate steps. + +## Example provider + +The repository includes a minimal provider example in [`examples/capability-demo`](https://github.com/Pama-Lee/Ordo/tree/main/examples/capability-demo). It registers a `demo.echo` provider, invokes it from an `ExternalCall`, and reads the result through `$result.payload`. diff --git a/ordo-editor/apps/docs/en/guide/what-is-ordo.md b/ordo-editor/apps/docs/en/guide/what-is-ordo.md index 60584c72..4c577948 100644 --- a/ordo-editor/apps/docs/en/guide/what-is-ordo.md +++ b/ordo-editor/apps/docs/en/guide/what-is-ordo.md @@ -14,13 +14,13 @@ Most teams start with a rule engine. Then they realize the hard part isn't execu Ordo addresses the full lifecycle: -| Stage | What Ordo provides | -|-------|--------------------| -| **Author** | Studio flow editor, decision tables, template library | -| **Test** | Per-ruleset test cases, run in CI, export to YAML | -| **Govern** | Fact catalog, typed contracts, version history, audit log | -| **Execute** | Fast engine, hot reload, multi-tenancy | -| **Observe** | Execution traces, Prometheus metrics, structured logs | +| Stage | What Ordo provides | +| ----------- | --------------------------------------------------------- | +| **Author** | Studio flow editor, decision tables, template library | +| **Test** | Per-ruleset test cases, run in CI, export to YAML | +| **Govern** | Fact catalog, typed contracts, version history, audit log | +| **Execute** | Fast engine, hot reload, multi-tenancy | +| **Observe** | Execution traces, Prometheus metrics, structured logs | ## Architecture diff --git a/ordo-editor/apps/docs/en/index.md b/ordo-editor/apps/docs/en/index.md index ed9c1c14..a8446214 100644 --- a/ordo-editor/apps/docs/en/index.md +++ b/ordo-editor/apps/docs/en/index.md @@ -70,4 +70,3 @@ features: } } ``` - diff --git a/ordo-editor/apps/docs/en/roadmap.md b/ordo-editor/apps/docs/en/roadmap.md index fcbc961a..9d2a6d91 100644 --- a/ordo-editor/apps/docs/en/roadmap.md +++ b/ordo-editor/apps/docs/en/roadmap.md @@ -11,18 +11,18 @@ outline: [2, 3] Ordo already ships a production-grade core: -| Module | Capabilities | -|--------|-------------| -| **Engine** | Sub-microsecond rule execution, bytecode VM + Cranelift JIT, expression optimizer | -| **Transports** | HTTP REST, gRPC (with TLS/mTLS), Unix Domain Socket | -| **Visual Editor** | Three editing modes (Form / Flow Graph / JSON), decision tables, execution & performance panels | -| **CLI** | `ordo eval`, `ordo exec`, `ordo test` | -| **WASM** | Run the engine in browsers | -| **SDKs** | Go, Java, Python | -| **Studio** | Org/project/member management, fact catalog, concept registry, decision contracts, version history | -| **Multi-tenancy** | Per-tenant QPS limits, burst control, timeouts | -| **Observability** | Prometheus metrics, OTLP tracing, JSON Lines audit log, WAL crash-safe persistence | -| **i18n** | English, Simplified Chinese, Traditional Chinese | +| Module | Capabilities | +| ----------------- | -------------------------------------------------------------------------------------------------- | +| **Engine** | Sub-microsecond rule execution, bytecode VM + Cranelift JIT, expression optimizer | +| **Transports** | HTTP REST, gRPC (with TLS/mTLS), Unix Domain Socket | +| **Visual Editor** | Three editing modes (Form / Flow Graph / JSON), decision tables, execution & performance panels | +| **CLI** | `ordo eval`, `ordo exec`, `ordo test` | +| **WASM** | Run the engine in browsers | +| **SDKs** | Go, Java, Python | +| **Studio** | Org/project/member management, fact catalog, concept registry, decision contracts, version history | +| **Multi-tenancy** | Per-tenant QPS limits, burst control, timeouts | +| **Observability** | Prometheus metrics, OTLP tracing, JSON Lines audit log, WAL crash-safe persistence | +| **i18n** | English, Simplified Chinese, Traditional Chinese | --- @@ -34,12 +34,12 @@ Ordo already ships a production-grade core: Pre-built industry templates — each includes a complete RuleSet, pre-defined Facts & Concepts, sample input data, and a side-by-side "from if/else to Ordo" migration guide. -| Template | Scenario | Showcases | -|----------|----------|-----------| -| E-commerce Pricing | Discount tiers + VIP levels + time windows | Decision tables, hit policies | -| Loan Approval | Multi-condition branches + scorecard | Decision graph, multi-step flow | -| API Routing | Weighted routing + region + fallback | Action nodes, score aggregation | -| Permission Check | RBAC + attribute conditions | Policy layer, DENY_OVERRIDES | +| Template | Scenario | Showcases | +| ------------------ | ------------------------------------------ | ------------------------------- | +| E-commerce Pricing | Discount tiers + VIP levels + time windows | Decision tables, hit policies | +| Loan Approval | Multi-condition branches + scorecard | Decision graph, multi-step flow | +| API Routing | Weighted routing + region + fallback | Action nodes, score aggregation | +| Permission Check | RBAC + attribute conditions | Policy layer, DENY_OVERRIDES | ### Guided Onboarding @@ -124,12 +124,12 @@ Search, filter, and visualize execution traces: Configurable alerts with webhook notification: -| Condition | Example | -|-----------|---------| -| Error rate spike | Expression evaluation failures > 1% | -| Latency anomaly | P99 > threshold for 5 minutes | -| Traffic drop | QPS suddenly falls (upstream issue?) | -| Result shift | Reject rate jumps from 10% to 40% | +| Condition | Example | +| ---------------- | ------------------------------------ | +| Error rate spike | Expression evaluation failures > 1% | +| Latency anomaly | P99 > threshold for 5 minutes | +| Traffic drop | QPS suddenly falls (upstream issue?) | +| Result shift | Reject rate jumps from 10% to 40% | --- @@ -204,17 +204,17 @@ Interactive organization-wide graph: ### What Cloud Adds -| Capability | Self-hosted (OSS) | Ordo Cloud | -|-----------|-------------------|------------| -| Rule editing & publishing | :white_check_mark: | :white_check_mark: | -| Self-managed Engine | :white_check_mark: | :white_check_mark: | -| **Hosted Engine** (shared or dedicated) | — | :white_check_mark: | -| **Bring your own Engine** (register to Cloud) | — | :white_check_mark: | -| **Real-time collaborative editing** | — | :white_check_mark: | -| **SSO / SAML** | — | :white_check_mark: | -| **Long-term metrics & custom dashboards** | — | :white_check_mark: | -| **Compliance report export** | — | :white_check_mark: | -| **SLA guarantee + priority support** | — | :white_check_mark: | +| Capability | Self-hosted (OSS) | Ordo Cloud | +| --------------------------------------------- | ------------------ | ------------------ | +| Rule editing & publishing | :white_check_mark: | :white_check_mark: | +| Self-managed Engine | :white_check_mark: | :white_check_mark: | +| **Hosted Engine** (shared or dedicated) | — | :white_check_mark: | +| **Bring your own Engine** (register to Cloud) | — | :white_check_mark: | +| **Real-time collaborative editing** | — | :white_check_mark: | +| **SSO / SAML** | — | :white_check_mark: | +| **Long-term metrics & custom dashboards** | — | :white_check_mark: | +| **Compliance report export** | — | :white_check_mark: | +| **SLA guarantee + priority support** | — | :white_check_mark: | --- diff --git a/ordo-editor/apps/docs/zh/guide/capabilities.md b/ordo-editor/apps/docs/zh/guide/capabilities.md new file mode 100644 index 00000000..0aa682d0 --- /dev/null +++ b/ordo-editor/apps/docs/zh/guide/capabilities.md @@ -0,0 +1,161 @@ +# 能力与外部调用 + +Ordo 用 capability 边界来承接外部副作用和运行时集成。这样规则引擎内部仍然保持确定性执行,而指标、审计、HTTP 外调等运行时行为则通过统一接口接入。 + +## 什么是 capability + +一个 capability provider 暴露一个具名运行时服务,以及该服务上的一个或多个操作。 + +- provider 名称就是 capability 名,例如 `metrics.prometheus`、`audit.logger`、`network.http` +- operation 是 capability 上调用的方法,例如 `gauge`、`rule_executed`、`post` +- payload 是传给 provider 的对象数据,同时也是返回结果的主要承载体 + +运行时里,`ExternalCall` action 会被翻译成 capability 请求: + +```json +{ + "action": "external_call", + "service": "demo.echo", + "method": "echo", + "params": [["amount", { "Field": "amount" }]], + "result_variable": "echo_result", + "timeout_ms": 250 +} +``` + +如果设置了 `result_variable`,Ordo 会把响应存到这个变量下面: + +- `$echo_result.capability` +- `$echo_result.operation` +- `$echo_result.payload` +- `$echo_result.metadata` + +## Server 内置 capability + +当前 server 默认会注册这些 provider: + +| Capability | 分类 | 常见 operation | 用途 | +| -------------------- | --------- | ---------------------------------------------------------- | --------------------------------- | +| `metrics.prometheus` | `action` | `gauge`、`counter` | 通过 Prometheus sink 记录规则指标 | +| `audit.logger` | `action` | `rule_executed` | 发出结构化执行审计事件 | +| `network.http` | `network` | `get`、`post`、`put`、`patch`、`delete`、`head`、`options` | 发送出站 HTTP 请求 | + +## Studio `externalCalls` 如何映射 + +Studio 的 action step 可以定义 `externalCalls`。现在 editor adapter 会按下面这套规则把它转成 engine 的 `external_call`。 + +### HTTP 调用 + +当目标是 HTTP 端点时,使用 `type: "http"`。 + +```ts +{ + type: 'http', + target: 'PATCH https://api.example.com/score', + params: { + applicantId: Expr.variable('$.applicant.id'), + score: Expr.number(720), + }, + resultVariable: 'http_result', + timeout: 1500, +} +``` + +会被转成: + +```json +{ + "action": "external_call", + "service": "network.http", + "method": "patch", + "params": [ + ["url", { "Literal": "https://api.example.com/score" }], + [ + "json_body", + { + "Object": [ + ["applicantId", { "Field": "applicant.id" }], + ["score", { "Literal": 720 }] + ] + } + ] + ], + "result_variable": "http_result", + "timeout_ms": 1500 +} +``` + +规则如下: + +- 如果 `target` 以 `METHOD + 空格 + URL` 开头,就使用这个 HTTP method +- 如果没有 method 前缀,默认使用 `POST` +- `params` 会被打包成 `json_body` +- `target` 会变成 `network.http` payload 里的 `url` + +### Function 与 gRPC 调用 + +对于 `type: "function"` 和 `type: "grpc"`,editor 会把 `target` 当成 capability 引用。 + +支持的 target 形式: + +- `demo.echo` +- `demo.echo#echo` +- `demo.echo::echo` + +规则如下: + +- `service` 是 capability 名称 +- 如果 target 里带了 `#` 或 `::`,就把后半段解析成 `method` +- 如果没有显式 method,`function` 默认用 `invoke`,`grpc` 默认用 `call` +- `params` 会原样变成 capability payload + +示例: + +```ts +{ + type: 'function', + target: 'demo.echo#echo', + params: { + payload: Expr.object({ + amount: Expr.variable('$.amount'), + approved: Expr.boolean(true), + }), + }, + resultVariable: 'echo_result', +} +``` + +## capability payload 里支持的表达式 + +现在 editor adapter 可以把下列表达式序列化进 capability payload: + +- 字面量 +- 字段引用 +- 数组 +- 对象 +- 二元与一元表达式 +- 条件表达式 +- 函数调用 +- 类似 `$.user.profile.id` 这样的简单 member path + +## 当前限制 + +Studio 模型里已经有一些字段,但引擎暂时还没有执行语义: + +- `retry` +- `onError` +- `fallbackValue` + +这些字段目前仍然只停留在 editor 模型层,还不会被 adapter 翻译成真正的 runtime 行为。 + +这意味着: + +- `ExternalCall` 不会自动重试 +- capability 调用失败时不会自动写入 fallback 值 +- `onError` 还没有映射成引擎语义 + +如果你现在就需要这些行为,应该把它们实现在 capability provider 内部,或者在规则里拆成显式步骤处理。 + +## 示例 provider + +仓库里有一个最小示例 [`examples/capability-demo`](https://github.com/Pama-Lee/Ordo/tree/main/examples/capability-demo)。它注册了 `demo.echo` provider,通过 `ExternalCall` 调用它,并用 `$result.payload` 读取返回值。 diff --git a/ordo-editor/apps/docs/zh/guide/what-is-ordo.md b/ordo-editor/apps/docs/zh/guide/what-is-ordo.md index 8a4cbc74..d3fc0599 100644 --- a/ordo-editor/apps/docs/zh/guide/what-is-ordo.md +++ b/ordo-editor/apps/docs/zh/guide/what-is-ordo.md @@ -14,13 +14,13 @@ Ordo 覆盖规则的完整生命周期: -| 阶段 | Ordo 提供的能力 | -|------|----------------| -| **编写** | Studio 流程编辑器、决策表、模板库 | -| **测试** | 规则集级别测试用例、CI 集成、导出 YAML | +| 阶段 | Ordo 提供的能力 | +| -------- | ------------------------------------------ | +| **编写** | Studio 流程编辑器、决策表、模板库 | +| **测试** | 规则集级别测试用例、CI 集成、导出 YAML | | **治理** | 事实目录、带类型的契约、版本历史、审计日志 | -| **执行** | 高性能引擎、热重载、多租户 | -| **观测** | 执行追踪、Prometheus 指标、结构化日志 | +| **执行** | 高性能引擎、热重载、多租户 | +| **观测** | 执行追踪、Prometheus 指标、结构化日志 | ## 架构 diff --git a/ordo-editor/apps/docs/zh/index.md b/ordo-editor/apps/docs/zh/index.md index d853ba53..cc5f3fc7 100644 --- a/ordo-editor/apps/docs/zh/index.md +++ b/ordo-editor/apps/docs/zh/index.md @@ -70,4 +70,3 @@ features: } } ``` - diff --git a/ordo-editor/apps/docs/zh/roadmap.md b/ordo-editor/apps/docs/zh/roadmap.md index 1e01476e..d26fb7b7 100644 --- a/ordo-editor/apps/docs/zh/roadmap.md +++ b/ordo-editor/apps/docs/zh/roadmap.md @@ -11,18 +11,18 @@ outline: [2, 3] Ordo 已具备生产级的核心能力: -| 模块 | 能力 | -|------|------| -| **执行引擎** | 亚微秒级规则执行、字节码 VM + Cranelift JIT、表达式优化器 | -| **传输协议** | HTTP REST、gRPC(支持 TLS/mTLS)、Unix Domain Socket | -| **可视化编辑器** | 三种编辑模式(表单 / 流程图 / JSON)、决策表、执行与性能面板 | -| **CLI** | `ordo eval`、`ordo exec`、`ordo test` | -| **WASM** | 在浏览器中运行引擎 | -| **SDK** | Go、Java、Python | -| **Studio** | 组织/项目/成员管理、事实目录、概念注册、决策契约、版本历史 | -| **多租户** | 租户级 QPS 限流、突发控制、超时管理 | -| **可观测性** | Prometheus 指标、OTLP 链路追踪、JSON Lines 审计日志、WAL 崩溃安全持久化 | -| **国际化** | 英文、简体中文、繁体中文 | +| 模块 | 能力 | +| ---------------- | ----------------------------------------------------------------------- | +| **执行引擎** | 亚微秒级规则执行、字节码 VM + Cranelift JIT、表达式优化器 | +| **传输协议** | HTTP REST、gRPC(支持 TLS/mTLS)、Unix Domain Socket | +| **可视化编辑器** | 三种编辑模式(表单 / 流程图 / JSON)、决策表、执行与性能面板 | +| **CLI** | `ordo eval`、`ordo exec`、`ordo test` | +| **WASM** | 在浏览器中运行引擎 | +| **SDK** | Go、Java、Python | +| **Studio** | 组织/项目/成员管理、事实目录、概念注册、决策契约、版本历史 | +| **多租户** | 租户级 QPS 限流、突发控制、超时管理 | +| **可观测性** | Prometheus 指标、OTLP 链路追踪、JSON Lines 审计日志、WAL 崩溃安全持久化 | +| **国际化** | 英文、简体中文、繁体中文 | --- @@ -34,12 +34,12 @@ Ordo 已具备生产级的核心能力: 预置行业模板——每个模板包含完整的 RuleSet、预定义的 Facts 和 Concepts、示例输入数据,以及"从 if/else 到 Ordo"的迁移对比说明。 -| 模板 | 场景 | 展示能力 | -|------|------|----------| -| 电商优惠券发放 | 满减 + VIP 等级 + 时间窗口 | 决策表、命中策略 | -| 贷款审批 | 多条件分支 + 评分卡 | 决策图、多步骤流程 | -| API 路由选择 | 权重路由 + 地域 + 降级 | Action 节点、评分聚合 | -| 权限判定 | RBAC + 属性条件 | 策略层、DENY_OVERRIDES | +| 模板 | 场景 | 展示能力 | +| -------------- | -------------------------- | ---------------------- | +| 电商优惠券发放 | 满减 + VIP 等级 + 时间窗口 | 决策表、命中策略 | +| 贷款审批 | 多条件分支 + 评分卡 | 决策图、多步骤流程 | +| API 路由选择 | 权重路由 + 地域 + 降级 | Action 节点、评分聚合 | +| 权限判定 | RBAC + 属性条件 | 策略层、DENY_OVERRIDES | ### 引导式 Onboarding @@ -124,12 +124,12 @@ Ordo 已具备生产级的核心能力: 可配置的告警 + Webhook 通知: -| 条件 | 示例 | -|------|------| -| 错误率飙升 | 表达式执行失败率 > 1% | -| 延迟异常 | P99 连续 5 分钟超过阈值 | -| 流量骤降 | QPS 突然下降(可能上游出问题) | -| 结果偏移 | 拒绝率从 10% 跳到 40% | +| 条件 | 示例 | +| ---------- | ------------------------------ | +| 错误率飙升 | 表达式执行失败率 > 1% | +| 延迟异常 | P99 连续 5 分钟超过阈值 | +| 流量骤降 | QPS 突然下降(可能上游出问题) | +| 结果偏移 | 拒绝率从 10% 跳到 40% | --- @@ -204,17 +204,17 @@ Ordo 已具备生产级的核心能力: ### Cloud 增值功能 -| 能力 | 自部署(开源) | Ordo Cloud | -|------|--------------|------------| -| 规则编辑与发布 | :white_check_mark: | :white_check_mark: | -| 自管理 Engine | :white_check_mark: | :white_check_mark: | -| **托管 Engine**(共享或独占) | — | :white_check_mark: | -| **接入自有 Engine**(注册到 Cloud) | — | :white_check_mark: | -| **实时协同编辑** | — | :white_check_mark: | -| **SSO / SAML** | — | :white_check_mark: | -| **长期指标存储 + 自定义 Dashboard** | — | :white_check_mark: | -| **合规报告导出** | — | :white_check_mark: | -| **SLA 保证 + 优先支持** | — | :white_check_mark: | +| 能力 | 自部署(开源) | Ordo Cloud | +| ----------------------------------- | ------------------ | ------------------ | +| 规则编辑与发布 | :white_check_mark: | :white_check_mark: | +| 自管理 Engine | :white_check_mark: | :white_check_mark: | +| **托管 Engine**(共享或独占) | — | :white_check_mark: | +| **接入自有 Engine**(注册到 Cloud) | — | :white_check_mark: | +| **实时协同编辑** | — | :white_check_mark: | +| **SSO / SAML** | — | :white_check_mark: | +| **长期指标存储 + 自定义 Dashboard** | — | :white_check_mark: | +| **合规报告导出** | — | :white_check_mark: | +| **SLA 保证 + 优先支持** | — | :white_check_mark: | --- diff --git a/ordo-editor/apps/studio/src/App.vue b/ordo-editor/apps/studio/src/App.vue index 6651274c..d89970a1 100644 --- a/ordo-editor/apps/studio/src/App.vue +++ b/ordo-editor/apps/studio/src/App.vue @@ -1,30 +1,32 @@