Skip to content

feat(storage): delta storage for target_data — 20x reduction#679

Open
Muizzkolapo wants to merge 12 commits into
mainfrom
feat/543-delta-storage
Open

feat(storage): delta storage for target_data — 20x reduction#679
Muizzkolapo wants to merge 12 commits into
mainfrom
feat/543-delta-storage

Conversation

@Muizzkolapo

Copy link
Copy Markdown
Owner

Summary

  • Each action stores only its own content namespace instead of the full accumulated record (O(N) instead of O(N²))
  • Reconstruction happens transparently inside read_target() — consumers see identical list[dict]
  • Delta logic lives in the abstract StorageBackend base class — future backends (Postgres, Mongo) get it for free
  • 3 special-case write paths tagged _delta_mode: "full": carry-forward, version correlation, FILE expansion
  • Format version check prevents silent corruption on downgrade

Architecture

  • write_target() → concrete in base class, extracts delta, delegates to _write_target_raw()
  • read_target() → concrete in base class, reconstructs from upstream deltas via _read_target_raw_batch()
  • _delta_mode field (delta/first/full) stripped before returning to consumers

Storage reduction

  • qana_quiz (45 actions, 254 records): 440MB → ~22MB (20x)
  • Write latency for final action: ~1s → ~20ms (50x)

Safety

  • FM7: reconstruction uses only abstract methods
  • FM8: missing upstream → _reconstruction_incomplete flag + warning
  • FM9: untagged multi-namespace records → warning
  • FM10: format version check → ConfigurationError on old code reading delta DB
  • Self-reviewed: fixed async_run path, corrupt metadata guard, cross-file early exit

Verification

  • 7455 tests pass, 2 skipped
  • ruff check and ruff format --check clean
  • Manual round-trip verified: write 3-action pipeline as deltas, read back full records with all namespaces

Each action now stores only its own content namespace instead of the
full accumulated record. Reconstruction happens transparently inside
read_target() by joining upstream deltas.

Architecture:
- write_target() concrete in StorageBackend base class — extracts delta
  then delegates to _write_target_raw() (abstract, each backend implements)
- read_target() reconstructs full records from upstream deltas via
  _read_target_raw_batch() — consumers see identical list[dict]
- _delta_mode field (delta/first/full) tracks storage mode per record,
  stripped before return — never leaks to business logic

Safety (11 failure modes from spec review):
- FM7: reconstruction uses only abstract methods, no backend-specific access
- FM8: missing upstream flagged as _reconstruction_incomplete + warning log
- FM9: multi-namespace records without _delta_mode tag produce warning
- FM10: format version check — old code raises ConfigurationError on delta DB
- FM11: raw SQL shows partial records (documented DX tradeoff)

Tagged write paths:
- Carry-forward records: _delta_mode="full" (processing.py)
- Version correlation: _delta_mode="full" (loop.py)
- FILE expansion/synthetic: _delta_mode="full" (file_tool.py)

New infrastructure:
- workflow_metadata table stores execution_order for reconstruction
- _read_target_raw_batch() batched IN query in SQLiteBackend
- Reconstruction cache (instance-level, cleared on write)
@Muizzkolapo

Copy link
Copy Markdown
Owner Author

Code review

Found 3 issues:

  1. No tests for delta extraction, reconstruction, format version check, or preview reconstruction path. ~300 lines of new logic with zero new tests. (CLAUDE.md Gate 1: "New behavior must have new tests." quality-gates.md Gate 6: "For every happy path test, immediately write the corresponding failure/edge case test.")

