Skip to content

Commit 1fe64da

Browse files
feat: per-request toolkit isolation for concurrent multi-user requests
Add clone-based isolation so each Slack/Telegram/WhatsApp request gets fresh toolkit state (creds, service, caches) while sharing heavy resources (DB, GoogleAuth, MCP connections) by reference. Changes: - Toolkit: add clone() with _clone_reset_attrs for per-user state reset - Function: add clone_for() to rebind entrypoints to cloned toolkit - deep_copy_field: share GoogleAuth by reference (like MCP tools), fall back to clone() when deepcopy fails (Google httplib2 clients) - Interface routers: call entity.deep_copy() before arun() so each request runs on an isolated agent copy - AgentOS: wire toolkit._db at init time (_initialize_agents) so the OAuth callback closure has DB access before any request arrives - Cookbook: Slack Gmail agent with DB-backed OAuth flow Depends on: feat/google-auth-db-storage
1 parent 2e9bcb6 commit 1fe64da

10 files changed

Lines changed: 392 additions & 21 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""
2+
Slack Gmail Agent (DB-backed OAuth)
3+
====================================
4+
5+
Slack bot that can read/send Gmail using DB-backed OAuth tokens.
6+
When a user asks about email, the agent checks the DB for stored tokens.
7+
If none exist, it returns an OAuth URL the user clicks to authenticate.
8+
9+
Setup:
10+
1. Google Cloud: Create OAuth 2.0 Web Application credentials
11+
- Authorized redirect URI: https://<your-ngrok>/google/oauth/callback
12+
2. Set env vars:
13+
- GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET
14+
- GOOGLE_REDIRECT_URI=https://<your-ngrok>/google/oauth/callback
15+
- SLACK_BOT_TOKEN, SLACK_SIGNING_SECRET
16+
- OPENAI_API_KEY
17+
3. Start pgvector: ./cookbook/scripts/run_pgvector.sh
18+
4. Start ngrok: ngrok http 7777
19+
5. Run: .venvs/demo/bin/python cookbook/05_agent_os/interfaces/slack/gmail_agent.py
20+
21+
Flow:
22+
User: "What are my latest emails?"
23+
Bot: Returns OAuth URL (first time) -> user clicks -> Google consent -> callback saves token
24+
User: Retries -> Bot reads Gmail via stored token
25+
"""
26+
27+
from agno.agent import Agent
28+
from agno.db.postgres.postgres import PostgresDb
29+
from agno.models.openai import OpenAIChat
30+
from agno.os.app import AgentOS
31+
from agno.os.interfaces.slack import Slack
32+
from agno.tools.google.auth import GoogleAuth
33+
from agno.tools.google.gmail import GmailTools
34+
35+
# ---------------------------------------------------------------------------
36+
# Database (shared between agent session storage and OAuth token storage)
37+
# ---------------------------------------------------------------------------
38+
39+
db = PostgresDb(
40+
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
41+
)
42+
43+
# ---------------------------------------------------------------------------
44+
# Google Auth + Gmail
45+
# ---------------------------------------------------------------------------
46+
47+
google_auth = GoogleAuth()
48+
49+
gmail = GmailTools(
50+
google_auth=google_auth,
51+
get_latest_emails=True,
52+
get_emails_from_user=True,
53+
get_unread_emails=True,
54+
search_emails=True,
55+
send_email=True,
56+
create_draft_email=True,
57+
)
58+
59+
# ---------------------------------------------------------------------------
60+
# Agent
61+
# ---------------------------------------------------------------------------
62+
63+
gmail_agent = Agent(
64+
name="Gmail Agent",
65+
model=OpenAIChat(id="gpt-4o"),
66+
tools=[google_auth, gmail],
67+
db=db,
68+
add_history_to_context=True,
69+
num_history_runs=3,
70+
add_datetime_to_context=True,
71+
instructions=[
72+
"You are a helpful email assistant connected to Gmail.",
73+
"When authentication is needed, share the OAuth URL and ask the user to click it.",
74+
"After they authenticate, retry the original request.",
75+
],
76+
)
77+
78+
# ---------------------------------------------------------------------------
79+
# AgentOS + Slack + OAuth callback
80+
# ---------------------------------------------------------------------------
81+
82+
agent_os = AgentOS(
83+
agents=[gmail_agent],
84+
interfaces=[
85+
Slack(
86+
agent=gmail_agent,
87+
reply_to_mentions_only=True,
88+
resolve_user_identity=True,
89+
)
90+
],
91+
)
92+
93+
app = agent_os.get_app()
94+
95+
# Attach the OAuth callback endpoint to the same FastAPI app
96+
# Google redirects here after user consents: /google/oauth/callback?code=...&state=...
97+
app.include_router(google_auth.get_oauth_router())
98+
99+
100+
if __name__ == "__main__":
101+
agent_os.serve(app="gmail_agent:app", reload=True)

libs/agno/agno/agent/_utils.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,18 +171,20 @@ def deep_copy_field(agent: Agent, field_name: str, field_value: Any) -> Any:
171171
copied_tools = []
172172
for tool in field_value: # type: ignore
173173
try:
174-
# Share MCP tools (they maintain server connections)
175-
is_mcp_tool = hasattr(type(tool), "__mro__") and any(
176-
c.__name__ in ["MCPTools", "MultiMCPTools"] for c in type(tool).__mro__
177-
)
178-
if is_mcp_tool:
174+
# Share tools that maintain connections or act as coordinators
175+
mro_names = {c.__name__ for c in type(tool).__mro__} if hasattr(type(tool), "__mro__") else set()
176+
is_shared_tool = bool(mro_names & {"MCPTools", "MultiMCPTools", "GoogleAuth"})
177+
if is_shared_tool:
179178
copied_tools.append(tool)
180179
else:
181180
try:
182181
copied_tools.append(deepcopy(tool))
183182
except Exception:
184-
# Tool can't be deep copied, share by reference
185-
copied_tools.append(tool)
183+
# deepcopy failed (e.g. Google httplib2 clients) — try clone()
184+
if hasattr(tool, "clone") and callable(tool.clone):
185+
copied_tools.append(tool.clone())
186+
else:
187+
copied_tools.append(tool)
186188
except Exception:
187189
# MCP detection failed, share tool by reference to be safe
188190
copied_tools.append(tool)

libs/agno/agno/os/app.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from agno.registry import Registry
6868
from agno.remote.base import RemoteDb, RemoteKnowledge
6969
from agno.team import RemoteTeam, Team
70+
from agno.tools.toolkit import Toolkit
7071
from agno.utils.log import log_debug, log_error, log_info, log_warning
7172
from agno.utils.string import generate_id, generate_id_from_name
7273
from agno.workflow import RemoteWorkflow, Workflow
@@ -527,6 +528,17 @@ def _initialize_agents(self) -> None:
527528

528529
agent.initialize_agent()
529530

531+
# Wire agent.db to toolkits that declare _db (e.g. GoogleAuth for OAuth token storage).
532+
# Must happen here (init time) so the OAuth callback closure captures a wired instance.
533+
# Only sync DBs that override get_auth_token — skips async and unsupported backends.
534+
if agent.db is not None and isinstance(agent.tools, list):
535+
from agno.db.base import BaseDb
536+
537+
if isinstance(agent.db, BaseDb) and type(agent.db).get_auth_token is not BaseDb.get_auth_token:
538+
for tool in agent.tools:
539+
if isinstance(tool, Toolkit) and hasattr(tool, "_db") and tool._db is None:
540+
tool._db = agent.db
541+
530542
# Required for the built-in routes to work
531543
agent.store_events = True
532544

@@ -556,6 +568,18 @@ def _initialize_teams(self) -> None:
556568
if isinstance(member, Agent):
557569
member.team_id = None
558570
member.initialize_agent()
571+
# Wire DB to member agent toolkits — same guard as _initialize_agents
572+
member_db = member.db or team.db
573+
if member_db is not None and isinstance(member.tools, list):
574+
from agno.db.base import BaseDb
575+
576+
if (
577+
isinstance(member_db, BaseDb)
578+
and type(member_db).get_auth_token is not BaseDb.get_auth_token
579+
):
580+
for tool in member.tools:
581+
if isinstance(tool, Toolkit) and hasattr(tool, "_db") and tool._db is None:
582+
tool._db = member_db
559583
elif isinstance(member, Team):
560584
member.initialize_team()
561585

@@ -823,7 +847,6 @@ async def general_exception_handler(_: Request, exc: Exception) -> JSONResponse:
823847
# This allows middleware (like JWT) to access these values
824848
fastapi_app.state.agent_os_id = self.id
825849
fastapi_app.state.cors_allowed_origins = self.cors_allowed_origins
826-
827850
# Store internal service token for scheduler auth bypass
828851
if self._internal_service_token:
829852
fastapi_app.state.internal_service_token = self._internal_service_token

libs/agno/agno/os/interfaces/slack/router.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ async def _process_slack_event(data: dict):
191191
if skipped:
192192
notice = "[Skipped files: " + ", ".join(skipped) + "]"
193193
message_text = f"{notice}\n{message_text}"
194+
# Per-request clone isolates mutable toolkit state (creds, service) between users
195+
_entity = entity.deep_copy() if hasattr(entity, "deep_copy") else entity
196+
194197
run_kwargs: Dict[str, Any] = {
195198
"user_id": resolved_user_id,
196199
"session_id": session_id,
@@ -207,7 +210,7 @@ async def _process_slack_event(data: dict):
207210
"audio": audio or None,
208211
}
209212

210-
response = await entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
213+
response = await _entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
211214

212215
if response:
213216
if response.status == "ERROR":
@@ -312,6 +315,8 @@ async def _stream_slack_response(data: dict):
312315
if skipped:
313316
notice = "[Skipped files: " + ", ".join(skipped) + "]"
314317
message_text = f"{notice}\n{message_text}"
318+
_entity = entity.deep_copy() if hasattr(entity, "deep_copy") else entity
319+
315320
run_kwargs: Dict[str, Any] = {
316321
"stream": True,
317322
# Enables event-level chunks for task card and tool lifecycle rendering
@@ -331,7 +336,7 @@ async def _stream_slack_response(data: dict):
331336
"audio": audio or None,
332337
}
333338

334-
response_stream = entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
339+
response_stream = _entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
335340

336341
if response_stream is None:
337342
try:

libs/agno/agno/os/interfaces/telegram/router.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ async def _stream_response(
204204
message_thread_id: Optional[int],
205205
is_private: bool = False,
206206
) -> None:
207+
_entity = entity.deep_copy() if hasattr(entity, "deep_copy") else entity
208+
207209
is_workflow = entity_type == "workflow"
208210
stream_kwargs: dict = dict(stream=True, stream_events=True, **run_kwargs)
209211
if not is_workflow:
@@ -219,7 +221,7 @@ async def _stream_response(
219221
)
220222

221223
try:
222-
async for event in entity.arun(message_text, **stream_kwargs): # type: ignore[union-attr]
224+
async for event in _entity.arun(message_text, **stream_kwargs): # type: ignore[union-attr]
223225
if isinstance(event, (RunOutput, TeamRunOutput)):
224226
state.final_run_output = event
225227
continue
@@ -261,7 +263,8 @@ async def _sync_response(
261263
reply_to: Optional[int],
262264
message_thread_id: Optional[int],
263265
) -> None:
264-
response = await entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
266+
_entity = entity.deep_copy() if hasattr(entity, "deep_copy") else entity
267+
response = await _entity.arun(message_text, **run_kwargs) # type: ignore[union-attr]
265268
if not response or response.status == "ERROR":
266269
if response:
267270
log_error(response.content)

libs/agno/agno/os/interfaces/whatsapp/router.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,10 @@ async def _keep_typing():
313313
except asyncio.CancelledError:
314314
pass
315315

316+
_entity = entity.deep_copy() if hasattr(entity, "deep_copy") else entity
316317
typing_task = asyncio.create_task(_keep_typing())
317318
try:
318-
response = await entity.arun(parsed.text, **run_kwargs) # type: ignore[union-attr]
319+
response = await _entity.arun(parsed.text, **run_kwargs) # type: ignore[union-attr]
319320
finally:
320321
typing_task.cancel()
321322

libs/agno/agno/team/_utils.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,18 +183,20 @@ def _deep_copy_field(team: Team, field_name: str, field_value: Any) -> Any:
183183
copied_tools = []
184184
for tool in field_value:
185185
try:
186-
# Share MCP tools (they maintain server connections)
187-
is_mcp_tool = hasattr(type(tool), "__mro__") and any(
188-
c.__name__ in ["MCPTools", "MultiMCPTools"] for c in type(tool).__mro__
189-
)
190-
if is_mcp_tool:
186+
# Share tools that maintain connections or act as coordinators
187+
mro_names = {c.__name__ for c in type(tool).__mro__} if hasattr(type(tool), "__mro__") else set()
188+
is_shared_tool = bool(mro_names & {"MCPTools", "MultiMCPTools", "GoogleAuth"})
189+
if is_shared_tool:
191190
copied_tools.append(tool)
192191
else:
193192
try:
194193
copied_tools.append(deepcopy(tool))
195194
except Exception:
196-
# Tool can't be deep copied, share by reference
197-
copied_tools.append(tool)
195+
# deepcopy failed (e.g. Google httplib2 clients) — try clone()
196+
if hasattr(tool, "clone") and callable(tool.clone):
197+
copied_tools.append(tool.clone())
198+
else:
199+
copied_tools.append(tool)
198200
except Exception:
199201
# MCP detection failed, share tool by reference to be safe
200202
copied_tools.append(tool)
@@ -216,7 +218,6 @@ def _deep_copy_field(team: Team, field_name: str, field_value: Any) -> Any:
216218
"session_summary_manager",
217219
"compression_manager",
218220
"learning",
219-
"skills",
220221
):
221222
return field_value
222223

libs/agno/agno/tools/function.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,21 @@ def from_dict(cls, data: Dict[str, Any]) -> "Function":
234234
approval_type=data.get("approval_type"),
235235
)
236236

237+
def clone_for(self, new_owner: Any) -> "Function":
238+
"""Shallow-copy this Function with its entrypoint rebound to new_owner.
239+
240+
Used by Toolkit.clone() so tool methods operate on the cloned toolkit's
241+
state instead of the original's. Only rebinds bound methods (__self__);
242+
closures and standalone functions are left as-is.
243+
"""
244+
import types
245+
246+
clone = self.model_copy()
247+
ep = self.entrypoint
248+
if ep is not None and hasattr(ep, "__self__"):
249+
clone.entrypoint = types.MethodType(ep.__func__, new_owner) # type: ignore[attr-defined]
250+
return clone
251+
237252
def model_copy(self, *, deep: bool = False) -> "Function":
238253
"""
239254
Override model_copy to handle callable fields that can't be deep copied (pickled).

