Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
537 changes: 377 additions & 160 deletions docs/streaming/custom-streaming-ws.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/streaming/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ text, audio, and video inputs, and they can provide text and audio output.

<div class="grid cards" markdown>

<!-- this quickstart is outdated
- :material-console-line: **Quickstart (Bidi-streaming)**
---
Expand All @@ -42,6 +43,7 @@ text, audio, and video inputs, and they can provide text and audio output.
implement low-latency and bidirectional voice and video communication.
- [Quickstart (Bidi-streaming)](../get-started/streaming/quickstart-streaming.md)
-->

- :material-console-line: **Custom Audio Streaming app sample**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from google.adk.agents import Agent
from google.adk.tools import google_search # Import the tool

root_agent = Agent(
# A unique name for the agent.
name="google_search_agent",
# The Large Language Model (LLM) that agent will use.
model="gemini-2.0-flash-exp", # if this model does not work, try below
#model="gemini-2.0-flash-live-001",
model=os.getenv("DEMO_AGENT_MODEL"),
# A short description of the agent's purpose.
description="Agent to answer questions using Google Search.",
# Instructions to set the agent's behavior.
Expand Down
237 changes: 148 additions & 89 deletions examples/python/snippets/streaming/adk-streaming-ws/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@
from pathlib import Path
from dotenv import load_dotenv

# Load environment variables BEFORE importing the agent
load_dotenv()

from google.genai import types
from google.genai.types import (
Part,
Content,
Blob,
)

from google.adk.runners import InMemoryRunner
from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.genai import types
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions.in_memory_session_service import InMemorySessionService

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from fastapi.websockets import WebSocketDisconnect

from google_search_agent.agent import root_agent

Expand All @@ -44,40 +49,64 @@
# ADK Streaming
#

# Load Gemini API Key
load_dotenv()
# Application configuration
APP_NAME = "adk-streaming-ws"

APP_NAME = "ADK Streaming example"
# Initialize session service
session_service = InMemorySessionService()

# APP_NAME and session_service are defined in the Initialization section above
runner = Runner(
app_name=APP_NAME,
agent=root_agent,
session_service=session_service,
)

async def start_agent_session(user_id, is_audio=False):
"""Starts an agent session"""

# Create a Runner
runner = InMemoryRunner(
# Get or create session (recommended pattern for production)
session_id = f"{APP_NAME}_{user_id}"
session = await runner.session_service.get_session(
app_name=APP_NAME,
agent=root_agent,
user_id=user_id,
session_id=session_id,
)
if not session:
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)

# Create a Session
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=user_id, # Replace with actual user ID
)
# Configure response format based on client preference
# IMPORTANT: You must choose exactly ONE modality per session
# Either ["TEXT"] for text responses OR ["AUDIO"] for voice responses
# You cannot use both modalities simultaneously in the same session

# Force AUDIO modality for native audio models regardless of client preference
model_name = root_agent.model if isinstance(root_agent.model, str) else root_agent.model.model
is_native_audio = "native-audio" in model_name.lower()

# Set response modality
modality = "AUDIO" if is_audio else "TEXT"
modality = "AUDIO" if (is_audio or is_native_audio) else "TEXT"

# Enable session resumption for improved reliability
# For audio mode, enable output transcription to get text for UI display
run_config = RunConfig(
streaming_mode=StreamingMode.BIDI,
response_modalities=[modality],
session_resumption=types.SessionResumptionConfig()
session_resumption=types.SessionResumptionConfig(),
output_audio_transcription=types.AudioTranscriptionConfig() if (is_audio or is_native_audio) else None,
)

# Create a LiveRequestQueue for this session
# Create LiveRequestQueue in async context (recommended best practice)
# This ensures the queue uses the correct event loop
live_request_queue = LiveRequestQueue()

# Start agent session
# Start streaming session - returns async iterator for agent responses
live_events = runner.run_live(
session=session,
user_id=user_id,
session_id=session.id,
live_request_queue=live_request_queue,
run_config=run_config,
)
Expand All @@ -86,69 +115,90 @@ async def start_agent_session(user_id, is_audio=False):

async def agent_to_client_messaging(websocket, live_events):
"""Agent to client communication"""
async for event in live_events:

