Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openarmature-spec
Submodule openarmature-spec updated 23 files
+25 −0 CHANGELOG.md
+2 −7 README.md
+134 −0 docs/open-questions.md
+1 −1 docs/proposals.md
+3 −0 mkdocs.yml
+3 −2 proposals/0009-pipeline-utilities-per-instance-fan-out-resume.md
+0 −44 spec/pipeline-utilities/conformance/028-checkpoint-fan-out-atomic-restart.md
+0 −83 spec/pipeline-utilities/conformance/028-checkpoint-fan-out-atomic-restart.yaml
+46 −0 spec/pipeline-utilities/conformance/048-checkpoint-fan-out-per-instance-resume-skips-completed.md
+96 −0 spec/pipeline-utilities/conformance/048-checkpoint-fan-out-per-instance-resume-skips-completed.yaml
+45 −0 spec/pipeline-utilities/conformance/049-checkpoint-fan-out-per-instance-resume-append-reducer.md
+92 −0 spec/pipeline-utilities/conformance/049-checkpoint-fan-out-per-instance-resume-append-reducer.yaml
+47 −0 spec/pipeline-utilities/conformance/050-checkpoint-fan-out-in-flight-instance-restart.md
+109 −0 spec/pipeline-utilities/conformance/050-checkpoint-fan-out-in-flight-instance-restart.yaml
+48 −0 spec/pipeline-utilities/conformance/051-checkpoint-fan-out-fail-fast-resume.md
+96 −0 spec/pipeline-utilities/conformance/051-checkpoint-fan-out-fail-fast-resume.yaml
+49 −0 spec/pipeline-utilities/conformance/052-checkpoint-fan-out-collect-errors-resume.md
+110 −0 spec/pipeline-utilities/conformance/052-checkpoint-fan-out-collect-errors-resume.yaml
+50 −0 spec/pipeline-utilities/conformance/053-checkpoint-fan-out-instance-middleware-retry-resume.md
+98 −0 spec/pipeline-utilities/conformance/053-checkpoint-fan-out-instance-middleware-retry-resume.yaml
+58 −0 spec/pipeline-utilities/conformance/054-checkpoint-fan-out-batching-buffered-saves-lost-on-crash.md
+139 −0 spec/pipeline-utilities/conformance/054-checkpoint-fan-out-batching-buffered-saves-lost-on-crash.yaml
+245 −71 spec/pipeline-utilities/spec.md
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
Specification = "https://github.com/LunarCommand/openarmature-spec"

[tool.openarmature]
spec_version = "0.17.1"
spec_version = "0.18.0"

