From a3f7fb6224f441db0a14c9741fb11a55ad28ded3 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 18 May 2026 10:54:08 -0700 Subject: [PATCH 1/6] feat(examples): add 05-fan-out-with-retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Realistic fan-out shape: summarize and classify a batch of news headlines in parallel. - ``GraphBuilder.add_fan_out_node`` with ``items_field`` mode and ``extra_outputs`` collecting two parallel per-instance lists (summary + topic). - ``instance_middleware=(RetryMiddleware(3 attempts, deterministic backoff), TimingMiddleware(on_complete=...))`` wraps each instance's whole subgraph invocation — retries are per-instance, timings are captured per-instance. - ``concurrency=3`` caps how many instances run in flight at once. - Final printout shows per-instance durations in completion order alongside a wall-clock total vs sum-of-durations comparison, so the speedup from concurrency is visible. Per-instance subgraph is ``summarize → classify``; both nodes hit the LLM via the shared ``OpenAIProvider`` pattern the rest of the demos use. Smoke test list grows to six demos. --- examples/05-fan-out-with-retry/main.py | 297 +++++++++++++++++++++++++ examples/README.md | 10 + tests/test_examples_smoke.py | 1 + 3 files changed, 308 insertions(+) create mode 100644 examples/05-fan-out-with-retry/main.py diff --git a/examples/05-fan-out-with-retry/main.py b/examples/05-fan-out-with-retry/main.py new file mode 100644 index 0000000..bbf0f94 --- /dev/null +++ b/examples/05-fan-out-with-retry/main.py @@ -0,0 +1,297 @@ +"""openarmature demo: summarize a batch of news headlines in parallel, with +per-headline retries and timing. + +**Use case:** Given a list of news headlines, produce a one-sentence +summary and a topic tag for each one. The headlines are independent, so +fan them out and let them run concurrently. Each per-headline run hits +the LLM, which can transiently fail (rate-limit, timeout, transient 5xx); +wrap each instance in retry middleware so a flaky call doesn't tank the +whole batch. A timing middleware records how long each instance took. + +This is the canonical fan-out shape: N similar tasks, N runtime-determined +from state, the work independent enough to run concurrently. The +per-instance subgraph (summarize → classify) is a complete pipeline in +its own right — it would also work standalone against a single headline. + +**What's interesting in the implementation:** + +- ``GraphBuilder.add_fan_out_node`` with ``items_field`` mode: one + instance per element of ``state.headlines``, ``item_field`` carries the + per-instance input into the subgraph. +- ``extra_outputs`` collects a second per-instance field (``topic``) in + parallel with the primary ``collect_field`` (``summary``). The two + parent lists are index-aligned. +- ``instance_middleware=(RetryMiddleware(...), TimingMiddleware(...))`` + wraps EACH instance's whole subgraph invocation. Retries are + per-instance: a failure on headline 3 doesn't restart headlines 0-2. +- ``concurrency=3`` caps how many instances run in flight at once. Use + this to be polite to the upstream API. +- A ``TimingRecord`` is captured per instance via an ``on_complete`` + callback. ``TimingRecord`` carries the per-call duration but not the + ``fan_out_index`` — that index lives on observer NodeEvents instead. + The demo prints captured durations in completion order plus a + wall-clock vs sum-of-durations comparison that shows concurrency + actually parallelized the work. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/05-fan-out-with-retry + LLM_API_KEY=sk-... uv run python main.py +""" + +from __future__ import annotations + +import asyncio +import os +import time +from collections.abc import Mapping +from typing import Annotated, Any + +from pydantic import Field + +from openarmature.graph import ( + END, + CompiledGraph, + GraphBuilder, + State, + append, +) +from openarmature.graph.middleware import ( + RetryMiddleware, + TimingMiddleware, + TimingRecord, + deterministic_backoff, +) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +async def _chat(system: str, user: str) -> str: + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], + ) + return (response.message.content or "").strip() + + +# --------------------------------------------------------------------------- +# A small batch of headlines. In a real app this would come from an RSS +# feed, a database query, or wherever your batch lives. +# --------------------------------------------------------------------------- + +HEADLINES: list[str] = [ + "City council approves new bike-lane network spanning downtown", + "Researchers report unexpected results from fusion-reactor test run", + "Local bakery wins national award for sourdough loaf", + "Stock market dips after central bank signals slower rate cuts", + "Marathon runner sets new course record under heavy rainfall", +] + + +# --------------------------------------------------------------------------- +# State schemas +# --------------------------------------------------------------------------- + + +class BatchState(State): + """Outer graph: list of headlines goes in, parallel lists of summaries + and topic tags come out.""" + + headlines: list[str] = Field(default_factory=list) + summaries: Annotated[list[str], append] = Field(default_factory=list) + topics: Annotated[list[str], append] = Field(default_factory=list) + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class HeadlineState(State): + """Per-instance subgraph state — one headline, its summary, its topic.""" + + headline: str = "" + summary: str = "" + topic: str = "" + trace: Annotated[list[str], append] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Per-instance subgraph: summarize → classify +# --------------------------------------------------------------------------- + + +async def summarize(s: HeadlineState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Rewrite the headline as one short sentence (~15 words) that would work as a lead. No preamble." + ), + user=s.headline, + ) + return {"summary": content, "trace": ["summarize"]} + + +async def classify(s: HeadlineState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Tag the topic of the headline below with ONE word from this set: " + "politics, science, business, sports, food, technology, other. " + "Reply with just the word, lowercase, no punctuation." + ), + user=s.headline, + ) + tag = content.strip().lower().strip(".") + return {"topic": tag, "trace": ["classify"]} + + +def build_headline_subgraph() -> CompiledGraph[HeadlineState]: + return ( + GraphBuilder(HeadlineState) + .add_node("summarize", summarize) + .add_node("classify", classify) + .add_edge("summarize", "classify") + .add_edge("classify", END) + .set_entry("summarize") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Instance middleware: retry + timing +# --------------------------------------------------------------------------- +# Both middlewares wrap each instance's whole subgraph invocation. Retry's +# loop is per-instance: if headline 3's first attempt raises a transient +# error, the retry middleware re-invokes the subgraph for headline 3 only. +# Headlines 0-2 (already complete) and 4 (still running) are unaffected. +# +# Timing's on_complete callback fires once per successful (or final-failure) +# instance. ``TimingRecord`` carries duration + outcome but not +# ``fan_out_index`` — the index lives on observer NodeEvents, not in the +# middleware's record. The demo prints the captured timings in completion +# order to show "this is what middleware-level timing gives you out of the +# box." For per-instance correlation against the input list, use an +# observer instead (see example 03). + + +# Captured timings, populated by the on_complete callback below. +_timings: list[TimingRecord] = [] + + +async def _record_timing(record: TimingRecord) -> None: + _timings.append(record) + + +# --------------------------------------------------------------------------- +# Outer graph +# --------------------------------------------------------------------------- + + +async def announce(s: BatchState) -> Mapping[str, Any]: + del s + return {"trace": ["announce"]} + + +async def present(s: BatchState) -> Mapping[str, Any]: + """Marker node so the trace shows the outer presented results. + + The summaries and topics are already on parent state from the fan-out's + projection; this node just appends to the trace. + """ + del s + return {"trace": ["present"]} + + +def build_graph() -> CompiledGraph[BatchState]: + headline_subgraph = build_headline_subgraph() + + retry = RetryMiddleware( + max_attempts=3, + # Short fixed delay so the demo isn't slow. A production app would + # use exponential_jitter_backoff (the default). + backoff=deterministic_backoff(0.2), + ) + timing = TimingMiddleware( + node_name="headline_run", + on_complete=_record_timing, + clock=time.monotonic, + ) + + return ( + GraphBuilder(BatchState) + .add_node("announce", announce) + .add_fan_out_node( + "headline_runs", + subgraph=headline_subgraph, + items_field="headlines", + item_field="headline", + collect_field="summary", + target_field="summaries", + extra_outputs={"topics": "topic"}, + concurrency=3, + instance_middleware=(retry, timing), + ) + .add_node("present", present) + .add_edge("announce", "headline_runs") + .add_edge("headline_runs", "present") + .add_edge("present", END) + .set_entry("announce") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + graph = build_graph() + + initial = BatchState(headlines=HEADLINES) + + print("=" * 72) + print(f"Summarizing {len(HEADLINES)} headlines in parallel (concurrency=3)") + print("=" * 72) + print() + + wall_start = time.monotonic() + try: + final = await graph.invoke(initial) + wall_ms = (time.monotonic() - wall_start) * 1000.0 + print("Results (in input order):") + print() + for i, (h, s, t) in enumerate(zip(final.headlines, final.summaries, final.topics, strict=True)): + print(f" [{i}] {h}") + print(f" summary: {s}") + print(f" topic: {t}") + print() + print("Per-instance timings (in completion order):") + for nth, record in enumerate(_timings): + print(f" #{nth} {record.duration_ms:7.1f} ms outcome={record.outcome}") + sum_ms = sum(record.duration_ms for record in _timings) + print() + print(f" wall-clock total: {wall_ms:7.1f} ms") + print(f" sum of per-instance: {sum_ms:7.1f} ms") + print(f" → concurrency speedup: {sum_ms / wall_ms:5.2f}x") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/README.md b/examples/README.md index e2d0db4..8da6474 100644 --- a/examples/README.md +++ b/examples/README.md @@ -42,6 +42,16 @@ levels of subgraph nesting: outer coordinator → doc-QA subgraph → section-extract subgraph. A depth-aware observer prints the descent and return. +### [`05-fan-out-with-retry/`](./05-fan-out-with-retry/main.py) + +Summarize a batch of news headlines in parallel. Each per-headline +run goes through a `summarize → classify` subgraph wrapped in retry +middleware (transient failures don't tank the batch) and timing +middleware (per-instance duration captured alongside the fan-out +index). Demonstrates: `add_fan_out_node` with `items_field` mode, +`extra_outputs` collecting a parallel list, `instance_middleware`, +concurrency cap. + ## Configuration All demos configure their LLM client via env vars; OpenAI public-API diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index fc95d5d..e4bf4d2 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -35,6 +35,7 @@ "02-explicit-subgraph-mapping", "03-observer-hooks", "04-nested-subgraphs", + "05-fan-out-with-retry", ] From f4a8af39b7a883447bbfe5d6b0e322d0bf32f6ad Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 18 May 2026 11:18:51 -0700 Subject: [PATCH 2/6] feat(examples): add 06-parallel-branches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enrich a lunar-mission news article with three independent analyses (summary, sentiment, topic tags) running concurrently. - ``GraphBuilder.add_parallel_branches_node`` registers M ``BranchSpec``s under named keys (``summary`` / ``sentiment`` / ``topics``). Each spec carries its own compiled subgraph, its own input/output projection, and optionally its own middleware. - The three branches have DIFFERENT state schemas — each is scoped to its analysis's inputs and outputs. The projection mapping translates the parent's ``article`` field into each branch's input field name. - The sentiment branch wraps its subgraph in ``RetryMiddleware`` to show per-branch middleware composition. The other two run bare. - Wall-clock total prints alongside the results so the parallelism benefit is visible. Sample article is a narrative of Artemis II's splashdown on April 10, 2026 — the first crewed flight beyond low Earth orbit since Apollo 17. Also tightens the 05 entry in examples/README.md to drop a stale mention of fan-out-index correlation (the demo doesn't claim that anymore; the timing record carries no index field). Smoke test list grows to seven demos. --- examples/06-parallel-branches/main.py | 323 ++++++++++++++++++++++++++ examples/README.md | 17 +- tests/test_examples_smoke.py | 1 + 3 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 examples/06-parallel-branches/main.py diff --git a/examples/06-parallel-branches/main.py b/examples/06-parallel-branches/main.py new file mode 100644 index 0000000..b53c80b --- /dev/null +++ b/examples/06-parallel-branches/main.py @@ -0,0 +1,323 @@ +"""openarmature demo: enrich a lunar-mission news article with three +independent analyses running concurrently. + +**Use case:** Given a news article about a lunar mission, produce three +side-by-side outputs: a one-sentence summary, an overall sentiment label, +and a short list of topic tags. The three analyses don't depend on each +other, so dispatch them in parallel. Each analysis is its own subgraph +with its own state schema (the summary subgraph doesn't care about +sentiment, the topic extractor doesn't care about either) — which is +exactly the shape parallel-branches is for. + +Where fan-out (example 05) runs N copies of ONE subgraph against +different inputs, parallel-branches runs M heterogeneous subgraphs +against the same input. Different schemas, different middleware, +different topologies per branch; one dispatch. + +**What's interesting in the implementation:** + +- ``GraphBuilder.add_parallel_branches_node`` registers M + ``BranchSpec``s under named keys (``summary``, ``sentiment``, + ``topics`` here). Each spec carries its own compiled subgraph, + its own input/output projection, and optionally its own middleware. +- The branches have DIFFERENT state schemas. The summary subgraph's + state has a ``summary`` field; the sentiment subgraph's has a + ``label`` field; the topics subgraph's has a ``tags`` list. Each is + scoped to its job. The projection mapping translates the parent's + ``article`` into each branch's input field name. +- The sentiment branch wraps its subgraph in ``RetryMiddleware`` to + show per-branch middleware composition. The other two branches run + bare. Per-branch middleware is heterogeneous — branch A may have + retry + timing, branch B nothing, branch C something custom. +- Branch insertion order determines fan-in order: when two branches + contribute to the same parent field, the parent's reducer applies + them in the order the branches were declared in the ``branches`` + mapping (not in completion order). The three branches here write + disjoint parent fields, so the order doesn't affect the result — + but the property holds and would matter if they overlapped. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/06-parallel-branches + LLM_API_KEY=sk-... uv run python main.py +""" + +from __future__ import annotations + +import asyncio +import os +import time +from collections.abc import Mapping +from typing import Annotated, Any + +from pydantic import Field + +from openarmature.graph import ( + END, + BranchSpec, + CompiledGraph, + GraphBuilder, + State, + append, +) +from openarmature.graph.middleware import ( + RetryMiddleware, + deterministic_backoff, +) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +async def _chat(system: str, user: str) -> str: + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], + ) + return (response.message.content or "").strip() + + +# --------------------------------------------------------------------------- +# Sample article. A real app would pull this from a feed, a queue, an API. +# --------------------------------------------------------------------------- + +ARTICLE = ( + "NASA's Artemis II crew capsule Integrity splashed down in the Pacific " + "Ocean this evening, ending a ten-day flight that carried four " + "astronauts on a free-return trajectory around the Moon and back. The " + "flight was the first crewed mission beyond low Earth orbit since " + "Apollo 17 in 1972. Agency officials described the result as a " + "successful test of the Orion spacecraft's deep-space systems and " + "cautioned that the Artemis III surface-landing timeline remains " + "dependent on the on-ground refurbishment cadence and lander-system " + "milestones. Even so, the splashdown was greeted with relief by " + "partner space agencies and renewed calls in policy circles for " + "sustained federal funding of the lunar return program." +) + + +# --------------------------------------------------------------------------- +# State schemas +# --------------------------------------------------------------------------- + + +class ArticleState(State): + """Outer: an article goes in, three enrichment fields come out.""" + + article: str = "" + summary: str = "" + sentiment: str = "" + topics: list[str] = Field(default_factory=list) + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class SummaryState(State): + """Summary branch: one-sentence rewrite of the article.""" + + text: str = "" + summary: str = "" + + +class SentimentState(State): + """Sentiment branch: overall tone of the article.""" + + text: str = "" + label: str = "" + + +class TopicsState(State): + """Topics branch: a short list of topic tags.""" + + text: str = "" + tags: list[str] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Branch subgraphs — each is one node, but each has its own scope. +# --------------------------------------------------------------------------- + + +async def write_summary(s: SummaryState) -> Mapping[str, Any]: + content = await _chat( + system=("Summarize the article in one tight sentence (~20 words). No preamble, no quoting."), + user=s.text, + ) + return {"summary": content} + + +async def classify_sentiment(s: SentimentState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Classify the overall sentiment of the article. Reply with ONE " + "word from this set: positive, negative, neutral, mixed. " + "Lowercase, no punctuation." + ), + user=s.text, + ) + label = content.strip().lower().strip(".") + return {"label": label} + + +async def extract_topics(s: TopicsState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Extract three short topic tags for the article. Reply with " + "exactly three lines, one tag per line, no numbering or bullets. " + "Tags should be 1-3 words each." + ), + user=s.text, + ) + tags = [line.strip(" -*•\t") for line in content.splitlines() if line.strip()][:3] + return {"tags": tags} + + +def build_summary_subgraph() -> CompiledGraph[SummaryState]: + return ( + GraphBuilder(SummaryState) + .add_node("write_summary", write_summary) + .add_edge("write_summary", END) + .set_entry("write_summary") + .compile() + ) + + +def build_sentiment_subgraph() -> CompiledGraph[SentimentState]: + return ( + GraphBuilder(SentimentState) + .add_node("classify_sentiment", classify_sentiment) + .add_edge("classify_sentiment", END) + .set_entry("classify_sentiment") + .compile() + ) + + +def build_topics_subgraph() -> CompiledGraph[TopicsState]: + return ( + GraphBuilder(TopicsState) + .add_node("extract_topics", extract_topics) + .add_edge("extract_topics", END) + .set_entry("extract_topics") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Outer graph +# --------------------------------------------------------------------------- + + +async def receive(s: ArticleState) -> Mapping[str, Any]: + del s + return {"trace": ["receive"]} + + +async def present(s: ArticleState) -> Mapping[str, Any]: + del s + return {"trace": ["present"]} + + +def build_graph() -> CompiledGraph[ArticleState]: + summary = build_summary_subgraph() + sentiment = build_sentiment_subgraph() + topics = build_topics_subgraph() + + # Only the sentiment branch retries. Realistic in production: the + # classification call is short and cheap to retry, but you may not want + # the same policy on a longer summarize call (where a retry doubles + # cost) or on a topic-extract that has different transient profile. + sentiment_retry = RetryMiddleware( + max_attempts=3, + backoff=deterministic_backoff(0.2), + ) + + return ( + GraphBuilder(ArticleState) + .add_node("receive", receive) + .add_parallel_branches_node( + "enrich", + branches={ + "summary": BranchSpec( + subgraph=summary, + inputs={"text": "article"}, + outputs={"summary": "summary"}, + ), + "sentiment": BranchSpec( + subgraph=sentiment, + inputs={"text": "article"}, + outputs={"sentiment": "label"}, + middleware=(sentiment_retry,), + ), + "topics": BranchSpec( + subgraph=topics, + inputs={"text": "article"}, + outputs={"topics": "tags"}, + ), + }, + ) + .add_node("present", present) + .add_edge("receive", "enrich") + .add_edge("enrich", "present") + .add_edge("present", END) + .set_entry("receive") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + graph = build_graph() + + print("=" * 72) + print("Lunar-mission article enrichment — three independent analyses in parallel") + print("=" * 72) + print() + print(f"Article ({len(ARTICLE)} chars):") + print() + print(ARTICLE) + print() + + wall_start = time.monotonic() + try: + final = await graph.invoke(ArticleState(article=ARTICLE)) + wall_ms = (time.monotonic() - wall_start) * 1000.0 + print("=" * 72) + print("Enrichment results") + print("=" * 72) + print() + print(f" summary: {final.summary}") + print(f" sentiment: {final.sentiment}") + print(f" topics: {final.topics}") + print() + print(f" wall-clock: {wall_ms:7.1f} ms") + print() + print("The three branches ran in parallel — wall-clock is closer to the") + print("slowest single branch than to the sum of all three.") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/README.md b/examples/README.md index 8da6474..1f38992 100644 --- a/examples/README.md +++ b/examples/README.md @@ -47,10 +47,19 @@ and return. Summarize a batch of news headlines in parallel. Each per-headline run goes through a `summarize → classify` subgraph wrapped in retry middleware (transient failures don't tank the batch) and timing -middleware (per-instance duration captured alongside the fan-out -index). Demonstrates: `add_fan_out_node` with `items_field` mode, -`extra_outputs` collecting a parallel list, `instance_middleware`, -concurrency cap. +middleware (per-instance duration captured). Demonstrates: +`add_fan_out_node` with `items_field` mode, `extra_outputs` +collecting a parallel list, `instance_middleware`, concurrency cap. + +### [`06-parallel-branches/`](./06-parallel-branches/main.py) + +Enrich an article with three independent analyses (summary, +sentiment, topic tags) running concurrently. Each analysis is a +separate subgraph with its own state schema. The sentiment branch +wraps its subgraph in retry middleware; the other two run bare. +Demonstrates: `add_parallel_branches_node`, `BranchSpec` per branch +with input/output projection, heterogeneous branch state schemas, +per-branch middleware. ## Configuration diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index e4bf4d2..428c468 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -36,6 +36,7 @@ "03-observer-hooks", "04-nested-subgraphs", "05-fan-out-with-retry", + "06-parallel-branches", ] From 1109514632f893c71e916041ee770a556266416d Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 18 May 2026 11:19:20 -0700 Subject: [PATCH 3/6] chore(examples): moon-themed subject matter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sweep across 00-05 so every example's queries, articles, baked-in corpora, and headlines are moon-related. The company is Lunar Command; consistent lunar framing makes the demo set feel like a coherent surface rather than a grab bag. - 00 hello-world: query → "why did Apollo 13 abort its lunar landing?" - 01 routing-and-subgraphs: default question → "why is the lunar south pole strategically important?" (the moon-landing-year question stays) - 02 explicit-subgraph-mapping: default topic pair → Apollo 11 vs Apollo 17; docstring and Run-with section updated - 03 observer-hooks: second default question → "explain why NASA is returning to the moon with Artemis" (the moon-landing-year question stays) - 04 nested-subgraphs: corpus → Apollo 11 (kept), Apollo 13 (new), Artemis II (new — narrative of the April 2026 splashdown); default questions updated to match - 05 fan-out-with-retry: five headlines all swapped for accurate lunar-mission news (Artemis II splashdown, Lunar Gateway pause, IM-3 prep, LRO crater find, south-pole water-ice confirmation); classify tag set tightened to lunar-relevant categories (crew / lander / orbiter / science / hardware / policy / other) Factual accuracy checked as of 2026-05-17: Artemis II splashed down 2026-04-10; the Lunar Gateway program was paused 2026-03-24 in favor of a lunar surface base; Intuitive Machines IM-2 ended on its side in March 2025 and IM-3 is scheduled for second half of 2026. --- examples/00-hello-world/main.py | 2 +- examples/01-routing-and-subgraphs/main.py | 4 +-- examples/02-explicit-subgraph-mapping/main.py | 11 ++++--- examples/03-observer-hooks/main.py | 2 +- examples/04-nested-subgraphs/main.py | 30 ++++++++++-------- examples/05-fan-out-with-retry/main.py | 31 ++++++++++--------- 6 files changed, 43 insertions(+), 37 deletions(-) diff --git a/examples/00-hello-world/main.py b/examples/00-hello-world/main.py index 82b0cc6..53eb2de 100644 --- a/examples/00-hello-world/main.py +++ b/examples/00-hello-world/main.py @@ -205,7 +205,7 @@ async def main() -> None: graph = build_graph() graph.attach_observer(trace) try: - final = await graph.invoke(PipelineState(query="what is RAG?")) + final = await graph.invoke(PipelineState(query="why did Apollo 13 abort its lunar landing?")) print(f"\nclassification: {final.classification}") if final.research_plan is not None: print(f"research_plan: {final.research_plan}") diff --git a/examples/01-routing-and-subgraphs/main.py b/examples/01-routing-and-subgraphs/main.py index bfe0821..5afdbae 100644 --- a/examples/01-routing-and-subgraphs/main.py +++ b/examples/01-routing-and-subgraphs/main.py @@ -37,7 +37,7 @@ uv sync --group examples cd examples/01-routing-and-subgraphs LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" - LLM_API_KEY=sk-... uv run python main.py "is espresso actually more caffeinated than drip?" + LLM_API_KEY=sk-... uv run python main.py "why is the lunar south pole strategically important?" """ from __future__ import annotations @@ -454,7 +454,7 @@ def build_graph() -> CompiledGraph[AssistantState]: async def main() -> None: - question = " ".join(sys.argv[1:]) or "is espresso actually more caffeinated than drip coffee?" + question = " ".join(sys.argv[1:]) or "why is the lunar south pole strategically important?" graph = build_graph() try: final = await graph.invoke(AssistantState(question=question)) diff --git a/examples/02-explicit-subgraph-mapping/main.py b/examples/02-explicit-subgraph-mapping/main.py index 717bf06..6e8b3a4 100644 --- a/examples/02-explicit-subgraph-mapping/main.py +++ b/examples/02-explicit-subgraph-mapping/main.py @@ -1,8 +1,9 @@ """openarmature demo: same compiled subgraph reused at two sites in one parent graph, each site with its own ExplicitMapping. -**Use case:** Compare two topics ("rust vs go", "espresso vs drip coffee") -by running the same analysis subgraph on each, then synthesizing a verdict. +**Use case:** Compare two topics ("Apollo program vs Artemis program", +"Apollo 11 vs Apollo 17") by running the same analysis subgraph on each, +then synthesizing a verdict. **Demonstrates:** One compiled subgraph reused at two parent sites with per-site `ExplicitMapping` — the canonical way to express "run the same @@ -27,8 +28,8 @@ uv sync --group examples cd examples/02-explicit-subgraph-mapping - LLM_API_KEY=sk-... uv run python main.py "rust" "go" - LLM_API_KEY=sk-... uv run python main.py "espresso vs drip coffee" + LLM_API_KEY=sk-... uv run python main.py "Apollo 11" "Apollo 17" + LLM_API_KEY=sk-... uv run python main.py "Apollo program vs Artemis program" """ from __future__ import annotations @@ -262,7 +263,7 @@ async def main() -> None: elif len(args) == 1 and " vs " in args[0].lower(): topic_a, topic_b = re.split(r" vs ", args[0], maxsplit=1, flags=re.IGNORECASE) else: - topic_a, topic_b = "rust", "go" + topic_a, topic_b = "Apollo 11", "Apollo 17" graph = build_graph() try: diff --git a/examples/03-observer-hooks/main.py b/examples/03-observer-hooks/main.py index abf0a47..146e387 100644 --- a/examples/03-observer-hooks/main.py +++ b/examples/03-observer-hooks/main.py @@ -32,7 +32,7 @@ uv sync --group examples --all-extras cd examples/03-observer-hooks LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" - LLM_API_KEY=sk-... uv run python main.py "explain the rise of espresso culture" + LLM_API_KEY=sk-... uv run python main.py "explain why NASA is returning to the moon with Artemis" (``--all-extras`` pulls in ``opentelemetry-sdk`` for the OTel observer.) """ diff --git a/examples/04-nested-subgraphs/main.py b/examples/04-nested-subgraphs/main.py index 6441199..f607029 100644 --- a/examples/04-nested-subgraphs/main.py +++ b/examples/04-nested-subgraphs/main.py @@ -30,7 +30,8 @@ uv sync --group examples cd examples/04-nested-subgraphs LLM_API_KEY=sk-... uv run python main.py "what year did humans first land on the moon?" - LLM_API_KEY=sk-... uv run python main.py "how is espresso different from drip coffee?" + LLM_API_KEY=sk-... uv run python main.py "what happened on Apollo 13?" + LLM_API_KEY=sk-... uv run python main.py "who was on the Artemis II crew?" """ from __future__ import annotations @@ -93,23 +94,26 @@ async def _chat(system: str, user: str) -> str: ), }, { - "title": "Espresso", + "title": "Apollo 13", "body": ( - "Espresso is a coffee brewing method of Italian origin. It is made by forcing pressurized " - "hot water through finely ground coffee. The resulting shot is more concentrated than coffee " - "brewed by other methods, with a layer of crema on top. Espresso has more caffeine per " - "unit volume than most coffee beverages but a typical serving is one-tenth the volume of a " - "drip coffee, so a single espresso usually contains less total caffeine than a drip cup." + "Apollo 13 was the seventh crewed mission in the Apollo program and the third intended " + "to land on the Moon. The lunar landing was aborted after an oxygen tank in the service " + "module ruptured two days after launch in April 1970, crippling power and life support. " + "The crew of Jim Lovell, Jack Swigert, and Fred Haise used the lunar module Aquarius as " + "a lifeboat and looped around the Moon on a free-return trajectory before splashing down " + "safely in the Pacific. The mission is remembered as a successful failure." ), }, { - "title": "Walking", + "title": "Artemis II", "body": ( - "Walking is the most common form of human locomotion and is associated with a range of " - "health benefits including reduced risk of cardiovascular disease, improved mood, and " - "lower mortality. A moderate pace of around 100 steps per minute is often cited as a " - "useful threshold. Walking as a deliberate practice has long been associated with " - "thinking and writing — many writers credit long walks as part of their creative process." + "Artemis II was the first crewed mission of NASA's Artemis program, launching from " + "Kennedy Space Center on April 1, 2026 atop the Space Launch System rocket. The " + "ten-day flight carried astronauts Reid Wiseman, Victor Glover, Christina Koch, and " + "Jeremy Hansen aboard the Orion spacecraft Integrity on a free-return trajectory around " + "the Moon and back. It was the first crewed flight beyond low Earth orbit since Apollo " + "17 in 1972. The capsule splashed down in the Pacific Ocean on April 10, 2026, marking " + "a successful test flight ahead of the Artemis III lunar landing mission." ), }, ] diff --git a/examples/05-fan-out-with-retry/main.py b/examples/05-fan-out-with-retry/main.py index bbf0f94..3aa7234 100644 --- a/examples/05-fan-out-with-retry/main.py +++ b/examples/05-fan-out-with-retry/main.py @@ -1,12 +1,13 @@ -"""openarmature demo: summarize a batch of news headlines in parallel, with -per-headline retries and timing. +"""openarmature demo: summarize a batch of lunar-mission headlines in +parallel, with per-headline retries and timing. -**Use case:** Given a list of news headlines, produce a one-sentence -summary and a topic tag for each one. The headlines are independent, so -fan them out and let them run concurrently. Each per-headline run hits -the LLM, which can transiently fail (rate-limit, timeout, transient 5xx); -wrap each instance in retry middleware so a flaky call doesn't tank the -whole batch. A timing middleware records how long each instance took. +**Use case:** Given a list of lunar-mission news headlines, produce a +one-sentence summary and a topic tag for each one. The headlines are +independent, so fan them out and let them run concurrently. Each +per-headline run hits the LLM, which can transiently fail (rate-limit, +timeout, transient 5xx); wrap each instance in retry middleware so a +flaky call doesn't tank the whole batch. A timing middleware records how +long each instance took. This is the canonical fan-out shape: N similar tasks, N runtime-determined from state, the work independent enough to run concurrently. The @@ -98,11 +99,11 @@ async def _chat(system: str, user: str) -> str: # --------------------------------------------------------------------------- HEADLINES: list[str] = [ - "City council approves new bike-lane network spanning downtown", - "Researchers report unexpected results from fusion-reactor test run", - "Local bakery wins national award for sourdough loaf", - "Stock market dips after central bank signals slower rate cuts", - "Marathon runner sets new course record under heavy rainfall", + "Artemis II splashes down in Pacific after ten-day lunar flyby", + "NASA pauses Lunar Gateway program in favor of crewed surface base", + "Intuitive Machines prepares IM-3 lander for Reiner Gamma touchdown", + "Lunar Reconnaissance Orbiter spots fresh impact crater on far side", + "Researchers confirm abundant water ice in permanently shadowed south-pole craters", ] @@ -148,8 +149,8 @@ async def summarize(s: HeadlineState) -> Mapping[str, Any]: async def classify(s: HeadlineState) -> Mapping[str, Any]: content = await _chat( system=( - "Tag the topic of the headline below with ONE word from this set: " - "politics, science, business, sports, food, technology, other. " + "Tag the topic of the lunar-mission headline below with ONE word " + "from this set: crew, lander, orbiter, science, hardware, policy, other. " "Reply with just the word, lowercase, no punctuation." ), user=s.headline, From 8c358de09414792dd978a771936c4fce6ccb36f6 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 18 May 2026 11:32:46 -0700 Subject: [PATCH 4/6] feat(examples): add 07-multimodal-prompt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Caption a historical lunar photograph using a versioned prompt template plus a multimodal user message. - ``FilesystemPromptBackend`` loads ``prompts/production/caption-lunar-image.j2`` from disk. The layout is ``/