# If the turn complete or interrupted, send it
if event.turn_complete or event.interrupted:
message = {
"turn_complete": event.turn_complete,
"interrupted": event.interrupted,
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: {message}")
continue

# Read the Content and its first Part
part: Part = (
event.content and event.content.parts and event.content.parts[0]
)
if not part:
continue

# If it's audio, send Base64 encoded audio data
is_audio = part.inline_data and part.inline_data.mime_type.startswith("audio/pcm")
if is_audio:
audio_data = part.inline_data and part.inline_data.data
if audio_data:
try:
async for event in live_events:

# Handle output audio transcription for native audio models
# This provides text representation of audio output for UI display
if event.output_transcription and event.output_transcription.text:
transcript_text = event.output_transcription.text
message = {
"mime_type": "audio/pcm",
"data": base64.b64encode(audio_data).decode("ascii")
"mime_type": "text/plain",
"data": transcript_text,
"is_transcript": True
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: audio/pcm: {len(audio_data)} bytes.")
continue

# If it's text and a partial text, send it
if part.text and event.partial:
message = {
"mime_type": "text/plain",
"data": part.text
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: text/plain: {message}")
print(f"[AGENT TO CLIENT]: audio transcript: {transcript_text}")
# Continue to process audio data if present
# Don't return here as we may want to send both transcript and audio

# Read the Content and its first Part
part: Part = (
event.content and event.content.parts and event.content.parts[0]
)
if part:
# Audio data must be Base64-encoded for JSON transport
is_audio = part.inline_data and part.inline_data.mime_type.startswith("audio/pcm")
if is_audio:
audio_data = part.inline_data and part.inline_data.data
if audio_data:
message = {
"mime_type": "audio/pcm",
"data": base64.b64encode(audio_data).decode("ascii")
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: audio/pcm: {len(audio_data)} bytes.")

# If it's text and a partial text, send it (for cascade audio models or text mode)
if part.text and event.partial:
message = {
"mime_type": "text/plain",
"data": part.text
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: text/plain: {message}")

# If the turn complete or interrupted, send it
if event.turn_complete or event.interrupted:
message = {
"turn_complete": event.turn_complete,
"interrupted": event.interrupted,
}
await websocket.send_text(json.dumps(message))
print(f"[AGENT TO CLIENT]: {message}")
except WebSocketDisconnect:
print("Client disconnected from agent_to_client_messaging")
except Exception as e:
print(f"Error in agent_to_client_messaging: {e}")


async def client_to_agent_messaging(websocket, live_request_queue):
"""Client to agent communication"""
while True:
# Decode JSON message
message_json = await websocket.receive_text()
message = json.loads(message_json)
mime_type = message["mime_type"]
data = message["data"]

# Send the message to the agent
if mime_type == "text/plain":
# Send a text message
content = Content(role="user", parts=[Part.from_text(text=data)])
live_request_queue.send_content(content=content)
print(f"[CLIENT TO AGENT]: {data}")
elif mime_type == "audio/pcm":
# Send an audio data
decoded_data = base64.b64decode(data)
live_request_queue.send_realtime(Blob(data=decoded_data, mime_type=mime_type))
else:
raise ValueError(f"Mime type not supported: {mime_type}")
try:
while True:
message_json = await websocket.receive_text()
message = json.loads(message_json)
mime_type = message["mime_type"]
data = message["data"]

if mime_type == "text/plain":
# send_content() sends text in "turn-by-turn mode"
# This signals a complete turn to the model, triggering immediate response
content = Content(role="user", parts=[Part.from_text(text=data)])
live_request_queue.send_content(content=content)
print(f"[CLIENT TO AGENT]: {data}")
elif mime_type == "audio/pcm":
# send_realtime() sends audio in "realtime mode"
# Data flows continuously without turn boundaries, enabling natural conversation
# Audio is Base64-encoded for JSON transport, decode before sending
decoded_data = base64.b64decode(data)
live_request_queue.send_realtime(Blob(data=decoded_data, mime_type=mime_type))
else:
raise ValueError(f"Mime type not supported: {mime_type}")
except WebSocketDisconnect:
print("Client disconnected from client_to_agent_messaging")
except Exception as e:
print(f"Error in client_to_agent_messaging: {e}")


#
Expand All @@ -169,30 +219,39 @@ async def root():

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int, is_audio: str):
"""Client websocket endpoint"""
"""Client websocket endpoint
This async function creates the LiveRequestQueue in an async context,
which is the recommended best practice from the ADK documentation.
This ensures the queue uses the correct event loop.
"""

# Wait for client connection
await websocket.accept()
print(f"Client #{user_id} connected, audio mode: {is_audio}")

# Start agent session
user_id_str = str(user_id)
live_events, live_request_queue = await start_agent_session(user_id_str, is_audio == "true")

# Start tasks
# Run bidirectional messaging concurrently
agent_to_client_task = asyncio.create_task(
agent_to_client_messaging(websocket, live_events)
)
client_to_agent_task = asyncio.create_task(
client_to_agent_messaging(websocket, live_request_queue)
)

# Wait until the websocket is disconnected or an error occurs
tasks = [agent_to_client_task, client_to_agent_task]
await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

# Close LiveRequestQueue
live_request_queue.close()

# Disconnected
print(f"Client #{user_id} disconnected")
try:
# Wait for either task to complete (connection close or error)
tasks = [agent_to_client_task, client_to_agent_task]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

# Check for errors in completed tasks
for task in done:
if task.exception() is not None:
print(f"Task error for client #{user_id}: {task.exception()}")
import traceback
traceback.print_exception(type(task.exception()), task.exception(), task.exception().__traceback__)
finally:
# Clean up resources (always runs, even if asyncio.wait fails)
live_request_queue.close()
print(f"Client #{user_id} disconnected")
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-adk==1.10.0
google-adk==1.17.0
Loading