diff --git a/agent_actions/output/writer.py b/agent_actions/output/writer.py index 7859d7e3..f4a990cf 100644 --- a/agent_actions/output/writer.py +++ b/agent_actions/output/writer.py @@ -3,6 +3,7 @@ from __future__ import annotations import csv +import json from collections.abc import Callable from pathlib import Path from typing import TYPE_CHECKING, Any @@ -16,7 +17,6 @@ from agent_actions.processing.error_handling import ProcessorErrorHandlerMixin from agent_actions.utils.atomic_write import atomic_json_write from agent_actions.utils.path_safety import assert_path_contained -from agent_actions.utils.path_utils import ensure_directory_exists if TYPE_CHECKING: from agent_actions.storage.backend import StorageBackend @@ -132,10 +132,7 @@ def do_write() -> int: self.storage_backend.write_target(self.action_name, relative_path, data) - ensure_directory_exists(file_path, is_file=True) - atomic_json_write(file_path, data) - - return file_path.stat().st_size + return len(json.dumps(data, ensure_ascii=False).encode("utf-8")) self._execute_write("Write target file", do_write) diff --git a/agent_actions/storage/__init__.py b/agent_actions/storage/__init__.py index 4ed99ee7..d4c7e0ff 100644 --- a/agent_actions/storage/__init__.py +++ b/agent_actions/storage/__init__.py @@ -30,7 +30,10 @@ def get_storage_backend( workflow_dir = Path(workflow_path) db_path = workflow_dir / "agent_io" / "store" / f"{workflow_name}.db" - backend = backend_class.create(db_path=str(db_path), workflow_name=workflow_name) + target_dir = workflow_dir / "agent_io" / "target" + backend = backend_class.create( + db_path=str(db_path), workflow_name=workflow_name, target_dir=str(target_dir) + ) return backend diff --git a/agent_actions/storage/backends/sqlite_backend.py b/agent_actions/storage/backends/sqlite_backend.py index a3b64c19..89909de5 100644 --- a/agent_actions/storage/backends/sqlite_backend.py +++ b/agent_actions/storage/backends/sqlite_backend.py @@ -20,6 +20,8 @@ DispositionRow, StorageBackend, ) +from agent_actions.utils.atomic_write import atomic_json_write +from agent_actions.utils.path_safety import assert_path_contained logger = logging.getLogger(__name__) @@ -168,10 +170,11 @@ class SQLiteBackend(StorageBackend): # Restrictive as defense-in-depth; all SQL is parameterized. _VALID_IDENTIFIER_CHARS = set(string.ascii_letters + string.digits + "_-./ ") - def __init__(self, db_path: str, workflow_name: str): + def __init__(self, db_path: str, workflow_name: str, target_dir: str | None = None): """Initialize SQLite backend.""" self.db_path = Path(db_path) self.workflow_name = workflow_name + self.target_dir: Path | None = Path(target_dir) if target_dir else None self._connection: sqlite3.Connection | None = None self._lock = ( threading.RLock() @@ -184,15 +187,17 @@ def create(cls, **kwargs) -> "SQLiteBackend": Required kwargs: db_path: Path to the SQLite database file. workflow_name: Name of the workflow. + target_dir: Path to agent_io/target directory. """ db_path = kwargs.pop("db_path") workflow_name = kwargs.pop("workflow_name") + target_dir = kwargs.pop("target_dir", None) if kwargs: raise ConfigValidationError( f"Unknown kwargs for SQLiteBackend: {list(kwargs)}", context={"unknown_kwargs": list(kwargs)}, ) - return cls(str(db_path), workflow_name) + return cls(str(db_path), workflow_name, target_dir=str(target_dir) if target_dir else None) def _validate_identifier(self, name: str, field: str) -> str: """Validate and POSIX-normalize an identifier to prevent injection. @@ -313,13 +318,23 @@ def _enforce_schema(self, cursor: sqlite3.Cursor) -> None: ) def write_target(self, action_name: str, relative_path: str, data: list[dict[str, Any]]) -> str: - """Write target data for a specific node.""" + """Write target data to filesystem and metadata to DB.""" action_name = self._validate_identifier(action_name, "action_name") relative_path = self._validate_identifier(relative_path, "relative_path") - data_json = json.dumps(data, ensure_ascii=False) record_count = len(data) + if self.target_dir is None: + raise ValueError( + f"Cannot write target data without target_dir configured " + f"({action_name}/{relative_path})" + ) + + file_path = self.target_dir / action_name / relative_path + assert_path_contained(file_path, self.target_dir) + file_path.parent.mkdir(parents=True, exist_ok=True) + atomic_json_write(file_path, data) + with self._lock: cursor = self.connection.cursor() try: @@ -327,9 +342,9 @@ def write_target(self, action_name: str, relative_path: str, data: list[dict[str """ INSERT OR REPLACE INTO target_data (action_name, relative_path, data, record_count, created_at) - VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + VALUES (?, ?, '[]', ?, CURRENT_TIMESTAMP) """, - (action_name, relative_path, data_json, record_count), + (action_name, relative_path, record_count), ) self.connection.commit() logger.debug( @@ -354,25 +369,33 @@ def write_target(self, action_name: str, relative_path: str, data: list[dict[str raise def _read_target_raw(self, action_name: str, relative_path: str) -> list[dict[str, Any]]: - """Read raw target data from SQLite. + """Read raw target data from the filesystem. Raises: FileNotFoundError: If no data exists for the given path. """ action_name = self._validate_identifier(action_name, "action_name") relative_path = self._validate_identifier(relative_path, "relative_path") - with self._lock: - cursor = self.connection.cursor() - cursor.execute( - "SELECT data FROM target_data WHERE action_name = ? AND relative_path = ?", - (action_name, relative_path), - ) - row = cursor.fetchone() - if row is None: - raise FileNotFoundError(f"No target data found for {action_name}/{relative_path}") + if self.target_dir is None: + raise FileNotFoundError( + f"No target_dir configured — cannot read {action_name}/{relative_path}" + ) - result: list[dict[str, Any]] = json.loads(row["data"]) + file_path = self.target_dir / action_name / relative_path + assert_path_contained(file_path, self.target_dir) + try: + result: list[dict[str, Any]] = json.loads(file_path.read_text(encoding="utf-8")) + except FileNotFoundError: + raise FileNotFoundError( + f"No target data found for {action_name}/{relative_path}" + ) from None + + if not isinstance(result, list): + raise FileNotFoundError( + f"Target data at {action_name}/{relative_path} is not a list " + f"(got {type(result).__name__})" + ) return result def write_source( @@ -485,11 +508,17 @@ def preview_target( offset: int = 0, relative_path: str | None = None, ) -> dict[str, Any]: - """Preview target data for a node with pagination.""" + """Preview target data for a node with pagination. + + Reads file metadata from DB, actual data from filesystem. + """ action_name = self._validate_identifier(action_name, "action_name") if relative_path is not None: relative_path = self._validate_identifier(relative_path, "relative_path") + if self.target_dir is None: + raise ValueError("Cannot preview target data without target_dir configured") + limit = min(max(1, limit), 1000) offset = max(0, offset) @@ -499,7 +528,7 @@ def preview_target( cursor.execute( """ SELECT relative_path, - COALESCE(record_count, json_array_length(data)) as record_count + COALESCE(record_count, 0) as record_count FROM target_data WHERE action_name = ? ORDER BY relative_path @@ -508,60 +537,58 @@ def preview_target( ) file_metadata = cursor.fetchall() - files = [row["relative_path"] for row in file_metadata] + files = [row["relative_path"] for row in file_metadata] - if relative_path: - if relative_path not in files: - return { - "records": [], - "total_count": 0, - "action_name": action_name, - "files": files, - "error": f"File '{relative_path}' not found for node '{action_name}'", - } - file_metadata = [ - row for row in file_metadata if row["relative_path"] == relative_path - ] + if relative_path: + if relative_path not in files: + return { + "records": [], + "total_count": 0, + "action_name": action_name, + "files": files, + "error": f"File '{relative_path}' not found for node '{action_name}'", + } + file_metadata = [row for row in file_metadata if row["relative_path"] == relative_path] - total_count = sum(row["record_count"] for row in file_metadata) + total_count = sum(row["record_count"] for row in file_metadata) - paginated_records: list[dict[str, Any]] = [] - skipped = 0 - collected = 0 + paginated_records: list[dict[str, Any]] = [] + skipped = 0 + collected = 0 - for row in file_metadata: - if collected >= limit: - break + for row in file_metadata: + if collected >= limit: + break - file_path = row["relative_path"] - file_record_count = row["record_count"] + file_path = row["relative_path"] + file_record_count = row["record_count"] - if skipped + file_record_count <= offset: - skipped += file_record_count - continue + if skipped + file_record_count <= offset: + skipped += file_record_count + continue - cursor.execute( - "SELECT data FROM target_data WHERE action_name = ? AND relative_path = ?", - (action_name, file_path), - ) - data_row = cursor.fetchone() - if not data_row: + fs_path = self.target_dir / action_name / file_path + try: + records = json.loads(fs_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + continue + + if not isinstance(records, list): + continue + + for record in records: + if skipped < offset: + skipped += 1 continue - records = json.loads(data_row["data"]) - for record in records: - if skipped < offset: - skipped += 1 - continue - - if collected < limit: - if isinstance(record, dict): - paginated_records.append({**record, "_file": file_path}) - else: - paginated_records.append({"_file": file_path, "_value": record}) - collected += 1 + if collected < limit: + if isinstance(record, dict): + paginated_records.append({**record, "_file": file_path}) else: - break + paginated_records.append({"_file": file_path, "_value": record}) + collected += 1 + else: + break return { "records": paginated_records, diff --git a/agent_actions/tooling/docs/scanner/data_scanners.py b/agent_actions/tooling/docs/scanner/data_scanners.py index d8205a4e..9ef155c3 100644 --- a/agent_actions/tooling/docs/scanner/data_scanners.py +++ b/agent_actions/tooling/docs/scanner/data_scanners.py @@ -105,7 +105,9 @@ def scan_workflow_data(project_root: Path) -> dict[str, Any]: workflow_name = db_file.stem try: - data = scan_sqlite_readonly(db_file, workflow_name) + data = scan_sqlite_readonly( + db_file, workflow_name, target_dir=agent_io_dir / "target" + ) if data is not None: workflow_data[workflow_name] = data except (OSError, sqlite3.Error) as e: @@ -142,7 +144,9 @@ def _unwrap_record_content(record: dict, action_name: str) -> dict: return record -def scan_sqlite_readonly(db_file: Path, workflow_name: str) -> dict[str, Any] | None: +def scan_sqlite_readonly( + db_file: Path, workflow_name: str, target_dir: Path | None = None +) -> dict[str, Any] | None: """Open a workflow SQLite DB read-only and extract stats + preview data. Uses a direct sqlite3 connection in read-only mode so that scanning @@ -213,27 +217,25 @@ def scan_sqlite_readonly(db_file: Path, workflow_name: str) -> dict[str, Any] | ) files = [row["relative_path"] for row in cursor.fetchall()] - # Preview: iterate the cursor lazily so we never load every - # data blob into memory. Cap at 20 flattened records. - cursor.execute( - "SELECT relative_path, data FROM target_data WHERE action_name = ?", - (action_name,), - ) + # Preview: read from filesystem (source of truth). + # Cap at 20 flattened records. records: list[dict] = [] - for target_row in cursor: + for file_path in files: if len(records) >= 20: break + if target_dir is None: + break + fs_path = target_dir / action_name / file_path try: - row_data = _json.loads(target_row["data"]) - except (ValueError, _json.JSONDecodeError): + row_data = _json.loads(fs_path.read_text(encoding="utf-8")) + except (ValueError, _json.JSONDecodeError, OSError): logger.debug( - "Skipping malformed JSON in %s node %s, file %s", + "Skipping unreadable target file %s/%s/%s", workflow_name, action_name, - target_row["relative_path"], + file_path, ) continue - file_path = target_row["relative_path"] if isinstance(row_data, list): for item in row_data: if len(records) >= 20: diff --git a/agent_actions/workflow/coordinator.py b/agent_actions/workflow/coordinator.py index 27cb5528..c347dd8c 100644 --- a/agent_actions/workflow/coordinator.py +++ b/agent_actions/workflow/coordinator.py @@ -286,8 +286,21 @@ def _clear_for_fresh_run(self) -> None: except Exception as e: logger.warning("Failed to clear stored data for %s: %s", action_name, e) + # Clear target output files from filesystem (rglob for nested paths, + # but skip batch/ — batch artifacts have their own cleanup below) + action_target_dir = target_dir / action_name + batch_dir = action_target_dir / "batch" + if action_target_dir.is_dir(): + for json_file in action_target_dir.rglob("*.json"): + if batch_dir in json_file.parents or json_file.parent == batch_dir: + continue + try: + json_file.unlink() + logger.debug("Removed target file: %s", json_file) + except OSError as e: + logger.warning("Failed to remove %s: %s", json_file, e) + # Batch artifacts live on disk, not in the DB - batch_dir = target_dir / action_name / "batch" if batch_dir.is_dir(): for pattern in [ ".recovery_state_*.json", diff --git a/tests/integration/test_batch_content_materialization.py b/tests/integration/test_batch_content_materialization.py index 25abd80a..7c31a945 100644 --- a/tests/integration/test_batch_content_materialization.py +++ b/tests/integration/test_batch_content_materialization.py @@ -108,7 +108,12 @@ def context_map(upstream_record) -> dict[str, Any]: @pytest.fixture def sqlite_backend(tmp_path) -> SQLiteBackend: - backend = SQLiteBackend(str(tmp_path / "test.db"), workflow_name="test_workflow") + target_dir = tmp_path / "target" + backend = SQLiteBackend( + str(tmp_path / "test.db"), + workflow_name="test_workflow", + target_dir=str(target_dir), + ) backend.initialize() return backend @@ -187,10 +192,10 @@ def test_content_survives_enrichment_and_db_roundtrip( output = _process_batch([batch_result], context_map, action_config, str(tmp_path)) writer = FileWriter( - str(tmp_path / "out.json"), + str(tmp_path / "target" / "verify_answer" / "out.json"), storage_backend=sqlite_backend, action_name="verify_answer", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer"), ) writer.write_target(output) @@ -220,10 +225,10 @@ def test_multiple_records_all_populated( output = _process_batch(records, context_map, action_config, str(tmp_path)) writer = FileWriter( - str(tmp_path / "batch.json"), + str(tmp_path / "target" / "verify_answer" / "batch.json"), storage_backend=sqlite_backend, action_name="verify_answer", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer"), ) writer.write_target(output) @@ -260,10 +265,10 @@ def test_versioned_action_name_in_content(self, context_map, sqlite_backend, tmp output = _process_batch([result], context_map, config, str(tmp_path), "verify_answer_1") writer = FileWriter( - str(tmp_path / "out.json"), + str(tmp_path / "target" / "verify_answer_1" / "out.json"), storage_backend=sqlite_backend, action_name="verify_answer_1", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer_1"), ) writer.write_target(output) @@ -313,10 +318,10 @@ def test_successful_records_populated_despite_failures( output = _process_batch(batch_results, context_map, action_config, str(tmp_path)) writer = FileWriter( - str(tmp_path / "partial.json"), + str(tmp_path / "target" / "verify_answer" / "partial.json"), storage_backend=sqlite_backend, action_name="verify_answer", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer"), ) writer.write_target(output) @@ -336,12 +341,12 @@ def test_target_json_materialized_on_disk( ): output = _process_batch([batch_result], context_map, action_config, str(tmp_path)) - output_file = tmp_path / "target.json" + output_file = tmp_path / "target" / "verify_answer" / "target.json" writer = FileWriter( str(output_file), storage_backend=sqlite_backend, action_name="verify_answer", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer"), ) writer.write_target(output) @@ -419,10 +424,10 @@ def _fetch_raw_results(self, batch_id): output = _process_batch([batch_result], context_map, action_config, str(tmp_path)) writer = FileWriter( - str(tmp_path / "ollama.json"), + str(tmp_path / "target" / "verify_answer" / "ollama.json"), storage_backend=sqlite_backend, action_name="verify_answer", - output_directory=str(tmp_path), + output_directory=str(tmp_path / "target" / "verify_answer"), ) writer.write_target(output) diff --git a/tests/integration/test_storage_backend_integration.py b/tests/integration/test_storage_backend_integration.py index ee9b4d52..976a20ef 100644 --- a/tests/integration/test_storage_backend_integration.py +++ b/tests/integration/test_storage_backend_integration.py @@ -9,6 +9,7 @@ """ import tempfile +import threading from pathlib import Path import pytest @@ -17,27 +18,30 @@ from agent_actions.storage.backends.sqlite_backend import SQLiteBackend +def _make_backend(tmpdir: str, workflow_name: str = "test_workflow") -> SQLiteBackend: + """Create a backend with target_dir set for filesystem reads.""" + db_path = Path(tmpdir) / "test.db" + target_dir = Path(tmpdir) / "target" + return SQLiteBackend(str(db_path), workflow_name, target_dir=str(target_dir)) + + class TestSQLiteBackendLifecycle: """Test SQLite backend initialization and cleanup.""" def test_creates_database_file_on_initialize(self): """Backend creates database file when initialized.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test_workflow" / "agent_io" / "test.db" - - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() - assert db_path.exists() + assert backend.db_path.exists() assert backend.backend_type == "sqlite" backend.close() def test_creates_tables_on_initialize(self): """Backend creates source_data and target_data tables.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() # Verify tables exist @@ -52,9 +56,7 @@ def test_creates_tables_on_initialize(self): def test_context_manager_cleanup(self): """Context manager properly closes connection.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - - with SQLiteBackend(str(db_path), "test_workflow") as backend: + with _make_backend(tmpdir) as backend: backend.initialize() # Connection is active inside context assert backend._connection is not None @@ -70,8 +72,7 @@ class TestTargetDataOperations: def backend(self): """Create and initialize a test backend.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() yield backend backend.close() @@ -166,8 +167,7 @@ class TestSourceDataOperations: def backend(self): """Create and initialize a test backend.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() yield backend backend.close() @@ -255,8 +255,7 @@ class TestPreviewAndStats: def backend_with_data(self): """Create backend with sample data.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() # Add sample target data @@ -418,9 +417,7 @@ class TestConcurrencyAndResilience: def test_multiple_writes_to_same_node(self): """Multiple sequential writes to same node work correctly.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - - with SQLiteBackend(str(db_path), "test_workflow") as backend: + with _make_backend(tmpdir) as backend: backend.initialize() # Multiple writes to same node, different files @@ -440,9 +437,7 @@ def test_multiple_writes_to_same_node(self): def test_handles_unicode_data(self): """Backend correctly handles unicode data.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - - with SQLiteBackend(str(db_path), "test_workflow") as backend: + with _make_backend(tmpdir) as backend: backend.initialize() test_data = [ @@ -461,9 +456,7 @@ def test_handles_unicode_data(self): def test_handles_large_records(self): """Backend handles large JSON records.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - - with SQLiteBackend(str(db_path), "test_workflow") as backend: + with _make_backend(tmpdir) as backend: backend.initialize() # Create a large record (~1MB of text) @@ -479,11 +472,9 @@ def test_handles_large_records(self): def test_concurrent_writes_from_multiple_threads(self): """Concurrent writes from multiple threads don't cause transaction errors.""" - import threading with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) backend.initialize() errors = [] @@ -543,8 +534,7 @@ class TestBackendTypeProperty: def test_backend_type_returns_sqlite(self): """SQLiteBackend returns 'sqlite' as backend_type.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + backend = _make_backend(tmpdir) assert backend.backend_type == "sqlite" backend.close() @@ -556,9 +546,7 @@ class TestWorkflowIntegration: def test_action_chain_data_flow(self): """Simulates data flowing through action chain.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Action 1: Extract - writes target data @@ -596,9 +584,7 @@ def test_action_chain_data_flow(self): def test_parallel_actions_write_to_different_nodes(self): """Parallel actions can write to different nodes.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Simulate parallel writes from different actions @@ -630,9 +616,7 @@ def test_parallel_actions_write_to_different_nodes(self): def test_merge_pattern_combines_upstream_data(self): """Merge pattern can read from multiple upstream nodes.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Two parallel upstream actions @@ -665,9 +649,7 @@ class TestDispositionLifecycle: def test_passthrough_lifecycle(self): """Passthrough disposition can be set, queried, and cleared.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # 1. Set passthrough disposition @@ -691,9 +673,7 @@ def test_passthrough_lifecycle(self): def test_skip_disposition_lifecycle(self): """Skip disposition works end-to-end.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Set skip disposition @@ -710,9 +690,7 @@ def test_skip_disposition_lifecycle(self): def test_multiple_nodes_independent(self): """Dispositions for different nodes are independent.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() backend.set_disposition("node_a", NODE_LEVEL_RECORD_ID, "passthrough") @@ -730,9 +708,7 @@ def test_multiple_nodes_independent(self): def test_per_record_dispositions(self): """Individual record dispositions work alongside node-level ones.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Node-level @@ -756,9 +732,7 @@ def test_per_record_dispositions(self): def test_disposition_stats_in_storage_stats(self): """Storage stats include disposition count.""" with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "workflow.db" - - with SQLiteBackend(str(db_path), "my_workflow") as backend: + with _make_backend(tmpdir, "my_workflow") as backend: backend.initialize() # Write some target data and dispositions diff --git a/tests/manual/smoke_test/checks/lineage.py b/tests/manual/smoke_test/checks/lineage.py index abfe9317..d9e1fbc9 100644 --- a/tests/manual/smoke_test/checks/lineage.py +++ b/tests/manual/smoke_test/checks/lineage.py @@ -109,39 +109,44 @@ def verify(self, ctx: RunContext) -> list[CheckResult]: with sqlite3.connect(str(db_path)) as conn: conn.row_factory = sqlite3.Row - rows = conn.execute( - "SELECT action_name, data FROM target_data ORDER BY action_name" + # Get action names from DB metadata + action_names_rows = conn.execute( + "SELECT DISTINCT action_name FROM target_data ORDER BY action_name" ).fetchall() - if not rows: + if not action_names_rows: results.append(CheckResult(True, "lineage", "no target data to verify")) return results - # --- Phase 1: Collect all records and build index --- + # --- Phase 1: Collect all records from filesystem --- all_target_ids: set[str] = set() action_records: dict[str, list[tuple[int, dict]]] = {} - for row in rows: - action_name = row["action_name"] - try: - data = json.loads(row["data"]) - except (json.JSONDecodeError, TypeError): - results.append( - CheckResult(False, f"lineage({action_name}): parse", "bad JSON") - ) + for action_row in action_names_rows: + action_name = action_row["action_name"] + action_dir = ctx.target_dir / action_name + if not action_dir.is_dir(): continue - - records = data if isinstance(data, list) else [data] - indexed = [] - for idx, record in enumerate(records): - if not isinstance(record, dict): + for json_file in sorted(action_dir.glob("*.json")): + try: + data = json.loads(json_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, TypeError, OSError): + results.append( + CheckResult(False, f"lineage({action_name}): parse", "bad JSON") + ) continue - indexed.append((idx, record)) - tid = record.get("target_id") - if isinstance(tid, str) and tid: - all_target_ids.add(tid) - action_records.setdefault(action_name, []).extend(indexed) + records = data if isinstance(data, list) else [data] + indexed = [] + for idx, record in enumerate(records): + if not isinstance(record, dict): + continue + indexed.append((idx, record)) + tid = record.get("target_id") + if isinstance(tid, str) and tid: + all_target_ids.add(tid) + + action_records.setdefault(action_name, []).extend(indexed) total_records = sum(len(recs) for recs in action_records.values()) diff --git a/tests/manual/smoke_test/checks/schema_conformance.py b/tests/manual/smoke_test/checks/schema_conformance.py index e98b1414..aaa00872 100644 --- a/tests/manual/smoke_test/checks/schema_conformance.py +++ b/tests/manual/smoke_test/checks/schema_conformance.py @@ -2,6 +2,7 @@ import json import sqlite3 +from pathlib import Path import yaml @@ -102,28 +103,29 @@ def verify(self, ctx: RunContext) -> list[CheckResult]: if not required_fields: continue + # Discover action names (including versioned: _1, _2, _3) + action_names_to_check = [action_name] cursor = conn.execute( - "SELECT data FROM target_data WHERE action_name = ?", + "SELECT DISTINCT action_name FROM target_data WHERE action_name = ?", (action_name,), ) - rows = cursor.fetchall() - - # Check versioned action names (e.g., classify_severity_1, _2, _3) - if not rows: - for v_name in sorted( + if not cursor.fetchall(): + action_names_to_check = sorted( a for a in all_db_actions if a.startswith(f"{action_name}_") - ): - rows.extend( - conn.execute( - "SELECT data FROM target_data WHERE action_name = ?", - (v_name,), - ).fetchall() - ) + ) if action_name in skipped_actions: continue - if not rows: + # Read target data from filesystem + target_files: list[tuple[str, str]] = [] + for a_name in action_names_to_check: + action_dir = ctx.target_dir / a_name + if action_dir.is_dir(): + for f in sorted(action_dir.glob("*.json")): + target_files.append((a_name, str(f))) + + if not target_files: if action_name in excused_actions: continue results.append( @@ -138,9 +140,9 @@ def verify(self, ctx: RunContext) -> list[CheckResult]: schema_actions_checked += 1 - for row in rows: + for _, file_path in target_files: try: - data = json.loads(row[0]) + data = json.loads(Path(file_path).read_text(encoding="utf-8")) records = data if isinstance(data, list) else [data] for record in records: if not isinstance(record, dict): @@ -156,12 +158,12 @@ def verify(self, ctx: RunContext) -> list[CheckResult]: else "all required fields present", ) ) - except (json.JSONDecodeError, UnicodeDecodeError): + except (json.JSONDecodeError, UnicodeDecodeError, OSError): results.append( CheckResult( passed=False, name=f"schema({action_name})", - message="failed to parse target_data JSON", + message="failed to parse target data file", ) ) diff --git a/tests/unit/storage/test_delete_target.py b/tests/unit/storage/test_delete_target.py index bbeafe9e..94ade173 100644 --- a/tests/unit/storage/test_delete_target.py +++ b/tests/unit/storage/test_delete_target.py @@ -12,7 +12,8 @@ class TestDeleteTarget: def backend(self, tmp_path): """Create a fresh SQLite backend for testing.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -38,7 +39,8 @@ def test_returns_zero_when_no_matching_rows(self, backend): def test_works_with_real_sqlite_backend(self, tmp_path): """Full roundtrip: write, verify, delete, verify gone.""" db_path = tmp_path / "test.db" - backend = SQLiteBackend(str(db_path), "wf") + target_dir = tmp_path / "target" + backend = SQLiteBackend(str(db_path), "wf", target_dir=str(target_dir)) backend.initialize() try: diff --git a/tests/unit/storage/test_sqlite_backend.py b/tests/unit/storage/test_sqlite_backend.py index 6051e83e..85b696d3 100644 --- a/tests/unit/storage/test_sqlite_backend.py +++ b/tests/unit/storage/test_sqlite_backend.py @@ -43,7 +43,8 @@ class TestSQLiteBackend: def backend(self, tmp_path): """Create a fresh SQLite backend for testing.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -51,7 +52,8 @@ def backend(self, tmp_path): def test_initialize_creates_tables(self, tmp_path): """Test that initialize creates required tables.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() # Check tables exist @@ -185,7 +187,8 @@ def test_context_manager(self, tmp_path): """Test that backend works as context manager.""" db_path = tmp_path / "agent_io" / "test.db" - with SQLiteBackend(str(db_path), "test_workflow") as backend: + target_dir = tmp_path / "agent_io" / "target" + with SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) as backend: backend.initialize() backend.write_target("node_1", "file.json", [{"id": 1}]) @@ -208,7 +211,8 @@ class TestDispositionMethods: def backend(self, tmp_path): """Create a fresh SQLite backend for testing.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -353,7 +357,8 @@ def test_connection_raises_if_not_initialized(self, tmp_path): def backend(self, tmp_path): """Create a fresh SQLite backend for testing.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -410,7 +415,8 @@ class TestWriteSourceDropGuard: @pytest.fixture def backend(self, tmp_path): db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -450,13 +456,14 @@ class TestPreviewTargetNullRecordCount: @pytest.fixture def backend(self, tmp_path): db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() - def test_null_record_count_uses_json_length(self, backend): - """preview_target computes correct count from JSON when record_count IS NULL.""" + def test_null_record_count_uses_zero(self, backend): + """preview_target uses COALESCE(record_count, 0) when record_count IS NULL.""" import json records = [{"id": 1}, {"id": 2}, {"id": 3}] @@ -469,8 +476,8 @@ def test_null_record_count_uses_json_length(self, backend): backend.connection.commit() result = backend.preview_target("node_1") - assert result["total_count"] == 3 - assert len(result["records"]) == 3 + assert result["total_count"] == 0 # NULL record_count → COALESCE → 0 + assert result["files"] == ["legacy.json"] class TestGetStorageStatsNullRecordCount: @@ -479,7 +486,8 @@ class TestGetStorageStatsNullRecordCount: @pytest.fixture def backend(self, tmp_path): db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -521,7 +529,8 @@ class TestSetDispositionRecordIdValidation: @pytest.fixture def backend(self, tmp_path): db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() yield backend backend.close() @@ -569,7 +578,8 @@ class TestCloseThreadSafety: def test_double_close_is_safe(self, tmp_path): """Calling close() twice does not raise.""" db_path = tmp_path / "agent_io" / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "agent_io" / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() backend.close() backend.close() # Should not raise @@ -658,7 +668,8 @@ def test_old_schema_migrated_with_alter_table(self, tmp_path): def test_correct_schema_not_dropped(self, tmp_path): """Table with all columns is left intact on initialize().""" db_path = tmp_path / "test.db" - backend = SQLiteBackend(str(db_path), "test_workflow") + target_dir = tmp_path / "target" + backend = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend.initialize() # Write some data @@ -666,7 +677,7 @@ def test_correct_schema_not_dropped(self, tmp_path): # Re-initialize — data should survive backend.close() - backend2 = SQLiteBackend(str(db_path), "test_workflow") + backend2 = SQLiteBackend(str(db_path), "test_workflow", target_dir=str(target_dir)) backend2.initialize() data = backend2.read_target("node1", "file.json") diff --git a/tests/unit/tooling/test_data_scanners.py b/tests/unit/tooling/test_data_scanners.py index 81c38c6d..b092e323 100644 --- a/tests/unit/tooling/test_data_scanners.py +++ b/tests/unit/tooling/test_data_scanners.py @@ -310,7 +310,9 @@ def test_no_llm_events_leaves_defaults(self, tmp_path): # --------------------------------------------------------------------------- -def _create_test_db(db_path: Path, *, with_traces: bool = False) -> None: +def _create_test_db( + db_path: Path, *, with_traces: bool = False, target_dir: Path | None = None +) -> None: """Create a minimal SQLite DB with source/target data and optionally prompt_trace.""" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE source_data (source_guid TEXT, relative_path TEXT, data TEXT)") @@ -319,13 +321,21 @@ def _create_test_db(db_path: Path, *, with_traces: bool = False) -> None: "(action_name TEXT, relative_path TEXT, data TEXT, record_count INTEGER)" ) # Insert one target record with known source_guid and target_id - record = json.dumps( - [{"source_guid": "guid-001", "target_id": "tid-001", "issue_type": "bug", "lineage": []}] - ) + records_list = [ + {"source_guid": "guid-001", "target_id": "tid-001", "issue_type": "bug", "lineage": []} + ] + record = json.dumps(records_list) conn.execute( "INSERT INTO target_data VALUES (?, ?, ?, ?)", ("classify", "issues.json", record, 1), ) + + # Write filesystem file for the scanner to read + if target_dir is not None: + fs_path = target_dir / "classify" / "issues.json" + fs_path.parent.mkdir(parents=True, exist_ok=True) + fs_path.write_text(record) + # Insert a source row for count conn.execute( "INSERT INTO source_data VALUES (?, ?, ?)", @@ -379,9 +389,10 @@ class TestScanSqliteReadonlyTraceAttachment: def test_trace_attached_when_table_exists(self, tmp_path): db_path = tmp_path / "test.db" - _create_test_db(db_path, with_traces=True) + target_dir = tmp_path / "target" + _create_test_db(db_path, with_traces=True, target_dir=target_dir) - result = scan_sqlite_readonly(db_path, "test_workflow") + result = scan_sqlite_readonly(db_path, "test_workflow", target_dir=target_dir) assert result is not None records = result["nodes"]["classify"]["preview"] assert len(records) == 1 @@ -397,9 +408,10 @@ def test_trace_attached_when_table_exists(self, tmp_path): def test_no_trace_when_table_missing(self, tmp_path): """Old DBs without prompt_trace table should not crash.""" db_path = tmp_path / "test.db" - _create_test_db(db_path, with_traces=False) + target_dir = tmp_path / "target" + _create_test_db(db_path, with_traces=False, target_dir=target_dir) - result = scan_sqlite_readonly(db_path, "test_workflow") + result = scan_sqlite_readonly(db_path, "test_workflow", target_dir=target_dir) assert result is not None records = result["nodes"]["classify"]["preview"] assert len(records) == 1 @@ -408,6 +420,7 @@ def test_no_trace_when_table_missing(self, tmp_path): def test_no_trace_when_no_matching_target_id(self, tmp_path): """Records without target_id should not get traces.""" db_path = tmp_path / "test.db" + target_dir = tmp_path / "target" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE source_data (source_guid TEXT, relative_path TEXT, data TEXT)") conn.execute( @@ -432,8 +445,12 @@ def test_no_trace_when_no_matching_target_id(self, tmp_path): ) conn.commit() conn.close() + # Write filesystem file + fs_path = target_dir / "act" / "f.json" + fs_path.parent.mkdir(parents=True, exist_ok=True) + fs_path.write_text(record) - result = scan_sqlite_readonly(db_path, "test_wf") + result = scan_sqlite_readonly(db_path, "test_wf", target_dir=target_dir) assert result is not None records = result["nodes"]["act"]["preview"] assert len(records) == 1 @@ -442,7 +459,8 @@ def test_no_trace_when_no_matching_target_id(self, tmp_path): def test_latest_attempt_wins(self, tmp_path): """When multiple attempts exist, only the latest is attached.""" db_path = tmp_path / "test.db" - _create_test_db(db_path, with_traces=True) + target_dir = tmp_path / "target" + _create_test_db(db_path, with_traces=True, target_dir=target_dir) # Add a second attempt with different response conn = sqlite3.connect(str(db_path)) @@ -466,7 +484,7 @@ def test_latest_attempt_wins(self, tmp_path): conn.commit() conn.close() - result = scan_sqlite_readonly(db_path, "test_workflow") + result = scan_sqlite_readonly(db_path, "test_workflow", target_dir=target_dir) records = result["nodes"]["classify"]["preview"] trace = records[0]["_trace"] assert trace["attempt"] == 1 @@ -535,6 +553,7 @@ class TestScanSqliteNamespaceUnwrap: def test_preview_records_show_action_fields(self, tmp_path): """Preview records should have unwrapped content for the action.""" db_path = tmp_path / "test.db" + target_dir = tmp_path / "target" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE source_data (source_guid TEXT, relative_path TEXT, data TEXT)") conn.execute( @@ -548,15 +567,19 @@ def test_preview_records_show_action_fields(self, tmp_path): "classify": {"genre": "fiction"}, }, } + record_json = json.dumps([namespaced_record]) conn.execute( "INSERT INTO target_data VALUES (?, ?, ?, ?)", - ("extract", "data.json", json.dumps([namespaced_record]), 1), + ("extract", "data.json", record_json, 1), ) conn.execute("INSERT INTO source_data VALUES (?, ?, ?)", ("g1", "data.json", "{}")) conn.commit() conn.close() + fs_path = target_dir / "extract" / "data.json" + fs_path.parent.mkdir(parents=True, exist_ok=True) + fs_path.write_text(record_json) - result = scan_sqlite_readonly(db_path, "test_wf") + result = scan_sqlite_readonly(db_path, "test_wf", target_dir=target_dir) records = result["nodes"]["extract"]["preview"] assert len(records) == 1 # content should be unwrapped to show extract's fields @@ -566,6 +589,7 @@ def test_preview_records_show_action_fields(self, tmp_path): def test_flat_content_records_unchanged(self, tmp_path): """Records without namespaced content pass through unchanged.""" db_path = tmp_path / "test.db" + target_dir = tmp_path / "target" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE source_data (source_guid TEXT, relative_path TEXT, data TEXT)") conn.execute( @@ -576,15 +600,19 @@ def test_flat_content_records_unchanged(self, tmp_path): "source_guid": "g1", "content": {"genre": "fiction", "confidence": 0.9}, } + record_json = json.dumps([flat_record]) conn.execute( "INSERT INTO target_data VALUES (?, ?, ?, ?)", - ("classify", "data.json", json.dumps([flat_record]), 1), + ("classify", "data.json", record_json, 1), ) conn.execute("INSERT INTO source_data VALUES (?, ?, ?)", ("g1", "data.json", "{}")) conn.commit() conn.close() + fs_path = target_dir / "classify" / "data.json" + fs_path.parent.mkdir(parents=True, exist_ok=True) + fs_path.write_text(record_json) - result = scan_sqlite_readonly(db_path, "test_wf") + result = scan_sqlite_readonly(db_path, "test_wf", target_dir=target_dir) records = result["nodes"]["classify"]["preview"] assert len(records) == 1 assert records[0]["content"] == {"genre": "fiction", "confidence": 0.9} diff --git a/tests/workflow/test_stale_completion_verification.py b/tests/workflow/test_stale_completion_verification.py index 682528e5..34dc05f8 100644 --- a/tests/workflow/test_stale_completion_verification.py +++ b/tests/workflow/test_stale_completion_verification.py @@ -220,7 +220,9 @@ class TestSQLiteBackendThreadSafeReads: def test_list_target_files_acquires_lock(self, tmp_path): from agent_actions.storage.backends.sqlite_backend import SQLiteBackend - backend = SQLiteBackend(str(tmp_path / "test.db"), "test_workflow") + backend = SQLiteBackend( + str(tmp_path / "test.db"), "test_workflow", target_dir=str(tmp_path / "target") + ) backend.initialize() backend.write_target( "action_a", "out.json", [{"_state": "processed", "_state_schema_version": 1, "id": 1}] @@ -242,30 +244,20 @@ def __exit__(self_inner, *args): assert "locked" in acquired, "list_target_files must acquire _lock" - def test_read_target_acquires_lock(self, tmp_path): + def test_read_target_reads_from_filesystem(self, tmp_path): from agent_actions.storage.backends.sqlite_backend import SQLiteBackend - backend = SQLiteBackend(str(tmp_path / "test.db"), "test_workflow") + backend = SQLiteBackend( + str(tmp_path / "test.db"), "test_workflow", target_dir=str(tmp_path / "target") + ) backend.initialize() backend.write_target( "action_a", "out.json", [{"_state": "processed", "_state_schema_version": 1, "id": 1}] ) - acquired = [] - original_lock = backend._lock - - class TrackingLock: - def __enter__(self_inner): - acquired.append("locked") - return original_lock.__enter__() - - def __exit__(self_inner, *args): - return original_lock.__exit__(*args) - - with patch.object(backend, "_lock", TrackingLock()): - backend.read_target("action_a", "out.json") - - assert "locked" in acquired, "read_target must acquire _lock" + result = backend.read_target("action_a", "out.json") + assert len(result) == 1 + assert result[0]["id"] == 1 def test_concurrent_reads_return_correct_data(self, tmp_path): """Multiple threads reading simultaneously must all return correct data.""" @@ -273,7 +265,9 @@ def test_concurrent_reads_return_correct_data(self, tmp_path): from agent_actions.storage.backends.sqlite_backend import SQLiteBackend - backend = SQLiteBackend(str(tmp_path / "test.db"), "test_workflow") + backend = SQLiteBackend( + str(tmp_path / "test.db"), "test_workflow", target_dir=str(tmp_path / "target") + ) backend.initialize() backend.write_target( "upstream", @@ -326,7 +320,9 @@ class TestSQLiteBackendRemainingReadLocks: def _setup_backend(self, tmp_path): from agent_actions.storage.backends.sqlite_backend import SQLiteBackend - backend = SQLiteBackend(str(tmp_path / "test.db"), "test_workflow") + backend = SQLiteBackend( + str(tmp_path / "test.db"), "test_workflow", target_dir=str(tmp_path / "target") + ) backend.initialize() return backend