From 19d988c31169911712126c6d7a9c8c0b9405e17f Mon Sep 17 00:00:00 2001 From: minatoaquaMK2 Date: Thu, 30 Apr 2026 08:44:09 +0000 Subject: [PATCH 1/2] feat: add Devin and Pi agent watchers Signed-off-by: minatoaquaMK2 --- AGENTS.md | 8 +- CONTRACTS.md | 23 +- README.md | 4 +- apps/server/src/main.ts | 2 + docs/explanation/architecture.md | 2 +- docs/reference/configuration.md | 1 + packages/runtime/src/agents/watchers/devin.ts | 344 ++++++++++++++ .../runtime/src/contracts/agent-watcher.ts | 1 + packages/runtime/src/index.ts | 1 + packages/runtime/test/devin-watcher.test.ts | 439 ++++++++++++++++++ 10 files changed, 819 insertions(+), 6 deletions(-) create mode 100644 packages/runtime/src/agents/watchers/devin.ts create mode 100644 packages/runtime/test/devin-watcher.test.ts diff --git a/AGENTS.md b/AGENTS.md index a016216..2f6c9a2 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -26,7 +26,9 @@ opensessions/ │ │ │ │ ├── amp.ts │ │ │ │ ├── claude-code.ts │ │ │ │ ├── codex.ts -│ │ │ │ └── opencode.ts +│ │ │ │ ├── devin.ts +│ │ │ │ ├── opencode.ts +│ │ │ │ └── pi.ts │ │ │ ├── mux/ # Mux registry and detection helpers │ │ │ ├── server/ # WebSocket server internals and launcher │ │ │ ├── shared.ts # Shared types, constants, palette @@ -47,7 +49,7 @@ opensessions/ ## Key Architecture Decisions 1. **Monorepo**: Turborepo + Bun workspaces, with `apps/` for runnable entrypoints and `packages/` for reusable libraries. -2. **Built-in agent watchers**: Core ships with `AmpAgentWatcher`, `ClaudeCodeAgentWatcher`, `CodexAgentWatcher`, and `OpenCodeAgentWatcher` that watch agent data directories directly. External agents integrate via the `AgentWatcher` plugin interface. +2. **Built-in agent watchers**: Core ships with `AmpAgentWatcher`, `ClaudeCodeAgentWatcher`, `CodexAgentWatcher`, `DevinAgentWatcher`, `OpenCodeAgentWatcher`, and `PiAgentWatcher` that watch agent data directories directly. External agents integrate via the `AgentWatcher` plugin interface. 3. **Mux-agnostic**: `MuxProvider` interface abstracts all mux operations. `TmuxProvider` is the reference implementation. 4. **MuxProvider is SYNC**: All methods use `Bun.spawnSync` — matches the existing pattern and keeps the server simple. 5. **Auto-detect mux**: `detectMux()` checks `$TMUX`, `$ZELLIJ_SESSION_NAME` env vars. Config file override planned. @@ -99,7 +101,7 @@ interface AgentWatcher { - **Sync mux calls**: MuxProvider methods are synchronous. Don't make them async. - **Preserve optimizations**: Batched tmux calls, 5s git cache with HEAD watchers, lightweight focus-only broadcasts. - **Sidebar resize work**: Before changing sidebar spawning, width sync, tmux resize handling, or `sidebar-coordinator`, read `docs/explanation/sidebar-behavior.md` and preserve those invariants unless you update the doc in the same change. -- **Built-in watchers in runtime**: Amp, Claude Code, Codex, and OpenCode have built-in watchers in `packages/runtime/src/agents/watchers/`. Community agents use the `AgentWatcher` plugin interface. +- **Built-in watchers in runtime**: Amp, Claude Code, Codex, Devin, OpenCode, and Pi have built-in watchers in `packages/runtime/src/agents/watchers/`. Community agents use the `AgentWatcher` plugin interface. - **OpenTUI Solid**: JSX needs `bunfig.toml` preload and `jsxImportSource: "@opentui/solid"` in tsconfig. Build needs `solidPlugin`. - **Never call `process.exit()` directly in TUI**: Use `renderer.destroy()`. diff --git a/CONTRACTS.md b/CONTRACTS.md index 3b5caac..f1824fe 100644 --- a/CONTRACTS.md +++ b/CONTRACTS.md @@ -6,7 +6,7 @@ For end-user setup, start with the docs linked from [README.md](./README.md). Fo ## Built-In Watchers -opensessions currently registers four built-in watchers at server startup. +opensessions currently registers six built-in watchers at server startup. ### Amp @@ -32,6 +32,20 @@ opensessions currently registers four built-in watchers at server startup. - Resolves mux sessions from `turn_context.cwd` inside the transcript. - Treats `user_message`, tool activity, and assistant `commentary` as `running`, assistant `final_answer` and `task_complete` as `done`, and `turn_aborted` as `interrupted`. +### Devin + +- Polls `~/.local/share/devin/cli/sessions.db` or `$DEVIN_CLI_DB_PATH`. +- Uses `bun:sqlite` in read-only mode. +- Polls every 3 seconds. +- Skips sessions whose `last_activity_at` is older than 5 minutes (timestamps are stored in seconds). +- Resolves mux sessions from the Devin session row's `working_directory` field. +- Derives status from the head node referenced by `main_chain_id` in the `message_nodes` tree: + - `role=assistant` + `finish_reason=stop` → `done` + - `role=assistant` + `finish_reason=tool_calls` → `running` + - `role=user` / `role=tool` / streaming assistant → `running` + - `role=system` with content beginning `[Response interrupted by user]` → `interrupted` +- Promotes `running` → `stale` when `last_activity_at` does not advance for 15 seconds (assumed process death). + ### OpenCode - Polls `~/.local/share/opencode/opencode.db` or `$OPENCODE_DB_PATH`. @@ -39,6 +53,13 @@ opensessions currently registers four built-in watchers at server startup. - Polls every 3 seconds. - Resolves mux sessions from the OpenCode session row's `directory` field. +### Pi + +- Watches `~/.pi/agent/sessions//_.jsonl`. +- Uses recursive `fs.watch` plus a 2 second polling pass. +- Skips stale transcript files older than 5 minutes. +- Resolves mux sessions from the `cwd` recorded on the `session` header entry. + ## Agent Model ### `AgentStatus` diff --git a/README.md b/README.md index bcb392a..b2c0faa 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ Then remove the `set -g @plugin 'Ataraxy-Labs/opensessions'` line from `~/.tmux. ## Today -- Live agent state across sessions for Amp, Claude Code, Codex, and OpenCode. +- Live agent state across sessions for Amp, Claude Code, Codex, Devin, OpenCode, and Pi. - Per-thread unseen markers for `done`, `error`, and `interrupted` states. - Session context in the UI: branch in the list, working directory in the detail panel, thread names, and detected localhost ports. - Programmatic metadata API: agents and scripts push status, progress, and logs to the sidebar via HTTP. @@ -144,7 +144,9 @@ For the full tmux workflow with keybindings, troubleshooting, and configuration - Amp watcher reads `~/.local/share/amp/threads/*.json` and clears unseen state from Amp's `session.json` when a thread becomes seen there. - Claude Code watcher reads JSONL transcripts in `~/.claude/projects/`. - Codex watcher reads transcript JSONL files in `~/.codex/sessions/` or `$CODEX_HOME/sessions/` and resolves sessions from `turn_context.cwd`. +- Devin watcher polls the SQLite database in `~/.local/share/devin/cli/sessions.db` (override with `DEVIN_CLI_DB_PATH`) and resolves sessions from each row's `working_directory`. - OpenCode watcher polls the SQLite database in `~/.local/share/opencode/opencode.db`. +- Pi watcher reads JSONL transcripts in `~/.pi/agent/sessions/`. - Hidden sidebars are stashed in a tmux session named `_os_stash`, so they can come back without restarting the sidebar process. - Clicking a detected port opens `http://localhost:`. diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 065e434..2df4320 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -2,6 +2,7 @@ import { AmpAgentWatcher, ClaudeCodeAgentWatcher, CodexAgentWatcher, + DevinAgentWatcher, OpenCodeAgentWatcher, PiAgentWatcher, PluginLoader, @@ -66,6 +67,7 @@ if (extraProviders.length > 0) { loader.registerWatcher(new AmpAgentWatcher()); loader.registerWatcher(new ClaudeCodeAgentWatcher()); loader.registerWatcher(new CodexAgentWatcher()); +loader.registerWatcher(new DevinAgentWatcher()); loader.registerWatcher(new OpenCodeAgentWatcher()); loader.registerWatcher(new PiAgentWatcher()); diff --git a/docs/explanation/architecture.md b/docs/explanation/architecture.md index 398cc88..fd23102 100644 --- a/docs/explanation/architecture.md +++ b/docs/explanation/architecture.md @@ -19,7 +19,7 @@ If no healthy server is listening on `127.0.0.1:7391`, `ensureServer()` launches 2. dynamically registers the built-in mux providers from `@opensessions/mux-tmux` and `@opensessions/mux-zellij` 3. loads local plugins and configured package plugins 4. resolves the primary mux provider -5. registers the built-in Amp, Claude Code, Codex, and OpenCode watchers +5. registers the built-in Amp, Claude Code, Codex, Devin, OpenCode, and Pi watchers 6. starts the WebSocket and HTTP control server ## State Assembly diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 998db90..be88cd4 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -132,6 +132,7 @@ All other tmux options fall back to the defaults shown in the table above. | Variable | Used by | Notes | | --- | --- | --- | +| `DEVIN_CLI_DB_PATH` | Devin watcher | Overrides the default Devin CLI SQLite path | | `OPENCODE_DB_PATH` | OpenCode watcher | Overrides the default SQLite path | | `OPENSESSIONS_DIR` | tmux helper scripts and server | Helps helper scripts find the repo checkout | | `OPENSESSIONS_HOST` | helper shell scripts | Script-level override only; the app runtime still uses `127.0.0.1` | diff --git a/packages/runtime/src/agents/watchers/devin.ts b/packages/runtime/src/agents/watchers/devin.ts new file mode 100644 index 0000000..af6c2ab --- /dev/null +++ b/packages/runtime/src/agents/watchers/devin.ts @@ -0,0 +1,344 @@ +/** + * Devin agent watcher + * + * Polls the Devin CLI SQLite database (~/.local/share/devin/cli/sessions.db) + * to determine agent status and emits events mapped to mux sessions + * via the `working_directory` field on each Devin session row. + * + * All queries use bun:sqlite in readonly mode. + * + * ## Devin CLI SQLite Schema (observed v2026.4.29-0) + * + * ### Tables used + * - `sessions` — one row per Devin CLI session + * - `id` (TEXT PRIMARY KEY) — slug like "jelly-zucchini" or UUID + * - `working_directory`, `title` + * - `main_chain_id` — INTEGER pointing at the head node of the active chain + * - `last_activity_at` (INTEGER, **seconds since epoch**) + * - `hidden` — INTEGER, 1 means user hid the session + * - other fields: `backend_type`, `model`, `agent_mode`, `created_at`, ... + * - `message_nodes` — tree-structured chat history + * - `(session_id, node_id)` UNIQUE, with `parent_node_id` for the tree + * - `chat_message` (JSON) + * + * Timestamps in this database are in **seconds**, unlike most other watchers + * which use millisecond timestamps. We always convert to ms when comparing + * against `Date.now()`. + * + * ### chat_message JSON shape + * ``` + * { + * role: "user" | "assistant" | "tool" | "system", + * content: string | Array<{ type, text }>, + * tool_calls?: [...], // only on assistant messages + * metadata?: { + * finish_reason?: "stop" | "tool_calls" | "error" | "length" | null, + * extensions?: { ... }, + * telemetry?: { source, operation } + * } + * } + * ``` + * + * ## Status Detection + * + * The watcher fetches the head node of each session via `main_chain_id` and + * derives status from the role + finish_reason combination: + * + * | head node | status | + * | ---------------------------------------------- | ------------- | + * | role=user | running | + * | role=tool | running | + * | role=assistant + finish_reason=tool_calls | running | + * | role=assistant + finish_reason=stop | done | + * | role=assistant + finish_reason=error | error | + * | role=assistant + finish_reason=length | done | + * | role=assistant (no finish_reason yet) | running | + * | role=system + content="[Response interrupted]" | interrupted | + * | role=system (system prompt) | idle | + * + * ### Lifecycle (observed) + * 1. `devin` boots → row in `sessions`, system prompt nodes appear + * 2. User submits prompt → `role=user` node appended + * 3. Streaming response → `role=assistant` nodes (finish_reason=null while streaming) + * 4. Tool call → `role=assistant` with finish_reason=tool_calls + `tool_calls` + * array; followed by one or more `role=tool` result nodes + * 5. Final answer → `role=assistant` with finish_reason=stop + * 6. Interrupt (Ctrl+C / Esc) → `role=system` node with content + * `"[Response interrupted by user]"`. Usually followed by another user + * prompt; when the chain ends here, the session is "interrupted". + * + * ### Stuck detection + * When status is "running" but `last_activity_at` hasn't advanced for + * STUCK_MS we promote to "stale" — the Devin process probably died. + */ + +import { existsSync } from "fs"; +import { homedir } from "os"; +import { join } from "path"; +import type { AgentStatus } from "../../contracts/agent"; +import type { AgentWatcher, AgentWatcherContext } from "../../contracts/agent-watcher"; + +// --- Types --- + +interface SessionRow { + id: string; + title: string | null; + working_directory: string; + main_chain_id: number | null; + last_activity_at: number; +} + +interface NodeRow { + chat_message: string; +} + +interface ChatMessage { + role?: string; + content?: string | Array<{ type?: string; text?: string }>; + tool_calls?: unknown[]; + metadata?: { + finish_reason?: string | null; + extensions?: Record; + }; +} + +const POLL_MS = 3000; +/** Sessions older than this (in seconds) are skipped during scans. */ +const STALE_SEC = 5 * 60; +/** How long a "running" session can go without activity before we assume the process died (ms). */ +const STUCK_MS = 15_000; + +const INTERRUPT_PATTERNS = [ + "[Response interrupted by user", + "[Response interrupted", +]; + +// --- Status detection --- + +/** + * Determine the agent status from the head chat message of a session. + * + * Exported for independent testing. + */ +export function determineStatus(msg: ChatMessage | null): AgentStatus { + if (!msg?.role) return "idle"; + + if (msg.role === "user") return "running"; + if (msg.role === "tool") return "running"; + + if (msg.role === "system") { + const text = extractText(msg.content); + if (text && INTERRUPT_PATTERNS.some((p) => text.startsWith(p))) return "interrupted"; + // Other system messages are system prompts / metadata — not user-visible activity + return "idle"; + } + + if (msg.role === "assistant") { + const text = extractText(msg.content); + if (text && INTERRUPT_PATTERNS.some((p) => text.startsWith(p))) return "interrupted"; + + const finish = msg.metadata?.finish_reason; + if (finish === "stop") return "done"; + if (finish === "tool_calls") return "running"; + if (finish === "error") return "error"; + if (finish === "length") return "done"; + // No finish_reason yet → streaming + return "running"; + } + + return "idle"; +} + +function extractText(content: ChatMessage["content"]): string | undefined { + if (typeof content === "string") return content; + if (Array.isArray(content)) { + for (const item of content) { + if (item?.type === "text" && typeof item.text === "string") return item.text; + } + } + return undefined; +} + +// --- Session snapshot --- + +interface SessionSnapshot { + status: AgentStatus; + title: string | null; + workingDirectory: string; + mainChainId: number | null; + /** last_activity_at in **seconds** as observed in the DB. */ + lastActivitySec: number; + /** ms timestamp when we last observed last_activity_at advance. For stuck detection. */ + lastGrowthAt: number; +} + +// --- Watcher implementation --- + +export class DevinAgentWatcher implements AgentWatcher { + readonly name = "devin"; + + private sessions = new Map(); + private pollTimer: ReturnType | null = null; + private ctx: AgentWatcherContext | null = null; + private db: any = null; + private dbPath: string; + private polling = false; + private seeded = false; + + constructor() { + this.dbPath = process.env.DEVIN_CLI_DB_PATH + ?? join(homedir(), ".local", "share", "devin", "cli", "sessions.db"); + } + + start(ctx: AgentWatcherContext): void { + this.ctx = ctx; + setTimeout(() => this.poll(), 50); + this.pollTimer = setInterval(() => this.poll(), POLL_MS); + } + + stop(): void { + if (this.pollTimer) { clearInterval(this.pollTimer); this.pollTimer = null; } + try { this.db?.close(); } catch {} + this.db = null; + this.ctx = null; + } + + /** Emit a status change event if we have a valid session mapping */ + private emitStatus(sessionId: string, snapshot: SessionSnapshot): boolean { + if (!this.ctx || !snapshot.workingDirectory || snapshot.status === "idle") return false; + + const session = this.ctx.resolveThreadOwner?.("devin", sessionId, snapshot.title ?? undefined)?.session + ?? this.ctx.resolveSession(snapshot.workingDirectory); + if (!session) return false; + + this.ctx.emit({ + agent: "devin", + session, + status: snapshot.status, + ts: Date.now(), + threadId: sessionId, + ...(snapshot.title && { threadName: snapshot.title }), + }); + return true; + } + + private openDb(): boolean { + if (this.db) return true; + if (!existsSync(this.dbPath)) return false; + try { + const { Database } = require("bun:sqlite"); + this.db = new Database(this.dbPath, { readonly: true }); + return true; + } catch { + return false; + } + } + + /** Fetch the head node and derive its status */ + private readSessionStatus(sessionId: string, mainChainId: number | null): AgentStatus { + if (mainChainId === null || mainChainId === undefined) return "idle"; + + let row: NodeRow | null = null; + try { + row = this.db.query( + `SELECT chat_message FROM message_nodes WHERE session_id = ? AND node_id = ?`, + ).get(sessionId, mainChainId); + } catch { + return "idle"; + } + if (!row) return "idle"; + + let msg: ChatMessage | null = null; + try { msg = JSON.parse(row.chat_message); } catch {} + return determineStatus(msg); + } + + private poll(): void { + if (!this.ctx || this.polling) return; + this.polling = true; + + try { + if (!this.openDb()) return; + + let rows: SessionRow[]; + const staleThresholdSec = Math.floor(Date.now() / 1000) - STALE_SEC; + try { + rows = this.db.query( + `SELECT id, title, working_directory, main_chain_id, last_activity_at + FROM sessions + WHERE hidden = 0 + AND last_activity_at > ? + ORDER BY last_activity_at DESC`, + ).all(staleThresholdSec); + } catch { + try { this.db.close(); } catch {} + this.db = null; + return; + } + + const now = Date.now(); + + // --- Seed: record current state, then emit non-idle sessions --- + if (!this.seeded) { + for (const row of rows) { + const status = this.readSessionStatus(row.id, row.main_chain_id); + this.sessions.set(row.id, { + status, + title: row.title, + workingDirectory: row.working_directory, + mainChainId: row.main_chain_id, + lastActivitySec: row.last_activity_at, + lastGrowthAt: now, + }); + } + this.seeded = true; + + for (const [sessionId, snapshot] of this.sessions) { + this.emitStatus(sessionId, snapshot); + } + return; + } + + // --- Incremental: detect changes via last_activity_at --- + for (const row of rows) { + const prev = this.sessions.get(row.id); + + if (prev && prev.lastActivitySec === row.last_activity_at) { + // Session unchanged — check for stuck detection + if (prev.status === "running" && now - prev.lastGrowthAt >= STUCK_MS) { + prev.status = "stale"; + this.emitStatus(row.id, prev); + } + continue; + } + + // Session changed — read current status + const status = this.readSessionStatus(row.id, row.main_chain_id); + const prevStatus = prev?.status; + const prevTitle = prev?.title; + + const snapshot: SessionSnapshot = { + status, + title: row.title, + workingDirectory: row.working_directory, + mainChainId: row.main_chain_id, + lastActivitySec: row.last_activity_at, + lastGrowthAt: now, + }; + this.sessions.set(row.id, snapshot); + + // Emit when: + // - existing session changed status or title + // - we just discovered a brand-new post-seed session in a non-idle state + // (an idle/system-prompt-only session is not user-visible activity) + const isNewActivity = !prev && status !== "idle"; + const isStateChange = prev && (status !== prevStatus || prevTitle !== row.title); + if (isNewActivity || isStateChange) { + this.emitStatus(row.id, snapshot); + } + } + } finally { + this.polling = false; + } + } +} diff --git a/packages/runtime/src/contracts/agent-watcher.ts b/packages/runtime/src/contracts/agent-watcher.ts index c8b03b6..dc951a4 100644 --- a/packages/runtime/src/contracts/agent-watcher.ts +++ b/packages/runtime/src/contracts/agent-watcher.ts @@ -27,6 +27,7 @@ export interface AgentWatcherContext { * - amp: watches ~/.local/share/amp/threads/*.json * - claude-code: watches ~/.claude/projects/ JSONL files * - codex: watches ~/.codex/sessions/ JSONL transcripts + * - devin: polls Devin CLI SQLite database * - opencode: polls OpenCode SQLite database * - pi: watches ~/.pi/agent/sessions/ JSONL transcripts * diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index d9fb0de..675b36f 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -24,6 +24,7 @@ export { AgentTracker } from "./agents/tracker"; export { AmpAgentWatcher } from "./agents/watchers/amp"; export { ClaudeCodeAgentWatcher } from "./agents/watchers/claude-code"; export { CodexAgentWatcher } from "./agents/watchers/codex"; +export { DevinAgentWatcher } from "./agents/watchers/devin"; export { OpenCodeAgentWatcher } from "./agents/watchers/opencode"; export { PiAgentWatcher } from "./agents/watchers/pi"; export { MuxRegistry } from "./mux/registry"; diff --git a/packages/runtime/test/devin-watcher.test.ts b/packages/runtime/test/devin-watcher.test.ts new file mode 100644 index 0000000..a487796 --- /dev/null +++ b/packages/runtime/test/devin-watcher.test.ts @@ -0,0 +1,439 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test"; +import { mkdirSync, rmSync } from "fs"; +import { join } from "path"; +import { tmpdir } from "os"; +import { Database } from "bun:sqlite"; +import { DevinAgentWatcher, determineStatus } from "../src/agents/watchers/devin"; +import type { AgentEvent } from "../src/contracts/agent"; +import type { AgentWatcherContext } from "../src/contracts/agent-watcher"; + +// --- determineStatus --- + +describe("Devin determineStatus", () => { + test("returns idle for null message", () => { + expect(determineStatus(null)).toBe("idle"); + }); + + test("returns idle for message with no role", () => { + expect(determineStatus({})).toBe("idle"); + }); + + test("user message → running", () => { + expect(determineStatus({ role: "user" })).toBe("running"); + }); + + test("tool message → running", () => { + expect(determineStatus({ role: "tool" })).toBe("running"); + }); + + test("assistant streaming (no finish_reason) → running", () => { + expect(determineStatus({ role: "assistant" })).toBe("running"); + }); + + test("assistant + finish_reason=tool_calls → running", () => { + expect(determineStatus({ + role: "assistant", + metadata: { finish_reason: "tool_calls" }, + })).toBe("running"); + }); + + test("assistant + finish_reason=stop → done", () => { + expect(determineStatus({ + role: "assistant", + metadata: { finish_reason: "stop" }, + })).toBe("done"); + }); + + test("assistant + finish_reason=error → error", () => { + expect(determineStatus({ + role: "assistant", + metadata: { finish_reason: "error" }, + })).toBe("error"); + }); + + test("assistant + finish_reason=length → done", () => { + expect(determineStatus({ + role: "assistant", + metadata: { finish_reason: "length" }, + })).toBe("done"); + }); + + test("system + interrupt content → interrupted", () => { + expect(determineStatus({ + role: "system", + content: "[Response interrupted by user]", + })).toBe("interrupted"); + }); + + test("system + system prompt content → idle (skip)", () => { + expect(determineStatus({ + role: "system", + content: "You are powered by Claude Opus 4.6...", + })).toBe("idle"); + }); + + test("assistant content with interrupt marker → interrupted", () => { + expect(determineStatus({ + role: "assistant", + content: "[Response interrupted by user]", + metadata: { finish_reason: null }, + })).toBe("interrupted"); + }); + + test("array content with interrupt text → interrupted", () => { + expect(determineStatus({ + role: "system", + content: [{ type: "text", text: "[Response interrupted by user]" }], + })).toBe("interrupted"); + }); + + test("unknown role → idle", () => { + expect(determineStatus({ role: "weird" })).toBe("idle"); + }); +}); + +// --- DevinAgentWatcher integration --- + +describe("DevinAgentWatcher", () => { + let tmpDir: string; + let dbPath: string; + let db: InstanceType; + let watcher: DevinAgentWatcher; + let events: AgentEvent[]; + let ctx: AgentWatcherContext; + + function createDb() { + db = new Database(dbPath); + db.run(`CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + working_directory TEXT NOT NULL, + backend_type TEXT NOT NULL DEFAULT '', + model TEXT NOT NULL DEFAULT '', + agent_mode TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + last_activity_at INTEGER NOT NULL, + title TEXT, + main_chain_id INTEGER, + hidden INTEGER NOT NULL DEFAULT 0 + )`); + db.run(`CREATE TABLE IF NOT EXISTS message_nodes ( + row_id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + node_id INTEGER NOT NULL, + parent_node_id INTEGER, + chat_message TEXT NOT NULL, + created_at INTEGER NOT NULL, + metadata TEXT, + UNIQUE(session_id, node_id) + )`); + } + + function insertSession( + id: string, + workingDir: string, + title: string | null, + mainChainId: number | null, + lastActivitySec = Math.floor(Date.now() / 1000), + hidden = 0, + ) { + db.run( + `INSERT OR REPLACE INTO sessions (id, working_directory, created_at, last_activity_at, title, main_chain_id, hidden) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + [id, workingDir, lastActivitySec - 10, lastActivitySec, title, mainChainId, hidden], + ); + } + + function insertNode(sessionId: string, nodeId: number, parent: number | null, chatMessage: object, createdAtSec = Math.floor(Date.now() / 1000)) { + db.run( + `INSERT OR REPLACE INTO message_nodes (session_id, node_id, parent_node_id, chat_message, created_at) + VALUES (?, ?, ?, ?, ?)`, + [sessionId, nodeId, parent, JSON.stringify(chatMessage), createdAtSec], + ); + } + + function bumpActivity(sessionId: string, lastActivitySec = Math.floor(Date.now() / 1000)) { + db.run(`UPDATE sessions SET last_activity_at = ? WHERE id = ?`, [lastActivitySec, sessionId]); + } + + beforeEach(() => { + tmpDir = join(tmpdir(), `devin-watcher-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); + mkdirSync(tmpDir, { recursive: true }); + dbPath = join(tmpDir, "sessions.db"); + createDb(); + events = []; + ctx = { + resolveSession: (dir) => dir === "/projects/myapp" ? "myapp-session" : null, + emit: (event) => events.push(event), + }; + watcher = new DevinAgentWatcher(); + (watcher as any).dbPath = dbPath; + }); + + afterEach(() => { + watcher.stop(); + try { db.close(); } catch {} + rmSync(tmpDir, { recursive: true, force: true }); + }); + + test("seed scan emits running for active session with assistant streaming", async () => { + insertSession("zesty-zebra", "/projects/myapp", "First task", 2); + insertNode("zesty-zebra", 1, null, { role: "user", content: "do thing" }); + insertNode("zesty-zebra", 2, 1, { role: "assistant", content: "" }); // streaming, no finish_reason + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(1); + expect(events[0]!.agent).toBe("devin"); + expect(events[0]!.session).toBe("myapp-session"); + expect(events[0]!.status).toBe("running"); + expect(events[0]!.threadName).toBe("First task"); + expect(events[0]!.threadId).toBe("zesty-zebra"); + }); + + test("seed scan emits done for completed session", async () => { + insertSession("done-deer", "/projects/myapp", "Completed task", 2); + insertNode("done-deer", 1, null, { role: "user", content: "do thing" }); + insertNode("done-deer", 2, 1, { role: "assistant", content: "ok", metadata: { finish_reason: "stop" } }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(1); + expect(events[0]!.status).toBe("done"); + }); + + test("seed scan emits interrupted for system interrupt", async () => { + insertSession("aborted-ape", "/projects/myapp", "Aborted task", 3); + insertNode("aborted-ape", 1, null, { role: "user", content: "do thing" }); + insertNode("aborted-ape", 2, 1, { role: "assistant", content: "starting" }); + insertNode("aborted-ape", 3, 2, { role: "system", content: "[Response interrupted by user]" }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(1); + expect(events[0]!.status).toBe("interrupted"); + }); + + test("skips session whose working_directory cannot be resolved", async () => { + insertSession("unmapped", "/some/random/path", "Stray", 1); + insertNode("unmapped", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(0); + }); + + test("skips hidden sessions", async () => { + insertSession("hidden-hippo", "/projects/myapp", "Hidden", 1, undefined, 1); + insertNode("hidden-hippo", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(0); + }); + + test("skips sessions older than STALE_SEC", async () => { + const ancient = Math.floor(Date.now() / 1000) - 6 * 60; // 6 minutes ago + insertSession("ancient-aardvark", "/projects/myapp", "Old", 1, ancient); + insertNode("ancient-aardvark", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(0); + }); + + test("emits status change when session transitions running → done", async () => { + const start = Math.floor(Date.now() / 1000); + insertSession("trans-tiger", "/projects/myapp", "Transition", 2, start); + insertNode("trans-tiger", 1, null, { role: "user", content: "..." }); + insertNode("trans-tiger", 2, 1, { role: "assistant", content: "" }); // streaming + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + const seedCount = events.length; + + // Now session completes — head node updated to finish_reason=stop + insertNode("trans-tiger", 2, 1, { + role: "assistant", + content: "done", + metadata: { finish_reason: "stop" }, + }); + bumpActivity("trans-tiger", start + 5); + + await new Promise((r) => setTimeout(r, 3500)); + + const post = events.slice(seedCount); + expect(post.length).toBeGreaterThanOrEqual(1); + expect(post[0]!.status).toBe("done"); + }); + + test("emits running through tool-use cycle (no spurious done)", async () => { + const start = Math.floor(Date.now() / 1000); + insertSession("tool-toad", "/projects/myapp", "Tool cycle", 2, start); + insertNode("tool-toad", 1, null, { role: "user", content: "fetch" }); + insertNode("tool-toad", 2, 1, { role: "assistant", content: "" }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + const seedCount = events.length; + + // Step 1: assistant calls tools (running → still running) + insertNode("tool-toad", 2, 1, { + role: "assistant", + content: "calling", + metadata: { finish_reason: "tool_calls" }, + }); + db.run(`UPDATE sessions SET main_chain_id = ?, last_activity_at = ? WHERE id = ?`, [2, start + 5, "tool-toad"]); + await new Promise((r) => setTimeout(r, 500)); + + // Step 2: tool result (running) + insertNode("tool-toad", 3, 2, { role: "tool", content: "ok" }); + db.run(`UPDATE sessions SET main_chain_id = ?, last_activity_at = ? WHERE id = ?`, [3, start + 10, "tool-toad"]); + await new Promise((r) => setTimeout(r, 3500)); + + const post = events.slice(seedCount); + const doneEvents = post.filter((e) => e.status === "done"); + expect(doneEvents.length).toBe(0); + }); + + test("promotes stuck running to stale when activity stops advancing", async () => { + insertSession("stuck-stork", "/projects/myapp", "Stuck", 1); + insertNode("stuck-stork", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + const seedCount = events.length; + + // Backdate lastGrowthAt to simulate process killed 16s ago + const snapshot = (watcher as any).sessions.get("stuck-stork"); + snapshot.lastGrowthAt = Date.now() - 16_000; + + await new Promise((r) => setTimeout(r, 3500)); + + const stale = events.slice(seedCount).filter((e) => e.status === "stale"); + expect(stale.length).toBeGreaterThanOrEqual(1); + }, 10_000); + + test("emits title update when title appears for the first time", async () => { + const start = Math.floor(Date.now() / 1000); + insertSession("untitled-ungulate", "/projects/myapp", null, 1, start); + insertNode("untitled-ungulate", 1, null, { role: "user", content: "hi" }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + events = []; + + // Advance last_activity_at by at least one second (DB column is seconds-precision) + db.run(`UPDATE sessions SET title = ?, last_activity_at = ? WHERE id = ?`, [ + "Generated title", + start + 5, + "untitled-ungulate", + ]); + + await new Promise((r) => setTimeout(r, 3500)); + + expect(events.length).toBeGreaterThanOrEqual(1); + expect(events[0]!.threadName).toBe("Generated title"); + expect(events[0]!.status).toBe("running"); + }); + + test("does not emit when nothing meaningful changed", async () => { + insertSession("steady-stoat", "/projects/myapp", "Steady", 1); + insertNode("steady-stoat", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + events = []; + + // Bump activity but keep status + title the same + bumpActivity("steady-stoat", Math.floor(Date.now() / 1000) + 1); + + await new Promise((r) => setTimeout(r, 3500)); + + expect(events.length).toBe(0); + }); + + test("emits for brand-new sessions appearing after seed", async () => { + // Seed empty — no sessions yet + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + expect(events.length).toBe(0); + + // New session appears post-seed (e.g. user just started `devin -p`) + insertSession("late-llama", "/projects/myapp", "Late session", 1); + insertNode("late-llama", 1, null, { role: "user", content: "..." }); + + await new Promise((r) => setTimeout(r, 3500)); + + // Brand-new post-seed sessions are user-visible activity that should + // appear in the sidebar, so the watcher emits on first detection + // (as long as the head-node status is non-idle). + expect(events.length).toBeGreaterThanOrEqual(1); + expect(events[0]!.threadId).toBe("late-llama"); + expect(events[0]!.status).toBe("running"); + }); + + test("does not emit for new post-seed sessions stuck on a system prompt (idle)", async () => { + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + expect(events.length).toBe(0); + + // Session whose head node is just a system prompt — not actual activity + insertSession("syslog-sloth", "/projects/myapp", "System only", 1); + insertNode("syslog-sloth", 1, null, { + role: "system", + content: "You are powered by ...", + }); + + await new Promise((r) => setTimeout(r, 3500)); + + expect(events.length).toBe(0); + }); + + test("recovers from DB errors by reopening", async () => { + const start = Math.floor(Date.now() / 1000); + insertSession("recover-rabbit", "/projects/myapp", "Recovery", 1, start); + insertNode("recover-rabbit", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + db.close(); + createDb(); + // Advance the timestamp on the recreated row so the watcher detects a change + insertSession("recover-rabbit", "/projects/myapp", "Recovery", 2, start + 5); + insertNode("recover-rabbit", 1, null, { role: "user", content: "..." }); + insertNode("recover-rabbit", 2, 1, { role: "assistant", content: "ok", metadata: { finish_reason: "stop" } }); + + (watcher as any).db = null; + + await new Promise((r) => setTimeout(r, 3500)); + + const doneEvents = events.filter((e) => e.status === "done"); + expect(doneEvents.length).toBeGreaterThanOrEqual(1); + }); + + test("handles missing main_chain_id gracefully (idle, no emit)", async () => { + insertSession("no-chain", "/projects/myapp", "No chain", null); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(0); + }); + + test("handles missing head node gracefully (idle, no emit)", async () => { + insertSession("phantom-chain", "/projects/myapp", "Phantom", 99); + // No corresponding node row for node_id=99 + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(0); + }); +}); From a140fb719ecb2f6d124a2e4b8ed1805299e92168 Mon Sep 17 00:00:00 2001 From: minatoaquaMK2 Date: Mon, 11 May 2026 05:44:17 +0000 Subject: [PATCH 2/2] fix: reset Devin watcher state on stop Signed-off-by: minatoaquaMK2 --- packages/runtime/src/agents/watchers/devin.ts | 3 +++ packages/runtime/test/devin-watcher.test.ts | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/packages/runtime/src/agents/watchers/devin.ts b/packages/runtime/src/agents/watchers/devin.ts index af6c2ab..858e8e6 100644 --- a/packages/runtime/src/agents/watchers/devin.ts +++ b/packages/runtime/src/agents/watchers/devin.ts @@ -201,6 +201,9 @@ export class DevinAgentWatcher implements AgentWatcher { try { this.db?.close(); } catch {} this.db = null; this.ctx = null; + this.polling = false; + this.seeded = false; + this.sessions.clear(); } /** Emit a status change event if we have a valid session mapping */ diff --git a/packages/runtime/test/devin-watcher.test.ts b/packages/runtime/test/devin-watcher.test.ts index a487796..a6c750b 100644 --- a/packages/runtime/test/devin-watcher.test.ts +++ b/packages/runtime/test/devin-watcher.test.ts @@ -378,6 +378,25 @@ describe("DevinAgentWatcher", () => { expect(events[0]!.status).toBe("running"); }); + test("reseeds and emits active sessions after stop/start restart", async () => { + insertSession("restart-raven", "/projects/myapp", "Restarted", 1); + insertNode("restart-raven", 1, null, { role: "user", content: "..." }); + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + expect(events.length).toBe(1); + + watcher.stop(); + events = []; + + watcher.start(ctx); + await new Promise((r) => setTimeout(r, 200)); + + expect(events.length).toBe(1); + expect(events[0]!.threadId).toBe("restart-raven"); + expect(events[0]!.status).toBe("running"); + }); + test("does not emit for new post-seed sessions stuck on a system prompt (idle)", async () => { watcher.start(ctx); await new Promise((r) => setTimeout(r, 200));