diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 065e434..78aa62e 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -2,6 +2,7 @@ import { AmpAgentWatcher, ClaudeCodeAgentWatcher, CodexAgentWatcher, + DroidAgentWatcher, OpenCodeAgentWatcher, PiAgentWatcher, PluginLoader, @@ -66,6 +67,7 @@ if (extraProviders.length > 0) { loader.registerWatcher(new AmpAgentWatcher()); loader.registerWatcher(new ClaudeCodeAgentWatcher()); loader.registerWatcher(new CodexAgentWatcher()); +loader.registerWatcher(new DroidAgentWatcher()); loader.registerWatcher(new OpenCodeAgentWatcher()); loader.registerWatcher(new PiAgentWatcher()); diff --git a/packages/runtime/src/agents/watchers/droid.ts b/packages/runtime/src/agents/watchers/droid.ts new file mode 100644 index 0000000..c6147a6 --- /dev/null +++ b/packages/runtime/src/agents/watchers/droid.ts @@ -0,0 +1,461 @@ +/** + * Droid (Factory) agent watcher + * + * Watches ~/.factory/sessions/ for JSONL file changes, + * determines agent status from journal entries, and emits events + * mapped to mux sessions via the project directory encoded in folder names. + * + * Directory structure: ~/.factory/sessions//.jsonl + * Encoded path: /Users/foo/myproject → -Users-foo-myproject + * (Droid replaces `/` with `-` but preserves `.` and `_`) + * + * ## Droid JSONL Lifecycle + * + * Each JSONL file represents one Droid session. Entries are appended + * as the session progresses. The top-level `type` field determines the + * entry category: + * + * ### Entry types: + * - `session_start` — session metadata: id, sessionTitle, cwd, version + * - `message` — user or assistant message with content blocks + * - `todo_state` — task tracking (no status change) + * - `session_end` — terminal: durationMs, toolCount, finalText + * + * ### Message structure: + * Droid writes one complete message entry per turn (not streamed). + * Assistant messages contain all content blocks for the turn: + * - `thinking` blocks (model reasoning) + * - `text` blocks (response text) + * - `tool_use` blocks (tool calls) + * User messages contain: + * - `text` blocks (user prompt) + * - `tool_result` blocks (tool execution results) + * + * ### Status mapping: + * - session_end entry → "done" + * - assistant + content has tool_use → "running" (tool calls in progress) + * - assistant + content has only text → "done" (final answer, no more tools) + * - assistant + content has thinking → "running" (model is reasoning) + * - user + content is tool_result → "running" (tool executed, next turn coming) + * - user + text → "running" (new prompt submitted) + * - todo_state / session_start → null (no status change) + * + * ### Permission prompt detection: + * When Droid awaits permission, the last entry is assistant with tool_use + * and the file stops growing. After TOOL_USE_WAIT_MS with no growth, + * we promote "running" → "waiting". + * + * ### Stuck process detection: + * If status is "running" or "waiting" and the file hasn't grown for + * STUCK_RUNNING_MS, we assume the process died and emit "stale". + */ + +import { watch, type FSWatcher } from "fs"; +import { readdir, stat } from "fs/promises"; +import { join, basename } from "path"; +import { homedir } from "os"; +import type { AgentStatus } from "../../contracts/agent"; +import type { AgentWatcher, AgentWatcherContext } from "../../contracts/agent-watcher"; + +// --- Types --- + +interface ContentItem { + type?: string; + text?: string; +} + +interface DroidEntry { + type?: string; + /** session_start fields */ + id?: string; + sessionTitle?: string; + cwd?: string; + /** message fields */ + message?: { + role?: string; + content?: ContentItem[] | string; + }; +} + +interface SessionState { + status: AgentStatus; + fileSize: number; + threadName?: string; + projectDir?: string; + /** Timestamp when status first became "running" from a tool_use entry */ + toolUseSeenAt?: number; + /** Timestamp when the file was last observed to have grown (for stuck detection) */ + lastGrowthAt?: number; + /** File mtime at last observation — used for seed emission ts instead of Date.now() */ + lastMtimeMs?: number; +} + +const POLL_MS = 2000; +const STALE_MS = 5 * 60 * 1000; +/** How long to wait before promoting tool_use "running" → "waiting" (permission prompt heuristic) */ +const TOOL_USE_WAIT_MS = 3000; +/** How long a "running" session can go without file growth before we assume the process died */ +const STUCK_RUNNING_MS = 15_000; + +// --- Status detection --- + +/** + * Returns the status implied by a journal entry, or `null` if the entry + * is a control/metadata record that should not change the current status. + */ +export function determineStatus(entry: DroidEntry): AgentStatus | null { + // session_end is terminal + if (entry.type === "session_end") return "done"; + + // Skip non-message entries (session_start, todo_state, etc.) + if (entry.type !== "message") return null; + + const msg = entry.message; + if (!msg?.role) return null; + + const content = msg.content; + const items: ContentItem[] = Array.isArray(content) + ? content + : typeof content === "string" + ? [{ type: "text", text: content }] + : []; + + if (msg.role === "assistant") { + // tool_use → running (tool calls in progress) + if (items.some((c) => c.type === "tool_use")) return "running"; + // thinking only → running (model is reasoning, more entries will follow) + if (items.some((c) => c.type === "thinking") && !items.some((c) => c.type === "text")) return "running"; + // text only (no tool_use) → done (final answer) + if (items.some((c) => c.type === "text")) return "done"; + return "running"; + } + + if (msg.role === "user") { + // tool_result → running (tool just executed, next turn coming) + if (items.some((c) => c.type === "tool_result")) return "running"; + // Normal user message → running (new prompt) + return "running"; + } + + return null; +} + +/** Returns true if the entry is an assistant message containing a tool_use block */ +export function isToolUseEntry(entry: DroidEntry): boolean { + const msg = entry.message; + if (msg?.role !== "assistant") return false; + const content = msg.content; + if (!Array.isArray(content)) return false; + return content.some((c) => c.type === "tool_use"); +} + +function extractThreadName(entry: DroidEntry): string | undefined { + // Prefer sessionTitle from session_start + if (entry.type === "session_start" && entry.sessionTitle) { + return entry.sessionTitle.slice(0, 80); + } + + // Fall back to first user message text + const msg = entry.message; + if (msg?.role !== "user") return undefined; + + const content = msg.content; + let text: string | undefined; + + if (typeof content === "string") { + text = content; + } else if (Array.isArray(content)) { + text = content.find((c) => c.type === "text" && c.text)?.text; + } + + if (!text) return undefined; + // Skip system/internal messages + if (text.startsWith("<") || text.startsWith("{") || text.startsWith("[")) return undefined; + return text.slice(0, 80); +} + +function extractProjectDir(entry: DroidEntry): string | undefined { + if (entry.type === "session_start" && entry.cwd) return entry.cwd; + return undefined; +} + +/** + * Decode Droid's encoded project dir name back to a path. + * + * Droid encodes by replacing `/` with `-` but preserves `.` and `_`. + * The encoding is still ambiguous for paths containing literal hyphens. + * We try the naive decode first, then check if the directory exists. + */ +function decodeProjectDir(encoded: string): string { + const naive = encoded.replace(/-/g, "/"); + try { if (require("fs").statSync(naive).isDirectory()) return naive; } catch {} + return `__encoded__:${encoded}`; +} + +// --- Watcher implementation --- + +export class DroidAgentWatcher implements AgentWatcher { + readonly name = "droid"; + + private sessions = new Map(); + private fsWatchers: FSWatcher[] = []; + private pollTimer: ReturnType | null = null; + private ctx: AgentWatcherContext | null = null; + private sessionsDir: string; + private scanning = false; + private seeded = false; + private scanPromise: Promise | null = null; + + constructor() { + this.sessionsDir = join(homedir(), ".factory", "sessions"); + } + + start(ctx: AgentWatcherContext): void { + this.ctx = ctx; + this.setupWatchers(); + setTimeout(() => this.scan(), 50); + this.pollTimer = setInterval(() => this.scan(), POLL_MS); + } + + stop(): void { + for (const w of this.fsWatchers) { try { w.close(); } catch {} } + this.fsWatchers = []; + if (this.pollTimer) { clearInterval(this.pollTimer); this.pollTimer = null; } + this.ctx = null; + } + + /** Trigger an immediate scan and return when complete. + * If a scan is already in flight, waits for it then runs another. */ + async flush(): Promise { + if (this.scanPromise) await this.scanPromise; + await this.scan(); + } + + /** Emit a status change event if we have a valid session mapping */ + private emitStatus(threadId: string, state: SessionState): void { + if (!this.ctx || !this.seeded || !state.projectDir) return; + const session = this.ctx.resolveSession(state.projectDir); + if (!session) return; + this.ctx.emit({ + agent: "droid", + session, + status: state.status, + ts: Date.now(), + threadId, + threadName: state.threadName, + }); + } + + private async processFile(filePath: string, fallbackProjectDir: string): Promise { + if (!this.ctx) return; + + let size: number; + let mtimeMs: number; + try { const s = await stat(filePath); size = s.size; mtimeMs = s.mtimeMs; } catch { return; } + + const threadId = basename(filePath, ".jsonl"); + const prev = this.sessions.get(threadId); + + // --- File unchanged --- + if (prev && size === prev.fileSize) { + const now = Date.now(); + + // Promote tool_use "running" → "waiting" (permission prompt heuristic) + if (prev.status === "running" && prev.toolUseSeenAt && now - prev.toolUseSeenAt >= TOOL_USE_WAIT_MS) { + prev.status = "waiting"; + prev.toolUseSeenAt = undefined; + this.emitStatus(threadId, prev); + } + + // Stuck detection: no file growth while running/waiting → assume process died + if ((prev.status === "running" || prev.status === "waiting") && prev.lastGrowthAt && now - prev.lastGrowthAt >= STUCK_RUNNING_MS) { + prev.status = "stale"; + prev.toolUseSeenAt = undefined; + prev.lastGrowthAt = undefined; + this.emitStatus(threadId, prev); + } + + return; + } + + // --- Seed mode: read full file to capture current status --- + if (!this.seeded) { + let text: string; + try { + text = await Bun.file(filePath).text(); + } catch { return; } + + const lines = text.split("\n").filter(Boolean); + let latestStatus: AgentStatus = "idle"; + let threadName: string | undefined; + let projectDir: string | undefined; + let lastEntryIsToolUse = false; + + for (const line of lines) { + let entry: DroidEntry; + try { entry = JSON.parse(line); } catch { continue; } + + const dir = extractProjectDir(entry); + if (dir) projectDir = dir; + + const name = extractThreadName(entry); + if (name && !threadName) threadName = name; + + const s = determineStatus(entry); + if (s !== null) latestStatus = s; + lastEntryIsToolUse = isToolUseEntry(entry); + } + + this.sessions.set(threadId, { + status: latestStatus, fileSize: size, threadName, + projectDir: projectDir ?? fallbackProjectDir, + toolUseSeenAt: lastEntryIsToolUse && latestStatus === "running" ? mtimeMs : undefined, + lastGrowthAt: (latestStatus === "running" || latestStatus === "waiting") ? mtimeMs : undefined, + lastMtimeMs: mtimeMs, + }); + return; + } + + // --- Incremental read: only new bytes --- + const offset = prev?.fileSize ?? 0; + if (size <= offset) return; + + let text: string; + try { + const buf = await Bun.file(filePath).arrayBuffer(); + text = new TextDecoder().decode(new Uint8Array(buf).subarray(offset, size)); + } catch { + return; + } + + const lines = text.split("\n").filter(Boolean); + let latestStatus: AgentStatus = prev?.status ?? "idle"; + let threadName = prev?.threadName; + let projectDir = prev?.projectDir; + let lastEntryIsToolUse = false; + + for (const line of lines) { + let entry: DroidEntry; + try { entry = JSON.parse(line); } catch { continue; } + + const dir = extractProjectDir(entry); + if (dir) projectDir = dir; + + if (!threadName) { + const name = extractThreadName(entry); + if (name) threadName = name; + } + + const s = determineStatus(entry); + if (s !== null) latestStatus = s; + lastEntryIsToolUse = isToolUseEntry(entry); + } + + const prevStatus = prev?.status; + const prevThreadName = prev?.threadName; + const now = Date.now(); + const toolUseSeenAt = lastEntryIsToolUse && latestStatus === "running" ? now : undefined; + this.sessions.set(threadId, { + status: latestStatus, fileSize: size, threadName, + projectDir: projectDir ?? fallbackProjectDir, + toolUseSeenAt, lastGrowthAt: now, + }); + + if (latestStatus !== prevStatus || threadName !== prevThreadName) { + this.emitStatus(threadId, this.sessions.get(threadId)!); + } + } + + private async scan(): Promise { + if (this.scanning || !this.ctx) return; + this.scanning = true; + + const p = this.scanInternal(); + this.scanPromise = p; + await p; + this.scanPromise = null; + } + + private async scanInternal(): Promise { + try { + let dirs: string[]; + try { dirs = await readdir(this.sessionsDir); } catch { return; } + const now = Date.now(); + + for (const dir of dirs) { + const dirPath = join(this.sessionsDir, dir); + try { if (!(await stat(dirPath)).isDirectory()) continue; } catch { continue; } + + const fallbackProjectDir = decodeProjectDir(dir); + + let files: string[]; + try { files = await readdir(dirPath); } catch { continue; } + + for (const file of files) { + if (!file.endsWith(".jsonl")) continue; + const filePath = join(dirPath, file); + let fileStat; + try { fileStat = await stat(filePath); } catch { continue; } + if (now - fileStat.mtimeMs > STALE_MS) continue; + await this.processFile(filePath, fallbackProjectDir); + } + } + } finally { + if (!this.seeded) { + this.seeded = true; + for (const [threadId, state] of this.sessions) { + if (state.status === "idle" || !state.projectDir) continue; + const session = this.ctx?.resolveSession(state.projectDir); + if (!session) continue; + this.ctx?.emit({ + agent: "droid", + session, + status: state.status, + ts: state.lastMtimeMs ?? Date.now(), + threadId, + threadName: state.threadName, + }); + } + } + this.scanning = false; + } + } + + private setupWatchers(): void { + let dirs: string[]; + try { dirs = require("fs").readdirSync(this.sessionsDir); } catch { return; } + + for (const dir of dirs) { + const dirPath = join(this.sessionsDir, dir); + try { if (!require("fs").statSync(dirPath).isDirectory()) continue; } catch { continue; } + + const fallbackProjectDir = decodeProjectDir(dir); + try { + const w = watch(dirPath, (_eventType, filename) => { + if (!filename?.endsWith(".jsonl")) return; + this.processFile(join(dirPath, filename), fallbackProjectDir); + }); + this.fsWatchers.push(w); + } catch {} + } + + // Watch sessions dir for new project directories + try { + const w = watch(this.sessionsDir, (eventType, filename) => { + if (eventType !== "rename" || !filename) return; + const dirPath = join(this.sessionsDir, filename); + try { if (!require("fs").statSync(dirPath).isDirectory()) return; } catch { return; } + + const fallbackProjectDir = decodeProjectDir(filename); + try { + const sub = watch(dirPath, (_et, fn) => { + if (!fn?.endsWith(".jsonl")) return; + this.processFile(join(dirPath, fn), fallbackProjectDir); + }); + this.fsWatchers.push(sub); + } catch {} + }); + this.fsWatchers.push(w); + } catch {} + } +} diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index d9fb0de..cc53528 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -24,6 +24,7 @@ export { AgentTracker } from "./agents/tracker"; export { AmpAgentWatcher } from "./agents/watchers/amp"; export { ClaudeCodeAgentWatcher } from "./agents/watchers/claude-code"; export { CodexAgentWatcher } from "./agents/watchers/codex"; +export { DroidAgentWatcher } from "./agents/watchers/droid"; export { OpenCodeAgentWatcher } from "./agents/watchers/opencode"; export { PiAgentWatcher } from "./agents/watchers/pi"; export { MuxRegistry } from "./mux/registry"; diff --git a/packages/runtime/src/server/index.ts b/packages/runtime/src/server/index.ts index 7718c8d..ab34c9c 100644 --- a/packages/runtime/src/server/index.ts +++ b/packages/runtime/src/server/index.ts @@ -1495,6 +1495,7 @@ export function startServer(mux: MuxProvider, extraProviders?: MuxProvider[], wa amp: ["amp"], "claude-code": ["claude"], codex: ["codex"], + droid: ["droid", "factory"], opencode: ["opencode"], }; diff --git a/packages/runtime/test/droid-watcher.test.ts b/packages/runtime/test/droid-watcher.test.ts new file mode 100644 index 0000000..94978b0 --- /dev/null +++ b/packages/runtime/test/droid-watcher.test.ts @@ -0,0 +1,220 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test"; +import { appendFileSync, mkdirSync, rmSync, writeFileSync } from "fs"; +import { tmpdir } from "os"; +import { join } from "path"; +import { DroidAgentWatcher, determineStatus } from "../src/agents/watchers/droid"; +import type { AgentEvent } from "../src/contracts/agent"; +import type { AgentWatcherContext } from "../src/contracts/agent-watcher"; + +describe("Droid determineStatus", () => { + test("returns running for user text messages", () => { + expect(determineStatus({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "Fix the bug" }] }, + })).toBe("running"); + }); + + test("returns running for user tool_result messages", () => { + expect(determineStatus({ + type: "message", + message: { role: "user", content: [{ type: "tool_result" }] }, + })).toBe("running"); + }); + + test("returns running for assistant tool_use messages", () => { + expect(determineStatus({ + type: "message", + message: { role: "assistant", content: [{ type: "text" }, { type: "tool_use" }] }, + })).toBe("running"); + }); + + test("returns running for assistant thinking-only messages", () => { + expect(determineStatus({ + type: "message", + message: { role: "assistant", content: [{ type: "thinking" }] }, + })).toBe("running"); + }); + + test("returns done for assistant text-only messages", () => { + expect(determineStatus({ + type: "message", + message: { role: "assistant", content: [{ type: "text", text: "Here is the fix." }] }, + })).toBe("done"); + }); + + test("returns done for session_end entries", () => { + expect(determineStatus({ type: "session_end" })).toBe("done"); + }); + + test("returns null for session_start entries", () => { + expect(determineStatus({ type: "session_start", sessionTitle: "test" })).toBeNull(); + }); + + test("returns null for todo_state entries", () => { + expect(determineStatus({ type: "todo_state" })).toBeNull(); + }); + + test("returns null for messages without role", () => { + expect(determineStatus({ type: "message", message: {} })).toBeNull(); + }); +}); + +describe("DroidAgentWatcher", () => { + let tmpDir: string; + let watcher: DroidAgentWatcher; + let events: AgentEvent[]; + let ctx: AgentWatcherContext; + let sessionFile: string; + + beforeEach(() => { + tmpDir = join(tmpdir(), `droid-watcher-test-${Date.now()}`); + const projectDir = join(tmpDir, "sessions", "-projects-myapp"); + mkdirSync(projectDir, { recursive: true }); + + sessionFile = join(projectDir, "abc12345-1234-1234-1234-123456789abc.jsonl"); + writeFileSync(sessionFile, + JSON.stringify({ + type: "session_start", + id: "abc12345-1234-1234-1234-123456789abc", + sessionTitle: "Fix the watcher", + cwd: "/projects/myapp", + version: 2, + }) + "\n" + + JSON.stringify({ + type: "message", + id: "msg-user-1", + timestamp: "2026-04-15T12:00:01.000Z", + message: { + role: "user", + content: [{ type: "text", text: "Fix the watcher status mapping" }], + }, + }) + "\n", + ); + + events = []; + ctx = { + resolveSession: (dir) => dir === "/projects/myapp" ? "myapp-session" : null, + emit: (event) => events.push(event), + }; + + watcher = new DroidAgentWatcher(); + (watcher as any).sessionsDir = join(tmpDir, "sessions"); + }); + + afterEach(() => { + watcher.stop(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + test("seed scan emits events for non-idle sessions", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(events).toHaveLength(1); + expect(events[0]!.agent).toBe("droid"); + expect(events[0]!.session).toBe("myapp-session"); + expect(events[0]!.status).toBe("running"); + expect(events[0]!.threadId).toBe("abc12345-1234-1234-1234-123456789abc"); + expect(events[0]!.threadName).toBe("Fix the watcher"); + }); + + test("emits running when assistant uses tools", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + const seedCount = events.length; + + appendFileSync(sessionFile, + JSON.stringify({ + type: "message", + id: "msg-assistant-1", + timestamp: "2026-04-15T12:00:05.000Z", + message: { + role: "assistant", + content: [{ type: "thinking" }, { type: "tool_use" }], + }, + }) + "\n", + ); + + await new Promise((resolve) => setTimeout(resolve, 2500)); + + // Status stays running (tool_use), so no new event emitted (was already running) + const postSeed = events.slice(seedCount); + // Running → running doesn't emit, which is correct + expect(postSeed).toHaveLength(0); + }); + + test("emits done when session_end is written", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + const seedCount = events.length; + + appendFileSync(sessionFile, + JSON.stringify({ + type: "message", + id: "msg-assistant-1", + timestamp: "2026-04-15T12:00:05.000Z", + message: { + role: "assistant", + content: [{ type: "text", text: "All done." }], + }, + }) + "\n" + + JSON.stringify({ + type: "session_end", + timestamp: "2026-04-15T12:00:06.000Z", + durationMs: 5000, + toolCount: 3, + finalText: "All done.", + }) + "\n", + ); + + await new Promise((resolve) => setTimeout(resolve, 2500)); + + const postSeed = events.slice(seedCount); + expect(postSeed.length).toBeGreaterThanOrEqual(1); + const last = postSeed[postSeed.length - 1]!; + expect(last.agent).toBe("droid"); + expect(last.session).toBe("myapp-session"); + expect(last.status).toBe("done"); + }); + + test("emits done for assistant text-only response", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + const seedCount = events.length; + + appendFileSync(sessionFile, + JSON.stringify({ + type: "message", + id: "msg-assistant-1", + timestamp: "2026-04-15T12:00:05.000Z", + message: { + role: "assistant", + content: [{ type: "text", text: "Here is the result." }], + }, + }) + "\n", + ); + + await new Promise((resolve) => setTimeout(resolve, 2500)); + + const postSeed = events.slice(seedCount); + expect(postSeed.length).toBeGreaterThanOrEqual(1); + const last = postSeed[postSeed.length - 1]!; + expect(last.status).toBe("done"); + expect(last.threadName).toBe("Fix the watcher"); + }); + + test("extracts sessionTitle as threadName", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(events[0]!.threadName).toBe("Fix the watcher"); + }); + + test("uses cwd from session_start for project dir resolution", async () => { + watcher.start(ctx); + await new Promise((resolve) => setTimeout(resolve, 200)); + + // resolveSession was called with "/projects/myapp" (from cwd in session_start) + expect(events[0]!.session).toBe("myapp-session"); + }); +});