Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/l0/drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import math
import re
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Literal

Expand Down Expand Up @@ -133,9 +134,9 @@ class DriftConfig:
class _DriftHistory:
"""Internal history tracking for drift detection."""

entropy: list[float] = field(default_factory=list)
tokens: list[str] = field(default_factory=list)
last_content: str = ""
entropy: deque[float] = field(default_factory=lambda: deque(maxlen=50))
tokens: deque[str] = field(default_factory=lambda: deque(maxlen=50))
last_window: str = "" # Store only the window, not full content


class DriftDetector:
Expand Down Expand Up @@ -163,7 +164,10 @@ def __init__(self, config: DriftConfig | None = None) -> None:
config: Detection configuration (uses defaults if not provided)
"""
self.config = config or DriftConfig()
self._history = _DriftHistory()
self._history = _DriftHistory(
entropy=deque(maxlen=self.config.entropy_window),
tokens=deque(maxlen=self.config.entropy_window),
)

def _get_window(self, content: str) -> str:
"""Get sliding window of content for analysis.
Expand Down Expand Up @@ -191,13 +195,11 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:

# Use sliding window for content analysis (O(window_size) instead of O(content_length))
window = self._get_window(content)
last_window = self._get_window(self._history.last_content)
last_window = self._history.last_window

# Update history
# Update history (deque handles maxlen automatically)
if delta:
self._history.tokens.append(delta)
if len(self._history.tokens) > self.config.entropy_window:
self._history.tokens.pop(0)

# Check for meta commentary (on window only)
if self.config.detect_meta_commentary:
Expand All @@ -224,8 +226,6 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
if self.config.detect_entropy_spike and delta:
entropy = self._calculate_entropy(delta)
self._history.entropy.append(entropy)
if len(self._history.entropy) > self.config.entropy_window:
self._history.entropy.pop(0)

if self._detect_entropy_spike():
types.append("entropy_spike")
Expand All @@ -250,8 +250,8 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
confidence = max(confidence, 0.5)
details.append("Excessive hedging detected")

# Update last content
self._history.last_content = content
# Update last window (store only the window, not full content)
self._history.last_window = window

return DriftResult(
detected=len(types) > 0,
Expand Down Expand Up @@ -396,14 +396,17 @@ def _detect_excessive_hedging(self, content: str) -> bool:

def reset(self) -> None:
"""Reset detector state."""
self._history = _DriftHistory()
self._history = _DriftHistory(
entropy=deque(maxlen=self.config.entropy_window),
tokens=deque(maxlen=self.config.entropy_window),
)

def get_history(self) -> dict[str, Any]:
"""Get detection history."""
return {
"entropy": self._history.entropy.copy(),
"tokens": self._history.tokens.copy(),
"last_content": self._history.last_content,
"entropy": list(self._history.entropy),
"tokens": list(self._history.tokens),
"last_content": self._history.last_window,
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: get_history()["last_content"] now returns only the sliding window, which silently truncates long outputs behind the existing API name.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/l0/drift.py, line 409:

<comment>`get_history()["last_content"]` now returns only the sliding window, which silently truncates long outputs behind the existing API name.</comment>

<file context>
@@ -396,14 +396,17 @@ def _detect_excessive_hedging(self, content: str) -> bool:
-            "last_content": self._history.last_content,
+            "entropy": list(self._history.entropy),
+            "tokens": list(self._history.tokens),
+            "last_content": self._history.last_window,
         }
 
</file context>
Fix with Cubic

}


Expand Down
46 changes: 34 additions & 12 deletions src/l0/guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,19 +1560,23 @@ def json_rule() -> GuardrailRule:
# Incremental state for O(delta) streaming checks
incremental_state = IncrementalJsonState()
last_content_length = 0
is_json_content: bool | None = None # Cache: None=unknown, True/False=determined
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.

def check(state: State) -> list[GuardrailViolation]:
nonlocal incremental_state, last_content_length
nonlocal incremental_state, last_content_length, is_json_content

content = state.content
if not content.strip():
# Reset state when content is empty (new stream starting)
incremental_state = IncrementalJsonState()
last_content_length = 0
is_json_content = None
return []

# Only check if it looks like JSON
if not looks_like_json(content):
# Only check if it looks like JSON (cache after first determination)
if is_json_content is None:
is_json_content = looks_like_json(content)
if not is_json_content:
# Reset state when content doesn't look like JSON
incremental_state = IncrementalJsonState()
last_content_length = 0
Expand Down Expand Up @@ -1745,18 +1749,18 @@ def markdown_rule() -> GuardrailRule:
"""

def check(state: State) -> list[GuardrailViolation]:
# During streaming, markdown is always incomplete — skip expensive analysis
if not state.completed:
return []

content = state.content
if not content.strip():
return []

analysis = analyze_markdown_structure(content)
violations = []

# During streaming, only warn about unclosed fences
if not state.completed:
# This is expected during streaming, don't report
pass
else:
if True:
# On completion, report issues
for issue in analysis.issues:
severity: Severity = "warning"
Expand Down Expand Up @@ -1881,17 +1885,35 @@ def pattern_rule(
for cat_patterns in categories.values():
patterns.extend(cat_patterns)

# Pre-compile all patterns into a single combined regex for O(1) pass
combined = re.compile(
"|".join(f"(?:{p})" for p in patterns),
re.IGNORECASE | re.MULTILINE,
)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
last_scanned_length = 0

def check(state: State) -> list[GuardrailViolation]:
nonlocal last_scanned_length
content = state.content

# Reset tracking if content was replaced (e.g. new stream)
if len(content) < last_scanned_length:
last_scanned_length = 0

# Only scan new content since last check (delta scanning)
scan_start = max(0, last_scanned_length - 50) # small overlap for boundary matches
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
scan_region = content[scan_start:]
last_scanned_length = len(content)

violations = []
matches = find_bad_patterns(state.content, patterns)
for pattern, match in matches:
for match in combined.finditer(scan_region):
violations.append(
GuardrailViolation(
rule="pattern",
message=f"Matched unwanted pattern: {match.group()}",
severity="warning",
position=match.start(),
context={"pattern": pattern, "matched": match.group()},
position=scan_start + match.start(),
context={"matched": match.group()},
)
)
return violations
Expand Down
118 changes: 66 additions & 52 deletions src/l0/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .events import EventBus, ObservabilityEventType
from .logging import logger
from .retry import RetryManager
from .state import append_token, create_state, mark_completed, update_checkpoint
from .state import append_token, create_state, flush_content, mark_completed, update_checkpoint
from .types import (
AwaitableStreamFactory,
CheckIntervals,
Expand Down Expand Up @@ -482,6 +482,7 @@ async def run_stream() -> AsyncIterator[Event]:
# Checkpoint invalid - start fresh
logger.debug("Checkpoint validation failed, starting fresh")
state.content = ""
state._content_buffer.clear()
state.token_count = 0
pending_checkpoint = None

Expand Down Expand Up @@ -718,82 +719,94 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:
state.token_count,
)

# Fire on_event callback for token events
_fire_callback(cb.on_event, event)

# Fire on_token callback
_fire_callback(cb.on_token, token_text)
# Fire per-token callbacks (skip function call overhead when None)
if cb.on_event is not None:
_fire_callback(cb.on_event, event)
if cb.on_token is not None:
_fire_callback(cb.on_token, token_text)

# Check guardrails periodically
if (
state.token_count % guardrail_interval == 0
and guardrails
):
phase_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_PHASE_START,
phase="post",
ruleCount=len(guardrails),
)
flush_content(state)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Flushing the full content string inside periodic checks makes long streams O(n²) again.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/l0/runtime.py, line 733:

<comment>Flushing the full content string inside periodic checks makes long streams O(n²) again.</comment>

<file context>
@@ -718,82 +719,94 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:
-                                    phase="post",
-                                    ruleCount=len(guardrails),
-                                )
+                                flush_content(state)
+                                _has_obs = event_bus._handler is not None
 
</file context>
Fix with Cubic

_has_obs = event_bus._handler is not None

all_violations = []
for idx, rule in enumerate(guardrails):
callback_id = _next_callback_id()
rule_start_time = time.perf_counter()
if _has_obs:
phase_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_START,
index=idx,
ruleId=rule.name,
callbackId=callback_id,
ObservabilityEventType.GUARDRAIL_PHASE_START,
phase="post",
ruleCount=len(guardrails),
)

all_violations = []
for idx, rule in enumerate(guardrails):
if _has_obs:
callback_id = _next_callback_id()
rule_start_time = time.perf_counter()
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_START,
index=idx,
ruleId=rule.name,
callbackId=callback_id,
)

rule_violations = rule.check(state)
passed = len(rule_violations) == 0
rule_duration_ms = int(
(time.perf_counter() - rule_start_time) * 1000
)
# Emit result for each rule
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_RESULT,
index=idx,
ruleId=rule.name,
passed=passed,
violation=rule_violations[0].__dict__
if rule_violations
else None,
)

if _has_obs:
passed = len(rule_violations) == 0
rule_duration_ms = int(
(time.perf_counter() - rule_start_time) * 1000
)
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_RESULT,
index=idx,
ruleId=rule.name,
passed=passed,
violation=rule_violations[0].__dict__
if rule_violations
else None,
)

if rule_violations:
all_violations.extend(rule_violations)
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_END,
index=idx,
ruleId=rule.name,
passed=passed,
callbackId=callback_id,
durationMs=rule_duration_ms,
)

if _has_obs:
event_bus.emit(
ObservabilityEventType.GUARDRAIL_RULE_END,
index=idx,
ruleId=rule.name,
passed=passed,
callbackId=callback_id,
durationMs=rule_duration_ms,
)

if all_violations:
state.violations.extend(all_violations)
# Fire on_violation callback for each violation
for v in all_violations:
_fire_callback(cb.on_violation, v)

phase_duration_ms = int(
(time.perf_counter() - phase_start_time) * 1000
)
event_bus.emit(
ObservabilityEventType.GUARDRAIL_PHASE_END,
phase="post",
passed=len(all_violations) == 0,
violations=[v.__dict__ for v in all_violations],
durationMs=phase_duration_ms,
)
if _has_obs:
phase_duration_ms = int(
(time.perf_counter() - phase_start_time) * 1000
)
event_bus.emit(
ObservabilityEventType.GUARDRAIL_PHASE_END,
phase="post",
passed=len(all_violations) == 0,
violations=[v.__dict__ for v in all_violations],
durationMs=phase_duration_ms,
)

# Check drift periodically
if (
drift_detector is not None
and state.token_count % drift_interval == 0
):
flush_content(state)
drift_result = drift_detector.check(
state.content, token_text
)
Expand Down Expand Up @@ -1189,6 +1202,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]:
else:
# Reset state for fresh retry (no continuation)
state.content = ""
state._content_buffer.clear()
state.token_count = 0
state.checkpoint = ""
state.completed = False
Expand Down
14 changes: 12 additions & 2 deletions src/l0/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,33 @@ def create_state() -> State:
return State()


def flush_content(state: State) -> None:
"""Materialize content buffer into content string. Call before reading state.content."""
buf = state._content_buffer
if buf:
state.content = state.content + "".join(buf)
buf.clear()


def update_checkpoint(state: State) -> None:
"""Save current content as checkpoint."""
flush_content(state)
state.checkpoint = state.content


def append_token(state: State, token: str) -> None:
"""Append token to content and update timing."""
"""Append token to content buffer and update timing."""
now = time.time()
if state.first_token_at is None:
state.first_token_at = now
state.last_token_at = now
state.content += token
state._content_buffer.append(token)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Buffering tokens here leaves state.content stale on abort paths, so post-abort reads and abort telemetry can drop the most recent tokens.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/l0/state.py, line 35:

<comment>Buffering tokens here leaves `state.content` stale on abort paths, so post-abort reads and abort telemetry can drop the most recent tokens.</comment>

<file context>
@@ -12,23 +12,33 @@ def create_state() -> State:
         state.first_token_at = now
     state.last_token_at = now
-    state.content += token
+    state._content_buffer.append(token)
     state.token_count += 1
 
</file context>
Fix with Cubic

state.token_count += 1


def mark_completed(state: State) -> None:
"""Mark stream as completed and calculate duration."""
flush_content(state)
state.completed = True
if state.first_token_at is not None:
state.duration = (state.last_token_at or time.time()) - state.first_token_at
1 change: 1 addition & 0 deletions src/l0/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class State:
"""Runtime state tracking."""

content: str = ""
_content_buffer: list[str] = field(default_factory=list, init=False, repr=False, compare=False)
checkpoint: str = "" # Last known good slice for continuation
token_count: int = 0
model_retry_count: int = 0
Expand Down
Loading