diff --git a/examples/00-hello-world/main.py b/examples/00-hello-world/main.py index 53eb2de..0aa0c8a 100644 --- a/examples/00-hello-world/main.py +++ b/examples/00-hello-world/main.py @@ -12,6 +12,11 @@ - Pydantic class (``Classification``, ``Summary``): typed instance on ``Response.parsed``. - JSON Schema dict (``research``): raw dict on ``Response.parsed``. +- ``RuntimeConfig`` for per-call sampling knobs — every ``complete()`` + here passes ``config=RuntimeConfig(temperature=0.0)`` to reduce + sampling variance across runs. Temperature 0 isn't a strict + determinism guarantee (providers vary at the infra level) but it's + the standard tuning knob for "as reproducible as the API allows." - Conditional routing on a parsed field (``route`` reads ``state.classification.intent``). - ``attach_observer`` for boundary visibility. @@ -49,7 +54,7 @@ append, merge, ) -from openarmature.llm import OpenAIProvider, UserMessage +from openarmature.llm import OpenAIProvider, RuntimeConfig, UserMessage # Pydantic schemas the model is constrained to produce. Passing a @@ -84,6 +89,16 @@ class PipelineState(State): # builders, IDE inspection) import this module without running main(). _provider_instance: OpenAIProvider | None = None +# Per-call sampling knobs. The demo sets temperature 0 to reduce +# variance across invocations — the run is "as reproducible as the +# API allows" but not strictly deterministic (providers vary at the +# infra level even at temp 0). Useful for tutorial output; production +# usually wants some sampling variety. +# RuntimeConfig also surfaces max_tokens, top_p, and seed; only +# temperature is set here so the others fall through to provider +# defaults. +_DETERMINISTIC = RuntimeConfig(temperature=0.0) + def _get_provider() -> OpenAIProvider: global _provider_instance @@ -113,6 +128,7 @@ async def classify(state: PipelineState) -> Mapping[str, Any]: ) ], response_schema=Classification, + config=_DETERMINISTIC, ) return {"classification": response.parsed, "metadata": {"classified_by": "llm"}} @@ -140,6 +156,7 @@ async def research(state: PipelineState) -> Mapping[str, Any]: "required": ["topics", "follow_up_questions"], "additionalProperties": False, }, + config=_DETERMINISTIC, ) return { "research_plan": response.parsed, @@ -161,6 +178,7 @@ async def summarize(state: PipelineState) -> Mapping[str, Any]: ) ], response_schema=Summary, + config=_DETERMINISTIC, ) return { "summary": response.parsed, diff --git a/examples/05-fan-out-with-retry/main.py b/examples/05-fan-out-with-retry/main.py index 88cb485..9620c7f 100644 --- a/examples/05-fan-out-with-retry/main.py +++ b/examples/05-fan-out-with-retry/main.py @@ -27,12 +27,29 @@ per-instance: a failure on headline 3 doesn't restart headlines 0-2. - ``concurrency=3`` caps how many instances run in flight at once. Use this to be polite to the upstream API. +- ``error_policy`` defaults to ``"fail_fast"`` — the first instance + failure (after retries exhaust) raises and cancels siblings. Set + the ``COLLECT_MODE`` env var to switch to ``"collect"``: each + instance runs independently and per-instance failures land in + ``state.instance_errors`` instead of aborting the batch. The + ``errors_field="instance_errors"`` knob names where the records go. + Under COLLECT_MODE, the demo prepends a sentinel headline + (``[FORCE_FAIL] ...``) that ``summarize`` raises + ``ProviderUnavailable`` on; retry exhausts, the error lands in + ``instance_errors``, and the rest of the batch completes. Without + the sentinel, ``COLLECT_MODE`` would have nothing to capture. - A ``TimingRecord`` is captured per instance via an ``on_complete`` callback. ``TimingRecord`` carries the per-call duration but not the ``fan_out_index`` — that index lives on observer NodeEvents instead. The demo prints captured durations in completion order plus a wall-clock vs sum-of-durations comparison that shows concurrency actually parallelized the work. +- A ``fan_out_config_observer`` reads ``NodeEvent.fan_out_config`` on + the fan-out node's dispatch event. Inner-instance events carry + ``fan_out_index`` but not ``fan_out_config``; the config lives on + the fan-out node's own started / completed pair and gives observers + a record of the resolved item_count, concurrency, and error_policy + at dispatch time. **Configuration** (env vars; OpenAI defaults shown): @@ -61,6 +78,7 @@ END, CompiledGraph, GraphBuilder, + NodeEvent, State, append, ) @@ -70,7 +88,7 @@ TimingRecord, deterministic_backoff, ) -from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage +from openarmature.llm import OpenAIProvider, ProviderUnavailable, SystemMessage, UserMessage _provider_instance: OpenAIProvider | None = None @@ -114,11 +132,14 @@ async def _chat(system: str, user: str) -> str: class BatchState(State): """Outer graph: list of headlines goes in, parallel lists of summaries - and topic tags come out.""" + and topic tags come out. ``instance_errors`` only populates under + ``error_policy="collect"`` — each failed instance contributes one + record naming its ``fan_out_index`` and the exception category.""" headlines: list[str] = Field(default_factory=list) summaries: Annotated[list[str], append] = Field(default_factory=list) topics: Annotated[list[str], append] = Field(default_factory=list) + instance_errors: Annotated[list[dict[str, Any]], append] = Field(default_factory=list[dict[str, Any]]) trace: Annotated[list[str], append] = Field(default_factory=list) @@ -137,6 +158,16 @@ class HeadlineState(State): async def summarize(s: HeadlineState) -> Mapping[str, Any]: + # Sentinel for the COLLECT_MODE demo. Raising a transient error + # (ProviderUnavailable carries the ``provider_unavailable`` + # category, which retry's default classifier recognizes as + # retryable) lets the retry middleware exhaust its 3 attempts; + # the final failure then surfaces according to the fan-out's + # error_policy. Under fail_fast (default), the batch aborts. + # Under collect, the failure lands in instance_errors and the + # batch produces partial results. + if "[FORCE_FAIL]" in s.headline: + raise ProviderUnavailable("synthetic failure: provider unavailable (COLLECT_MODE demo)") content = await _chat( system=( "Rewrite the headline as one short sentence (~15 words) that would work as a lead. No preamble." @@ -216,7 +247,16 @@ async def present(s: BatchState) -> Mapping[str, Any]: return {"trace": ["present"]} -def build_graph() -> CompiledGraph[BatchState]: +def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]: + """Build the fan-out demo graph. + + ``error_policy`` switches between ``"fail_fast"`` (default; first + exhausted-retry failure raises and cancels the rest) and + ``"collect"`` (each instance runs independently; failures land in + ``state.instance_errors`` and the batch produces partial results). + The smoke test calls this with no argument, exercising the default + path; main() lets the COLLECT_MODE env var flip to collect. + """ headline_subgraph = build_headline_subgraph() retry = RetryMiddleware( @@ -244,6 +284,8 @@ def build_graph() -> CompiledGraph[BatchState]: extra_outputs={"topics": "topic"}, concurrency=3, instance_middleware=(retry, timing), + error_policy=error_policy, + errors_field="instance_errors", ) .add_node("present", present) .add_edge("announce", "headline_runs") @@ -254,6 +296,30 @@ def build_graph() -> CompiledGraph[BatchState]: ) +async def fan_out_config_observer(event: NodeEvent) -> None: + """Print the fan-out node's resolved config when its dispatch event + fires. + + NodeEvent carries ``fan_out_config`` ONLY on the fan-out node's own + started / completed pair (the dispatch wrapper); inner-instance + events carry ``fan_out_index`` but not ``fan_out_config``. Reading + the config gives observability layers a record of how the dispatch + actually resolved at runtime — useful when ``count`` or + ``concurrency`` are callable resolvers whose value isn't visible + in code. + """ + if event.fan_out_config is None: + return + if event.phase != "started": + return + cfg = event.fan_out_config + print( + f" [observer] fan-out node {event.node_name!r} dispatching: " + f"item_count={cfg.item_count} concurrency={cfg.concurrency} " + f"error_policy={cfg.error_policy!r}" + ) + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -264,12 +330,32 @@ async def main() -> None: # doesn't accumulate timings across invocations. _timings.clear() - graph = build_graph() - - initial = BatchState(headlines=HEADLINES) + # Set COLLECT_MODE=1 to switch the fan-out error policy from the + # default fail_fast to collect. Under collect, each instance runs + # independently and per-instance failures (after retries exhaust) + # land in state.instance_errors instead of aborting the batch. + error_policy = "collect" if os.environ.get("COLLECT_MODE") else "fail_fast" + graph = build_graph(error_policy=error_policy) + graph.attach_observer(fan_out_config_observer) + + # Under COLLECT_MODE, prepend a deliberately-failing headline so + # the collect path is exercised end-to-end: retry middleware + # exhausts on the sentinel, the failure lands in + # state.instance_errors, and the rest of the batch completes. + # Default (fail_fast) keeps the headline list clean so the demo's + # happy path runs to completion. + if error_policy == "collect": + headlines = [ + "[FORCE_FAIL] Synthetic failing headline for the COLLECT_MODE demo", + *HEADLINES, + ] + else: + headlines = list(HEADLINES) + initial = BatchState(headlines=headlines) print("=" * 72) - print(f"Summarizing {len(HEADLINES)} headlines in parallel (concurrency=3)") + print(f"Summarizing {len(headlines)} headlines in parallel (concurrency=3)") + print(f"error_policy={error_policy!r}") print("=" * 72) print() @@ -277,12 +363,28 @@ async def main() -> None: try: final = await graph.invoke(initial) wall_ms = (time.monotonic() - wall_start) * 1000.0 + # Under collect, failed instances are absent from summaries / + # topics (their projections don't fire on failure). Pull the + # failed fan_out_indices out of instance_errors so the print + # loop can align successes to original positions and mark the + # gaps for the reader. + failed_indices = {int(e["fan_out_index"]) for e in final.instance_errors} + success_iter = iter(zip(final.summaries, final.topics, strict=True)) print("Results (in input order):") print() - for i, (h, s, t) in enumerate(zip(final.headlines, final.summaries, final.topics, strict=True)): - print(f" [{i}] {h}") - print(f" summary: {s}") - print(f" topic: {t}") + for i, headline in enumerate(final.headlines): + print(f" [{i}] {headline}") + if i in failed_indices: + print(" (failed after retries; see instance_errors below)") + else: + s, t = next(success_iter) + print(f" summary: {s}") + print(f" topic: {t}") + print() + if final.instance_errors: + print(f"Captured {len(final.instance_errors)} per-instance error(s):") + for err in final.instance_errors: + print(f" {err}") print() print("Per-instance timings (in completion order):") for nth, record in enumerate(_timings): diff --git a/examples/06-parallel-branches/main.py b/examples/06-parallel-branches/main.py index b53c80b..78a2ed1 100644 --- a/examples/06-parallel-branches/main.py +++ b/examples/06-parallel-branches/main.py @@ -35,6 +35,12 @@ mapping (not in completion order). The three branches here write disjoint parent fields, so the order doesn't affect the result — but the property holds and would matter if they overlapped. +- A ``branch_attribution_observer`` reads ``NodeEvent.branch_name`` + on inner-node events. ``branch_name`` is populated only for + events INSIDE a branch's subgraph; outermost nodes (receive, + enrich, present) have ``branch_name=None``. This is the + per-event attribution that lets observability backends route + metrics / spans by branch. **Configuration** (env vars; OpenAI defaults shown): @@ -64,6 +70,7 @@ BranchSpec, CompiledGraph, GraphBuilder, + NodeEvent, State, append, ) @@ -233,6 +240,21 @@ async def present(s: ArticleState) -> Mapping[str, Any]: return {"trace": ["present"]} +async def branch_attribution_observer(event: NodeEvent) -> None: + """Print which branch each inner-node event came from. + + NodeEvent carries ``branch_name`` on events from nodes that + execute INSIDE a parallel-branches branch — it's the per-event + attribution that says "this came from branch X." Outermost-graph + nodes (receive, enrich, present) carry no branch_name. The + observer skips events with no branch attribution and prints + ``(branch=…) node_name`` for the rest. + """ + if event.branch_name is None or event.phase != "started": + return + print(f" [observer] (branch={event.branch_name}) inner node {event.node_name!r} started") + + def build_graph() -> CompiledGraph[ArticleState]: summary = build_summary_subgraph() sentiment = build_sentiment_subgraph() @@ -287,6 +309,7 @@ def build_graph() -> CompiledGraph[ArticleState]: async def main() -> None: graph = build_graph() + graph.attach_observer(branch_attribution_observer) print("=" * 72) print("Lunar-mission article enrichment — three independent analyses in parallel") diff --git a/examples/07-multimodal-prompt/main.py b/examples/07-multimodal-prompt/main.py index 48028f9..697e559 100644 --- a/examples/07-multimodal-prompt/main.py +++ b/examples/07-multimodal-prompt/main.py @@ -1,55 +1,77 @@ -"""openarmature demo: caption a historical lunar photograph using a -versioned prompt template plus a multimodal user message. - -**Use case:** Given a photograph from a lunar mission and the mission's -name, describe what's visible in the image. The text instructions are -loaded from a versioned prompt template on disk so they can be edited, -diffed, and rolled out independently of the code. The image is passed -to the model alongside the rendered text as a multimodal user message. - -This is the "prompt management + image input" shape — two openarmature -surfaces that compose cleanly. The prompt manager gives you traceable, -hashable, version-tagged instruction text; content blocks give you the -multimodal payload alongside it. +"""openarmature demo: two independent analyses of a lunar-mission +photograph using versioned prompt templates, a fallback prompt +backend, and a multimodal user message. + +**Use case:** Given a photograph from a lunar mission, run two +independent analyses: describe the lunar surface visible +(``describe-surface``) and identify the equipment (``describe-equipment``). +Both prompts take the mission name as their only variable; neither +depends on the other's output. Both renders are grouped under one +observability ``PromptGroup`` so a trace UI can render the analyses +as one logical unit. + +The image can come from a public URL (default) or a local file (set +``IMAGE_PATH`` to use the inline base64 source instead). The +``PromptManager`` is wired with a primary + fallback +``FilesystemPromptBackend`` to demonstrate composite-backend +configuration; the fallback path fires only when the primary raises +``PromptStoreUnavailable`` (e.g., a remote Langfuse backend off-line). **What's interesting in the implementation:** -- ``FilesystemPromptBackend`` loads ``caption-lunar-image.j2`` from - ``prompts/production/``. The layout is ``/