Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent-streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ This sample demonstrates a chatbot application that uses [Genkit](https://genkit

![Agent Streaming Chat Screenshot](screenshot.png)

The Genkit code for the streaming flow can be found in `src/server/index.js`.
The Genkit code for the streaming flow can be found in `src/server/index.ts`.

## How it works

- **Backend (Express):** Exposes an endpoint `/api/chat` using Server-Sent Events (SSE). It invokes the Genkit flow `streamingThoughtsFlow`, which streams both intermediate thoughts and the final text chunks.
- **Genkit Flow:** Uses `googleAI.model('gemini-3.5-flash')` with `thinkingConfig` (`includeThoughts: true`) to stream reasoning details. The flow yields custom chunk objects with `type: 'thought'` or `type: 'text'`.
- **Frontend (Vanilla JS):** Reads the Server-Sent Events stream, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time.
- **Frontend (React + TypeScript):** Reads the Server-Sent Events stream, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time.

## Running the app

Expand Down
24 changes: 2 additions & 22 deletions agent-streaming/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,8 @@
</head>

<body>
<div class="chat-container">
<div class="chat-messages" id="chat-messages">
<!-- Welcome Message -->
<div class="message system-message">
<div class="message-content">
Agent will think out loud and show reasoning
</div>
</div>
</div>

<form class="chat-input-form" id="chat-form">
<input type="text" id="prompt-input" placeholder="Type a message..." autocomplete="off" required />
<button type="submit" id="send-button" aria-label="Send message">
<!-- SVG Send Icon -->
<svg viewBox="0 0 24 24" width="24" height="24">
<path fill="currentColor" d="M2,21L23,12L2,3V10L17,12L2,14V21Z" />
</svg>
</button>
</form>
</div>

<script type="module" src="/src/client/script.js"></script>
<div id="root" class="chat-container"></div>
<script type="module" src="/src/client/main.tsx"></script>
</body>

</html>
87 changes: 86 additions & 1 deletion agent-streaming/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions agent-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"test": "echo \"Error: no test specified\" && exit 1",
"dev:client": "vite",
"dev:server": "tsx --env-file=.env --watch src/server/index.ts",
"build": "vite build",
"build": "tsc --noEmit && vite build",
"start": "tsx --env-file=.env src/server/index.ts"
},
"keywords": [],
Expand All @@ -20,12 +20,17 @@
"express": "^5.2.1",
"genkit": "^1.36.0",
"helmet": "^8.2.0",
"marked": "^18.0.5"
"marked": "^18.0.5",
"react": "^19.2.7",
"react-dom": "^19.2.7"
},
"devDependencies": {
"@types/dompurify": "^3.0.5",
"@types/express": "^5.0.6",
"@types/node": "^25.9.3",
"@types/react": "^19.2.17",
"@types/react-dom": "^19.2.3",
"@vitejs/plugin-react": "^6.0.2",
"tsx": "^4.22.4",
"typescript": "^6.0.3",
"vite": "^8.0.16"
Expand Down
166 changes: 166 additions & 0 deletions agent-streaming/src/client/App.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import React, { useState, useEffect, useRef } from 'react';
import { ChatInput } from './components/ChatInput.js';
import { ThoughtBox } from './components/ThoughtBox.js';
import { MessageBubble } from './components/MessageBubble.js';
import { Message, StreamChunk } from './types.js';

export const App: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
const [loading, setLoading] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
const abortControllerRef = useRef<AbortController | null>(null);

// Auto-scroll to bottom on messages update
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'auto' });
}, [messages]);

// Clean up pending stream on unmount
useEffect(() => {
return () => {
abortControllerRef.current?.abort();
};
}, []);

const handlePromptSubmit = async (prompt: string) => {
setLoading(true);

// Abort any previous request
abortControllerRef.current?.abort();
const controller = new AbortController();
abortControllerRef.current = controller;

const userMessage: Message = {
id: crypto.randomUUID(), // Generate unique message IDs
role: 'user',
type: 'text',
content: prompt,
};

setMessages((prev) => [...prev, userMessage]);

try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt }),
signal: controller.signal,
});

if (!response.ok) {
throw new Error(`Network response was not ok (Status: ${response.status})`);
}
if (!response.body) {
throw new Error(`Response body is empty (Status: ${response.status})`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { value, done } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');

// Store the last partial line in the buffer to prevent parsing
// errors if a chunk is split mid-line
buffer = lines.pop() || '';

const dataPrefix = 'data: ';
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.startsWith(dataPrefix)) {
const jsonStr = trimmed.slice(dataPrefix.length);
const chunk = JSON.parse(jsonStr) as StreamChunk;
handleStreamChunk(chunk);
}
}
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
return;
}
console.error('Streaming error:', error);
const errorMessage: Message = {
id: crypto.randomUUID(),
role: 'system',
type: 'error',
content: 'Failed to connect to the agent. Please try again.',
};
setMessages((prev) => [...prev, errorMessage]);
} finally {
if (abortControllerRef.current === controller) {
setLoading(false);
}
}
};

const handleStreamChunk = (chunk: StreamChunk) => {
setMessages((prev) => {
const last = prev.at(-1);

if (last && last.id === chunk.messageId) {
// Update the active message in-place
const updated = {
...last,
content: last.content + (chunk.content || ''),
...(last.type === 'thought' ? { stepName: chunk.currentStep || last.stepName } : {}),
} as Message;
return [...prev.slice(0, -1), updated];
} else {
// Create a new message block and append it
const newMessage = {
id: chunk.messageId,
type: chunk.type,
role: chunk.type === 'thought' ? 'model' : chunk.type === 'error' ? 'system' : 'model',
content: chunk.content || '',
...(chunk.type === 'thought' ? { stepName: chunk.currentStep || 'Thinking' } : {}),
} as Message;
return [...prev, newMessage];
}
});
};

return (
<>
<section className="chat-messages" aria-label="Chat History">
{/* Welcome Message */}
<div className="message system-message">
<div className="message-content">
Agent will think out loud and show reasoning
</div>
</div>

{/* Conversation list */}
{messages.map((msg) => {
if (msg.type === 'thought') {
return (
<ThoughtBox
key={msg.id}
content={msg.content}
stepName={msg.stepName}
/>
);
} else {
return (
<MessageBubble
key={msg.id}
{...msg}
/>
);
}
})}
<div ref={messagesEndRef} />
</section>

<ChatInput onSubmit={handlePromptSubmit} disabled={loading} />
</>
);
};

export default App;
Loading