diff --git a/src/openhuman/skills/bus.rs b/src/openhuman/skills/bus.rs index 07fb965e28..5718ff0885 100644 --- a/src/openhuman/skills/bus.rs +++ b/src/openhuman/skills/bus.rs @@ -1,16 +1,399 @@ -//! Legacy no-op event bus hooks retained while call-sites are cleaned up. +//! Event bus subscriber for event-triggered skills. +//! +//! Skills that declare a `triggers:` list in their `SKILL.md` frontmatter are +//! indexed at startup by [`TriggeredSkillIndex`]. A [`TriggeredSkillSubscriber`] +//! is then registered on the global event bus; when a matching [`DomainEvent`] +//! arrives it logs which skill(s) should be activated. +//! +//! The actual agent-session launch for triggered skills is intentionally out of +//! scope here — it requires the full harness context (provider, memory, config) +//! that is wired up by the channel runtime after bus initialization. This module +//! provides the **type plumbing and observer** so the integration layer can hook +//! in without touching the bus machinery. +use crate::core::event_bus::{subscribe_global, DomainEvent, EventHandler, SubscriptionHandle}; +use crate::openhuman::skills::Skill; +use async_trait::async_trait; +use std::sync::Arc; + +// ── Trigger pattern ─────────────────────────────────────────────────────────── + +/// A parsed trigger pattern from a skill's `triggers:` frontmatter list. +/// +/// Patterns take the form `"domain"` or `"domain/event_slug"`. A bare domain +/// (no `/`) matches **any** event in that domain; with a slug only events whose +/// discriminant name (lower-kebab-cased) equals the slug are matched. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TriggerPattern { + /// The event domain, e.g. `"composio"`, `"cron"`, `"channel"`. + pub domain: String, + /// Optional event slug; `None` means match the entire domain. + pub event_slug: Option, +} + +impl TriggerPattern { + /// Parse a raw trigger string like `"composio/trigger_received"` or `"cron"`. + pub fn parse(raw: &str) -> Option { + let raw = raw.trim(); + if raw.is_empty() { + return None; + } + match raw.split_once('/') { + Some((domain, slug)) => { + let domain = domain.trim().to_ascii_lowercase(); + let slug = slug.trim().to_ascii_lowercase(); + if domain.is_empty() { + return None; + } + Some(Self { + domain, + event_slug: if slug.is_empty() || slug == "*" { + None + } else { + Some(slug) + }, + }) + } + None => Some(Self { + domain: raw.to_ascii_lowercase(), + event_slug: None, + }), + } + } + + /// Returns true when this pattern matches the given event. + /// + /// Slug-qualified patterns (e.g. `"agent/task_complete"`) are rejected + /// until [`DomainEvent`] exposes a stable `slug()` method — returning + /// `true` here would silently match the entire domain, firing for every + /// event regardless of the declared slug. + pub fn matches(&self, event: &DomainEvent) -> bool { + if event.domain() != self.domain { + return false; + } + // Slug-qualified patterns cannot be matched precisely yet. + // TODO(#skills-triggers): replace with `event.slug() == slug` once + // DomainEvent exposes slug(). + if self.event_slug.is_some() { + return false; + } + true + } +} + +// ── Triggered skill index ───────────────────────────────────────────────────── + +/// Index of skills that declare event triggers, built from discovered skills. +/// +/// Call [`TriggeredSkillIndex::build`] after the skill discovery pass, then +/// pass the result to [`register_triggered_skill_subscriber`]. +#[derive(Debug, Default)] +pub struct TriggeredSkillIndex { + /// Sorted `(skill_name, patterns)` pairs. Sorted for deterministic logging. + entries: Vec<(String, Vec)>, +} + +impl TriggeredSkillIndex { + /// Build an index from a slice of discovered skills. + /// + /// Skills with an empty `triggers:` list are skipped. + pub fn build(skills: &[Skill]) -> Self { + let mut entries: Vec<(String, Vec)> = skills + .iter() + .filter_map(|skill| { + let patterns: Vec = skill + .frontmatter + .triggers + .iter() + .filter_map(|t| { + let p = TriggerPattern::parse(t); + if p.is_none() { + log::warn!( + "[skills::triggered] skill '{}': malformed trigger {:?} — skipping", + skill.name, + t + ); + } + p + }) + .collect(); + if patterns.is_empty() { + None + } else { + Some((skill.name.clone(), patterns)) + } + }) + .collect(); + entries.sort_by(|a, b| a.0.cmp(&b.0)); + Self { entries } + } + + /// Returns `true` when no skills have declared triggers. + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Returns the number of skills with at least one trigger pattern. + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns all unique domain strings across every trigger pattern. + pub fn domains(&self) -> Vec { + let mut seen = std::collections::BTreeSet::new(); + for (_, patterns) in &self.entries { + for p in patterns { + seen.insert(p.domain.clone()); + } + } + seen.into_iter().collect() + } + + /// Returns the names of skills whose trigger patterns match `event`. + pub fn matching_skills<'a>(&'a self, event: &DomainEvent) -> Vec<&'a str> { + self.entries + .iter() + .filter(|(_, patterns)| patterns.iter().any(|p| p.matches(event))) + .map(|(name, _)| name.as_str()) + .collect() + } +} + +// ── Subscriber ──────────────────────────────────────────────────────────────── + +struct TriggeredSkillSubscriber { + index: Arc, +} + +#[async_trait] +impl EventHandler for TriggeredSkillSubscriber { + fn name(&self) -> &str { + "skills::triggered_skill" + } + + // No `domains()` filter — the domain list is dynamic (built from skill + // triggers at startup) and the `EventHandler` trait returns `&[&str]` + // which cannot point into an owned Vec. Filtering in `handle()` + // is equivalent and avoids an unsafe lifetime trick. + + async fn handle(&self, event: &DomainEvent) { + let matched = self.index.matching_skills(event); + if matched.is_empty() { + return; + } + tracing::debug!( + domain = event.domain(), + skills = ?matched, + "[skills::triggered] event matches {} skill trigger(s); \ + activation handoff to integration layer pending", + matched.len() + ); + } +} + +// ── Public API ──────────────────────────────────────────────────────────────── + +/// Register a subscriber for all skills that declare `triggers:` patterns. +/// +/// Call this once at startup **after** skill discovery is complete. Skills with +/// an empty `triggers:` list are ignored. Returns `None` when no skills have +/// triggers (no subscription is created). The returned [`SubscriptionHandle`] +/// must be kept alive for the duration of the process. +/// +/// ```text +/// // In channel runtime startup, after load_skills(): +/// static SKILL_TRIGGER_HANDLE: OnceLock> = OnceLock::new(); +/// SKILL_TRIGGER_HANDLE.get_or_init(|| { +/// skills::bus::register_triggered_skill_subscriber(&discovered_skills) +/// }); +/// ``` +pub fn register_triggered_skill_subscriber(skills: &[Skill]) -> Option { + let index = TriggeredSkillIndex::build(skills); + if index.is_empty() { + return None; + } + log::info!( + "[skills::triggered] registering subscriber for {} skill(s) with event triggers (domains: {:?})", + index.len(), + index.domains() + ); + subscribe_global(Arc::new(TriggeredSkillSubscriber { + index: Arc::new(index), + })) +} + +/// Legacy no-op retained while call-sites migrate to +/// [`register_triggered_skill_subscriber`]. Safe to call multiple times. pub fn register_skill_cleanup_subscriber() {} #[cfg(test)] mod tests { use super::*; + use crate::openhuman::skills::ops_types::{Skill, SkillFrontmatter}; + + fn skill_with_triggers(name: &str, triggers: Vec<&str>) -> Skill { + Skill { + name: name.to_string(), + frontmatter: SkillFrontmatter { + triggers: triggers.iter().map(|s| s.to_string()).collect(), + ..Default::default() + }, + ..Default::default() + } + } + + // ── TriggerPattern::parse ──────────────────────────────────────────────── + + #[test] + fn parse_bare_domain() { + let p = TriggerPattern::parse("composio").unwrap(); + assert_eq!(p.domain, "composio"); + assert!(p.event_slug.is_none()); + } + + #[test] + fn parse_domain_with_slug() { + let p = TriggerPattern::parse("composio/trigger_received").unwrap(); + assert_eq!(p.domain, "composio"); + assert_eq!(p.event_slug.as_deref(), Some("trigger_received")); + } + + #[test] + fn parse_domain_with_wildcard_slug_is_bare() { + let p = TriggerPattern::parse("cron/*").unwrap(); + assert_eq!(p.domain, "cron"); + assert!(p.event_slug.is_none()); + } + + #[test] + fn parse_normalises_to_lowercase() { + let p = TriggerPattern::parse("Composio/TRIGGER_RECEIVED").unwrap(); + assert_eq!(p.domain, "composio"); + assert_eq!(p.event_slug.as_deref(), Some("trigger_received")); + } + + #[test] + fn parse_empty_is_none() { + assert!(TriggerPattern::parse("").is_none()); + assert!(TriggerPattern::parse(" ").is_none()); + } + + #[test] + fn parse_empty_domain_with_slug_is_none() { + assert!(TriggerPattern::parse("/event_slug").is_none()); + } + + // ── TriggerPattern::matches ────────────────────────────────────────────── + + #[test] + fn bare_domain_matches_any_event_in_domain() { + let p = TriggerPattern::parse("cron").unwrap(); + let event = DomainEvent::CronJobTriggered { + job_id: "j1".into(), + job_name: "test".into(), + job_type: "shell".into(), + }; + assert!(p.matches(&event)); + } + + #[test] + fn bare_domain_does_not_match_other_domain() { + let p = TriggerPattern::parse("cron").unwrap(); + let event = DomainEvent::SystemStartup { + component: "core".into(), + }; + assert!(!p.matches(&event)); + } + + #[test] + fn slugged_pattern_rejected_until_slug_api_exists() { + // A slug-qualified pattern like "cron/job_triggered" must NOT match + // the entire cron domain — returning true here would over-fire for + // every cron event regardless of the declared slug. + let p = TriggerPattern::parse("cron/job_triggered").unwrap(); + assert_eq!(p.event_slug.as_deref(), Some("job_triggered")); + let event = DomainEvent::CronJobTriggered { + job_id: "j1".into(), + job_name: "test".into(), + job_type: "shell".into(), + }; + assert!( + !p.matches(&event), + "slugged pattern must not match until DomainEvent::slug() exists" + ); + } + + // ── TriggeredSkillIndex ────────────────────────────────────────────────── + + #[test] + fn build_ignores_skills_without_triggers() { + let skills = vec![ + skill_with_triggers("no_triggers", vec![]), + skill_with_triggers("with_trigger", vec!["cron"]), + ]; + let idx = TriggeredSkillIndex::build(&skills); + assert_eq!(idx.len(), 1); + assert!(!idx.is_empty()); + } + + #[test] + fn build_empty_skills_list_is_empty() { + let idx = TriggeredSkillIndex::build(&[]); + assert!(idx.is_empty()); + } + + #[test] + fn build_sorts_entries_by_skill_name() { + let skills = vec![ + skill_with_triggers("zzz_skill", vec!["cron"]), + skill_with_triggers("aaa_skill", vec!["channel"]), + ]; + let idx = TriggeredSkillIndex::build(&skills); + assert_eq!(idx.entries[0].0, "aaa_skill"); + assert_eq!(idx.entries[1].0, "zzz_skill"); + } + + #[test] + fn domains_returns_unique_sorted_set() { + let skills = vec![ + skill_with_triggers("a", vec!["composio", "cron"]), + skill_with_triggers("b", vec!["composio", "channel"]), + ]; + let idx = TriggeredSkillIndex::build(&skills); + let domains = idx.domains(); + assert_eq!(domains, vec!["channel", "composio", "cron"]); + } + + #[test] + fn matching_skills_returns_correct_names() { + let skills = vec![ + skill_with_triggers("cron_watcher", vec!["cron"]), + skill_with_triggers("composio_watcher", vec!["composio"]), + skill_with_triggers("multi_watcher", vec!["cron", "composio"]), + ]; + let idx = TriggeredSkillIndex::build(&skills); + let event = DomainEvent::CronJobTriggered { + job_id: "j1".into(), + job_name: "test".into(), + job_type: "shell".into(), + }; + let mut matched = idx.matching_skills(&event); + matched.sort_unstable(); + assert_eq!(matched, vec!["cron_watcher", "multi_watcher"]); + } + + #[test] + fn matching_skills_returns_empty_when_no_match() { + let skills = vec![skill_with_triggers("composio_watcher", vec!["composio"])]; + let idx = TriggeredSkillIndex::build(&skills); + let event = DomainEvent::SystemStartup { + component: "core".into(), + }; + assert!(idx.matching_skills(&event).is_empty()); + } #[test] fn register_skill_cleanup_subscriber_is_a_safe_noop() { - // The function is intentionally empty while call-sites migrate - // off the legacy bus hook — calling it repeatedly must remain - // side-effect free. register_skill_cleanup_subscriber(); register_skill_cleanup_subscriber(); } diff --git a/src/openhuman/skills/ops_types.rs b/src/openhuman/skills/ops_types.rs index 79793bf5fd..cb28329140 100644 --- a/src/openhuman/skills/ops_types.rs +++ b/src/openhuman/skills/ops_types.rs @@ -66,6 +66,17 @@ pub struct SkillFrontmatter { /// (non-binding hint; the host decides what to expose). #[serde(default, rename = "allowed-tools", alias = "allowed_tools")] pub allowed_tools: Vec, + /// Domain events that should activate this skill. + /// + /// Each entry is a trigger pattern of the form `"domain"` or + /// `"domain/event_slug"` (e.g. `"composio"`, + /// `"composio/trigger_received"`, `"cron"`, `"channel/inbound_message"`). + /// A bare domain (no slash) matches any event in that domain. + /// The host builds a [`TriggeredSkillIndex`] at startup and registers a + /// subscriber that logs matching skills; the actual agent-session launch + /// is handled by the integration layer (see `skills::bus`). + #[serde(default)] + pub triggers: Vec, /// Forward-compat hatch for spec additions. Non-spec top-level keys /// (including legacy `version`, `author`, `tags`) land here and trigger /// a migration warning when read. diff --git a/src/openhuman/tool_registry/ops.rs b/src/openhuman/tool_registry/ops.rs index 7897ddbf1c..0873a974ed 100644 --- a/src/openhuman/tool_registry/ops.rs +++ b/src/openhuman/tool_registry/ops.rs @@ -510,6 +510,29 @@ mod tests { assert!(err.contains("missing.tool")); } + #[test] + fn all_registry_entries_have_non_empty_name_and_description() { + let entries = registry_entries(); + assert!( + !entries.is_empty(), + "registry must contain at least one tool" + ); + let mut violations: Vec = Vec::new(); + for entry in &entries { + if entry.name.trim().is_empty() { + violations.push(format!("tool_id='{}' has empty name", entry.tool_id)); + } + if entry.description.trim().is_empty() { + violations.push(format!("tool_id='{}' has empty description", entry.tool_id)); + } + } + assert!( + violations.is_empty(), + "registry integrity violations:\n{}", + violations.join("\n") + ); + } + #[test] fn controller_json_schema_marks_required_and_optional_fields() { let schema = schema_fields_to_json_schema(&[ diff --git a/src/openhuman/tools/impl/cron/add.rs b/src/openhuman/tools/impl/cron/add.rs index 07ed1b9c1a..5c4d17b3ca 100644 --- a/src/openhuman/tools/impl/cron/add.rs +++ b/src/openhuman/tools/impl/cron/add.rs @@ -747,4 +747,96 @@ mod tests { std::fs::create_dir_all(&config.workspace_dir).unwrap(); Arc::new(config) } + + // ── Schedule serde roundtrip tests ────────────────────────────────────── + // + // These tests verify that the JSON shapes documented in `parameters_schema()` + // actually deserialize into the `Schedule` enum. A mismatch between the schema + // and the serde struct silently breaks tool calls at runtime (same root cause + // as the `window_days` / `time_window_days` field name drift in issue #2252). + + #[test] + fn schedule_cron_variant_deserializes_from_schema_shape() { + let s: Schedule = serde_json::from_value(json!({ + "kind": "cron", + "expr": "0 9 * * *" + })) + .expect("cron schedule must deserialize from schema-documented shape"); + assert!(matches!(s, Schedule::Cron { .. })); + } + + #[test] + fn schedule_cron_variant_accepts_optional_tz() { + let s: Schedule = serde_json::from_value(json!({ + "kind": "cron", + "expr": "0 9 * * *", + "tz": "America/Los_Angeles" + })) + .expect("cron schedule with tz must deserialize"); + match s { + Schedule::Cron { tz, .. } => { + assert_eq!(tz.as_deref(), Some("America/Los_Angeles")) + } + _ => panic!("expected Cron variant"), + } + } + + #[test] + fn schedule_at_variant_deserializes_from_schema_shape() { + let s: Schedule = serde_json::from_value(json!({ + "kind": "at", + "at": "2024-06-01T09:00:00Z" + })) + .expect("at schedule must deserialize from schema-documented shape"); + assert!(matches!(s, Schedule::At { .. })); + } + + #[test] + fn schedule_every_variant_deserializes_from_schema_shape() { + let s: Schedule = serde_json::from_value(json!({ + "kind": "every", + "every_ms": 60000u64 + })) + .expect("every schedule must deserialize from schema-documented shape"); + assert!(matches!(s, Schedule::Every { every_ms: 60000 })); + } + + #[test] + fn schedule_fails_when_kind_is_missing() { + let result = serde_json::from_value::(json!({"expr": "0 9 * * *"})); + assert!( + result.is_err(), + "Schedule must reject a payload without 'kind'" + ); + } + + #[test] + fn schedule_fails_when_kind_is_unknown() { + let result = serde_json::from_value::(json!({"kind": "daily"})); + assert!( + result.is_err(), + "Schedule must reject an unrecognised 'kind' value" + ); + } + + #[test] + fn cron_add_tool_schema_requires_name_and_schedule() { + // Use the real schema from CronAddTool::parameters_schema() so a + // future change that removes or renames a required field breaks this + // test rather than silently passing against a hardcoded fixture. + let cfg = Arc::new(Config::default()); + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + let schema = tool.parameters_schema(); + let required = schema["required"] + .as_array() + .expect("CronAddTool schema must have a 'required' array"); + assert!( + required.iter().any(|v| v.as_str() == Some("name")), + "'name' must appear in CronAddTool schema required list" + ); + assert!( + required.iter().any(|v| v.as_str() == Some("schedule")), + "'schedule' must appear in CronAddTool schema required list" + ); + } }