[dependency-groups]
dev = [
Expand Down
2 changes: 1 addition & 1 deletion src/openarmature/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""OpenArmature: workflow framework for LLM pipelines and tool-calling agents."""

__version__ = "0.8.0"
__spec_version__ = "0.17.1"
__spec_version__ = "0.18.0"
7 changes: 6 additions & 1 deletion src/openarmature/checkpoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
restores from a prior record.
"""

from .backends import InMemoryCheckpointer, SerializationMode, SQLiteCheckpointer
from .backends import FanOutInternalSaveBatching, InMemoryCheckpointer, SerializationMode, SQLiteCheckpointer
from .errors import (
CheckpointError,
CheckpointNotFound,
Expand All @@ -36,6 +36,8 @@
CheckpointFilter,
CheckpointRecord,
CheckpointSummary,
FanOutInstanceProgress,
FanOutProgress,
NodePosition,
)

Expand All @@ -51,6 +53,9 @@
"CheckpointStateMigrationMissing",
"CheckpointSummary",
"Checkpointer",
"FanOutInstanceProgress",
"FanOutInternalSaveBatching",
"FanOutProgress",
"InMemoryCheckpointer",
"MigrationRegistry",
"NodePosition",
Expand Down
3 changes: 2 additions & 1 deletion src/openarmature/checkpoint/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
from openarmature.checkpoint import InMemoryCheckpointer, SQLiteCheckpointer
"""

from .memory import InMemoryCheckpointer
from .memory import FanOutInternalSaveBatching, InMemoryCheckpointer
from .sqlite import SerializationMode, SQLiteCheckpointer

__all__ = [
"FanOutInternalSaveBatching",
"InMemoryCheckpointer",
"SQLiteCheckpointer",
"SerializationMode",
Expand Down
136 changes: 132 additions & 4 deletions src/openarmature/checkpoint/backends/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,34 @@

import asyncio
from collections.abc import Iterable
from dataclasses import dataclass

from ..protocol import CheckpointFilter, CheckpointRecord, CheckpointSummary


@dataclass(frozen=True)
class FanOutInternalSaveBatching:
"""Per-Checkpointer-instance configuration for §10.11.4 fan-out
internal save batching.

Applies ONLY to fan-out instance internal saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves remain
synchronous per §10.3.

- ``flush_every``: flush the buffer every N buffered saves. ``0``
/ negative means batching is disabled (every save flushes
immediately). The buffered save count resets at each flush.

Buffered-but-unflushed saves are LOST on crash per §10.11.4;
on resume, instances whose completed state was buffered-only
revert to ``in_flight`` / ``not_started`` and re-run. The §10.11.1
reducer correctness holds because their contributions hadn't
durably committed.
"""

flush_every: int = 0


class InMemoryCheckpointer:
"""Dict-backed Checkpointer.

Expand All @@ -36,6 +60,16 @@ class InMemoryCheckpointer:
reference, so a version mismatch on resume raises
``CheckpointRecordInvalid`` rather than consulting the
migration registry.

**Fan-out internal save batching** (per spec §10.11.4): optional
via the ``fan_out_internal_save_batching`` constructor parameter.
Default is no batching (every save flushes immediately). When
enabled, fan-out instance internal saves buffer in memory and
flush every ``flush_every`` saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves bypass the
buffer entirely (they remain synchronous). On crash, buffered
saves are lost — by design, per §10.11.4's documented cost
trade-off.
"""

# Per spec §10.12.1: in-memory storage holds live typed-state
Expand All @@ -47,20 +81,114 @@ class InMemoryCheckpointer:
# so Pyright accepts a class-attribute override here.
supports_state_migration: bool = False

def __init__(self) -> None:
def __init__(
self,
*,
fan_out_internal_save_batching: FanOutInternalSaveBatching | None = None,
) -> None:
self._records: dict[str, CheckpointRecord] = {}
self._lock = asyncio.Lock()
self._fan_out_batching = fan_out_internal_save_batching
# Buffered fan-out internal saves keyed by invocation_id. Each
# entry holds the latest buffered record for that invocation;
# subsequent buffered saves overwrite (the most recent state
# is what would have flushed). Count of buffered saves
# decides when to flush per ``flush_every``.
self._fan_out_buffer: dict[str, CheckpointRecord] = {}
self._fan_out_buffer_count = 0
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated

async def save(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Store ``record`` under ``invocation_id``, replacing any
previous record for the same id. Not durable across process
restarts."""
restarts.

Per §10.11.4: outermost-graph, subgraph-internal, and
fan-out node completion saves are synchronous regardless of
the batching configuration. The engine routes fan-out
instance internal saves through :meth:`save_fan_out_internal`
instead; this method bypasses the buffer.
"""
async with self._lock:
# Flush any buffered fan-out internal saves for this
# invocation before recording the (synchronous) save —
# otherwise a fan-out node completion save could land in
# the persistent slot while a more-recent buffered
# in-flight save sits in the buffer, inverting the
# save order.
self._flush_invocation_buffer_locked(invocation_id)
self._records[invocation_id] = record

async def save_fan_out_internal(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Buffer a fan-out instance internal save under the §10.11.4
batching policy. When batching is disabled (default), behaves
identically to :meth:`save` — every save is synchronously
durable. When ``flush_every`` is positive, the save is
buffered; the buffer flushes when the count reaches the
configured threshold.
"""
if self._fan_out_batching is None or self._fan_out_batching.flush_every <= 0:
await self.save(invocation_id, record)
return
async with self._lock:
self._fan_out_buffer[invocation_id] = record
self._fan_out_buffer_count += 1
if self._fan_out_buffer_count >= self._fan_out_batching.flush_every:
self._flush_invocation_buffer_locked(invocation_id)

async def save_fan_out_in_flight_failure(
self,
invocation_id: str,
record: CheckpointRecord,
) -> None:
"""Buffer an "instance failed mid-execution" save under §10.11.4
batching. The failure save records the in_flight state of an
instance whose terminal inner node raised; this save closes the
in_flight observability gap (per §10.11) for instances whose
subgraphs have no sibling-completed save to piggyback on.

Under batching, this save buffers BUT does NOT count toward
the flush threshold. The rationale: this save logically
represents "the moment of crash" — a real crash wouldn't
complete an extra save first; the buffered records (and this
one) would simply be lost. The batching count-trigger mechanism
is meant for steady-state save flow, not the abort path.

Backends without batching route this to a synchronous
:meth:`save` — the failure save is durable in the non-batching
case (fixture 048's in_flight observability requirement).
"""
if self._fan_out_batching is None or self._fan_out_batching.flush_every <= 0:
await self.save(invocation_id, record)
return
async with self._lock:
# Overwrite the buffer slot (the most-recent state is
# what the next flush would capture if one fires later)
# but DO NOT increment the count or trigger a flush.
# On crash, this record is lost along with the rest of
# the buffer — by design per §10.11.4.
self._fan_out_buffer[invocation_id] = record

def _flush_invocation_buffer_locked(self, invocation_id: str) -> None:
"""Caller-holds-lock helper: flush this invocation's buffered
fan-out internal save (if any) into the persistent records
dict. Resets the buffer count once flushed."""
buffered = self._fan_out_buffer.pop(invocation_id, None)
if buffered is not None:
self._records[invocation_id] = buffered
# Reset count; the spec doesn't require per-invocation
# counts (most pipelines run one invocation at a time
# against any given checkpointer instance), and resetting on
# any flush keeps the model simple.
self._fan_out_buffer_count = 0

async def load(self, invocation_id: str) -> CheckpointRecord | None:
"""Return the saved record for ``invocation_id`` or ``None``
if nothing has been saved under that id."""
if nothing has been saved under that id. Per §10.11.4:
buffered-but-unflushed fan-out internal saves are NOT visible
to ``load`` — that's the crash-loses-buffered contract. To
simulate a crash before the buffer flushes, drop the
Checkpointer reference; the buffer is in-memory only.
"""
async with self._lock:
return self._records.get(invocation_id)

Expand Down Expand Up @@ -91,4 +219,4 @@ async def delete(self, invocation_id: str) -> None:
self._records.pop(invocation_id, None)


__all__ = ["InMemoryCheckpointer"]
__all__ = ["FanOutInternalSaveBatching", "InMemoryCheckpointer"]
Loading
Loading