feat(python): introduce TypedDict for model config#5554
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces bidirectional streaming agents to Genkit Python, adding the core agent runtime, session state management, snapshot persistence, and JSON patch diffing for streaming custom state. It also adds an Artifacts middleware to list and manage session artifacts, updates the reflection server to support bidirectional action execution, and implements cooperative cancellation via abort signals across generate actions and tools. The reviewer feedback focuses on preventing background tasks spawned via asyncio.create_task from being garbage collected prematurely by maintaining strong references to them. Other suggestions include leveraging the newly introduced ModelConfigDict for better type safety and IDE autocompletion, fixing a typo in the documentation of _tools.py, and using model_copy in the Artifacts middleware to avoid discarding metadata or other fields when updating messages.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if getattr(item, 'detach', False): | ||
| if not detach_future.done(): | ||
| detach_future.set_result(None) | ||
|
|
||
| val = item | ||
|
|
||
| async def _finish_detach_input( | ||
| payload: AgentInput | None = None, | ||
| *, | ||
| bound_item: AgentInput = val, | ||
| ) -> None: | ||
| p = payload if payload is not None else bound_item | ||
| if _agent_input_has_payload(p): | ||
| await self._intake.put(p) | ||
| await self._intake.put(QUEUE_SENTINEL) | ||
|
|
||
| asyncio.create_task(_finish_detach_input()) | ||
| return |
There was a problem hiding this comment.
Spawning an unreferenced background task _finish_detach_input using asyncio.create_task and returning early from _forward is unnecessary and dangerous. The event loop only keeps weak references to tasks, making them vulnerable to garbage collection mid-execution. Additionally, forward_task resolves before the queue operations are actually complete. We can simply await the queue operations directly in _forward and return.
if getattr(item, 'detach', False):
if not detach_future.done():
detach_future.set_result(None)
if _agent_input_has_payload(item):
await self._intake.put(item)
await self._intake.put(QUEUE_SENTINEL)
return| asyncio.create_task(self._watch_snapshot_abort(pending_snap.snapshot_id, abort_signal)) | ||
| asyncio.create_task(self._finalize_detach(pending_snap, fn_task, forward_task, err_holder, result_holder)) |
There was a problem hiding this comment.
Spawning background tasks _watch_snapshot_abort and _finalize_detach using asyncio.create_task without keeping a strong reference to them makes them vulnerable to garbage collection mid-execution (since _AgentRuntime.run returns immediately in the detach path, destroying the local frame references). We should maintain a strong reference to these background tasks using a module-level set.
| asyncio.create_task(self._watch_snapshot_abort(pending_snap.snapshot_id, abort_signal)) | |
| asyncio.create_task(self._finalize_detach(pending_snap, fn_task, forward_task, err_holder, result_holder)) | |
| self._detached = True | |
| abort_task = asyncio.create_task(self._watch_snapshot_abort(pending_snap.snapshot_id, abort_signal)) | |
| _background_tasks.add(abort_task) | |
| abort_task.add_done_callback(_background_tasks.discard) | |
| finalize_task = asyncio.create_task(self._finalize_detach(pending_snap, fn_task, forward_task, err_holder, result_holder)) | |
| _background_tasks.add(finalize_task) | |
| finalize_task.add_done_callback(_background_tasks.discard) |
| ToolRequestPart, | ||
| TurnEnd, | ||
| ) | ||
|
|
| if not isinstance(chunk, AgentStreamChunk): | ||
| return | ||
| if chunk.artifact is not None: | ||
| asyncio.get_event_loop().create_task(self._session.add_artifacts(chunk.artifact, _suppress_events=True)) |
There was a problem hiding this comment.
Keep a strong reference to the background task created for add_artifacts to prevent it from being garbage collected mid-execution.
| asyncio.get_event_loop().create_task(self._session.add_artifacts(chunk.artifact, _suppress_events=True)) | |
| try: | |
| task = asyncio.create_task(self._session.add_artifacts(chunk.artifact, _suppress_events=True)) | |
| _background_tasks.add(task) | |
| task.add_done_callback(_background_tasks.discard) | |
| except RuntimeError: | |
| pass |
| # Sentinel tells BidiConnection.receive() to stop. | ||
| await out_queue.put(_SENTINEL) | ||
|
|
||
| asyncio.create_task(_run()) |
There was a problem hiding this comment.
Spawning the background task _run in stream_bidi using asyncio.create_task without keeping a strong reference to it makes it vulnerable to garbage collection mid-execution. We should store the task on the returned BidiConnection instance to keep it strongly referenced as long as the caller holds the connection.
| asyncio.create_task(_run()) | |
| conn = BidiConnection(in_queue, out_queue, result_future) | |
| conn._task = asyncio.create_task(_run()) |
| *, | ||
| variant: str | None = None, | ||
| model: str | None = None, | ||
| config: dict[str, object] | ModelConfig | None = None, |
There was a problem hiding this comment.
| import anyio | ||
| import uvicorn | ||
| from pydantic import BaseModel | ||
|
|
| *, | ||
| variant: str | None = None, | ||
| model: str | None = None, | ||
| config: dict[str, object] | ModelConfig | None = None, |
There was a problem hiding this comment.
| """Register a function as a tool. | ||
|
|
||
| Normally, the input_schema and output_schema are inferred from func. However, | ||
| Normally, the input_schema and output_schem are inferred from func. However, |
| def inject_artifact_listing_messages(messages: list[Message], listing: str) -> list[Message]: | ||
| """Strip prior listing parts and append a fresh listing to the system message.""" | ||
| out = list(messages) | ||
|
|
||
| for i, msg in enumerate(out): | ||
| filtered: list[Part] = [] | ||
| for part in msg.content or []: | ||
| root = part.root | ||
| meta = root.metadata if isinstance(root, TextPart) else None | ||
| if isinstance(meta, dict) and meta.get(ARTIFACTS_LISTING_MARKER): | ||
| continue | ||
| filtered.append(part) | ||
| if len(filtered) != len(msg.content or []): | ||
| out[i] = Message(role=msg.role, content=filtered) | ||
|
|
||
| listing_part = Part( | ||
| root=TextPart(text=listing, metadata={ARTIFACTS_LISTING_MARKER: True}), | ||
| ) | ||
|
|
||
| system_idx: int | None = None | ||
| for i, msg in enumerate(out): | ||
| if msg.role == Role.SYSTEM: | ||
| system_idx = i | ||
| break | ||
|
|
||
| if system_idx is not None: | ||
| msg = out[system_idx] | ||
| out[system_idx] = Message( | ||
| role=Role.SYSTEM, | ||
| content=[*msg.content, listing_part], | ||
| ) | ||
| else: | ||
| out.insert(0, Message(role=Role.SYSTEM, content=[listing_part])) | ||
|
|
||
| return out |
There was a problem hiding this comment.
Recreating Message instances using Message(role=msg.role, content=filtered) and Message(role=Role.SYSTEM, content=[...]) discards other fields like metadata or docs on the original messages. Use msg.model_copy(update={'content': ...}) to preserve other fields.
def inject_artifact_listing_messages(messages: list[Message], listing: str) -> list[Message]:
"""Strip prior listing parts and append a fresh listing to the system message."""
out = list(messages)
for i, msg in enumerate(out):
filtered: list[Part] = []
for part in msg.content or []:
root = part.root
meta = root.metadata if isinstance(root, TextPart) else None
if isinstance(meta, dict) and meta.get(ARTIFACTS_LISTING_MARKER):
continue
filtered.append(part)
if len(filtered) != len(msg.content or []):
out[i] = msg.model_copy(update={'content': filtered})
listing_part = Part(
root=TextPart(text=listing, metadata={ARTIFACTS_LISTING_MARKER: True}),
)
system_idx: int | None = None
for i, msg in enumerate(out):
if msg.role == Role.SYSTEM:
system_idx = i
break
if system_idx is not None:
msg = out[system_idx]
out[system_idx] = msg.model_copy(
update={'content': [*(msg.content or []), listing_part]}
)
else:
out.insert(0, Message(role=Role.SYSTEM, content=[listing_part]))
return out824e800 to
3fb0c8e
Compare
Fixes #5553