Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions agent_actions/output/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import csv
import json
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -16,7 +17,6 @@
from agent_actions.processing.error_handling import ProcessorErrorHandlerMixin
from agent_actions.utils.atomic_write import atomic_json_write
from agent_actions.utils.path_safety import assert_path_contained
from agent_actions.utils.path_utils import ensure_directory_exists

if TYPE_CHECKING:
from agent_actions.storage.backend import StorageBackend
Expand Down Expand Up @@ -132,10 +132,7 @@ def do_write() -> int:

self.storage_backend.write_target(self.action_name, relative_path, data)

ensure_directory_exists(file_path, is_file=True)
atomic_json_write(file_path, data)

return file_path.stat().st_size
return len(json.dumps(data, ensure_ascii=False).encode("utf-8"))

self._execute_write("Write target file", do_write)

Expand Down
5 changes: 4 additions & 1 deletion agent_actions/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def get_storage_backend(

workflow_dir = Path(workflow_path)
db_path = workflow_dir / "agent_io" / "store" / f"{workflow_name}.db"
backend = backend_class.create(db_path=str(db_path), workflow_name=workflow_name)
target_dir = workflow_dir / "agent_io" / "target"
backend = backend_class.create(
db_path=str(db_path), workflow_name=workflow_name, target_dir=str(target_dir)
)

return backend

Expand Down
153 changes: 90 additions & 63 deletions agent_actions/storage/backends/sqlite_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
DispositionRow,
StorageBackend,
)
from agent_actions.utils.atomic_write import atomic_json_write
from agent_actions.utils.path_safety import assert_path_contained

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -168,10 +170,11 @@ class SQLiteBackend(StorageBackend):
# Restrictive as defense-in-depth; all SQL is parameterized.
_VALID_IDENTIFIER_CHARS = set(string.ascii_letters + string.digits + "_-./ ")

def __init__(self, db_path: str, workflow_name: str):
def __init__(self, db_path: str, workflow_name: str, target_dir: str | None = None):
"""Initialize SQLite backend."""
self.db_path = Path(db_path)
self.workflow_name = workflow_name
self.target_dir: Path | None = Path(target_dir) if target_dir else None
self._connection: sqlite3.Connection | None = None
self._lock = (
threading.RLock()
Expand All @@ -184,15 +187,17 @@ def create(cls, **kwargs) -> "SQLiteBackend":
Required kwargs:
db_path: Path to the SQLite database file.
workflow_name: Name of the workflow.
target_dir: Path to agent_io/target directory.
"""
db_path = kwargs.pop("db_path")
workflow_name = kwargs.pop("workflow_name")
target_dir = kwargs.pop("target_dir", None)
if kwargs:
raise ConfigValidationError(
f"Unknown kwargs for SQLiteBackend: {list(kwargs)}",
context={"unknown_kwargs": list(kwargs)},
)
return cls(str(db_path), workflow_name)
return cls(str(db_path), workflow_name, target_dir=str(target_dir) if target_dir else None)

def _validate_identifier(self, name: str, field: str) -> str:
"""Validate and POSIX-normalize an identifier to prevent injection.
Expand Down Expand Up @@ -313,23 +318,33 @@ def _enforce_schema(self, cursor: sqlite3.Cursor) -> None:
)

def write_target(self, action_name: str, relative_path: str, data: list[dict[str, Any]]) -> str:
"""Write target data for a specific node."""
"""Write target data to filesystem and metadata to DB."""
action_name = self._validate_identifier(action_name, "action_name")
relative_path = self._validate_identifier(relative_path, "relative_path")

data_json = json.dumps(data, ensure_ascii=False)
record_count = len(data)

if self.target_dir is None:
raise ValueError(
f"Cannot write target data without target_dir configured "
f"({action_name}/{relative_path})"
)

file_path = self.target_dir / action_name / relative_path
assert_path_contained(file_path, self.target_dir)
file_path.parent.mkdir(parents=True, exist_ok=True)
atomic_json_write(file_path, data)