libs/agno/agno/tools/toolkit.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ class Toolkit:
1212
# When True, the Agent will automatically call connect() before using tools and close() after
1313
_requires_connect: bool = False
1414

15+
# Subclasses override with attr names that hold per-user mutable state (creds, service, etc.).
16+
# clone() resets these to None so each request rebuilds them for the current user.
17+
_clone_reset_attrs: tuple = ()
18+
1519
def __init__(
1620
self,
1721
name: str = "toolkit",
@@ -345,6 +349,30 @@ def close(self) -> None:
345349
"""
346350
pass
347351

352+
def clone(self) -> "Toolkit":
353+
"""Shallow-copy this toolkit and reset per-user mutable state.
354+
355+
Used by deep_copy_field() as a fallback when deepcopy() fails (e.g. Google
356+
API clients hold unpicklable httplib2 transports). Config attrs (scopes,
357+
credentials_path, etc.) are shared; mutable attrs listed in _clone_reset_attrs
358+
are set to None so the auth decorator rebuilds them for the current user.
359+
"""
360+
import copy
361+
362+
clone = copy.copy(self)
363+
for attr in self._clone_reset_attrs:
364+
setattr(clone, attr, None)
365+
366+
# Rebuild function dicts so entrypoints are bound to the clone, not the original
367+
clone.functions = OrderedDict()
368+
clone.async_functions = OrderedDict()
369+
for name, fn in self.functions.items():
370+
clone.functions[name] = fn.clone_for(clone)
371+
for name, fn in self.async_functions.items():
372+
clone.async_functions[name] = fn.clone_for(clone)
373+
374+
return clone
375+
348376
def _check_path(self, file_name: str, base_dir: Path, restrict_to_base_dir: bool = True) -> Tuple[bool, Path]:
349377
"""Check if the file path is within the base directory.
350378

0 commit comments

Comments
 (0)