diff --git a/Gradata/src/gradata/enhancements/self_improvement/_oscillation_guard.py b/Gradata/src/gradata/enhancements/self_improvement/_oscillation_guard.py new file mode 100644 index 00000000..26d73218 --- /dev/null +++ b/Gradata/src/gradata/enhancements/self_improvement/_oscillation_guard.py @@ -0,0 +1,190 @@ +""" +Oscillation guard for self-healing rule rewrites. + +When `_patches.observe_patch` is called, this module checks the recent rule +history for a cycle: a previous patch where we wrote *away from* the text +we're now writing *back to*. If detected, signal "abort + lock" so the +patcher skips the rewrite and emits a `rule_patch_cycle_detected` event +instead. + +Without this guard, a poorly-tuned compliance scorer can ping-pong the +same lesson between two phrasings forever (each rewrite gets fresh "100% +reduction" credit because the new text has zero observations yet). Real +example from a production brain 2026-05-21: + lesson 911130b3 oscillated A→B→A→B→A→B across 5 rollbacks in 20 days + with no actual behavioral improvement. + +Design: + - Cycle = the proposed `new_text` MATCHES a recent `old_text` (i.e. we + patched away from it before and now want to go back). + - Lookback: 30 days OR N patches whichever is smaller (default N=5). + - On detection: emit `rule_patch_cycle_detected` event with both + texts, the chain of patches, and the recommended lock-until session. + - Lock duration: 10 sessions (configurable via env). + +The guard is intentionally conservative — it only fires on DIRECT cycles +(A→B then proposed B→A). Higher-order cycles (A→B→C→A) are not detected; +those are rare and false-positive risk on a sensitive guard kills more +value than it protects. +""" + +from __future__ import annotations + +import os +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from gradata.brain import Brain + + +# Default lock duration (in sessions) after a cycle is detected. +_DEFAULT_LOCK_SESSIONS = 10 + +# Default lookback window for cycle detection. +_DEFAULT_LOOKBACK_DAYS = 30 +_DEFAULT_LOOKBACK_PATCHES = 5 + + +def _normalize(text: str) -> str: + """Whitespace-collapse for stable comparison.""" + return " ".join((text or "").split()).lower() + + +def _lock_sessions() -> int: + """Read lock duration from env (`GRADATA_OSCILLATION_LOCK_SESSIONS`).""" + raw = os.environ.get("GRADATA_OSCILLATION_LOCK_SESSIONS") + if raw is None: + return _DEFAULT_LOCK_SESSIONS + try: + value = int(raw) + except (TypeError, ValueError): + return _DEFAULT_LOCK_SESSIONS + return max(1, value) + + +def detect_cycle( + brain: Brain, + category: str, + old_description: str, + new_description: str, + lookback_days: int = _DEFAULT_LOOKBACK_DAYS, + lookback_patches: int = _DEFAULT_LOOKBACK_PATCHES, +) -> dict | None: + """Check whether applying the proposed patch would complete an A→B→A cycle. + + Args: + brain: Brain instance — used to query past rule_patch_observed events. + category: Rule category (e.g. "TONE"). Cycles only apply within a category. + old_description: The current rule text being replaced. + new_description: The proposed replacement text. + lookback_days: Maximum age (in days) of prior patches to consider. + lookback_patches: Maximum number of prior patches to consider. + + Returns: + A dict describing the detected cycle: + { + "matched_event_id": str, # the prior patch that wrote AWAY from new_description + "matched_applied_at": str, # ISO timestamp + "cycle_length": int, # how many patches between the prior write-away and now + "lock_sessions": int, # recommended lock duration + "old_text": str, + "new_text": str, + } + Or None if no cycle is detected (safe to apply patch). + """ + proposed_old = _normalize(old_description) + proposed_new = _normalize(new_description) + + # Trivial guard: never patch a rule to itself. + if proposed_old == proposed_new: + return { + "matched_event_id": None, + "matched_applied_at": None, + "cycle_length": 0, + "lock_sessions": _lock_sessions(), + "old_text": old_description, + "new_text": new_description, + "reason": "self_identity", + } + + cat = (category or "").upper() + cutoff = datetime.now(UTC) - timedelta(days=lookback_days) + + try: + events = brain.query_events( + event_type="rule_patch_observed", + limit=200, + ) + except Exception: + return None # Fail open — never block patches on a query failure. + + relevant = [] + for ev in events: + data = ev.get("data", {}) or {} + if (data.get("category") or "").upper() != cat: + continue + applied_at_raw = data.get("applied_at") + if applied_at_raw: + try: + applied_at = datetime.fromisoformat(applied_at_raw.replace("Z", "+00:00")) + except (ValueError, TypeError): + continue + if applied_at < cutoff: + continue + relevant.append(ev) + + # Newest first, capped by lookback_patches. + relevant.sort( + key=lambda e: (e.get("data", {}) or {}).get("applied_at") or "", + reverse=True, + ) + relevant = relevant[:lookback_patches] + + for idx, ev in enumerate(relevant): + data = ev.get("data", {}) or {} + prior_old = _normalize(data.get("old_rule_text", "")) + prior_new = _normalize(data.get("new_rule_text", "")) + + # Direct A→B then proposed B→A cycle: + # prior patch went FROM prior_old TO prior_new (= proposed_old). + # now we want to go FROM proposed_old BACK TO prior_old (= proposed_new). + if prior_new == proposed_old and prior_old == proposed_new: + return { + "matched_event_id": ev.get("id"), + "matched_applied_at": data.get("applied_at"), + "cycle_length": idx + 1, + "lock_sessions": _lock_sessions(), + "old_text": old_description, + "new_text": new_description, + "reason": "direct_cycle", + } + + return None + + +def emit_cycle_detected(brain: Brain, category: str, cycle: dict) -> dict: + """Emit a `rule_patch_cycle_detected` event so the cycle is auditable. + + Returns the emitted event dict, or {} on failure (never raises). + """ + try: + event = brain.emit( + "rule_patch_cycle_detected", + "_oscillation_guard.detect_cycle", + { + "category": (category or "").upper(), + "old_rule_text": (cycle.get("old_text") or "")[:500], + "new_rule_text": (cycle.get("new_text") or "")[:500], + "matched_event_id": cycle.get("matched_event_id"), + "matched_applied_at": cycle.get("matched_applied_at"), + "cycle_length": cycle.get("cycle_length", 0), + "lock_sessions": cycle.get("lock_sessions", _DEFAULT_LOCK_SESSIONS), + "reason": cycle.get("reason", "direct_cycle"), + "detected_at": datetime.now(UTC).isoformat(), + }, + [f"category:{category}", "self_healing", "cycle_detected", "patch_blocked"], + ) + return event if isinstance(event, dict) else {} + except Exception: + return {} diff --git a/Gradata/src/gradata/enhancements/self_improvement/_patches.py b/Gradata/src/gradata/enhancements/self_improvement/_patches.py new file mode 100644 index 00000000..ed7eca38 --- /dev/null +++ b/Gradata/src/gradata/enhancements/self_improvement/_patches.py @@ -0,0 +1,235 @@ +""" +Rule-patch telemetry — tracks behavioral compliance before and after rule rewrites. + +When brain.patch_rule() applies a rewrite, observe_patch() emits a +``rule_patch_observed`` event capturing: + + - old_rule_text / new_rule_text / applied_at + - observed_compliance_before: RULE_FAILURE count for this rule in the + last 3 sessions before the patch + - observed_compliance_after_3_sessions: None at apply time; filled + by resolve_patch_compliance() three sessions later + +The acceptance rate (% of patches where after < before) is the headline +metric for whether self-healing rule rewrites actually change agent behavior. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from gradata.brain import Brain + +_LOOKBACK_SESSIONS = 3 +_MAX_RULE_TEXT = 500 +_PATCH_TAG = "patch_telemetry" + + +def _count_failures_for_rule( + brain: Brain, + category: str, + rule_description: str, + lookback_sessions: int = _LOOKBACK_SESSIONS, +) -> int: + """Count RULE_FAILURE events matching this specific rule in the last N sessions.""" + try: + events = brain.query_events( + event_type="RULE_FAILURE", + last_n_sessions=lookback_sessions, + limit=500, + ) + except Exception: + return 0 + + cat = category.upper() + rule_desc = rule_description.strip() + return sum( + 1 + for e in events + if ( + (e.get("data", {}).get("failed_rule_category") or "").upper() == cat + and (e.get("data", {}).get("failed_rule_description") or "").strip() == rule_desc + ) + ) + + +def _get_current_session(brain: Brain) -> int: + """Return the highest session number seen in recent events, or 1.""" + try: + events = brain.query_events(limit=10) + if events: + return max((e.get("session") or 0) for e in events) + except Exception: + pass + return 1 + + +def observe_patch( + brain: Brain, + category: str, + old_description: str, + new_description: str, +) -> dict: + """Emit a rule_patch_observed event capturing pre-patch compliance. + + Called immediately after brain.patch_rule() succeeds. Records the + RULE_FAILURE count for the old rule text over the last 3 sessions so + we can compare against the post-patch count via resolve_patch_compliance(). + + Oscillation guard: if applying this patch would complete an A→B→A + cycle (we're patching back to a text we patched AWAY from recently), + no `rule_patch_observed` event is emitted. Instead we emit a + `rule_patch_cycle_detected` event and return that. Caller can detect + the abort via the event type field. + + Args: + brain: Brain instance — used to query events and emit. + category: Rule category (e.g. "TONE"). + old_description: Rule text being replaced. + new_description: Replacement rule text. + + Returns: + The emitted event dict. May be a `rule_patch_observed` (normal path) + OR a `rule_patch_cycle_detected` (cycle aborted) event. Empty dict + on emit failure. Never raises. + """ + # Cycle check BEFORE recording the patch. If we'd be completing an + # A→B then proposed B→A cycle, bail and emit a cycle-detected event + # so the dashboard can show the lock state and the patcher can skip + # this lesson for N sessions. + from gradata.enhancements.self_improvement._oscillation_guard import ( + detect_cycle, + emit_cycle_detected, + ) + + cycle = detect_cycle(brain, category, old_description, new_description) + if cycle is not None: + return emit_cycle_detected(brain, category, cycle) + + compliance_before = _count_failures_for_rule(brain, category, old_description) + + try: + event = brain.emit( + "rule_patch_observed", + "_patches.observe_patch", + { + "category": category.upper(), + "old_rule_text": old_description[:_MAX_RULE_TEXT], + "new_rule_text": new_description[:_MAX_RULE_TEXT], + "applied_at": datetime.now(UTC).isoformat(), + "observed_compliance_before": compliance_before, + "observed_compliance_after_3_sessions": None, + }, + [f"category:{category}", "self_healing", _PATCH_TAG], + ) + return event if isinstance(event, dict) else {} + except Exception: + return {} + + +def resolve_patch_compliance( + brain: Brain, + min_session_gap: int = _LOOKBACK_SESSIONS, +) -> list[dict]: + """Fill in observed_compliance_after_3_sessions for pending observations. + + Finds rule_patch_observed events where the after value is still None + and the patch was applied at least min_session_gap sessions ago. For + each, measures current RULE_FAILURE count for the new rule text and + emits a new resolved event. + + Returns: + List of resolution receipts — one per observation resolved this call. + """ + try: + pending_events = brain.query_events(event_type="rule_patch_observed", limit=200) + except Exception: + return [] + + current_session = _get_current_session(brain) + updates: list[dict] = [] + + for ev in pending_events: + data = ev.get("data", {}) + if data.get("observed_compliance_after_3_sessions") is not None: + continue + + patch_session = ev.get("session") or 0 + if current_session - patch_session < min_session_gap: + continue + + category = data.get("category", "") + new_rule_text = data.get("new_rule_text", "") + + compliance_after = _count_failures_for_rule(brain, category, new_rule_text) + compliance_before = data.get("observed_compliance_before") or 0 + improved = compliance_after < compliance_before + + try: + updated = brain.emit( + "rule_patch_observed", + "_patches.resolve_patch_compliance", + { + **data, + "observed_compliance_after_3_sessions": compliance_after, + "compliance_improved": improved, + "resolution_session": current_session, + "original_event_id": ev.get("id"), + }, + [f"category:{category}", "self_healing", _PATCH_TAG, "resolved"], + ) + updates.append( + { + "category": category, + "compliance_before": compliance_before, + "compliance_after": compliance_after, + "improved": improved, + "event": updated if isinstance(updated, dict) else {}, + } + ) + except Exception: + continue + + return updates + + +def patch_acceptance_rate(brain: Brain) -> dict: + """Compute the acceptance rate from all resolved rule_patch_observed events. + + "Accepted" = compliance improved (compliance_after < compliance_before). + + Returns: + { + "total_observed": int, + "total_resolved": int, + "accepted": int, + "rejected": int, + "acceptance_rate": float, # 0.0 if no resolved observations + "pending": int, + } + """ + try: + events = brain.query_events(event_type="rule_patch_observed", limit=500) + except Exception: + events = [] + + resolved = [ + e + for e in events + if e.get("data", {}).get("observed_compliance_after_3_sessions") is not None + ] + pending = len(events) - len(resolved) + accepted = sum(1 for e in resolved if e.get("data", {}).get("compliance_improved", False)) + rejected = len(resolved) - accepted + rate = accepted / len(resolved) if resolved else 0.0 + + return { + "total_observed": len(events), + "total_resolved": len(resolved), + "accepted": accepted, + "rejected": rejected, + "acceptance_rate": round(rate, 3), + "pending": pending, + } diff --git a/Gradata/tests/test_oscillation_guard.py b/Gradata/tests/test_oscillation_guard.py new file mode 100644 index 00000000..eb0e77eb --- /dev/null +++ b/Gradata/tests/test_oscillation_guard.py @@ -0,0 +1,231 @@ +"""Tests for the oscillation guard on self-healing rule patches. + +Without these tests guarding the behavior, a poorly-tuned compliance +scorer can ping-pong the same lesson between two phrasings forever. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + +import pytest + +# _FakeBrain doesn't implement the full Brain protocol but provides the +# subset detect_cycle / observe_patch actually use. Tell pyright to chill. +# pyright: reportArgumentType=false +from gradata.enhancements.self_improvement._oscillation_guard import ( + _normalize, + detect_cycle, + emit_cycle_detected, +) +from gradata.enhancements.self_improvement._patches import observe_patch + + +# --------------------------------------------------------------------------- +# Test fake brain +# --------------------------------------------------------------------------- + + +class _FakeBrain: + """Minimal in-memory brain stand-in for cycle-guard testing. + + Stores emitted events in a list. `query_events` filters by event_type. + """ + + def __init__(self, events: list[dict] | None = None) -> None: + self._events: list[dict] = list(events or []) + self._next_id = len(self._events) + + def query_events( + self, + event_type: str | None = None, + last_n_sessions: int | None = None, + limit: int = 500, + ) -> list[dict]: + out = self._events + if event_type is not None: + out = [e for e in out if e.get("event_type") == event_type] + return out[:limit] + + def emit( + self, + event_type: str, + source: str, + data: dict, + tags: list[str] | None = None, + ) -> dict: + self._next_id += 1 + ev = { + "id": f"ev_{self._next_id}", + "event_type": event_type, + "source": source, + "data": data, + "tags": tags or [], + "session": 1, + } + self._events.append(ev) + return ev + + +def _patch_event(old_text: str, new_text: str, category: str = "TONE", applied_at: str | None = None) -> dict: + return { + "id": f"patch_{old_text[:5]}_{new_text[:5]}", + "event_type": "rule_patch_observed", + "data": { + "category": category, + "old_rule_text": old_text, + "new_rule_text": new_text, + "applied_at": applied_at or datetime.now(UTC).isoformat(), + "observed_compliance_before": 2, + "observed_compliance_after_3_sessions": None, + }, + "session": 1, + } + + +# --------------------------------------------------------------------------- +# detect_cycle() +# --------------------------------------------------------------------------- + + +def test_no_history_returns_none() -> None: + brain = _FakeBrain([]) + result = detect_cycle(brain, "TONE", "A", "B") + assert result is None + + +def test_unrelated_patches_return_none() -> None: + brain = _FakeBrain([ + _patch_event("X", "Y"), + _patch_event("Y", "Z"), + ]) + # Proposing X → W is not a cycle. + result = detect_cycle(brain, "TONE", "X", "W") + assert result is None + + +def test_direct_cycle_detected() -> None: + """A → B was patched recently. Proposing B → A must be flagged.""" + brain = _FakeBrain([_patch_event("A", "B")]) + result = detect_cycle(brain, "TONE", "B", "A") + assert result is not None + assert result["reason"] == "direct_cycle" + assert result["lock_sessions"] > 0 + assert result["old_text"] == "B" + assert result["new_text"] == "A" + + +def test_self_identity_detected() -> None: + """Patching X → X is meaningless; always flag as a cycle.""" + brain = _FakeBrain([]) + result = detect_cycle(brain, "TONE", "X", "X") + assert result is not None + assert result["reason"] == "self_identity" + + +def test_category_isolation() -> None: + """A cycle in TONE must not affect URL-category patches.""" + brain = _FakeBrain([_patch_event("A", "B", category="TONE")]) + # Same texts, different category → not a cycle. + result = detect_cycle(brain, "URL", "B", "A") + assert result is None + + +def test_whitespace_normalization() -> None: + """Whitespace/case differences must not break cycle detection.""" + brain = _FakeBrain([ + _patch_event( + " Don't give prospects a WAY OUT", + "When prospect EXPRESSED interest, close immediately", + ), + ]) + # Same texts with different casing/whitespace must still trip the guard. + result = detect_cycle( + brain, + "TONE", + "when prospect expressed interest, close immediately", + "don't give prospects a way out", + ) + assert result is not None + assert result["reason"] == "direct_cycle" + + +def test_old_patch_outside_lookback() -> None: + """A patch older than the lookback window must NOT trip the guard.""" + very_old = "2020-01-01T00:00:00+00:00" + brain = _FakeBrain([_patch_event("A", "B", applied_at=very_old)]) + result = detect_cycle(brain, "TONE", "B", "A", lookback_days=7) + assert result is None + + +# --------------------------------------------------------------------------- +# emit_cycle_detected() +# --------------------------------------------------------------------------- + + +def test_emit_cycle_detected_records_event() -> None: + brain = _FakeBrain() + cycle = { + "matched_event_id": "patch_X_Y", + "matched_applied_at": datetime.now(UTC).isoformat(), + "cycle_length": 1, + "lock_sessions": 10, + "old_text": "B", + "new_text": "A", + "reason": "direct_cycle", + } + event = emit_cycle_detected(brain, "TONE", cycle) + assert event["event_type"] == "rule_patch_cycle_detected" + assert event["data"]["lock_sessions"] == 10 + assert event["data"]["reason"] == "direct_cycle" + assert event["data"]["category"] == "TONE" + # The brain stored it. + assert any( + e.get("event_type") == "rule_patch_cycle_detected" for e in brain.query_events() + ) + + +# --------------------------------------------------------------------------- +# observe_patch() integration — the customer-visible behavior +# --------------------------------------------------------------------------- + + +def test_observe_patch_normal_path_emits_observed() -> None: + brain = _FakeBrain() + event = observe_patch(brain, "TONE", "A", "B") + assert event["event_type"] == "rule_patch_observed" + + +def test_observe_patch_blocks_direct_cycle() -> None: + """Real-world bug: lesson 911130b3 oscillated A→B→A→B 5×. + + After this fix, the 2nd patch back to A must emit + `rule_patch_cycle_detected` instead of `rule_patch_observed`, so no + fake "100% reduction" gets recorded and the dashboard shows a lock + badge instead of another rollback entry. + """ + brain = _FakeBrain() + # First patch A → B succeeds normally. + first = observe_patch(brain, "TONE", "A", "B") + assert first["event_type"] == "rule_patch_observed" + # Second patch attempts B → A — must be blocked as a direct cycle. + second = observe_patch(brain, "TONE", "B", "A") + assert second["event_type"] == "rule_patch_cycle_detected" + assert second["data"]["old_rule_text"] == "B" + assert second["data"]["new_rule_text"] == "A" + + +def test_observe_patch_allows_genuine_new_text() -> None: + """A → B then C → D is two unrelated patches, not a cycle.""" + brain = _FakeBrain() + first = observe_patch(brain, "TONE", "A", "B") + second = observe_patch(brain, "TONE", "C", "D") + assert first["event_type"] == "rule_patch_observed" + assert second["event_type"] == "rule_patch_observed" + + +def test_normalize_helper() -> None: + assert _normalize(" Hello WORLD ") == "hello world" + assert _normalize("") == "" + assert _normalize(None) == "" # type: ignore[arg-type]