Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openarmature-spec
Submodule openarmature-spec updated 23 files
+25 −0 CHANGELOG.md
+2 −7 README.md
+134 −0 docs/open-questions.md
+1 −1 docs/proposals.md
+3 −0 mkdocs.yml
+3 −2 proposals/0009-pipeline-utilities-per-instance-fan-out-resume.md
+0 −44 spec/pipeline-utilities/conformance/028-checkpoint-fan-out-atomic-restart.md
+0 −83 spec/pipeline-utilities/conformance/028-checkpoint-fan-out-atomic-restart.yaml
+46 −0 spec/pipeline-utilities/conformance/048-checkpoint-fan-out-per-instance-resume-skips-completed.md
+96 −0 spec/pipeline-utilities/conformance/048-checkpoint-fan-out-per-instance-resume-skips-completed.yaml
+45 −0 spec/pipeline-utilities/conformance/049-checkpoint-fan-out-per-instance-resume-append-reducer.md
+92 −0 spec/pipeline-utilities/conformance/049-checkpoint-fan-out-per-instance-resume-append-reducer.yaml
+47 −0 spec/pipeline-utilities/conformance/050-checkpoint-fan-out-in-flight-instance-restart.md
+109 −0 spec/pipeline-utilities/conformance/050-checkpoint-fan-out-in-flight-instance-restart.yaml
+48 −0 spec/pipeline-utilities/conformance/051-checkpoint-fan-out-fail-fast-resume.md
+96 −0 spec/pipeline-utilities/conformance/051-checkpoint-fan-out-fail-fast-resume.yaml
+49 −0 spec/pipeline-utilities/conformance/052-checkpoint-fan-out-collect-errors-resume.md
+110 −0 spec/pipeline-utilities/conformance/052-checkpoint-fan-out-collect-errors-resume.yaml
+50 −0 spec/pipeline-utilities/conformance/053-checkpoint-fan-out-instance-middleware-retry-resume.md
+98 −0 spec/pipeline-utilities/conformance/053-checkpoint-fan-out-instance-middleware-retry-resume.yaml
+58 −0 spec/pipeline-utilities/conformance/054-checkpoint-fan-out-batching-buffered-saves-lost-on-crash.md
+139 −0 spec/pipeline-utilities/conformance/054-checkpoint-fan-out-batching-buffered-saves-lost-on-crash.yaml
+245 −71 spec/pipeline-utilities/spec.md
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
Specification = "https://github.com/LunarCommand/openarmature-spec"

[tool.openarmature]
spec_version = "0.17.1"
spec_version = "0.18.0"

