Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/ordo-server",
"crates/ordo-platform",
"crates/ordo-wasm",
"examples/capability-demo",
]

[workspace.package]
Expand Down
72 changes: 72 additions & 0 deletions crates/ordo-core/src/capability/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
mod provider;
mod registry;

pub use provider::{
CapabilityCategory, CapabilityConfig, CapabilityDescriptor, CapabilityInvoker,
CapabilityProvider, CapabilityRequest, CapabilityResponse, CircuitBreakerConfig, RetryPolicy,
};
pub use registry::CapabilityRegistry;

#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::{Action, ActionKind, Expr, RuleExecutor, RuleSet, Step, TerminalResult};
use crate::{context::Value, error::Result};
use std::sync::Arc;

struct EchoProvider;

impl CapabilityProvider for EchoProvider {
fn descriptor(&self) -> CapabilityDescriptor {
CapabilityDescriptor::new("demo.echo", CapabilityCategory::Compute)
.with_description("Echo payloads back to the caller")
}

fn invoke(&self, request: &CapabilityRequest) -> Result<CapabilityResponse> {
Ok(CapabilityResponse::new(request.payload.clone()))
}
}

#[test]
fn executor_external_call_routes_through_capability_registry() {
let registry = Arc::new(CapabilityRegistry::new());
registry.register(Arc::new(EchoProvider));

let mut ruleset = RuleSet::new("capability_demo", "call_echo");
ruleset.add_step(Step::action(
"call_echo",
"Call echo",
vec![Action {
kind: ActionKind::ExternalCall {
service: "demo.echo".to_string(),
method: "echo".to_string(),
params: vec![("amount".to_string(), Expr::field("amount"))],
timeout_ms: 250,
result_variable: Some("capability_result".to_string()),
},
description: String::new(),
}],
"done",
));
ruleset.add_step(Step::terminal(
"done",
"Done",
TerminalResult::new("OK").with_output(
"echoed_amount",
Expr::field("$capability_result.payload.amount"),
),
));

let mut executor = RuleExecutor::new();
executor.set_capability_invoker(registry);

let input: Value = serde_json::from_str(r#"{"amount": 42}"#).unwrap();
let result = executor.execute(&ruleset, input).unwrap();

let amount = result
.output
.get_path("echoed_amount")
.expect("echoed amount missing");
assert_eq!(amount, &Value::int(42));
}
}
231 changes: 231 additions & 0 deletions crates/ordo-core/src/capability/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use crate::context::Value;
use crate::error::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// High-level execution tier for a capability.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CapabilityCategory {
Network,
Compute,
Action,
}

/// Retry policy for a capability.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub backoff_ms: u64,
}

impl RetryPolicy {
#[inline]
pub fn disabled() -> Self {
Self {
max_attempts: 1,
backoff_ms: 0,
}
}
}

impl Default for RetryPolicy {
fn default() -> Self {
Self::disabled()
}
}

/// Consecutive-failure breaker for a capability.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub reset_timeout_ms: u64,
}

impl CircuitBreakerConfig {
#[inline]
pub fn disabled() -> Self {
Self {
failure_threshold: 0,
reset_timeout_ms: 0,
}
}

#[inline]
pub fn enabled(&self) -> bool {
self.failure_threshold > 0
}
}

impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self::disabled()
}
}

/// Runtime policy for a registered capability.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CapabilityConfig {
pub category: CapabilityCategory,
pub timeout_ms: Option<u64>,
pub retry: RetryPolicy,
pub circuit_breaker: CircuitBreakerConfig,
}

impl CapabilityConfig {
#[inline]
pub fn new(category: CapabilityCategory) -> Self {
Self {
category,
timeout_ms: None,
retry: RetryPolicy::default(),
circuit_breaker: CircuitBreakerConfig::default(),
}
}

#[inline]
pub fn timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}

#[inline]
pub fn retry(mut self, retry: RetryPolicy) -> Self {
self.retry = retry;
self
}

#[inline]
pub fn circuit_breaker(mut self, circuit_breaker: CircuitBreakerConfig) -> Self {
self.circuit_breaker = circuit_breaker;
self
}
}

/// Metadata for a registered capability.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CapabilityDescriptor {
pub name: String,
pub description: String,
pub config: CapabilityConfig,
}

impl CapabilityDescriptor {
#[inline]
pub fn new(name: impl Into<String>, category: CapabilityCategory) -> Self {
Self {
name: name.into(),
description: String::new(),
config: CapabilityConfig::new(category),
}
}

#[inline]
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = description.into();
self
}

#[inline]
pub fn with_config(mut self, config: CapabilityConfig) -> Self {
self.config = config;
self
}
}

/// Normalized request shape passed to capability providers.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CapabilityRequest {
pub capability: String,
pub operation: String,
#[serde(default)]
pub payload: Value,
#[serde(default)]
pub metadata: HashMap<String, String>,
#[serde(default)]
pub timeout_ms: Option<u64>,
#[serde(default)]
pub category: Option<CapabilityCategory>,
}

impl CapabilityRequest {
#[inline]
pub fn new(
capability: impl Into<String>,
operation: impl Into<String>,
payload: Value,
) -> Self {
Self {
capability: capability.into(),
operation: operation.into(),
payload,
metadata: HashMap::new(),
timeout_ms: None,
category: None,
}
}

#[inline]
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}

#[inline]
pub fn with_category(mut self, category: CapabilityCategory) -> Self {
self.category = Some(category);
self
}

#[inline]
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}

/// Normalized response shape returned by capability providers.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CapabilityResponse {
#[serde(default)]
pub payload: Value,
#[serde(default)]
pub metadata: HashMap<String, String>,
}

impl CapabilityResponse {
#[inline]
pub fn new(payload: Value) -> Self {
Self {
payload,
metadata: HashMap::new(),
}
}

#[inline]
pub fn empty() -> Self {
Self::new(Value::Null)
}

#[inline]
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}

/// Provider implementation for a named capability.
pub trait CapabilityProvider: Send + Sync {
fn descriptor(&self) -> CapabilityDescriptor;
fn invoke(&self, request: &CapabilityRequest) -> Result<CapabilityResponse>;
}

/// Trait used by the rule executor to call a capability registry.
pub trait CapabilityInvoker: Send + Sync {
fn invoke(&self, request: &CapabilityRequest) -> Result<CapabilityResponse>;

fn describe(&self, capability: &str) -> Option<CapabilityDescriptor> {
let _ = capability;
None
}
}
Loading
Loading