Skip to content

fragres/toolfan

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

toolfan — async fan-out for agent workflows

Run dozens of tool calls in parallel. Coalesce small LLM calls into batches. Cap concurrency. Survive partial failures.

Why?

Agent frameworks tend to call tools sequentially. That's fine for two tools. For ten tools, it's the difference between 200ms and 2s. The fix is obvious (asyncio.gather) until you remember you also need:

  • Concurrency limits so you don't DoS your own search backend
  • Per-task timeouts so one stuck call doesn't block the rest
  • Retries with backoff for flaky HTTP tools
  • Failure isolation — one tool exception shouldn't kill the others
  • Batching — turning 20 small classification prompts into one LLM call

toolfan is the small library (~600 LoC) that wraps all of that.

Install & Run

pip install toolfan
from toolfan import Scheduler, Task

async def search(q): ...
async def fetch(url): ...

results = await Scheduler(max_concurrency=8).run([
    Task("web",   search,  args=("agent observability",), timeout=5.0, retries=2),
    Task("docs",  fetch,   args=("https://example.com",), timeout=3.0),
    Task("graph", fetch,   args=("https://example.org/g",), timeout=3.0),
])

for r in results:
    if r.ok:
        print(r.name, "→", r.duration_s, "s")
    else:
        print(r.name, "FAILED:", r.error)

Configuration

Field Default What it does
max_concurrency 10 Max simultaneous in-flight tasks across the Scheduler.
Task.timeout None Per-task hard timeout (seconds).
Task.retries 0 Retries on any exception (exponential backoff).

Examples

Coalescing small LLM calls

from toolfan.batch import MicroBatcher

async def classify_batch(texts: list[str]) -> list[str]:
    # one batched call to your LLM
    ...

batcher = MicroBatcher(classify_batch, max_size=16, max_wait=0.020)

# 50 callers each await `batcher.submit(text)`
# toolfan turns that into ~3-4 batched LLM calls
labels = await asyncio.gather(*(batcher.submit(t) for t in texts))

Bounded fan-out from an agent step

async def agent_step(query):
    plan = await llm("plan: " + query)
    results = await Scheduler(max_concurrency=4).run([
        Task(s.tool, TOOL_REGISTRY[s.tool], args=(s.args,), timeout=10, retries=1)
        for s in plan.steps
    ])
    return await llm("synthesize", inputs=[r.value for r in results if r.ok])

License

BSD-3-Clause.

About

Async fan-out scheduler for agent workflows: bounded concurrency, retries, rate limits, micro-batching.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages