Skip to content

Commit f86b702

Browse files
Kludexdmontagu
andauthored
Add A2A server (#1537)
Co-authored-by: David Montague <[email protected]>
1 parent f6b4502 commit f86b702

21 files changed

+2042
-6
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ repos:
6060
rev: v2.3.0
6161
hooks:
6262
- id: codespell
63-
args: ['--skip', 'tests/models/cassettes/*']
63+
args: ['--skip', 'tests/models/cassettes/*,docs/a2a/fasta2a.md']
6464
additional_dependencies:
6565
- tomli

docs/a2a.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Agent2Agent (A2A) Protocol
2+
3+
The [Agent2Agent (A2A) Protocol](https://google.github.io/A2A/) is an open standard introduced by Google that enables
4+
communication and interoperability between AI agents, regardless of the framework or vendor they are built on.
5+
6+
At Pydantic, we built the [FastA2A](#fasta2a) library to make it easier to implement the A2A protocol in Python.
7+
8+
We also built a convenience method that expose PydanticAI agents as A2A servers - let's have a quick look at how to use it:
9+
10+
```py {title="agent_to_a2a.py" hl_lines="4"}
11+
from pydantic_ai import Agent
12+
13+
agent = Agent('openai:gpt-4.1', instructions='Be fun!')
14+
app = agent.to_a2a()
15+
```
16+
17+
_You can run the example with `uvicorn agent_to_a2a:app --host 0.0.0.0 --port 8000`_
18+
19+
This will expose the agent as an A2A server, and you can start sending requests to it.
20+
21+
See more about [exposing PydanticAI agents as A2A servers](#pydanticai-agent-to-a2a-server).
22+
23+
## FastA2A
24+
25+
**FastA2A** is an agentic framework agnostic implementation of the A2A protocol in Python.
26+
The library is designed to be used with any agentic framework, and is **not exclusive to PydanticAI**.
27+
28+
### Design
29+
30+
**FastA2A** is built on top of [Starlette](https://starlette.io), which means it's fully compatible with any ASGI server.
31+
32+
Given the nature of the A2A protocol, it's important to understand the design before using it, as a developer
33+
you'll need to provide some components:
34+
35+
- [`Storage`][fasta2a.Storage]: to save and load tasks
36+
- [`Broker`][fasta2a.Broker]: to schedule tasks
37+
- [`Worker`][fasta2a.Worker]: to execute tasks
38+
39+
Let's have a look at how those components fit together:
40+
41+
```mermaid
42+
flowchart TB
43+
Server["HTTP Server"] <--> |Sends Requests/<br>Receives Results| TM
44+
45+
subgraph CC[Core Components]
46+
direction RL
47+
TM["TaskManager<br>(coordinates)"] --> |Schedules Tasks| Broker
48+
TM <--> Storage
49+
Broker["Broker<br>(queues & schedules)"] <--> Storage["Storage<br>(persistence)"]
50+
Broker --> |Delegates Execution| Worker
51+
end
52+
53+
Worker["Worker<br>(implementation)"]
54+
```
55+
56+
FastA2A allows you to bring your own [`Storage`][fasta2a.Storage], [`Broker`][fasta2a.Broker] and [`Worker`][fasta2a.Worker].
57+
58+
59+
### Installation
60+
61+
FastA2A is available on PyPI as [`fasta2a`](https://pypi.org/project/fasta2a/) so installation is as simple as:
62+
63+
```bash
64+
pip/uv-add fasta2a
65+
```
66+
67+
The only dependencies are:
68+
69+
- [starlette](https://starlette.io): to expose the A2A server as an [ASGI application](https://asgi.readthedocs.io/en/latest/)
70+
- [pydantic](https://pydantic.dev): to validate the request/response messages
71+
- [opentelemetry-api](https://opentelemetry-python.readthedocs.io/en/latest): to provide tracing capabilities
72+
73+
You can install PydanticAI with the `a2a` extra to include **FastA2A**:
74+
75+
```bash
76+
pip/uv-add 'pydantic-ai[a2a]'
77+
```
78+
79+
### PydanticAI Agent to A2A Server
80+
81+
To expose a PydanticAI agent as an A2A server, you can use the `to_a2a` method:
82+
83+
```python {title="agent_to_a2a.py"}
84+
from pydantic_ai import Agent
85+
86+
agent = Agent('openai:gpt-4.1', instructions='Be fun!')
87+
app = agent.to_a2a()
88+
```
89+
90+
Since `app` is an ASGI application, it can be used with any ASGI server.
91+
92+
```bash
93+
uvicorn agent_to_a2a:app --host 0.0.0.0 --port 8000
94+
```
95+
96+
Since the goal of `to_a2a` is to be a convenience method, it accepts the same arguments as the [`FastA2A`][fasta2a.FastA2A] constructor.

docs/api/fasta2a.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# `fasta2a`
2+
3+
::: fasta2a
4+
5+
::: fasta2a.schema
6+
7+
::: fasta2a.client

fasta2a/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# FastA2A
2+
3+
[![CI](https://github.com/pydantic/pydantic-ai/actions/workflows/ci.yml/badge.svg?event=push)](https://github.com/pydantic/pydantic-ai/actions/workflows/ci.yml?query=branch%3Amain)
4+
[![Coverage](https://coverage-badge.samuelcolvin.workers.dev/pydantic/pydantic-ai.svg)](https://coverage-badge.samuelcolvin.workers.dev/redirect/pydantic/pydantic-ai)
5+
[![PyPI](https://img.shields.io/pypi/v/fasta2a.svg)](https://pypi.python.org/pypi/fasta2a)
6+
[![python versions](https://img.shields.io/pypi/pyversions/fasta2a.svg)](https://github.com/pydantic/pydantic-ai)
7+
[![license](https://img.shields.io/github/license/pydantic/pydantic-ai.svg)](https://github.com/pydantic/pydantic-ai/blob/main/LICENSE)
8+
9+
To make it easier to implement A2A servers, we've implemented FastA2A,
10+
a library built on top of Starlette and Pydantic to bring A2A to Python.
11+
12+
See [the docs](https://ai.pydantic.dev/a2a/) for more information.

fasta2a/fasta2a/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .applications import FastA2A
2+
from .broker import Broker
3+
from .schema import Skill
4+
from .storage import Storage
5+
from .worker import Worker
6+
7+
__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']

fasta2a/fasta2a/applications.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from __future__ import annotations as _annotations
2+
3+
from collections.abc import AsyncIterator, Sequence
4+
from contextlib import asynccontextmanager
5+
from typing import Any
6+
7+
from starlette.applications import Starlette
8+
from starlette.middleware import Middleware
9+
from starlette.requests import Request
10+
from starlette.responses import Response
11+
from starlette.routing import Route
12+
from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send
13+
14+
from .broker import Broker
15+
from .schema import AgentCard, Provider, Skill, a2a_request_ta, a2a_response_ta, agent_card_ta
16+
from .storage import Storage
17+
from .task_manager import TaskManager
18+
19+
20+
class FastA2A(Starlette):
21+
"""The main class for the FastA2A library."""
22+
23+
def __init__(
24+
self,
25+
*,
26+
storage: Storage,
27+
broker: Broker,
28+
# Agent card
29+
name: str | None = None,
30+
url: str = 'http://localhost:8000',
31+
version: str = '1.0.0',
32+
description: str | None = None,
33+
provider: Provider | None = None,
34+
skills: list[Skill] | None = None,
35+
# Starlette
36+
debug: bool = False,
37+
routes: Sequence[Route] | None = None,
38+
middleware: Sequence[Middleware] | None = None,
39+
exception_handlers: dict[Any, ExceptionHandler] | None = None,
40+
lifespan: Lifespan[FastA2A] | None = None,
41+
):
42+
if lifespan is None:
43+
lifespan = _default_lifespan
44+
45+
super().__init__(
46+
debug=debug,
47+
routes=routes,
48+
middleware=middleware,
49+
exception_handlers=exception_handlers,
50+
lifespan=lifespan,
51+
)
52+
53+
self.name = name or 'Agent'
54+
self.url = url
55+
self.version = version
56+
self.description = description
57+
self.provider = provider
58+
self.skills = skills or []
59+
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
60+
self.default_input_modes = ['application/json']
61+
self.default_output_modes = ['application/json']
62+
63+
self.task_manager = TaskManager(broker=broker, storage=storage)
64+
65+
# Setup
66+
self._agent_card_json_schema: bytes | None = None
67+
self.router.add_route('/.well-known/agent.json', self._agent_card_endpoint, methods=['HEAD', 'GET', 'OPTIONS'])
68+
self.router.add_route('/', self._agent_run_endpoint, methods=['POST'])
69+
70+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
71+
if scope['type'] == 'http' and not self.task_manager.is_running:
72+
raise RuntimeError('TaskManager was not properly initialized.')
73+
await super().__call__(scope, receive, send)
74+
75+
async def _agent_card_endpoint(self, request: Request) -> Response:
76+
if self._agent_card_json_schema is None:
77+
agent_card = AgentCard(
78+
name=self.name,
79+
url=self.url,
80+
version=self.version,
81+
skills=self.skills,
82+
default_input_modes=self.default_input_modes,
83+
default_output_modes=self.default_output_modes,
84+
)
85+
if self.description is not None:
86+
agent_card['description'] = self.description
87+
if self.provider is not None:
88+
agent_card['provider'] = self.provider
89+
self._agent_card_json_schema = agent_card_ta.dump_json(agent_card)
90+
return Response(content=self._agent_card_json_schema, media_type='application/json')
91+
92+
async def _agent_run_endpoint(self, request: Request) -> Response:
93+
"""This is the main endpoint for the A2A server.
94+
95+
Although the specification allows freedom of choice and implementation, I'm pretty sure about some decisions.
96+
97+
1. The server will always either send a "submitted" or a "failed" on `tasks/send`.
98+
Never a "completed" on the first message.
99+
2. There are three possible ends for the task:
100+
2.1. The task was "completed" successfully.
101+
2.2. The task was "canceled".
102+
2.3. The task "failed".
103+
3. The server will send a "working" on the first chunk on `tasks/pushNotification/get`.
104+
"""
105+
data = await request.body()
106+
a2a_request = a2a_request_ta.validate_json(data)
107+
108+
if a2a_request['method'] == 'tasks/send':
109+
jsonrpc_response = await self.task_manager.send_task(a2a_request)
110+
elif a2a_request['method'] == 'tasks/get':
111+
jsonrpc_response = await self.task_manager.get_task(a2a_request)
112+
elif a2a_request['method'] == 'tasks/cancel':
113+
jsonrpc_response = await self.task_manager.cancel_task(a2a_request)
114+
else:
115+
raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.')
116+
return Response(
117+
content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json'
118+
)
119+
120+
121+
@asynccontextmanager
122+
async def _default_lifespan(app: FastA2A) -> AsyncIterator[None]:
123+
async with app.task_manager:
124+
yield

fasta2a/fasta2a/broker.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
from __future__ import annotations as _annotations
2+
3+
from abc import ABC, abstractmethod
4+
from collections.abc import AsyncIterator
5+
from contextlib import AsyncExitStack
6+
from dataclasses import dataclass
7+
from typing import Annotated, Any, Generic, Literal, TypeVar
8+
9+
import anyio
10+
from opentelemetry.trace import Span, get_current_span, get_tracer
11+
from pydantic import Discriminator
12+
from typing_extensions import Self, TypedDict
13+
14+
from .schema import TaskIdParams, TaskSendParams
15+
16+
tracer = get_tracer(__name__)
17+
18+
19+
@dataclass
20+
class Broker(ABC):
21+
"""The broker class is in charge of scheduling the tasks.
22+
23+
The HTTP server uses the broker to schedule tasks.
24+
25+
The simple implementation is the `InMemoryBroker`, which is the broker that
26+
runs the tasks in the same process as the HTTP server. That said, this class can be
27+
extended to support remote workers.
28+
"""
29+
30+
@abstractmethod
31+
async def run_task(self, params: TaskSendParams) -> None:
32+
"""Send a task to be executed by the worker."""
33+
raise NotImplementedError('send_run_task is not implemented yet.')
34+
35+
@abstractmethod
36+
async def cancel_task(self, params: TaskIdParams) -> None:
37+
"""Cancel a task."""
38+
raise NotImplementedError('send_cancel_task is not implemented yet.')
39+
40+
@abstractmethod
41+
async def __aenter__(self) -> Self: ...
42+
43+
@abstractmethod
44+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): ...
45+
46+
@abstractmethod
47+
def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
48+
"""Receive task operations from the broker.
49+
50+
On a multi-worker setup, the broker will need to round-robin the task operations
51+
between the workers.
52+
"""
53+
54+
55+
OperationT = TypeVar('OperationT')
56+
ParamsT = TypeVar('ParamsT')
57+
58+
59+
class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]):
60+
"""A task operation."""
61+
62+
operation: OperationT
63+
params: ParamsT
64+
_current_span: Span
65+
66+
67+
_RunTask = _TaskOperation[Literal['run'], TaskSendParams]
68+
_CancelTask = _TaskOperation[Literal['cancel'], TaskIdParams]
69+
70+
TaskOperation = Annotated['_RunTask | _CancelTask', Discriminator('operation')]
71+
72+
73+
class InMemoryBroker(Broker):
74+
"""A broker that schedules tasks in memory."""
75+
76+
async def __aenter__(self):
77+
self.aexit_stack = AsyncExitStack()
78+
await self.aexit_stack.__aenter__()
79+
80+
self._write_stream, self._read_stream = anyio.create_memory_object_stream[TaskOperation]()
81+
await self.aexit_stack.enter_async_context(self._read_stream)
82+
await self.aexit_stack.enter_async_context(self._write_stream)
83+
84+
return self
85+
86+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
87+
await self.aexit_stack.__aexit__(exc_type, exc_value, traceback)
88+
89+
async def run_task(self, params: TaskSendParams) -> None:
90+
await self._write_stream.send(_RunTask(operation='run', params=params, _current_span=get_current_span()))
91+
92+
async def cancel_task(self, params: TaskIdParams) -> None:
93+
await self._write_stream.send(_CancelTask(operation='cancel', params=params, _current_span=get_current_span()))
94+
95+
async def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
96+
"""Receive task operations from the broker."""
97+
async for task_operation in self._read_stream:
98+
yield task_operation

0 commit comments

Comments
 (0)