diff --git a/agent_actions/llm/batch/services/processing.py b/agent_actions/llm/batch/services/processing.py index c41f7437..15f6e4d4 100644 --- a/agent_actions/llm/batch/services/processing.py +++ b/agent_actions/llm/batch/services/processing.py @@ -379,6 +379,8 @@ def _merge_carry_forward( len(carry_records), action_name, ) + for record in carry_records: + record["_delta_mode"] = "full" try: carry_path.unlink() diff --git a/agent_actions/output/writer.py b/agent_actions/output/writer.py index 7859d7e3..d5f5b6e8 100644 --- a/agent_actions/output/writer.py +++ b/agent_actions/output/writer.py @@ -132,8 +132,16 @@ def do_write() -> int: self.storage_backend.write_target(self.action_name, relative_path, data) + # Strip _delta_mode before disk write — it's a storage-layer + # internal that must not leak to filesystem files. + clean_data = [ + {k: v for k, v in record.items() if k != "_delta_mode"} + if isinstance(record, dict) and "_delta_mode" in record + else record + for record in data + ] ensure_directory_exists(file_path, is_file=True) - atomic_json_write(file_path, data) + atomic_json_write(file_path, clean_data) return file_path.stat().st_size diff --git a/agent_actions/processing/enrichment.py b/agent_actions/processing/enrichment.py index c67e183f..ed7aed5a 100644 --- a/agent_actions/processing/enrichment.py +++ b/agent_actions/processing/enrichment.py @@ -72,6 +72,8 @@ def enrich(self, result: ProcessingResult, context: ProcessingContext) -> Proces item["source_guid"] = IDGenerator.generate_source_guid() if old_source_guid: item["parent_source_guid"] = old_source_guid + # New GUIDs have no upstream deltas — store as full + item["_delta_mode"] = "full" if ( result.source_mapping is not None diff --git a/agent_actions/record/envelope.py b/agent_actions/record/envelope.py index 8eb9ca46..59deba78 100644 --- a/agent_actions/record/envelope.py +++ b/agent_actions/record/envelope.py @@ -37,6 +37,7 @@ { "_state_history", "_state_schema_version", + "_delta_mode", } ) diff --git a/agent_actions/storage/backend.py b/agent_actions/storage/backend.py index 87e69c99..79a3ef4f 100644 --- a/agent_actions/storage/backend.py +++ b/agent_actions/storage/backend.py @@ -1,5 +1,8 @@ """Abstract storage backend interface for extensible data persistence.""" +import copy +import json +import logging from abc import ABC, abstractmethod from enum import Enum from types import TracebackType @@ -8,6 +11,8 @@ from agent_actions.config.defaults import StorageDefaults from agent_actions.record.lifecycle_read import reset_for_downstream, validate_lifecycle_batch +logger = logging.getLogger(__name__) + _MAINTENANCE_RETENTION_DEFAULT = StorageDefaults.PROMPT_TRACE_RETENTION_RUNS _MAINTENANCE_TTL_DEFAULT = StorageDefaults.SOURCE_DATA_TTL_DAYS @@ -57,7 +62,23 @@ class Disposition(str, Enum): class StorageBackend(ABC): - """Abstract interface for pluggable storage backends (SQLite, S3, DuckDB, etc.).""" + """Abstract interface for pluggable storage backends (SQLite, S3, DuckDB, etc.). + + Delta storage: write_target() extracts only the current action's content + namespace before delegating to _write_target_raw(). read_target() reconstructs + the full accumulated record from upstream deltas. This is transparent to all + consumers — they see the exact same list[dict] as before. + """ + + _STORAGE_FORMAT_VERSION = 2 # Version 1 = full records, Version 2 = delta storage + + def __init__(self) -> None: + """Initialize base storage backend state.""" + self._reconstruction_cache: dict[tuple[str, str], list[dict[str, Any]]] = {} + self._execution_order_cache: list[str] | None = None + self._dependency_graph_cache: dict[str, list[str]] | None = None + self._format_version_written = False + self._format_version_checked = False @classmethod @abstractmethod @@ -79,31 +100,298 @@ def initialize(self) -> None: """Create tables, indexes, and other infrastructure required by the backend.""" ... - @abstractmethod - def write_target(self, action_name: str, relative_path: str, data: list[dict[str, Any]]) -> str: - """Write target data for a specific node.""" - ... + def write_target( + self, + action_name: str, + relative_path: str, + data: list[dict[str, Any]], + *, + is_first_action: bool | None = None, + force_full: bool = False, + ) -> str: + """Write target data with delta extraction.""" + if force_full: + delta_records = [{**r, "_delta_mode": "full"} for r in data] + else: + if is_first_action is None: + execution_order = self._get_execution_order() + is_first_action = bool(execution_order) and execution_order[0] == action_name + + delta_records = [] + for record in data: + if record.get("_delta_mode") == "full": + delta_records.append(record) + else: + delta_records.append( + self._extract_delta(record, action_name, is_first_action=is_first_action) + ) + + if not self._format_version_written: + self.save_metadata("storage_format_version", str(self._STORAGE_FORMAT_VERSION)) + self._format_version_written = True + + self._reconstruction_cache.clear() + return self._write_target_raw(action_name, relative_path, delta_records) def read_target(self, action_name: str, relative_path: str) -> list[dict[str, Any]]: - """Read target data, validate lifecycle fields, and reset for downstream. - - Subclasses implement _read_target_raw(). This method adds lifecycle - validation and executor boundary reset so every backend gets them - automatically. + """Read target data, reconstruct from deltas, validate lifecycle, reset for downstream. Raises: FileNotFoundError: If the target data doesn't exist. """ + if not self._format_version_checked: + stored_version = self.load_metadata("storage_format_version") + if stored_version is not None: + try: + version_int = int(stored_version) + except ValueError: + from agent_actions.errors.configuration import ConfigValidationError + + raise ConfigValidationError( + f"Corrupt storage_format_version in workflow_metadata: {stored_version!r}. " + f"Expected an integer. Re-run with --fresh to reset.", + context={"stored_version": stored_version}, + ) from None + if version_int > self._STORAGE_FORMAT_VERSION: + from agent_actions.errors.configuration import ConfigValidationError + + raise ConfigValidationError( + f"Database uses storage format version {version_int}, " + f"but this code supports up to version {self._STORAGE_FORMAT_VERSION}. " + f"Please upgrade agent-actions.", + context={ + "stored_version": version_int, + "supported": self._STORAGE_FORMAT_VERSION, + }, + ) + self._format_version_checked = True + + cache_key = (action_name, relative_path) + if cache_key in self._reconstruction_cache: + return copy.deepcopy(self._reconstruction_cache[cache_key]) + result = self._read_target_raw(action_name, relative_path) + result = self._reconstruct_from_deltas(action_name, relative_path, result) validate_lifecycle_batch(result, action_name=action_name) reset_for_downstream(result, action_name=action_name) - return result + self._reconstruction_cache[cache_key] = result + return copy.deepcopy(result) + + @abstractmethod + def _write_target_raw( + self, action_name: str, relative_path: str, data: list[dict[str, Any]] + ) -> str: + """Store delta-extracted records to the backend.""" + ... @abstractmethod def _read_target_raw(self, action_name: str, relative_path: str) -> list[dict[str, Any]]: - """Read raw target data from storage. Subclasses implement this.""" + """Read raw target data from storage.""" + ... + + def _read_target_raw_batch( + self, action_names: list[str], relative_path: str + ) -> dict[str, list[dict[str, Any]]]: + """Fetch target data for multiple actions. Override for batched queries.""" + result: dict[str, list[dict[str, Any]]] = {} + for action in action_names: + try: + result[action] = self._read_target_raw(action, relative_path) + except FileNotFoundError: + pass + return result + + def save_metadata(self, key: str, value: str) -> None: + """Store a metadata key-value pair. Clears related caches.""" + self._save_metadata_raw(key, value) + if key == "execution_order": + self._execution_order_cache = None + elif key == "dependency_graph": + self._dependency_graph_cache = None + + @abstractmethod + def _save_metadata_raw(self, key: str, value: str) -> None: + """Store a metadata key-value pair to the backend.""" ... + @abstractmethod + def load_metadata(self, key: str) -> str | None: + """Load a metadata value by key. Returns None if not found.""" + ... + + def _extract_delta( + self, record: dict[str, Any], action_name: str, *, is_first_action: bool = False + ) -> dict[str, Any]: + """Extract delta: preserve entire envelope, strip content to this action's namespace.""" + content = record.get("content") + if not isinstance(content, dict): + return {**record, "_delta_mode": "full"} + + if not record.get("source_guid"): + return {**record, "_delta_mode": "full"} + + if action_name not in content: + return {**record, "_delta_mode": "full"} + + if is_first_action: + delta_content: dict[str, Any] = {} + if "source" in content: + delta_content["source"] = content["source"] + delta_content[action_name] = content[action_name] + mode = "first" + else: + delta_content = {action_name: content[action_name]} + mode = "delta" + + delta = {k: v for k, v in record.items() if k != "content"} + delta["content"] = delta_content + delta["_delta_mode"] = mode + return delta + + def _reconstruct_from_deltas( + self, + action_name: str, + relative_path: str, + delta_records: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + """Reconstruct full records by joining upstream deltas.""" + # All records in a batch have the same _delta_mode (set uniformly by + # write_target). Checking records[0] is sufficient — mixed batches + # cannot occur through the write_target API. + if not delta_records or "_delta_mode" not in delta_records[0]: + return delta_records + + upstream_actions = self._get_upstream_actions(action_name) + + if not upstream_actions: + return [{k: v for k, v in r.items() if k != "_delta_mode"} for r in delta_records] + + upstream_data = self._read_target_raw_batch(upstream_actions, relative_path) + + # Index upstream deltas by (action_name, source_guid). + # Track which guids have a "full" record — those are self-contained + # and we don't need to look further upstream for that guid. + upstream: dict[str, dict[str, dict[str, Any]]] = {} + full_boundary_guids: dict[str, str] = {} # guid → action that has full record + for act, records in upstream_data.items(): + guid_map: dict[str, dict[str, Any]] = {} + for rec in records: + guid = rec.get("source_guid") + if guid: + rec_content = rec.get("content") + if rec_content is None: + logger.warning("Upstream record %s in '%s' has no content.", guid, act) + rec_content = {} + guid_map[guid] = rec_content + if rec.get("_delta_mode") == "full": + full_boundary_guids[guid] = act + upstream[act] = guid_map + + # For each current record, determine how far back to reconstruct. + # If a "full" upstream record exists for this guid, only merge from + # that point forward — the full record already contains everything + # before it (expansion records embed upstream content). + reconstructed: list[dict[str, Any]] = [] + for record in delta_records: + mode = record.get("_delta_mode") + if mode in ("first", "full") or mode is None: + clean = {k: v for k, v in record.items() if k != "_delta_mode"} + reconstructed.append(clean) + continue + + guid = record.get("source_guid") + + # Find the boundary: if this guid has a full record upstream, + # start merging from that action (inclusive), not from the beginning. + boundary_action = full_boundary_guids.get(guid) if guid else None + if boundary_action and boundary_action in upstream_actions: + boundary_idx = upstream_actions.index(boundary_action) + merge_actions = upstream_actions[boundary_idx:] + else: + merge_actions = upstream_actions + + full_content: dict[str, Any] = {} + for act in merge_actions: + act_deltas = upstream.get(act, {}) + if not act_deltas: + continue + delta_content = act_deltas.get(guid) if guid else None + if delta_content is None: + # Upstream has data for this file but not this guid. + # Could be routing/partitioning (action processes a + # subset of guids) — log at debug, not warning. + logger.debug( + "Record %s not found in upstream '%s' — may be partitioned.", + guid, + act, + ) + else: + full_content.update(delta_content) + + current_content = record.get("content") + if current_content is not None: + full_content.update(current_content) + else: + logger.error("Delta record %s in '%s' has no content key.", guid, action_name) + + full_record = {k: v for k, v in record.items() if k != "content" and k != "_delta_mode"} + full_record["content"] = full_content + + reconstructed.append(full_record) + + return reconstructed + + def _get_upstream_actions(self, action_name: str) -> list[str]: + """Get the transitive upstream actions for a given action. + + Uses the dependency graph if available (correct for DAGs with parallel actions). + Falls back to execution_order[:idx] if no graph stored (legacy). + """ + # Try dependency graph first (correct for parallel pipelines) + if self._dependency_graph_cache is None: + raw = self.load_metadata("dependency_graph") + if raw is not None: + try: + self._dependency_graph_cache = json.loads(raw) + except json.JSONDecodeError: + logger.warning( + "Corrupt dependency_graph in metadata — reconstruction will use flat fallback." + ) + self._dependency_graph_cache = {} + else: + self._dependency_graph_cache = {} + + if self._dependency_graph_cache and action_name in self._dependency_graph_cache: + return self._dependency_graph_cache[action_name] + + if self._dependency_graph_cache and action_name not in self._dependency_graph_cache: + logger.warning( + "Action '%s' not found in dependency graph — using flat execution order fallback. " + "This may include parallel peers as upstream.", + action_name, + ) + + execution_order = self._get_execution_order() + try: + idx = execution_order.index(action_name) + except ValueError: + return [] + return execution_order[:idx] + + def _get_execution_order(self) -> list[str]: + """Get the workflow execution order from metadata. Cached per instance.""" + if self._execution_order_cache is not None: + return self._execution_order_cache + raw = self.load_metadata("execution_order") + if raw is None: + return [] + try: + self._execution_order_cache = json.loads(raw) + except json.JSONDecodeError: + logger.warning("Corrupt execution_order in metadata — delta reconstruction disabled.") + return [] + return self._execution_order_cache + @abstractmethod def write_source( self, @@ -355,7 +643,9 @@ def perform_maintenance( # noqa: B027 def close(self) -> None: # noqa: B027 """Close the storage backend and release resources.""" - pass + self._reconstruction_cache.clear() + self._execution_order_cache = None + self._dependency_graph_cache = None def __enter__(self) -> "StorageBackend": """Context manager entry.""" diff --git a/agent_actions/storage/backends/sqlite_backend.py b/agent_actions/storage/backends/sqlite_backend.py index a3b64c19..43c5992e 100644 --- a/agent_actions/storage/backends/sqlite_backend.py +++ b/agent_actions/storage/backends/sqlite_backend.py @@ -168,8 +168,17 @@ class SQLiteBackend(StorageBackend): # Restrictive as defense-in-depth; all SQL is parameterized. _VALID_IDENTIFIER_CHARS = set(string.ascii_letters + string.digits + "_-./ ") + METADATA_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS workflow_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """ + def __init__(self, db_path: str, workflow_name: str): """Initialize SQLite backend.""" + super().__init__() self.db_path = Path(db_path) self.workflow_name = workflow_name self._connection: sqlite3.Connection | None = None @@ -257,6 +266,7 @@ def initialize(self) -> None: cursor.execute(self.TRACE_INDEX_ACTION_RECORD_SQL) cursor.execute(self.CHECKPOINT_TABLE_SQL) cursor.execute(self.CHECKPOINT_INDEX_SQL) + cursor.execute(self.METADATA_TABLE_SQL) self.connection.commit() logger.info( "Initialized SQLite storage backend: %s", @@ -312,8 +322,10 @@ def _enforce_schema(self, cursor: sqlite3.Cursor) -> None: columns_to_add, ) - def write_target(self, action_name: str, relative_path: str, data: list[dict[str, Any]]) -> str: - """Write target data for a specific node.""" + def _write_target_raw( + self, action_name: str, relative_path: str, data: list[dict[str, Any]] + ) -> str: + """Store records to SQLite.""" action_name = self._validate_identifier(action_name, "action_name") relative_path = self._validate_identifier(relative_path, "relative_path") @@ -375,6 +387,48 @@ def _read_target_raw(self, action_name: str, relative_path: str) -> list[dict[st result: list[dict[str, Any]] = json.loads(row["data"]) return result + def _read_target_raw_batch( + self, action_names: list[str], relative_path: str + ) -> dict[str, list[dict[str, Any]]]: + """Fetch raw target data for multiple actions in one batched query.""" + if not action_names: + return {} + placeholders = ",".join("?" for _ in action_names) + with self._lock: + cursor = self.connection.cursor() + cursor.execute( + f"SELECT action_name, data FROM target_data " + f"WHERE action_name IN ({placeholders}) AND relative_path = ?", + (*action_names, relative_path), + ) + rows = cursor.fetchall() + + result: dict[str, list[dict[str, Any]]] = {} + for row in rows: + result[row["action_name"]] = json.loads(row["data"]) + return result + + def _save_metadata_raw(self, key: str, value: str) -> None: + """Store a metadata key-value pair. Latest value wins (INSERT OR REPLACE).""" + with self._lock: + self.connection.execute( + "INSERT OR REPLACE INTO workflow_metadata (key, value, updated_at) " + "VALUES (?, ?, CURRENT_TIMESTAMP)", + (key, value), + ) + self.connection.commit() + + def load_metadata(self, key: str) -> str | None: + """Load a metadata value by key. Returns None if not found.""" + with self._lock: + cursor = self.connection.cursor() + cursor.execute( + "SELECT value FROM workflow_metadata WHERE key = ?", + (key,), + ) + row = cursor.fetchone() + return row["value"] if row else None + def write_source( self, relative_path: str, @@ -549,6 +603,8 @@ def preview_target( continue records = json.loads(data_row["data"]) + if records and isinstance(records[0], dict) and "_delta_mode" in records[0]: + records = self._reconstruct_from_deltas(action_name, file_path, records) for record in records: if skipped < offset: skipped += 1 diff --git a/agent_actions/workflow/coordinator.py b/agent_actions/workflow/coordinator.py index 27cb5528..079f02f7 100644 --- a/agent_actions/workflow/coordinator.py +++ b/agent_actions/workflow/coordinator.py @@ -3,6 +3,7 @@ from __future__ import annotations import hashlib +import json import logging from datetime import datetime from typing import TYPE_CHECKING @@ -414,6 +415,20 @@ def _initialize_event_context(self) -> None: # ── Execution ─────────────────────────────────────────────────────── + def _persist_execution_metadata(self, levels: list[list[str]]) -> None: + """Store execution order and dependency graph in workflow metadata.""" + backend = getattr(self, "storage_backend", None) + if backend is None: + return + backend.save_metadata("execution_order", json.dumps(self.execution_order)) + prior_actions: list[str] = [] + dep_graph: dict[str, list[str]] = {} + for level_actions in levels: + for action in level_actions: + dep_graph[action] = list(prior_actions) + prior_actions.extend(level_actions) + backend.save_metadata("dependency_graph", json.dumps(dep_graph)) + async def async_run(self, concurrency_limit: int = 5): """Execute workflow level-by-level with parallelism within each level.""" self._initialize_event_context() @@ -425,6 +440,7 @@ async def async_run(self, concurrency_limit: int = 5): with manager.context(): try: levels = self.services.core.action_level_orchestrator.compute_execution_levels() + self._persist_execution_metadata(levels) self.services.core.action_level_orchestrator.log_execution_levels( levels, self.action_indices ) @@ -500,6 +516,7 @@ def _run_workflow_with_context(self, workflow_start): try: total_actions = len(self.execution_order) levels = self.services.core.action_level_orchestrator.compute_execution_levels() + self._persist_execution_metadata(levels) self.services.core.action_level_orchestrator.log_execution_levels( levels, self.action_indices ) diff --git a/agent_actions/workflow/managers/loop.py b/agent_actions/workflow/managers/loop.py index 64db70ad..5495e924 100644 --- a/agent_actions/workflow/managers/loop.py +++ b/agent_actions/workflow/managers/loop.py @@ -385,6 +385,8 @@ def _write_correlated_data( if self.storage_backend is not None and action_name: try: + for record in cleaned_data: + record["_delta_mode"] = "full" self.storage_backend.write_target(action_name, filename, cleaned_data) logger.debug( "Wrote %d correlated records to storage backend for %s/%s", diff --git a/tests/unit/storage/test_delta_storage.py b/tests/unit/storage/test_delta_storage.py new file mode 100644 index 00000000..87e2decf --- /dev/null +++ b/tests/unit/storage/test_delta_storage.py @@ -0,0 +1,1478 @@ +"""Tests for delta storage — extraction, reconstruction, and edge cases. + +Verifies that write_target stores only the current action's content namespace +and read_target reconstructs the full accumulated record transparently. +""" + +import json + +import pytest + +from agent_actions.storage.backends.sqlite_backend import SQLiteBackend + + +def _make_backend(tmp_path): + """Create a backend with workflow_metadata support.""" + db_path = tmp_path / "agent_io" / "test.db" + backend = SQLiteBackend(str(db_path), "test_workflow") + backend.initialize() + return backend + + +def _set_execution_order(backend, actions): + """Store execution order in metadata.""" + backend.save_metadata("execution_order", json.dumps(actions)) + + +class TestDeltaExtraction: + """write_target extracts deltas — stores only the current action's namespace.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["action_1", "action_2", "action_3"]) + yield b + b.close() + + def test_first_action_preserves_source(self, backend): + """First action delta has {source, action_1} — source is preserved.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL Intro"}, + "action_1": {"question": "What is SQL?"}, + }, + } + ] + backend.write_target("action_1", "file.json", data, is_first_action=True) + + # Check raw DB — should have source + action_1 + raw = backend._read_target_raw("action_1", "file.json") + assert "source" in raw[0]["content"] + assert "action_1" in raw[0]["content"] + assert raw[0]["_delta_mode"] == "first" + + def test_subsequent_action_strips_to_delta(self, backend): + """Non-first action stores only its own namespace.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL Intro"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + }, + } + ] + backend.write_target("action_2", "file.json", data) + + raw = backend._read_target_raw("action_2", "file.json") + assert raw[0]["content"] == {"action_2": {"difficulty": "easy"}} + assert raw[0]["_delta_mode"] == "delta" + # source and action_1 are NOT stored — they're in upstream deltas + assert "source" not in raw[0]["content"] + assert "action_1" not in raw[0]["content"] + + def test_full_mode_preserves_all_content(self, backend): + """Records tagged _delta_mode=full are stored as-is.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "_delta_mode": "full", + "content": { + "source": {"title": "SQL Intro"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + }, + } + ] + backend.write_target("action_2", "file.json", data) + + raw = backend._read_target_raw("action_2", "file.json") + assert len(raw[0]["content"]) == 3 + assert raw[0]["_delta_mode"] == "full" + + def test_record_without_content_stored_as_full(self, backend): + """Records without a content dict are stored as full (test data, raw records).""" + data = [{"id": 1, "value": "hello"}] + backend.write_target("action_1", "file.json", data, is_first_action=True) + + raw = backend._read_target_raw("action_1", "file.json") + assert raw[0]["_delta_mode"] == "full" + assert raw[0]["id"] == 1 + + def test_is_first_action_auto_detect(self, backend): + """write_target with is_first_action=None detects from execution_order.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"q": "hi"}, + }, + } + ] + # Don't pass is_first_action — should auto-detect action_1 as first + backend.write_target("action_1", "file.json", data) + + raw = backend._read_target_raw("action_1", "file.json") + assert raw[0]["_delta_mode"] == "first" + assert "source" in raw[0]["content"] + + +class TestDeltaReconstruction: + """read_target reconstructs full records from upstream deltas.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["action_1", "action_2", "action_3"]) + yield b + b.close() + + def _write_pipeline(self, backend): + """Write a 3-action pipeline as deltas.""" + backend.write_target( + "action_1", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + }, + } + ], + is_first_action=True, + ) + backend.write_target( + "action_2", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + }, + } + ], + ) + backend.write_target( + "action_3", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + "action_3": {"draft": "SQL is..."}, + }, + } + ], + ) + + def test_reconstruction_produces_full_record(self, backend): + """read_target for final action returns full record with all upstream namespaces.""" + self._write_pipeline(backend) + + result = backend.read_target("action_3", "file.json") + content_keys = sorted(result[0]["content"].keys()) + assert content_keys == ["action_1", "action_2", "action_3", "source"] + assert result[0]["content"]["source"]["title"] == "SQL" + assert result[0]["content"]["action_1"]["question"] == "What is SQL?" + assert result[0]["content"]["action_2"]["difficulty"] == "easy" + assert result[0]["content"]["action_3"]["draft"] == "SQL is..." + + def test_delta_mode_stripped_from_all_modes(self, backend): + """_delta_mode never leaks to consumers — stripped from delta, first, and full.""" + self._write_pipeline(backend) + + for action in ["action_1", "action_2", "action_3"]: + result = backend.read_target(action, "file.json") + assert "_delta_mode" not in result[0], f"_delta_mode leaked in {action}" + + def test_reconstruction_cache_invalidated_on_write(self, backend): + """Cache is cleared after write_target — subsequent read returns fresh data.""" + self._write_pipeline(backend) + + # First read — populates cache + result1 = backend.read_target("action_3", "file.json") + assert result1[0]["content"]["action_3"]["draft"] == "SQL is..." + + # Write new data for action_3 + backend.write_target( + "action_3", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + "action_3": {"draft": "UPDATED"}, + }, + } + ], + ) + + # Second read — must return updated data, not cached + result2 = backend.read_target("action_3", "file.json") + assert result2[0]["content"]["action_3"]["draft"] == "UPDATED" + + def test_missing_upstream_guid_not_flagged_as_incomplete(self, backend): + """Upstream with data for other guids but not this one — partitioned, not incomplete.""" + # Write action_1 with a DIFFERENT guid (partitioned pipeline) + backend.write_target( + "action_1", + "file.json", + [ + { + "source_guid": "other_guid", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"title": "Other"}, "action_1": {"q": "Other"}}, + } + ], + is_first_action=True, + ) + # Write action_3 with guid g1 — action_1 has data but NOT for g1 + backend.write_target( + "action_3", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": {"difficulty": "easy"}, + "action_3": {"draft": "SQL is..."}, + }, + } + ], + ) + + result = backend.read_target("action_3", "file.json") + # Partitioned: upstream has other guids, not g1 — not flagged + assert "_reconstruction_incomplete" not in result[0] + assert "action_3" in result[0]["content"] + + +class TestGuardSkipTombstone: + """Guard-skip tombstones (content[action] = None) work correctly with delta storage.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["action_1", "action_2", "action_3"]) + yield b + b.close() + + def test_tombstone_stored_as_delta(self, backend): + """Guard-skip tombstone stores {action_name: None} as delta.""" + # Write action_1 first + backend.write_target( + "action_1", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + }, + } + ], + is_first_action=True, + ) + + # Write action_2 tombstone (guard-skipped) + backend.write_target( + "action_2", + "file.json", + [ + { + "source_guid": "g1", + "_state": "guard_skipped", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": None, + }, + } + ], + ) + + # Raw DB should have only the null marker + raw = backend._read_target_raw("action_2", "file.json") + assert raw[0]["content"] == {"action_2": None} + assert raw[0]["_delta_mode"] == "delta" + + def test_tombstone_reconstruction_includes_upstream(self, backend): + """Reconstructed tombstone has all upstream content + null marker.""" + # Write action_1 + backend.write_target( + "action_1", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + }, + } + ], + is_first_action=True, + ) + + # Write guard-skip tombstone + backend.write_target( + "action_2", + "file.json", + [ + { + "source_guid": "g1", + "_state": "guard_skipped", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + "action_2": None, + }, + } + ], + ) + + # Reconstruct — should have upstream + null marker + result = backend.read_target("action_2", "file.json") + assert result[0]["content"]["source"]["title"] == "SQL" + assert result[0]["content"]["action_1"]["question"] == "What is SQL?" + assert result[0]["content"]["action_2"] is None + + +class TestBackwardCompatibility: + """Legacy records (no _delta_mode) work unchanged.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + yield b + b.close() + + def test_legacy_records_returned_as_is(self, backend): + """Records without _delta_mode are returned without reconstruction.""" + import sqlite3 + + # Insert a legacy full record directly (no _delta_mode) + legacy_data = [ + { + "_state": "active", + "source_guid": "g1", + "content": { + "source": {"title": "SQL"}, + "action_1": {"question": "What is SQL?"}, + }, + } + ] + conn = sqlite3.connect(str(backend.db_path)) + conn.execute( + "INSERT INTO target_data (action_name, relative_path, data, record_count) " + "VALUES (?, ?, ?, ?)", + ("action_1", "file.json", json.dumps(legacy_data), 1), + ) + conn.commit() + conn.close() + + result = backend.read_target("action_1", "file.json") + assert result[0]["content"]["source"]["title"] == "SQL" + assert result[0]["content"]["action_1"]["question"] == "What is SQL?" + assert "_delta_mode" not in result[0] + + +class TestMetadata: + """save_metadata / load_metadata round-trip.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + yield b + b.close() + + def test_save_load_round_trip(self, backend): + """Metadata key-value round-trip works.""" + backend.save_metadata("my_key", "my_value") + assert backend.load_metadata("my_key") == "my_value" + + def test_load_missing_returns_none(self, backend): + """Missing key returns None, not error.""" + assert backend.load_metadata("nonexistent") is None + + def test_save_overwrites(self, backend): + """Second save to same key overwrites.""" + backend.save_metadata("key", "v1") + backend.save_metadata("key", "v2") + assert backend.load_metadata("key") == "v2" + + def test_execution_order_stored_and_retrieved(self, backend): + """Execution order survives save/load cycle.""" + order = ["a1", "a2", "a3"] + backend.save_metadata("execution_order", json.dumps(order)) + loaded = json.loads(backend.load_metadata("execution_order")) + assert loaded == order + + +class TestFormatVersionCheck: + """Format version prevents silent corruption on downgrade.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["a1"]) + yield b + b.close() + + def test_format_version_stored_on_first_write(self, backend): + """First write_target stores format version in metadata.""" + backend.write_target( + "a1", + "f.json", + [{"_state": "active", "content": {"a1": {"x": 1}}}], + is_first_action=True, + ) + ver = backend.load_metadata("storage_format_version") + assert ver == "2" + + def test_old_code_rejects_delta_db(self, backend, tmp_path): + """Code with lower version raises ConfigurationError on delta DB.""" + from agent_actions.errors.configuration import ConfigValidationError + + # Simulate a future version + backend.save_metadata("storage_format_version", "99") + + with pytest.raises(ConfigValidationError, match="storage format version 99"): + backend.read_target("a1", "f.json") + + def test_corrupt_version_raises(self, backend): + """Non-integer version raises ConfigurationError.""" + from agent_actions.errors.configuration import ConfigValidationError + + backend.save_metadata("storage_format_version", "not_a_number") + + with pytest.raises(ConfigValidationError, match="Corrupt"): + backend.read_target("a1", "f.json") + + +class TestPreviewReconstruction: + """preview_target reconstructs deltas before pagination.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["action_1", "action_2"]) + yield b + b.close() + + def test_preview_shows_full_content(self, backend): + """preview_target shows reconstructed content, not raw deltas.""" + backend.write_target( + "action_1", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"title": "SQL"}, "action_1": {"q": "hi"}}, + } + ], + is_first_action=True, + ) + backend.write_target( + "action_2", + "file.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "action_1": {"q": "hi"}, + "action_2": {"d": "easy"}, + }, + } + ], + ) + + result = backend.preview_target("action_2") + records = result["records"] + assert len(records) == 1 + # Should have all namespaces, not just action_2 + assert "source" in records[0]["content"] + assert "action_1" in records[0]["content"] + assert "action_2" in records[0]["content"] + assert "_delta_mode" not in records[0] + + +# --------------------------------------------------------------------------- +# FILE-mode expansion and contraction through enrichment + delta storage +# --------------------------------------------------------------------------- + + +class TestFileToolExpansionDelta: + """FILE-mode tools that expand 1→N records get stored as full (not delta). + + Expansion records get fresh source_guids in enrichment.py. + These GUIDs don't exist in any upstream action, so delta reconstruction + would fail. The enricher tags them _delta_mode="full" to prevent stripping. + """ + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["extract", "flatten", "classify"]) + yield b + b.close() + + def _simulate_expansion_pipeline(self, backend): + """Simulate: extract (3 records) → flatten (10 records, 1→N expansion).""" + from unittest.mock import MagicMock + + from agent_actions.processing.enrichment import LineageEnricher + from agent_actions.processing.types import ProcessingResult, ProcessingStatus + + # Step 1: Write extract action (3 records, first action) + extract_records = [ + { + "source_guid": f"original-{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": f"Page {i}"}, + "extract": {"questions": [{"q": f"Q{j}"} for j in range(3)]}, + }, + } + for i in range(3) + ] + backend.write_target("extract", "file.json", extract_records, is_first_action=True) + + # Step 2: Simulate flatten tool expanding 3→9 records + # Each input record produces 3 output records (one per question) + expanded_items = [] + for i, rec in enumerate(extract_records): + for j, q in enumerate(rec["content"]["extract"]["questions"]): + expanded_items.append( + { + "source_guid": rec["source_guid"], # parent guid (will be replaced) + "target_id": f"tid-{i}-{j}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": rec["content"]["source"], + "extract": rec["content"]["extract"], + "flatten": {"question_text": q["q"], "index": j}, + }, + } + ) + + # Run through enrichment with is_expansion=True (like the real pipeline) + result = ProcessingResult( + data=expanded_items, + status=ProcessingStatus.SUCCESS, + is_expansion=True, + ) + ctx = MagicMock() + ctx.action_name = "flatten" + ctx.agent_name = "flatten" + ctx.is_first_stage = False + ctx.source_data = extract_records + ctx.record_index = 0 + ctx.agent_config = {} + + enricher = LineageEnricher() + enriched = enricher.enrich(result, ctx) + + # Write enriched expansion records + backend.write_target("flatten", "file.json", enriched.data) + + return enriched.data + + def test_expansion_records_stored_as_full(self, backend): + """Expansion records (fresh GUIDs) must be stored as _delta_mode=full.""" + self._simulate_expansion_pipeline(backend) + + raw = backend._read_target_raw("flatten", "file.json") + assert len(raw) == 9 + + modes = {r.get("_delta_mode") for r in raw} + assert modes == {"full"}, f"Expected all full, got: {modes}" + + def test_expansion_records_have_fresh_source_guids(self, backend): + """Each expansion record has a unique source_guid, not the parent's.""" + enriched = self._simulate_expansion_pipeline(backend) + + guids = [r["source_guid"] for r in enriched] + assert len(set(guids)) == 9, "Each expansion record should have a unique GUID" + assert all(not g.startswith("original-") for g in guids), "GUIDs should be fresh UUIDs" + + def test_expansion_records_preserve_parent_guid(self, backend): + """Expansion records carry parent_source_guid for lineage tracking.""" + enriched = self._simulate_expansion_pipeline(backend) + + for record in enriched: + assert "parent_source_guid" in record, "Missing parent_source_guid" + assert record["parent_source_guid"].startswith("original-") + + def test_expansion_read_returns_full_content(self, backend): + """read_target for expansion records returns full content (no reconstruction needed).""" + self._simulate_expansion_pipeline(backend) + + result = backend.read_target("flatten", "file.json") + assert len(result) == 9 + + for record in result: + assert "source" in record["content"], "Missing source namespace" + assert "extract" in record["content"], "Missing extract namespace" + assert "flatten" in record["content"], "Missing flatten namespace" + assert "_delta_mode" not in record, "_delta_mode leaked to consumer" + + def test_downstream_after_expansion_uses_delta(self, backend): + """Action after expansion stores deltas (not full) for the expanded records.""" + enriched = self._simulate_expansion_pipeline(backend) + + # Simulate classify action processing the expanded records. + # In the real pipeline, enrichment for a non-expansion action + # does NOT set _delta_mode, so drop it from the simulated input. + classify_records = [] + for rec in enriched: + classify_records.append( + { + **{k: v for k, v in rec.items() if k not in ("content", "_delta_mode")}, + "content": { + **rec["content"], + "classify": {"difficulty": "medium"}, + }, + } + ) + + backend.write_target("classify", "file.json", classify_records) + + raw = backend._read_target_raw("classify", "file.json") + modes = {r.get("_delta_mode") for r in raw} + assert "delta" in modes, f"Expected delta mode for downstream, got: {modes}" + + def test_downstream_reconstruction_uses_expansion_boundary(self, backend): + """Records downstream of expansion reconstruct from the expansion point, not from the start. + + The expansion (flatten) creates new GUIDs. Actions before the expansion + (summarize, extract) have the old GUIDs. Reconstruction for actions after + flatten should start at flatten's full record — not try to find the new + GUIDs in summarize/extract (where they don't exist). + """ + enriched = self._simulate_expansion_pipeline(backend) + + # Write a downstream action that processes the expanded records + classify_records = [] + for rec in enriched: + classify_records.append( + { + **{k: v for k, v in rec.items() if k not in ("content", "_delta_mode")}, + "content": { + **rec["content"], + "classify": {"difficulty": "medium"}, + }, + } + ) + backend.write_target("classify", "file.json", classify_records) + + # Read classify — should reconstruct from flatten (full) + classify (delta) + # Should NOT try to find these GUIDs in extract (they don't exist there) + result = backend.read_target("classify", "file.json") + assert len(result) == 9 + + for record in result: + # Must have flatten + classify content (from flatten's full + classify's delta) + assert "flatten" in record["content"], "Missing flatten namespace" + assert "classify" in record["content"], "Missing classify namespace" + # Must have source + extract content (baked into flatten's full record) + assert "source" in record["content"], "Missing source namespace" + assert "extract" in record["content"], "Missing extract namespace" + # Must NOT be flagged incomplete + assert "_reconstruction_incomplete" not in record, ( + f"Record incorrectly flagged incomplete: {record.get('source_guid')}" + ) + assert "_delta_mode" not in record + + +class TestFileToolContractionDelta: + """FILE-mode tools that contract N→M records (M < N). + + Contraction tools (e.g., grouping 100 records into 5 batches) produce + fewer output records than input. If output records have new GUIDs + (from enrichment expansion path), they should be stored as full. + If they reuse input GUIDs, they should be stored as delta. + """ + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["input_action", "group_action"]) + yield b + b.close() + + def test_contraction_with_reused_guids_stores_delta(self, backend): + """Tool that returns fewer records but reuses input GUIDs → delta storage.""" + # 5 input records + input_records = [ + { + "source_guid": f"g{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"id": i}, + "input_action": {"value": f"v{i}"}, + }, + } + for i in range(5) + ] + backend.write_target("input_action", "file.json", input_records, is_first_action=True) + + # Tool returns 3 records, reusing source_guids g0, g1, g2 + output_records = [ + { + "source_guid": f"g{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"id": i}, + "input_action": {"value": f"v{i}"}, + "group_action": {"group": f"batch_{i}"}, + }, + } + for i in range(3) + ] + backend.write_target("group_action", "file.json", output_records) + + raw = backend._read_target_raw("group_action", "file.json") + modes = {r.get("_delta_mode") for r in raw} + assert modes == {"delta"}, f"Expected delta for reused GUIDs, got: {modes}" + + def test_contraction_reconstruction_works(self, backend): + """Contracted records reconstruct upstream content correctly.""" + input_records = [ + { + "source_guid": f"g{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"id": i}, + "input_action": {"value": f"v{i}"}, + }, + } + for i in range(5) + ] + backend.write_target("input_action", "file.json", input_records, is_first_action=True) + + output_records = [ + { + "source_guid": f"g{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"id": i}, + "input_action": {"value": f"v{i}"}, + "group_action": {"group": f"batch_{i}"}, + }, + } + for i in range(3) + ] + backend.write_target("group_action", "file.json", output_records) + + result = backend.read_target("group_action", "file.json") + assert len(result) == 3 + for i, rec in enumerate(result): + assert rec["content"]["source"]["id"] == i + assert rec["content"]["input_action"]["value"] == f"v{i}" + assert rec["content"]["group_action"]["group"] == f"batch_{i}" + assert "_delta_mode" not in rec + + +class TestForceFullParameter: + """write_target(force_full=True) bypasses delta extraction entirely.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["a1", "a2"]) + yield b + b.close() + + def test_force_full_stores_complete_content(self, backend): + """force_full=True stores all content namespaces regardless of action.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "a1": {"q": "hi"}, + "a2": {"level": "easy"}, + }, + } + ] + backend.write_target("a2", "file.json", data, force_full=True) + + raw = backend._read_target_raw("a2", "file.json") + assert raw[0]["_delta_mode"] == "full" + assert len(raw[0]["content"]) == 3 + + def test_force_full_records_read_without_reconstruction(self, backend): + """force_full records are returned as-is, no upstream lookup.""" + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "a1": {"q": "hi"}, + "a2": {"level": "easy"}, + }, + } + ] + backend.write_target("a2", "file.json", data, force_full=True) + + result = backend.read_target("a2", "file.json") + assert result[0]["content"]["source"]["x"] == 1 + assert result[0]["content"]["a1"]["q"] == "hi" + assert "_delta_mode" not in result[0] + + +# --------------------------------------------------------------------------- +# E2E tests for every issue found during review +# Each test uses real SQLiteBackend, real data, no mocks. +# --------------------------------------------------------------------------- + + +class TestIssue2_ParallelPeers: + """Parallel actions at the same level must NOT appear in each other's upstream.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + # Pipeline: source → [branch_a, branch_b] (parallel) → merge + _set_execution_order(b, ["source_action", "branch_a", "branch_b", "merge_action"]) + # Dep graph from levels: branch_a and branch_b share the same deps + dep_graph = { + "source_action": [], + "branch_a": ["source_action"], + "branch_b": ["source_action"], + "merge_action": ["source_action", "branch_a", "branch_b"], + } + b.save_metadata("dependency_graph", json.dumps(dep_graph)) + yield b + b.close() + + def test_parallel_branches_dont_include_each_other(self, backend): + """branch_a's upstream must NOT include branch_b (they're parallel peers).""" + backend.write_target( + "source_action", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 1}, "source_action": {"data": "raw"}}, + } + ], + is_first_action=True, + ) + backend.write_target( + "branch_a", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source_action": {"data": "raw"}, + "branch_a": {"score": 5}, + }, + } + ], + ) + backend.write_target( + "branch_b", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source_action": {"data": "raw"}, + "branch_b": {"score": 8}, + }, + } + ], + ) + + # Read branch_a — should NOT contain branch_b's content + result_a = backend.read_target("branch_a", "f.json") + assert "branch_b" not in result_a[0]["content"], ( + "Parallel peer branch_b leaked into branch_a" + ) + assert "source_action" in result_a[0]["content"] + + # Read branch_b — should NOT contain branch_a's content + result_b = backend.read_target("branch_b", "f.json") + assert "branch_a" not in result_b[0]["content"], ( + "Parallel peer branch_a leaked into branch_b" + ) + + def test_merge_action_gets_both_branches(self, backend): + """merge_action depends on both branches — should have all namespaces.""" + backend.write_target( + "source_action", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 1}, "source_action": {"data": "raw"}}, + } + ], + is_first_action=True, + ) + backend.write_target( + "branch_a", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source_action": {"data": "raw"}, + "branch_a": {"score": 5}, + }, + } + ], + ) + backend.write_target( + "branch_b", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source_action": {"data": "raw"}, + "branch_b": {"score": 8}, + }, + } + ], + ) + backend.write_target( + "merge_action", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source_action": {"data": "raw"}, + "branch_a": {"score": 5}, + "branch_b": {"score": 8}, + "merge_action": {"combined": 13}, + }, + } + ], + ) + + result = backend.read_target("merge_action", "f.json") + assert "source_action" in result[0]["content"] + assert "branch_a" in result[0]["content"] + assert "branch_b" in result[0]["content"] + assert "merge_action" in result[0]["content"] + + +class TestIssue3_VersionedActions: + """Versioned actions (action_1, action_2, action_3) must be in the dep graph.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["summarize", "extract_1", "extract_2", "extract_3", "merge"]) + dep_graph = { + "summarize": [], + "extract_1": ["summarize"], + "extract_2": ["summarize"], + "extract_3": ["summarize"], + "merge": ["summarize", "extract_1", "extract_2", "extract_3"], + } + b.save_metadata("dependency_graph", json.dumps(dep_graph)) + yield b + b.close() + + def test_versioned_actions_have_correct_upstream(self, backend): + """Each extract_N depends only on summarize, not on other extract_N peers.""" + backend.write_target( + "summarize", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"title": "SQL"}, "summarize": {"summary": "about SQL"}}, + } + ], + is_first_action=True, + ) + for i in range(1, 4): + backend.write_target( + f"extract_{i}", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"title": "SQL"}, + "summarize": {"summary": "about SQL"}, + f"extract_{i}": {"question": f"Q{i}"}, + }, + } + ], + ) + + # Each extract_N should have source + summarize + its own namespace + for i in range(1, 4): + result = backend.read_target(f"extract_{i}", "f.json") + assert "source" in result[0]["content"] + assert "summarize" in result[0]["content"] + assert f"extract_{i}" in result[0]["content"] + # Should NOT have other extract peers + for j in range(1, 4): + if j != i: + assert f"extract_{j}" not in result[0]["content"], ( + f"extract_{j} leaked into extract_{i}" + ) + + +class TestIssue4_MetadataCacheInvalidation: + """save_metadata must clear the in-memory cache so reads get fresh data.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + yield b + b.close() + + def test_dep_graph_cache_cleared_on_save(self, backend): + """Writing new dep graph invalidates the cached version.""" + # Write initial graph + backend.save_metadata("dependency_graph", json.dumps({"a1": [], "a2": ["a1"]})) + backend.save_metadata("execution_order", json.dumps(["a1", "a2"])) + + # Force cache population by reading + backend.write_target( + "a1", + "f.json", + [{"source_guid": "g1", "_state": "active", "content": {"a1": {"x": 1}}}], + is_first_action=True, + ) + backend.write_target( + "a2", + "f.json", + [ + { + "source_guid": "g1", + "_state": "active", + "content": {"a1": {"x": 1}, "a2": {"y": 2}}, + } + ], + ) + + result1 = backend.read_target("a2", "f.json") + assert "a1" in result1[0]["content"] + + # Now update the dep graph to make a2 have NO deps + backend.save_metadata("dependency_graph", json.dumps({"a1": [], "a2": []})) + + # Read again — should use the NEW graph (no upstream for a2) + result2 = backend.read_target("a2", "f.json") + # a2's delta has only {a2: ...}, and with empty deps, no upstream is merged + assert "a2" in result2[0]["content"] + + +class TestIssue6_MissingSourceGuid: + """Records without source_guid must be stored as full — can't reconstruct without a join key.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["a1", "a2"]) + yield b + b.close() + + def test_no_source_guid_stored_as_full(self, backend): + """Record without source_guid is stored as _delta_mode=full.""" + data = [ + { + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 1}, "a2": {"y": 2}}, + } + ] + backend.write_target("a2", "f.json", data) + + raw = backend._read_target_raw("a2", "f.json") + assert raw[0]["_delta_mode"] == "full" + assert "source" in raw[0]["content"] + assert "a2" in raw[0]["content"] + + def test_no_source_guid_read_returns_full_content(self, backend): + """read_target for guid-less record returns all content intact.""" + data = [ + { + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 1}, "a1": {"q": "hi"}, "a2": {"y": 2}}, + } + ] + backend.write_target("a2", "f.json", data) + + result = backend.read_target("a2", "f.json") + assert "source" in result[0]["content"] + assert "a1" in result[0]["content"] + assert "a2" in result[0]["content"] + + +class TestIssue7_PartitionedPipeline: + """Diamond DAG where branches process different guids — no false incomplete flags.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["source", "branch_a", "branch_b", "merge"]) + dep_graph = { + "source": [], + "branch_a": ["source"], + "branch_b": ["source"], + "merge": ["source", "branch_a", "branch_b"], + } + b.save_metadata("dependency_graph", json.dumps(dep_graph)) + yield b + b.close() + + def test_partitioned_guids_no_false_incomplete(self, backend): + """branch_a processes g1, branch_b processes g2 — merge sees both without false flags.""" + backend.write_target( + "source", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 1}, "source": {"data": "raw"}}, + }, + { + "source_guid": "g2", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": 2}, "source": {"data": "raw2"}}, + }, + ], + is_first_action=True, + ) + # branch_a only processes g1 + backend.write_target( + "branch_a", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source": {"data": "raw"}, + "branch_a": {"score": 5}, + }, + } + ], + ) + # branch_b only processes g2 + backend.write_target( + "branch_b", + "f.json", + [ + { + "source_guid": "g2", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 2}, + "source": {"data": "raw2"}, + "branch_b": {"score": 8}, + }, + } + ], + ) + # merge processes both + backend.write_target( + "merge", + "f.json", + [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 1}, + "source": {"data": "raw"}, + "branch_a": {"score": 5}, + "merge": {"combined": "g1_result"}, + }, + }, + { + "source_guid": "g2", + "_state": "processed", + "_state_schema_version": 1, + "content": { + "source": {"x": 2}, + "source": {"data": "raw2"}, + "branch_b": {"score": 8}, + "merge": {"combined": "g2_result"}, + }, + }, + ], + ) + + result = backend.read_target("merge", "f.json") + for r in result: + assert "_reconstruction_incomplete" not in r, ( + f"False incomplete flag on {r.get('source_guid')}" + ) + assert "merge" in r["content"] + + +class TestIssue8_FilesystemLeak: + """_delta_mode must not appear in filesystem target files.""" + + def test_disk_file_has_no_delta_mode(self, tmp_path): + """FileWriter strips _delta_mode before writing to disk.""" + from unittest.mock import MagicMock + + from agent_actions.output.writer import FileWriter + + backend = _make_backend(tmp_path) + _set_execution_order(backend, ["expand_action"]) + backend.initialize() + + output_dir = tmp_path / "target" / "expand_action" + output_dir.mkdir(parents=True, exist_ok=True) + file_path = output_dir / "out.json" + + writer = FileWriter( + str(file_path), + storage_backend=backend, + action_name="expand_action", + output_directory=str(output_dir), + ) + + data = [ + { + "source_guid": "g1", + "_state": "processed", + "_state_schema_version": 1, + "_delta_mode": "full", + "content": {"source": {"x": 1}, "expand_action": {"expanded": True}}, + } + ] + writer.write_target(data) + + # Read the disk file directly — must NOT have _delta_mode + disk_data = json.loads(file_path.read_text()) + for record in disk_data: + assert "_delta_mode" not in record, "_delta_mode leaked to filesystem" + + backend.close() + + +class TestIssue9_CorruptMetadata: + """Corrupt metadata must degrade gracefully, not crash.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + yield b + b.close() + + def test_corrupt_dependency_graph_falls_back(self, backend): + """Corrupt dep graph → fallback to flat execution order, no crash.""" + backend.save_metadata("dependency_graph", "NOT VALID JSON{{{") + backend.save_metadata("execution_order", json.dumps(["a1", "a2"])) + + backend.write_target( + "a1", + "f.json", + [{"source_guid": "g1", "_state": "active", "content": {"a1": {"x": 1}}}], + is_first_action=True, + ) + backend.write_target( + "a2", + "f.json", + [ + { + "source_guid": "g1", + "_state": "active", + "content": {"a1": {"x": 1}, "a2": {"y": 2}}, + } + ], + ) + + # Should not crash — falls back to flat order + result = backend.read_target("a2", "f.json") + assert "a2" in result[0]["content"] + + def test_corrupt_execution_order_returns_raw(self, backend): + """Corrupt execution order → reconstruction disabled, raw delta returned.""" + backend.save_metadata("execution_order", "CORRUPT!!!") + + backend.write_target( + "a1", + "f.json", + [{"source_guid": "g1", "_state": "active", "content": {"a1": {"x": 1}}}], + is_first_action=True, + ) + + # read_target should not crash + result = backend.read_target("a1", "f.json") + assert "a1" in result[0]["content"] + + +class TestIssue10_UniformDeltaMode: + """write_target must produce uniform _delta_mode across all records in a batch.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["a1", "a2"]) + yield b + b.close() + + def test_batch_has_uniform_delta_mode(self, backend): + """All records in a single write_target call get the same _delta_mode.""" + data = [ + { + "source_guid": f"g{i}", + "_state": "processed", + "_state_schema_version": 1, + "content": {"source": {"x": i}, "a1": {"q": f"Q{i}"}, "a2": {"level": f"L{i}"}}, + } + for i in range(5) + ] + backend.write_target("a2", "f.json", data) + + raw = backend._read_target_raw("a2", "f.json") + modes = {r.get("_delta_mode") for r in raw} + assert len(modes) == 1, f"Non-uniform delta modes in batch: {modes}" + + +class TestIssue11_FallbackWarning: + """Action not in dep graph should warn and fall back to flat order.""" + + @pytest.fixture + def backend(self, tmp_path): + b = _make_backend(tmp_path) + _set_execution_order(b, ["a1", "a2", "a3"]) + # dep graph deliberately missing a3 + b.save_metadata("dependency_graph", json.dumps({"a1": [], "a2": ["a1"]})) + yield b + b.close() + + def test_missing_action_in_graph_still_works(self, backend, capsys): + """Action not in dep graph falls back to flat order with warning.""" + backend.write_target( + "a1", + "f.json", + [{"source_guid": "g1", "_state": "active", "content": {"a1": {"x": 1}}}], + is_first_action=True, + ) + backend.write_target( + "a2", + "f.json", + [ + { + "source_guid": "g1", + "_state": "active", + "content": {"a1": {"x": 1}, "a2": {"y": 2}}, + } + ], + ) + backend.write_target( + "a3", + "f.json", + [ + { + "source_guid": "g1", + "_state": "active", + "content": {"a1": {"x": 1}, "a2": {"y": 2}, "a3": {"z": 3}}, + } + ], + ) + + result = backend.read_target("a3", "f.json") + + # Should still reconstruct (using flat order fallback) + assert "a3" in result[0]["content"] + # Warning fires via logging to stderr + stderr = capsys.readouterr().err + assert "not found in dependency graph" in stderr diff --git a/tests/unit/workflow/test_circuit_breaker.py b/tests/unit/workflow/test_circuit_breaker.py index 350f9920..021bb418 100644 --- a/tests/unit/workflow/test_circuit_breaker.py +++ b/tests/unit/workflow/test_circuit_breaker.py @@ -829,9 +829,6 @@ def backend_type(self): def initialize(self): pass - def write_target(self, *a, **kw): - pass - def _read_target_raw(self, *a, **kw): return [] @@ -856,6 +853,15 @@ def get_storage_stats(self): def delete_target(self, *a, **kw): return 0 + def _write_target_raw(self, *a, **kw): + return "" + + def _save_metadata_raw(self, *a, **kw): + pass + + def load_metadata(self, *a, **kw): + return None + def get_disposition(self, action_name, record_id=None, disposition=None): return [ {