-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpydantic_mcp.py
41 lines (32 loc) · 913 Bytes
/
pydantic_mcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import os
import logfire
from dotenv import load_dotenv
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio
load_dotenv()
# Configure logging to logfire if LOGFIRE_TOKEN is set in environment
logfire.configure(send_to_logfire="if-token-present")
logfire.instrument_mcp()
logfire.instrument_pydantic_ai()
server = MCPServerStdio(
command="uv",
args=[
"run",
"run_server.py",
"stdio",
],
)
agent = Agent("gemini-2.5-pro-preview-03-25", mcp_servers=[server])
Agent.instrument_all()
async def main(query: str = "Greet Andrew and give him the current time") -> None:
"""
Main function to run the agent
Args:
query (str): The query to run the agent with
"""
async with agent.run_mcp_servers():
result = await agent.run(query)
print(result.output)
if __name__ == "__main__":
asyncio.run(main())