with self._lock:
cursor = self.connection.cursor()
try:
cursor.execute(
"""
INSERT OR REPLACE INTO target_data
(action_name, relative_path, data, record_count, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
VALUES (?, ?, '[]', ?, CURRENT_TIMESTAMP)
""",
(action_name, relative_path, data_json, record_count),
(action_name, relative_path, record_count),
)
self.connection.commit()
logger.debug(
Expand All @@ -354,25 +369,33 @@ def write_target(self, action_name: str, relative_path: str, data: list[dict[str
raise

def _read_target_raw(self, action_name: str, relative_path: str) -> list[dict[str, Any]]:
"""Read raw target data from SQLite.
"""Read raw target data from the filesystem.

Raises:
FileNotFoundError: If no data exists for the given path.
"""
action_name = self._validate_identifier(action_name, "action_name")
relative_path = self._validate_identifier(relative_path, "relative_path")
with self._lock:
cursor = self.connection.cursor()
cursor.execute(
"SELECT data FROM target_data WHERE action_name = ? AND relative_path = ?",
(action_name, relative_path),
)
row = cursor.fetchone()

if row is None:
raise FileNotFoundError(f"No target data found for {action_name}/{relative_path}")
if self.target_dir is None:
raise FileNotFoundError(
f"No target_dir configured — cannot read {action_name}/{relative_path}"
)

result: list[dict[str, Any]] = json.loads(row["data"])
file_path = self.target_dir / action_name / relative_path
assert_path_contained(file_path, self.target_dir)
try:
result: list[dict[str, Any]] = json.loads(file_path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise FileNotFoundError(
f"No target data found for {action_name}/{relative_path}"
) from None

if not isinstance(result, list):
raise FileNotFoundError(
f"Target data at {action_name}/{relative_path} is not a list "
f"(got {type(result).__name__})"
)
return result

def write_source(
Expand Down Expand Up @@ -485,11 +508,17 @@ def preview_target(
offset: int = 0,
relative_path: str | None = None,
) -> dict[str, Any]:
"""Preview target data for a node with pagination."""
"""Preview target data for a node with pagination.

Reads file metadata from DB, actual data from filesystem.
"""
action_name = self._validate_identifier(action_name, "action_name")
if relative_path is not None:
relative_path = self._validate_identifier(relative_path, "relative_path")

if self.target_dir is None:
raise ValueError("Cannot preview target data without target_dir configured")

limit = min(max(1, limit), 1000)
offset = max(0, offset)

Expand All @@ -499,7 +528,7 @@ def preview_target(
cursor.execute(
"""
SELECT relative_path,
COALESCE(record_count, json_array_length(data)) as record_count
COALESCE(record_count, 0) as record_count
FROM target_data
WHERE action_name = ?
ORDER BY relative_path
Expand All @@ -508,60 +537,58 @@ def preview_target(
)
file_metadata = cursor.fetchall()

files = [row["relative_path"] for row in file_metadata]
files = [row["relative_path"] for row in file_metadata]

if relative_path:
if relative_path not in files:
return {
"records": [],
"total_count": 0,
"action_name": action_name,
"files": files,
"error": f"File '{relative_path}' not found for node '{action_name}'",
}
file_metadata = [
row for row in file_metadata if row["relative_path"] == relative_path
]
if relative_path:
if relative_path not in files:
return {
"records": [],
"total_count": 0,
"action_name": action_name,
"files": files,
"error": f"File '{relative_path}' not found for node '{action_name}'",
}
file_metadata = [row for row in file_metadata if row["relative_path"] == relative_path]

total_count = sum(row["record_count"] for row in file_metadata)
total_count = sum(row["record_count"] for row in file_metadata)

paginated_records: list[dict[str, Any]] = []
skipped = 0
collected = 0
paginated_records: list[dict[str, Any]] = []
skipped = 0
collected = 0

for row in file_metadata:
if collected >= limit:
break
for row in file_metadata:
if collected >= limit:
break

file_path = row["relative_path"]
file_record_count = row["record_count"]
file_path = row["relative_path"]
file_record_count = row["record_count"]

if skipped + file_record_count <= offset:
skipped += file_record_count
continue
if skipped + file_record_count <= offset:
skipped += file_record_count
continue

cursor.execute(
"SELECT data FROM target_data WHERE action_name = ? AND relative_path = ?",
(action_name, file_path),
)
data_row = cursor.fetchone()
if not data_row:
fs_path = self.target_dir / action_name / file_path
try:
records = json.loads(fs_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
continue

if not isinstance(records, list):
continue

for record in records:
if skipped < offset:
skipped += 1
continue

records = json.loads(data_row["data"])
for record in records:
if skipped < offset:
skipped += 1
continue

if collected < limit:
if isinstance(record, dict):
paginated_records.append({**record, "_file": file_path})
else:
paginated_records.append({"_file": file_path, "_value": record})
collected += 1
if collected < limit:
if isinstance(record, dict):
paginated_records.append({**record, "_file": file_path})
else:
break
paginated_records.append({"_file": file_path, "_value": record})
collected += 1
else:
break

return {
"records": paginated_records,
Expand Down
30 changes: 16 additions & 14 deletions agent_actions/tooling/docs/scanner/data_scanners.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def scan_workflow_data(project_root: Path) -> dict[str, Any]:
workflow_name = db_file.stem

try:
data = scan_sqlite_readonly(db_file, workflow_name)
data = scan_sqlite_readonly(
db_file, workflow_name, target_dir=agent_io_dir / "target"
)
if data is not None:
workflow_data[workflow_name] = data
except (OSError, sqlite3.Error) as e:
Expand Down Expand Up @@ -142,7 +144,9 @@ def _unwrap_record_content(record: dict, action_name: str) -> dict:
return record


def scan_sqlite_readonly(db_file: Path, workflow_name: str) -> dict[str, Any] | None:
def scan_sqlite_readonly(
db_file: Path, workflow_name: str, target_dir: Path | None = None
) -> dict[str, Any] | None:
"""Open a workflow SQLite DB read-only and extract stats + preview data.

Uses a direct sqlite3 connection in read-only mode so that scanning
Expand Down Expand Up @@ -213,27 +217,25 @@ def scan_sqlite_readonly(db_file: Path, workflow_name: str) -> dict[str, Any] |
)
files = [row["relative_path"] for row in cursor.fetchall()]

# Preview: iterate the cursor lazily so we never load every
# data blob into memory. Cap at 20 flattened records.
cursor.execute(
"SELECT relative_path, data FROM target_data WHERE action_name = ?",
(action_name,),
)
# Preview: read from filesystem (source of truth).
# Cap at 20 flattened records.
records: list[dict] = []
for target_row in cursor:
for file_path in files:
if len(records) >= 20:
break
if target_dir is None:
break
fs_path = target_dir / action_name / file_path
try:
row_data = _json.loads(target_row["data"])
except (ValueError, _json.JSONDecodeError):
row_data = _json.loads(fs_path.read_text(encoding="utf-8"))
except (ValueError, _json.JSONDecodeError, OSError):
logger.debug(
"Skipping malformed JSON in %s node %s, file %s",
"Skipping unreadable target file %s/%s/%s",
workflow_name,
action_name,
target_row["relative_path"],
file_path,
)
continue
file_path = target_row["relative_path"]
if isinstance(row_data, list):
for item in row_data:
if len(records) >= 20:
Expand Down
15 changes: 14 additions & 1 deletion agent_actions/workflow/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,21 @@ def _clear_for_fresh_run(self) -> None:
except Exception as e:
logger.warning("Failed to clear stored data for %s: %s", action_name, e)

# Clear target output files from filesystem (rglob for nested paths,
# but skip batch/ — batch artifacts have their own cleanup below)
action_target_dir = target_dir / action_name
batch_dir = action_target_dir / "batch"
if action_target_dir.is_dir():
for json_file in action_target_dir.rglob("*.json"):
if batch_dir in json_file.parents or json_file.parent == batch_dir:
continue
try:
json_file.unlink()
logger.debug("Removed target file: %s", json_file)
except OSError as e:
logger.warning("Failed to remove %s: %s", json_file, e)

# Batch artifacts live on disk, not in the DB
batch_dir = target_dir / action_name / "batch"
if batch_dir.is_dir():
for pattern in [
".recovery_state_*.json",
Expand Down
Loading
Loading