[dependency-groups]
dev = [
Expand Down
2 changes: 1 addition & 1 deletion src/openarmature/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""OpenArmature: workflow framework for LLM pipelines and tool-calling agents."""

__version__ = "0.8.0"
__spec_version__ = "0.17.1"
__spec_version__ = "0.18.0"
7 changes: 6 additions & 1 deletion src/openarmature/checkpoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
restores from a prior record.
"""

from .backends import InMemoryCheckpointer, SerializationMode, SQLiteCheckpointer
from .backends import FanOutInternalSaveBatching, InMemoryCheckpointer, SerializationMode, SQLiteCheckpointer
from .errors import (
CheckpointError,
CheckpointNotFound,
Expand All @@ -36,6 +36,8 @@
CheckpointFilter,
CheckpointRecord,
CheckpointSummary,
FanOutInstanceProgress,
FanOutProgress,
NodePosition,
)

Expand All @@ -51,6 +53,9 @@
"CheckpointStateMigrationMissing",
"CheckpointSummary",
"Checkpointer",
"FanOutInstanceProgress",
"FanOutInternalSaveBatching",
"FanOutProgress",
"InMemoryCheckpointer",
"MigrationRegistry",
"NodePosition",
Expand Down
3 changes: 2 additions & 1 deletion src/openarmature/checkpoint/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
from openarmature.checkpoint import InMemoryCheckpointer, SQLiteCheckpointer
"""

from .memory import InMemoryCheckpointer
from .memory import FanOutInternalSaveBatching, InMemoryCheckpointer
from .sqlite import SerializationMode, SQLiteCheckpointer

__all__ = [
"FanOutInternalSaveBatching",
"InMemoryCheckpointer",
"SQLiteCheckpointer",
"SerializationMode",
Expand Down
137 changes: 133 additions & 4 deletions src/openarmature/checkpoint/backends/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,34 @@

import asyncio
from collections.abc import Iterable
from dataclasses import dataclass

from ..protocol import CheckpointFilter, CheckpointRecord, CheckpointSummary


@dataclass(frozen=True)
class FanOutInternalSaveBatching:
"""Per-Checkpointer-instance configuration for §10.11.4 fan-out
internal save batching.

Applies ONLY to fan-out instance internal saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves remain
synchronous per §10.3.

- ``flush_every``: flush the buffer every N buffered saves. ``0``
/ negative means batching is disabled (every save flushes
immediately). The buffered save count resets at each flush.

Buffered-but-unflushed saves are LOST on crash per §10.11.4;
on resume, instances whose completed state was buffered-only
revert to ``in_flight`` / ``not_started`` and re-run. The §10.11.1
reducer correctness holds because their contributions hadn't
durably committed.
"""

flush_every: int = 0


class InMemoryCheckpointer:
"""Dict-backed Checkpointer.

Expand All @@ -36,6 +60,16 @@ class InMemoryCheckpointer:
reference, so a version mismatch on resume raises
``CheckpointRecordInvalid`` rather than consulting the
migration registry.

**Fan-out internal save batching** (per spec §10.11.4): optional
via the ``fan_out_internal_save_batching`` constructor parameter.
Default is no batching (every save flushes immediately). When
enabled, fan-out instance internal saves buffer in memory and
flush every ``flush_every`` saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves bypass the
buffer entirely (they remain synchronous). On crash, buffered
saves are lost — by design, per §10.11.4's documented cost
trade-off.
"""

# Per spec §10.12.1: in-memory storage holds live typed-state
Expand All @@ -47,20 +81,115 @@ class InMemoryCheckpointer:
# so Pyright accepts a class-attribute override here.
supports_state_migration: bool = False

def __init__(self) -> None:
def __init__(
self,
*,
fan_out_internal_save_batching: FanOutInternalSaveBatching | None = None,
) -> None:
self._records: dict[str, CheckpointRecord] = {}
self._lock = asyncio.Lock()
self._fan_out_batching = fan_out_internal_save_batching
# Buffered fan-out internal saves keyed by invocation_id. Each
# entry holds the latest buffered record for that invocation;
# subsequent buffered saves overwrite (the most recent state
# is what would have flushed). Per-invocation counts of
# buffered saves decide when to flush per ``flush_every``;
# keeping counts per-invocation isolates concurrent
# invocations that share the same checkpointer.
self._fan_out_buffer: dict[str, CheckpointRecord] = {}
self._fan_out_buffer_counts: dict[str, int] = {}

async def save(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Store ``record`` under ``invocation_id``, replacing any
previous record for the same id. Not durable across process
restarts."""
restarts.

Per §10.11.4: outermost-graph, subgraph-internal, and
fan-out node completion saves are synchronous regardless of
the batching configuration. The engine routes fan-out
instance internal saves through :meth:`save_fan_out_internal`
instead; this method bypasses the buffer.
"""
async with self._lock:
# Flush any buffered fan-out internal saves for this
# invocation before recording the (synchronous) save —
# otherwise a fan-out node completion save could land in
# the persistent slot while a more-recent buffered
# in-flight save sits in the buffer, inverting the
# save order.
self._flush_invocation_buffer_locked(invocation_id)
self._records[invocation_id] = record

async def save_fan_out_internal(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Buffer a fan-out instance internal save under the §10.11.4
batching policy. When batching is disabled (default), behaves
identically to :meth:`save` — every save is synchronously
durable. When ``flush_every`` is positive, the save is
buffered; the buffer flushes when the count reaches the
configured threshold.
"""
if self._fan_out_batching is None or self._fan_out_batching.flush_every <= 0:
await self.save(invocation_id, record)
return
async with self._lock:
self._fan_out_buffer[invocation_id] = record
self._fan_out_buffer_counts[invocation_id] = self._fan_out_buffer_counts.get(invocation_id, 0) + 1
if self._fan_out_buffer_counts[invocation_id] >= self._fan_out_batching.flush_every:
self._flush_invocation_buffer_locked(invocation_id)

async def save_fan_out_in_flight_failure(
self,
invocation_id: str,
record: CheckpointRecord,
) -> None:
"""Buffer an "instance failed mid-execution" save under §10.11.4
batching. The failure save records the in_flight state of an
instance whose terminal inner node raised; this save closes the
in_flight observability gap (per §10.11) for instances whose
subgraphs have no sibling-completed save to piggyback on.

Under batching, this save buffers BUT does NOT count toward
the flush threshold. The rationale: this save logically
represents "the moment of crash" — a real crash wouldn't
complete an extra save first; the buffered records (and this
one) would simply be lost. The batching count-trigger mechanism
is meant for steady-state save flow, not the abort path.

Backends without batching route this to a synchronous
:meth:`save` — the failure save is durable in the non-batching
case (fixture 048's in_flight observability requirement).
"""
if self._fan_out_batching is None or self._fan_out_batching.flush_every <= 0:
await self.save(invocation_id, record)
return
async with self._lock:
# Overwrite the buffer slot (the most-recent state is
# what the next flush would capture if one fires later)
# but DO NOT increment the count or trigger a flush.
# On crash, this record is lost along with the rest of
# the buffer — by design per §10.11.4.
self._fan_out_buffer[invocation_id] = record

def _flush_invocation_buffer_locked(self, invocation_id: str) -> None:
"""Caller-holds-lock helper: flush this invocation's buffered
fan-out internal save (if any) into the persistent records
dict. Resets only this invocation's buffer count, leaving
other invocations' accounting untouched so concurrent
invocations sharing the checkpointer don't interfere with
each other's flush thresholds."""
buffered = self._fan_out_buffer.pop(invocation_id, None)
if buffered is not None:
self._records[invocation_id] = buffered
self._fan_out_buffer_counts.pop(invocation_id, None)

async def load(self, invocation_id: str) -> CheckpointRecord | None:
"""Return the saved record for ``invocation_id`` or ``None``
if nothing has been saved under that id."""
if nothing has been saved under that id. Per §10.11.4:
buffered-but-unflushed fan-out internal saves are NOT visible
to ``load`` — that's the crash-loses-buffered contract. To
simulate a crash before the buffer flushes, drop the
Checkpointer reference; the buffer is in-memory only.
"""
async with self._lock:
return self._records.get(invocation_id)

Expand Down Expand Up @@ -91,4 +220,4 @@ async def delete(self, invocation_id: str) -> None:
self._records.pop(invocation_id, None)


__all__ = ["InMemoryCheckpointer"]
__all__ = ["FanOutInternalSaveBatching", "InMemoryCheckpointer"]
Loading
Loading