relative_path: str,
data: list[dict[str, Any]],
*,
is_first_action: bool | None = None,
) -> str:
"""Write target data with delta extraction.
Concrete method — subclasses do NOT override. Subclasses implement
_write_target_raw() for the actual storage.
"""
if is_first_action is None:
execution_order = self._get_execution_order()
is_first_action = bool(execution_order) and execution_order[0] == action_name
delta_records = []
for record in data:
if record.get("_delta_mode") == "full":
delta_records.append(record)
else:
delta_records.append(
self._extract_delta(record, action_name, is_first_action=is_first_action)
)
if not self._format_version_written:
self.save_metadata("storage_format_version", str(self._STORAGE_FORMAT_VERSION))
self._format_version_written = True
self._reconstruction_cache.clear()
return self._write_target_raw(action_name, relative_path, delta_records)
def read_target(self, action_name: str, relative_path: str) -> list[dict[str, Any]]:
"""Read target data, reconstruct from deltas, validate lifecycle, reset for downstream.
Concrete method — subclasses do NOT override. Subclasses implement
_read_target_raw() for the actual storage.
"""
if not self._format_version_checked:
stored_version = self.load_metadata("storage_format_version")
if stored_version is not None:
try:
version_int = int(stored_version)
except ValueError:
from agent_actions.errors.configuration import ConfigValidationError
raise ConfigValidationError(
f"Corrupt storage_format_version in workflow_metadata: {stored_version!r}. "
f"Expected an integer. Re-run with --fresh to reset.",
context={"stored_version": stored_version},
) from None
if version_int > self._STORAGE_FORMAT_VERSION:
from agent_actions.errors.configuration import ConfigValidationError
raise ConfigValidationError(
f"Database uses storage format version {version_int}, "
f"but this code supports up to version {self._STORAGE_FORMAT_VERSION}. "
f"Please upgrade agent-actions.",
context={
"stored_version": version_int,
"supported": self._STORAGE_FORMAT_VERSION,
},
)
self._format_version_checked = True
cache_key = (action_name, relative_path)
if cache_key in self._reconstruction_cache:
return copy.deepcopy(self._reconstruction_cache[cache_key])
result = self._read_target_raw(action_name, relative_path)
result = self._reconstruct_from_deltas(action_name, relative_path, result)
validate_lifecycle_batch(result, action_name=action_name)
reset_for_downstream(result, action_name=action_name)
self._reconstruction_cache[cache_key] = result
return copy.deepcopy(result)
# ------------------------------------------------------------------
# Abstract methods subclasses must implement
# ------------------------------------------------------------------
@abstractmethod
def _write_target_raw(
self, action_name: str, relative_path: str, data: list[dict[str, Any]]
) -> str:
"""Store data to the backend. Called with delta-extracted records."""
...
@abstractmethod
def _read_target_raw(self, action_name: str, relative_path: str) -> list[dict[str, Any]]:
"""Read raw target data from storage. Subclasses implement this."""
...
def _read_target_raw_batch(
self, action_names: list[str], relative_path: str
) -> dict[str, list[dict[str, Any]]]:
"""Fetch raw target data for multiple actions in one call.
Default loops over _read_target_raw. Backends may override with
a batched query (e.g., SQL IN clause) for efficiency.
"""
result: dict[str, list[dict[str, Any]]] = {}
for action in action_names:
try:
result[action] = self._read_target_raw(action, relative_path)
except FileNotFoundError:
logger.debug(
"Upstream action '%s' has no data for file '%s' — "
"will be flagged as incomplete during reconstruction.",
action,
relative_path,
)
return result
@abstractmethod
def save_metadata(self, key: str, value: str) -> None:
"""Store a metadata key-value pair (e.g., execution_order)."""
...
@abstractmethod
def load_metadata(self, key: str) -> str | None:
"""Load a metadata value by key. Returns None if not found."""
...
# ------------------------------------------------------------------
# Delta extraction and reconstruction (concrete, backend-agnostic)
# ------------------------------------------------------------------
def _extract_delta(
self, record: dict[str, Any], action_name: str, *, is_first_action: bool = False
) -> dict[str, Any]:
"""Extract delta: preserve entire envelope, strip content to this action's namespace."""
content = record.get("content")
if not isinstance(content, dict):
# No content dict — store as full (raw records, test data, etc.)
return {**record, "_delta_mode": "full"}
# FM9: Warn on multi-namespace records that weren't pre-tagged
if (
record.get("_delta_mode") != "full"
and action_name in content
and not is_first_action
and len({k for k in content if k != "source"}) > 1
):
logger.warning(
"Record for '%s' has %d content namespaces but was not tagged as full. "
"If this is a carry-forward, correlation, or expansion record, "
"tag it with _delta_mode='full' before calling write_target. "
"Namespaces: %s",
action_name,
len(content),
sorted(content.keys()),
)
if action_name not in content:
return {**record, "_delta_mode": "full"}
if is_first_action:
delta_content: dict[str, Any] = {}
if "source" in content:
delta_content["source"] = content["source"]
delta_content[action_name] = content[action_name]
mode = "first"
else:
delta_content = {action_name: content[action_name]}
mode = "delta"
delta = {k: v for k, v in record.items() if k != "content"}
delta["content"] = delta_content
delta["_delta_mode"] = mode
return delta
def _reconstruct_from_deltas(
self,
action_name: str,
relative_path: str,
delta_records: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Reconstruct full records by joining upstream deltas.
Uses only abstract methods — no backend-specific access.
Strips _delta_mode from ALL returned records.
Flags records with missing upstream data as _reconstruction_incomplete.
"""
if not delta_records or "_delta_mode" not in delta_records[0]:
return delta_records # Legacy DB — no reconstruction needed
execution_order = self._get_execution_order()
try:
idx = execution_order.index(action_name)
except ValueError:
return [{k: v for k, v in r.items() if k != "_delta_mode"} for r in delta_records]
upstream_actions = execution_order[:idx]
if not upstream_actions:
return [{k: v for k, v in r.items() if k != "_delta_mode"} for r in delta_records]
# Batch fetch upstream deltas via abstract method
upstream_data = self._read_target_raw_batch(upstream_actions, relative_path)
# Index by (action_name, source_guid)
upstream: dict[str, dict[str, dict[str, Any]]] = {}
for act, records in upstream_data.items():
guid_map: dict[str, dict[str, Any]] = {}
for rec in records:
guid = rec.get("source_guid")
if guid:
rec_content = rec.get("content")
if rec_content is None:
logger.warning("Upstream record %s in '%s' has no content.", guid, act)
rec_content = {}
guid_map[guid] = rec_content
upstream[act] = guid_map
# Cross-file fallback for missing source_guids
all_guids = {r.get("source_guid") for r in delta_records if r.get("source_guid")}
found_guids: set[str] = set()
for guid_map in upstream.values():
found_guids.update(guid_map.keys())
missing_guids = all_guids - found_guids
if missing_guids:
logger.warning(
"Delta reconstruction for '%s': %d of %d source_guids not found in "
"same-file upstream deltas. Attempting cross-file lookup.",
action_name,
len(missing_guids),
len(all_guids),
)
still_missing = set(missing_guids)
for act in upstream_actions:
if not still_missing:
break # All guids found
try:
all_files = self.list_target_files(act)
except FileNotFoundError:
logger.warning(
"Cross-file lookup: list_target_files('%s') failed.",
act,
)
continue
for file_path in all_files:
if file_path == relative_path:
continue
try:
file_records = self._read_target_raw(act, file_path)
except FileNotFoundError:
continue
for rec in file_records:
guid = rec.get("source_guid")
if guid and guid in still_missing:
rec_content = rec.get("content")
if rec_content is None:
rec_content = {}
upstream.setdefault(act, {})[guid] = rec_content
still_missing.discard(guid)
# Merge and strip _delta_mode
reconstructed: list[dict[str, Any]] = []
for record in delta_records:
mode = record.get("_delta_mode")
if mode in ("first", "full") or mode is None:
clean = {k: v for k, v in record.items() if k != "_delta_mode"}
reconstructed.append(clean)
continue
guid = record.get("source_guid")
full_content: dict[str, Any] = {}
missing_upstream: list[str] = []
for act in upstream_actions:
act_deltas = upstream.get(act, {})
delta_content = act_deltas.get(guid) if guid else None
if delta_content is None:
missing_upstream.append(act)
else:
full_content.update(delta_content)
current_content = record.get("content")
if current_content is not None:
full_content.update(current_content)
else:
logger.error("Delta record %s in '%s' has no content key.", guid, action_name)
full_record = {k: v for k, v in record.items() if k != "content" and k != "_delta_mode"}
full_record["content"] = full_content
if missing_upstream:
logger.warning(
"Record %s in '%s' missing upstream deltas from: %s. "
"Content may be incomplete.",
guid,
action_name,

  1. FM9 multi-namespace warning fires as a false positive on every normal record at every action beyond the first. After read_target reconstructs full records upstream, every record passed to write_target at action N has N namespaces -- this is the normal data bus accumulation, not an untagged carry-forward. The warning will spam logs on every write in every multi-action pipeline, and the suggested fix ("tag as full") defeats delta storage entirely.

def _extract_delta(
self, record: dict[str, Any], action_name: str, *, is_first_action: bool = False
) -> dict[str, Any]:
"""Extract delta: preserve entire envelope, strip content to this action's namespace."""
content = record.get("content")
if not isinstance(content, dict):
# No content dict — store as full (raw records, test data, etc.)
return {**record, "_delta_mode": "full"}
# FM9: Warn on multi-namespace records that weren't pre-tagged
if (
record.get("_delta_mode") != "full"

  1. Guard-skip tombstone records from build_skipped() are not tagged _delta_mode="full". These records have content = {**upstream, action_name: None} (multi-namespace) but flow through write_target without a full tag. _extract_delta strips all upstream namespaces, keeping only {action_name: None}. Carry-forward, correlation, and expansion records are tagged -- but guard-skip tombstones from unified.py / result_collector.py are not. (bug due to record_helpers.py:build_tombstone and envelope.py:build_skipped producing multi-namespace content without tagging)

# Tag expansion records as full — new source_guids can't be
# reconstructed from upstream deltas.
if is_expansion:
for item in structured_data:
item["_delta_mode"] = "full"
has_synthetic = source_mapping and any(v is None for v in source_mapping.values())
if has_synthetic:
for item in structured_data:
item["_delta_mode"] = "full"

Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

…delta storage tests

Code review found 3 issues:

1. FM9 multi-namespace warning fired on every normal record at every
   action beyond the first — false positive. Removed entirely. The data
   bus normally accumulates namespaces; _extract_delta correctly strips
   them to deltas. This is the expected behavior, not an anomaly.

2. Guard-skip tombstones not tagged _delta_mode="full" — NOT a bug.
   Tombstone delta is {action_name: None}. Upstream content is stored
   in upstream action deltas and reconstructed on read. Verified with
   test_tombstone_reconstruction_includes_upstream.

3. No tests — FIXED. Added 20 tests across 7 test classes:
   - TestDeltaExtraction (5 tests): first action, subsequent, full mode,
     no content, auto-detect
   - TestDeltaReconstruction (4 tests): full round-trip, delta_mode
     stripping, cache invalidation, missing upstream flag
   - TestGuardSkipTombstone (2 tests): stored as delta, reconstructed
     with upstream
   - TestBackwardCompatibility (1 test): legacy records returned as-is
   - TestMetadata (4 tests): save/load round-trip, missing key, overwrite,
     execution order
   - TestFormatVersionCheck (3 tests): version stored, future version
     rejected, corrupt version rejected
   - TestPreviewReconstruction (1 test): preview shows full content
_delta_mode tags set on FILE-mode expansion/synthetic records were
dropped during enrichment because RecordEnvelope.build() only carries
_PERSISTENT_FIELDS. Add _delta_mode to RECORD_LIFECYCLE_FIELDS so
the tag survives through the pipeline to write_target.

Without this fix, FILE tools that produce new records (3→13 flatten,
N→M grouping) had their _delta_mode="full" tag dropped, causing
delta extraction to strip upstream content from records whose
source_guids don't exist in upstream actions.
The _delta_mode="full" tag set in file_tool.py was dropped during
enrichment because RecordEnvelope.build() creates new records.
Move tagging to enrichment.py where expansion records get fresh
source_guids — this is AFTER record creation, so the tag survives
to write_target.

Also add force_full parameter to write_target for callers that
know their data should not be delta-extracted.
Root cause: when a FILE tool expands 3→13 records with new GUIDs,
downstream actions tried to find those GUIDs in ALL upstream actions
(including pre-expansion ones where the GUIDs don't exist). This
produced _reconstruction_incomplete warnings for every record at
every action after the expansion.

Fix: track which GUIDs have a _delta_mode="full" upstream record.
Full records are self-contained — they already embed all upstream
content. Reconstruction for a given GUID starts at the full record
and merges forward, skipping actions before the expansion boundary.

This is the architectural fix: the execution order is a flat list
but GUIDs change at expansion boundaries. Reconstruction must
respect those boundaries instead of blindly walking the full chain.
The execution order is a flat list where parallel actions are
serialized in arbitrary order. Using execution_order[:idx] as
"upstream" incorrectly includes parallel peers (e.g., verify_answer_3
treated validate_final_question_1/2/3 as upstream, but they're
siblings at the same level).

Fix: store the actual dependency DAG in workflow_metadata alongside
execution_order. Reconstruction uses _get_upstream_actions() which
reads the DAG — only true transitive dependencies are fetched.
Falls back to flat execution order for legacy DBs.
…_actions

_get_reachable_actions walks action_configs using depends_on names.
For versioned actions (extract_raw_qa_1/2/3), the config declares
dependencies using base names (extract_raw_qa) but action_configs
uses versioned names. The lookup fails silently, producing empty
dependency lists for ALL actions.

Fix: build the graph from execution levels instead. Actions at
level N depend on all actions at levels 0..N-1. The level
orchestrator already handles versioned name resolution correctly.
This produces the right graph for both versioned and non-versioned
actions, and correctly excludes parallel peers.
@Muizzkolapo

Copy link
Copy Markdown
Owner Author

Code review (post-fix pass)

Found 1 issue:

  1. async_run() still builds dependency graph using _get_reachable_actions() which silently produces empty dependency lists for versioned actions. The sync run() path was fixed in commit 86389e0 to use execution levels instead, with an explicit comment explaining why _get_reachable_actions fails for versioned actions (base-name mismatch). The async path was not updated. Any workflow using async_run() with versioned actions gets wrong reconstruction — the same production bug that was fixed for sync. (CLAUDE.md staff-thinking: "After fixing a pattern, grep for the same pattern elsewhere. Fix all instances.")

try:
backend = getattr(self, "storage_backend", None)
if backend is not None:
backend.save_metadata("execution_order", json.dumps(self.execution_order))
dep_graph = {
name: sorted(self._get_reachable_actions(name))
for name in self.execution_order
}
backend.save_metadata("dependency_graph", json.dumps(dep_graph))
levels = self.services.core.action_level_orchestrator.compute_execution_levels()
self.services.core.action_level_orchestrator.log_execution_levels(

Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Three bugs fixed:

1. save_metadata now clears the in-memory dependency_graph_cache and
   execution_order_cache. Without this, the cache loaded stale data
   from a previous run and never refreshed — causing reconstruction
   to use empty dependency lists even though the DB had correct data.

2. async_run now uses levels-based dep graph (same as sync run).
   Previously used _get_reachable_actions which fails for versioned
   actions due to base-name mismatch.

3. save_metadata becomes concrete in base class (cache clearing),
   delegates to _save_metadata_raw (abstract, each backend implements).
- Extract duplicated metadata-saving logic from async_run and
  _run_workflow_with_context into _persist_execution_metadata()
- Remove inconsistent logging (was in sync only)
- Add invariant comment on records[0] check in _reconstruct_from_deltas
… leak, false incomplete

4 fixes from code review:

- #6: _get_upstream_actions logs warning when action not in dep graph
  and falls back to flat order, instead of silently using wrong upstream
- #7: Partitioned pipelines no longer get false _reconstruction_incomplete
  flags — upstream actions with no data for this file are skipped (they
  process different record partitions)
- #8: writer.py strips _delta_mode from records before disk write so
  filesystem files don't leak the internal storage marker
- #9: json.loads on corrupt dependency_graph/execution_order metadata
  now logs warning and degrades gracefully instead of crashing with
  opaque JSONDecodeError
Two remaining review findings:

- #2: Records without source_guid are now stored as _delta_mode="full"
  instead of being delta-extracted. Without a join key, upstream
  content is unrecoverable — store the full record.

- #3: Removed _reconstruction_incomplete flag entirely. Partitioned
  pipelines (where upstream actions process different guid subsets)
  produced false flags on correct records. Missing-guid cases are
  now logged at debug level, not flagged on the record. No consumer
  ever read the flag — removing it is the clean solution.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant