diff --git a/docs/acp-message-flow.md b/docs/acp-message-flow.md new file mode 100644 index 000000000..d5dd66c5f --- /dev/null +++ b/docs/acp-message-flow.md @@ -0,0 +1,208 @@ +# ACP Message Flow - Sequence Diagram + +## Complete Round-Trip Flow: User Message → OpenCode → UI Response + +```mermaid +sequenceDiagram + autonumber + participant UI as InputArea.tsx
(Renderer) + participant App as App.tsx
(Renderer) + participant IPC as IPC Layer
(Main) + participant Handler as process.ts
IPC Handler + participant PM as ProcessManager
(Main) + participant ACP_Proc as ACPProcess
(Main) + participant ACP_Client as ACPClient
(Main) + participant Adapter as ACP Adapter
(Main) + participant OpenCode as OpenCode Process
(External) + participant Terminal as Terminal.tsx
(Renderer) + + rect rgb(240, 248, 255) + Note over UI,Terminal: PHASE 1: User Sends Message + UI->>App: handleSubmit(message) + App->>App: setSessions({ state: 'busy' }) + App->>IPC: window.maestro.process.spawn({
prompt, useACP: true, acpShowStreaming }) + end + + rect rgb(255, 250, 240) + Note over IPC,Handler: PHASE 2: IPC Routing + IPC->>Handler: ipcMain.handle('process:spawn') + Handler->>Handler: Check agentConfigValues.useACP + Handler->>PM: processManager.spawn({
useACP: true, acpShowStreaming }) + end + + rect rgb(240, 255, 240) + Note over PM,ACP_Proc: PHASE 3: ACP Process Creation + PM->>ACP_Proc: new ACPProcess(config) + PM->>ACP_Proc: acpProcess.start() + PM->>PM: Wire event handlers:
acpProcess.on('data') + Note over PM: Event handler converts
ParsedEvent → string + end + + rect rgb(255, 240, 240) + Note over ACP_Proc,ACP_Client: PHASE 4: ACP Client Initialization + ACP_Proc->>ACP_Client: client.connect() + ACP_Client->>ACP_Client: spawn('opencode', ['acp']) + ACP_Client->>OpenCode: stdin: {"jsonrpc":"2.0","method":"initialize",...} + Note over ACP_Client: [ACP Transport]
OUTBOUND REQUEST
method: initialize + + OpenCode->>OpenCode: Start ACP server + OpenCode->>ACP_Client: stdout: {"jsonrpc":"2.0","result":{agentInfo,...}} + Note over ACP_Client: [ACP Transport]
INBOUND RESPONSE
initialized + + ACP_Client->>ACP_Proc: resolve(initResponse) + end + + rect rgb(240, 240, 255) + Note over ACP_Proc,OpenCode: PHASE 5: Session Creation + ACP_Proc->>ACP_Client: client.newSession(cwd) + ACP_Client->>OpenCode: stdin: {"jsonrpc":"2.0","method":"session/new",...} + Note over ACP_Client: [ACP Transport]
OUTBOUND REQUEST
method: session/new + + OpenCode->>OpenCode: Create session in
~/.local/share/opencode/storage/ + OpenCode->>ACP_Client: stdout: {"jsonrpc":"2.0","result":{sessionId:"..."}} + Note over ACP_Client: [ACP Transport]
INBOUND RESPONSE
sessionId returned + + ACP_Client->>ACP_Proc: resolve({ sessionId }) + ACP_Proc->>ACP_Proc: this.acpSessionId = sessionId + ACP_Proc->>Adapter: createSessionIdEvent(sessionId) + ACP_Proc->>PM: emit('data', {type:'init', sessionId}) + PM->>IPC: emit('session-id', sessionId) + IPC->>App: window.maestro.process.onSessionId() + end + + rect rgb(255, 240, 255) + Note over ACP_Proc,OpenCode: PHASE 6: Send Prompt + ACP_Proc->>ACP_Proc: Reset tracking:
streamedText = ''
emittedTextLength = 0 + ACP_Proc->>ACP_Client: client.prompt(sessionId, text) + ACP_Client->>OpenCode: stdin: {"jsonrpc":"2.0","method":"session/prompt",
params:{sessionId, messages:[{role:"user",...}]}} + Note over ACP_Client: [ACP Transport]
OUTBOUND REQUEST
method: session/prompt + end + + rect rgb(240, 255, 255) + Note over OpenCode,Terminal: PHASE 7: Streaming Response (Loop) + loop For each text chunk + OpenCode->>OpenCode: Generate response chunk + OpenCode->>ACP_Client: stdout: {"jsonrpc":"2.0","method":"session/update",
params:{sessionUpdate:"agent_message_chunk",
content:{type:"text",text:"chunk"}}} + Note over ACP_Client: [ACP Transport]
INBOUND NOTIFICATION
method: session/update + + ACP_Client->>ACP_Client: handleNotification() + ACP_Client->>ACP_Client: normalizeSessionUpdate() + Note over ACP_Client: Convert OpenCode format to ACP spec:
{sessionUpdate:"agent_message_chunk",...}
→ {agent_message_chunk:{content:...}} + + ACP_Client->>ACP_Proc: emit('session:update', sessionId, update) + ACP_Proc->>Adapter: acpUpdateToParseEvent(update) + Adapter->>Adapter: extractText(chunk.content) + Adapter->>ACP_Proc: {type:'text', text:'chunk', isPartial:true} + + ACP_Proc->>ACP_Proc: Accumulation & Deduplication:
streamedText += text
if (length > emittedTextLength) {
newText = substring(emittedTextLength)
emittedTextLength = length
emit delta
} + + ACP_Proc->>PM: emit('data', sessionId, {type:'text', text:deltaText}) + + PM->>PM: Event handler logic:
if (acpShowStreaming) {
emit('data', text)
}
if (isPartial) {
emit('thinking-chunk', text)
} + + alt Streaming Enabled + PM->>IPC: webContents.send('process:data', sessionId, deltaText) + IPC->>App: window.maestro.process.onData(sessionId, data) + App->>App: batchedUpdater.appendLog(
sessionId, tabId, true, data) + App->>App: setSessions: append to aiTabs[].logs[] + App->>Terminal: React re-render with new log entry + Terminal->>Terminal: Display chunk to user + end + end + end + + rect rgb(255, 245, 230) + Note over OpenCode,Terminal: PHASE 8: Completion + OpenCode->>OpenCode: Response complete + OpenCode->>ACP_Client: stdout: {"jsonrpc":"2.0","id":3,
result:{stopReason:"end_turn"}} + Note over ACP_Client: [ACP Transport]
INBOUND RESPONSE
prompt completed + + ACP_Client->>ACP_Proc: resolve({ stopReason: 'end_turn' }) + ACP_Proc->>Adapter: createResultEvent(sessionId, streamedText, stopReason) + ACP_Proc->>PM: emit('data', sessionId, {type:'result', text:streamedText}) + + alt Streaming Disabled + PM->>IPC: webContents.send('process:data', sessionId, fullText) + IPC->>App: window.maestro.process.onData(sessionId, fullText) + App->>App: batchedUpdater.appendLog(
sessionId, tabId, true, fullText) + App->>App: setSessions: append to aiTabs[].logs[] + App->>Terminal: React re-render with complete response + end + + ACP_Proc->>PM: emit('exit', sessionId, 0) + PM->>IPC: webContents.send('process:exit', sessionId, 0) + IPC->>App: window.maestro.process.onExit(sessionId, 0) + App->>App: setSessions({ state: 'idle' }) + end + + rect rgb(245, 245, 245) + Note over UI,Terminal: PHASE 9: Follow-up Message (Reuses Session) + UI->>App: handleSubmit(nextMessage) + App->>App: setSessions({ state: 'busy' }) + App->>IPC: window.maestro.process.write(sessionId, nextMessage) + IPC->>PM: processManager.write(sessionId, data) + PM->>ACP_Proc: acpProcess.write(data) + ACP_Proc->>ACP_Proc: Reset tracking:
streamedText = ''
emittedTextLength = 0 + ACP_Proc->>ACP_Client: client.prompt(acpSessionId, nextMessage) + Note over ACP_Client,OpenCode: Repeat PHASE 6-8 + end +``` + +## Key Components + +### 1. **Deduplication Logic** (ACP Process) +```typescript +// Track what we've accumulated vs emitted +streamedText += event.text; // Accumulate ALL +if (currentLength > emittedTextLength) { + newText = streamedText.substring(emittedTextLength); // Extract delta + emittedTextLength = currentLength; // Update tracker + emit('data', deltaEvent); // Emit only new portion +} +``` + +### 2. **Streaming Control** (Process Manager) +```typescript +if (event.type === 'text' && acpShowStreaming) { + emit('data', sid, event.text); // Stream to UI +} +if (event.type === 'result' && !acpShowStreaming) { + emit('data', sid, event.text); // Final text only +} +``` + +### 3. **Transport Layer Logging** +All JSON-RPC messages logged with `[ACP Transport]` category: +- **OUTBOUND REQUEST**: `initialize`, `session/new`, `session/prompt` +- **INBOUND RESPONSE**: Method responses with results +- **INBOUND NOTIFICATION**: `session/update` events +- **OUTBOUND RESPONSE**: Responses to OpenCode's requests + +### 4. **Session Persistence** +- Each `session/new` creates persistent session in OpenCode's storage +- Follow-up messages reuse same `sessionId` +- Session contains full conversation history +- Can be resumed later with `session/load` + +### 5. **UI State Management** +- **Busy State**: Set when message sent, cleared on exit +- **Logs Array**: Accumulated in `aiTabs[].logs[]` +- **Batched Updates**: Multiple chunks batched for performance +- **Tab Isolation**: Each tab has own `agentSessionId` + +## Config Flags + +| Flag | Default | Effect | +|------|---------|--------| +| `useACP` | `false` | Enable ACP protocol (vs JSON stdout) | +| `acpShowStreaming` | `false` | Show chunks as they arrive (vs final only) | + +## Debug Logging Categories + +| Category | Content | +|----------|---------| +| `[ACP Transport]` | All JSON-RPC messages in/out | +| `[ACPClient]` | Connection, session lifecycle | +| `[ACPProcess]` | Process orchestration | +| `[ACPAdapter]` | Event conversion | +| `[ProcessManager]` | Process management | diff --git a/package-lock.json b/package-lock.json index 117d8bb59..44e810d7a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "maestro", - "version": "0.11.2", + "version": "0.12.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "maestro", - "version": "0.11.2", + "version": "0.12.0", "hasInstallScript": true, "license": "AGPL 3.0", "dependencies": { diff --git a/package.json b/package.json index be766f886..584811906 100644 --- a/package.json +++ b/package.json @@ -17,8 +17,10 @@ }, "scripts": { "dev": "concurrently \"npm run dev:main\" \"npm run dev:renderer\"", + "dev:debug": "concurrently \"npm run dev:main:debug\" \"npm run dev:renderer\"", "dev:demo": "MAESTRO_DEMO_DIR=/tmp/maestro-demo npm run dev", "dev:main": "tsc -p tsconfig.main.json && NODE_ENV=development electron .", + "dev:main:debug": "tsc -p tsconfig.main.json && MAESTRO_LOG_LEVEL=debug NODE_ENV=development electron .", "dev:renderer": "vite", "dev:web": "vite --config vite.config.web.mts", "build": "npm run build:main && npm run build:renderer && npm run build:web && npm run build:cli", diff --git a/src/__tests__/integration/acp-opencode.integration.test.ts b/src/__tests__/integration/acp-opencode.integration.test.ts new file mode 100644 index 000000000..1ea3e1468 --- /dev/null +++ b/src/__tests__/integration/acp-opencode.integration.test.ts @@ -0,0 +1,497 @@ +/** + * ACP OpenCode Integration Tests + * + * Tests for ACP (Agent Client Protocol) communication with OpenCode. + * These tests verify that Maestro can communicate with OpenCode via ACP + * instead of the custom JSON format. + */ + +import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest'; +import { ACPClient } from '../../main/acp/acp-client'; +import { acpUpdateToParseEvent, createSessionIdEvent } from '../../main/acp/acp-adapter'; +import type { SessionUpdate } from '../../main/acp/types'; +import { execSync } from 'child_process'; +import * as path from 'path'; +import * as os from 'os'; + +// Test timeout for ACP operations +const ACP_TIMEOUT = 30000; + +// Check if OpenCode is available +function isOpenCodeAvailable(): boolean { + try { + execSync('which opencode', { encoding: 'utf-8' }); + return true; + } catch { + return false; + } +} + +// Check if integration tests should run +const SHOULD_RUN = process.env.RUN_INTEGRATION_TESTS === 'true' && isOpenCodeAvailable(); + +describe.skipIf(!SHOULD_RUN)('ACP OpenCode Integration Tests', () => { + const TEST_CWD = os.tmpdir(); + + describe('ACPClient connection', () => { + it('should connect to OpenCode via ACP and initialize', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + clientInfo: { + name: 'maestro-test', + version: '0.0.1', + }, + }); + + try { + const response = await client.connect(); + + expect(response.protocolVersion).toBeGreaterThanOrEqual(1); + expect(client.getIsConnected()).toBe(true); + expect(client.getAgentInfo()).toBeDefined(); + + console.log(`✅ Connected to: ${client.getAgentInfo()?.name} v${client.getAgentInfo()?.version}`); + console.log(`📋 Protocol version: ${response.protocolVersion}`); + console.log(`🔧 Capabilities:`, response.agentCapabilities); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should create a new session', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + expect(session.sessionId).toBeDefined(); + expect(typeof session.sessionId).toBe('string'); + expect(session.sessionId.length).toBeGreaterThan(0); + + console.log(`✅ Created session: ${session.sessionId}`); + if (session.modes) { + console.log(`📋 Available modes: ${session.modes.availableModes.map((m) => m.name).join(', ')}`); + console.log(`📋 Current mode: ${session.modes.currentModeId}`); + } + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should send a prompt and receive streaming updates', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + const updates: SessionUpdate[] = []; + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Listen for updates + client.on('session:update', (sessionId, update) => { + console.log(`📥 Update (${sessionId}):`, JSON.stringify(update).substring(0, 200)); + updates.push(update); + }); + + // Auto-approve permission requests in YOLO mode + client.on('session:permission_request', (request, respond) => { + console.log(`🔐 Permission request: ${request.toolCall.title}`); + // Find the "allow" option and select it + const allowOption = request.options.find( + (o) => o.kind === 'allow_once' || o.kind === 'allow_always' + ); + if (allowOption) { + respond({ outcome: { selected: { optionId: allowOption.optionId } } }); + } else { + respond({ outcome: { cancelled: {} } }); + } + }); + + console.log(`🚀 Sending prompt to session ${session.sessionId}...`); + const response = await client.prompt(session.sessionId, 'Say "hello" and nothing else.'); + + expect(response.stopReason).toBeDefined(); + console.log(`✅ Stop reason: ${response.stopReason}`); + console.log(`📊 Received ${updates.length} updates`); + + // Check we received some text updates + const textUpdates = updates.filter( + (u) => 'agent_message_chunk' in u || 'agent_thought_chunk' in u + ); + expect(textUpdates.length).toBeGreaterThan(0); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + }); + + describe('ACP to ParsedEvent adapter', () => { + it('should convert agent_message_chunk to text event', () => { + const update: SessionUpdate = { + agent_message_chunk: { + content: { + text: { text: 'Hello, world!' }, + }, + }, + }; + + const event = acpUpdateToParseEvent('test-session', update); + + expect(event).toBeDefined(); + expect(event?.type).toBe('text'); + expect(event?.text).toBe('Hello, world!'); + expect(event?.isPartial).toBe(true); + }); + + it('should convert agent_thought_chunk to thinking event', () => { + const update: SessionUpdate = { + agent_thought_chunk: { + content: { + text: { text: 'Let me think about this...' }, + }, + }, + }; + + const event = acpUpdateToParseEvent('test-session', update); + + expect(event).toBeDefined(); + expect(event?.type).toBe('text'); // Mapped to 'text' type since ParsedEvent doesn't have 'thinking' + expect(event?.text).toBe('[thinking] Let me think about this...'); + }); + + it('should convert tool_call to tool_use event', () => { + const update: SessionUpdate = { + tool_call: { + toolCallId: 'tc-123', + title: 'read_file', + kind: 'read', + status: 'in_progress', + rawInput: { path: '/tmp/test.txt' }, + }, + }; + + const event = acpUpdateToParseEvent('test-session', update); + + expect(event).toBeDefined(); + expect(event?.type).toBe('tool_use'); + expect(event?.toolName).toBe('read_file'); + expect((event?.toolState as any)?.id).toBe('tc-123'); + expect((event?.toolState as any)?.status).toBe('running'); + }); + + it('should convert tool_call_update with output', () => { + const update: SessionUpdate = { + tool_call_update: { + toolCallId: 'tc-123', + status: 'completed', + rawOutput: { content: 'file contents here' }, + }, + }; + + const event = acpUpdateToParseEvent('test-session', update); + + expect(event).toBeDefined(); + expect(event?.type).toBe('tool_use'); + expect((event?.toolState as any)?.status).toBe('completed'); + expect((event?.toolState as any)?.output).toEqual({ content: 'file contents here' }); + }); + + it('should create session_id event', () => { + const event = createSessionIdEvent('ses_abc123'); + + expect(event.type).toBe('init'); // Mapped to 'init' type since ParsedEvent doesn't have 'session_id' + expect(event.sessionId).toBe('ses_abc123'); + }); + }); + + // ============================================================================ + // ACP Provider Tests - These replace the legacy provider integration tests + // ============================================================================ + describe('ACP Provider Tests (replacing legacy format)', () => { + it('should send initial message and receive session ID via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Verify session ID format + expect(session.sessionId).toBeDefined(); + expect(session.sessionId).toMatch(/^ses_[a-zA-Z0-9]+$/); + + console.log(`✅ ACP Session ID: ${session.sessionId}`); + + // Send a prompt + const response = await client.prompt(session.sessionId, 'Say "hello" and nothing else.'); + + expect(response.stopReason).toBe('end_turn'); + console.log(`✅ Response received with stop reason: ${response.stopReason}`); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should resume session with follow-up message via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + let collectedText = ''; + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Listen for text updates - handle both content formats: + // Format 1: { text: { text: '...' } } (ACP spec) + // Format 2: { type: 'text', text: '...' } (OpenCode actual) + client.on('session:update', (_sessionId, update) => { + if ('agent_message_chunk' in update) { + const content = update.agent_message_chunk.content as any; + if (content) { + // Handle both formats + if (content.text && typeof content.text === 'object' && 'text' in content.text) { + collectedText += content.text.text; + } else if (content.type === 'text' && typeof content.text === 'string') { + collectedText += content.text; + } + } + } + }); + + console.log(`🚀 Initial message to session ${session.sessionId}`); + await client.prompt(session.sessionId, 'Remember the number 42. Say only "Got it."'); + + // Clear text for next prompt + collectedText = ''; + + console.log(`🔄 Follow-up message to same session`); + const response = await client.prompt( + session.sessionId, + 'What number did I ask you to remember? Reply with just the number.' + ); + + expect(response.stopReason).toBe('end_turn'); + console.log(`💬 Response: ${collectedText}`); + + // The response should contain "42" + expect(collectedText).toContain('42'); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should stream text updates via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + const textChunks: string[] = []; + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Collect streaming text updates - handle both content formats + client.on('session:update', (_sessionId, update) => { + if ('agent_message_chunk' in update) { + const content = update.agent_message_chunk.content as any; + if (content) { + // Handle both formats + if (content.text && typeof content.text === 'object' && 'text' in content.text) { + textChunks.push(content.text.text); + } else if (content.type === 'text' && typeof content.text === 'string') { + textChunks.push(content.text); + } + } + } + }); + + await client.prompt(session.sessionId, 'Count from 1 to 5, one number per line.'); + + console.log(`📊 Received ${textChunks.length} text chunks`); + console.log(`📝 Combined text: ${textChunks.join('')}`); + + // Should have received multiple streaming chunks + expect(textChunks.length).toBeGreaterThan(0); + + // Combined text should have the numbers + const combinedText = textChunks.join(''); + expect(combinedText).toContain('1'); + expect(combinedText).toContain('5'); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should handle tool calls via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + const toolCalls: Array<{ name: string; status: string }> = []; + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Track tool calls + client.on('session:update', (_sessionId, update) => { + if ('tool_call' in update) { + toolCalls.push({ + name: update.tool_call.title, + status: update.tool_call.status || 'unknown', + }); + } + if ('tool_call_update' in update) { + const existing = toolCalls.find((t) => t.name === update.tool_call_update.title); + if (existing) { + existing.status = update.tool_call_update.status || 'unknown'; + } + } + }); + + // Auto-approve any tool permission requests + client.on('session:permission_request', (request, respond) => { + console.log(`🔐 Auto-approving: ${request.toolCall.title}`); + const allowOption = request.options.find( + (o: { kind: string; optionId: string }) => o.kind === 'allow_once' || o.kind === 'allow_always' + ); + if (allowOption) { + respond({ outcome: { selected: { optionId: allowOption.optionId } } }); + } else { + respond({ outcome: { cancelled: {} } }); + } + }); + + // Request something that might trigger tool use + await client.prompt(session.sessionId, 'What files are in the current directory? Use ls command.'); + + console.log(`🔧 Tool calls observed: ${toolCalls.length}`); + toolCalls.forEach((tc) => console.log(` - ${tc.name}: ${tc.status}`)); + + // Note: Tool usage depends on agent behavior, so we just verify the mechanism works + // The test passes if no errors occur + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should convert ACP updates to ParsedEvent format', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + const parsedEvents: Array<{ type: string; text?: string }> = []; + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + // Convert updates to ParsedEvent format + client.on('session:update', (sessionId, update) => { + const event = acpUpdateToParseEvent(sessionId, update); + if (event) { + parsedEvents.push({ type: event.type, text: event.text }); + } + }); + + await client.prompt(session.sessionId, 'Say "test" and nothing else.'); + + console.log(`📊 Parsed ${parsedEvents.length} events:`); + parsedEvents.forEach((e) => console.log(` - ${e.type}: ${e.text?.substring(0, 50) || '(no text)'}`)); + + // Should have text events + const textEvents = parsedEvents.filter((e) => e.type === 'text'); + expect(textEvents.length).toBeGreaterThan(0); + + // Should have received "test" somewhere + const allText = textEvents.map((e) => e.text).join(''); + expect(allText.toLowerCase()).toContain('test'); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should handle multiple sessions via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + try { + await client.connect(); + + // Create first session + const session1 = await client.newSession(TEST_CWD); + console.log(`📋 Session 1: ${session1.sessionId}`); + + // Create second session + const session2 = await client.newSession(TEST_CWD); + console.log(`📋 Session 2: ${session2.sessionId}`); + + // Sessions should have different IDs + expect(session1.sessionId).not.toBe(session2.sessionId); + + // Both should be valid session ID format + expect(session1.sessionId).toMatch(/^ses_/); + expect(session2.sessionId).toMatch(/^ses_/); + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + + it('should report available modes via ACP', async () => { + const client = new ACPClient({ + command: 'opencode', + args: ['acp'], + cwd: TEST_CWD, + }); + + try { + await client.connect(); + const session = await client.newSession(TEST_CWD); + + if (session.modes) { + console.log(`📋 Available modes: ${session.modes.availableModes.map((m) => m.name).join(', ')}`); + console.log(`📋 Current mode: ${session.modes.currentModeId}`); + + expect(session.modes.availableModes.length).toBeGreaterThan(0); + expect(session.modes.currentModeId).toBeDefined(); + + // OpenCode typically has 'build' and 'plan' modes + const modeNames = session.modes.availableModes.map((m) => m.name); + expect(modeNames).toContain('build'); + } else { + console.log(`⚠️ No modes reported by agent`); + } + } finally { + client.disconnect(); + } + }, ACP_TIMEOUT); + }); +}); diff --git a/src/__tests__/main/agent-capabilities.test.ts b/src/__tests__/main/agent-capabilities.test.ts index 75c1a532d..b869fc217 100644 --- a/src/__tests__/main/agent-capabilities.test.ts +++ b/src/__tests__/main/agent-capabilities.test.ts @@ -258,6 +258,7 @@ describe('agent-capabilities', () => { 'supportsModelSelection', 'requiresPromptToStart', 'supportsThinkingDisplay', + 'supportsACP', ]; const defaultKeys = Object.keys(DEFAULT_CAPABILITIES); diff --git a/src/main/acp/acp-adapter.ts b/src/main/acp/acp-adapter.ts new file mode 100644 index 000000000..b9ce13457 --- /dev/null +++ b/src/main/acp/acp-adapter.ts @@ -0,0 +1,248 @@ +/** + * ACP to ParsedEvent Adapter + * + * Converts ACP session updates to Maestro's internal ParsedEvent format, + * enabling seamless integration with existing UI components. + */ + +import type { ParsedEvent } from '../parsers/agent-output-parser'; +import type { + SessionUpdate, + SessionId, + ContentBlock, + ToolCallStatus, +} from './types'; +import { logger } from '../utils/logger'; + +const LOG_CONTEXT = '[ACPAdapter]'; + +/** + * Extract text from a ContentBlock + * Handles both ACP spec format and OpenCode's actual format: + * - ACP spec: { text: { text: '...' } } + * - OpenCode: { type: 'text', text: '...' } + */ +function extractText(block: ContentBlock): string { + const content = block as any; + + logger.debug('Extracting text from content block', LOG_CONTEXT, { content }); + + // OpenCode format: { type: 'text', text: '...' } + if (content.type === 'text' && typeof content.text === 'string') { + return content.text; + } + + // ACP spec format: { text: { text: '...' } } + if ('text' in content && content.text && typeof content.text === 'object' && 'text' in content.text) { + return content.text.text; + } + + // Simple text field + if ('text' in content && typeof content.text === 'string') { + return content.text; + } + + // Direct string content + if (typeof content === 'string') { + return content; + } + + if ('image' in content) { + return '[image]'; + } + if ('resource_link' in content) { + return `[resource: ${content.resource_link.name}]`; + } + if ('resource' in content) { + const res = content.resource.resource; + if ('text' in res) { + return res.text; + } + return '[binary resource]'; + } + + // Fallback: try to stringify + logger.warn('Unknown content block format', LOG_CONTEXT, { content }); + return typeof content === 'object' ? JSON.stringify(content) : String(content); +} + +/** + * Map ACP ToolCallStatus to Maestro status + */ +function mapToolStatus(status?: ToolCallStatus): string { + switch (status) { + case 'pending': + return 'pending'; + case 'in_progress': + return 'running'; + case 'completed': + return 'completed'; + case 'failed': + return 'error'; + default: + return 'pending'; + } +} + +/** + * Convert an ACP SessionUpdate to a Maestro ParsedEvent + */ +export function acpUpdateToParseEvent( + sessionId: SessionId, + update: SessionUpdate +): ParsedEvent | null { + logger.debug('Converting ACP update to ParsedEvent', LOG_CONTEXT, { + sessionId, + updateKeys: Object.keys(update), + update + }); + + // Agent message chunk (streaming text) + if ('agent_message_chunk' in update) { + const chunk = update.agent_message_chunk; + logger.debug('Processing agent_message_chunk', LOG_CONTEXT, { chunk }); + const text = extractText(chunk.content); + return { + type: 'text', + text, + isPartial: true, + sessionId, + raw: update, + }; + } + + // Agent thought chunk (thinking/reasoning) - map to 'text' type with marker + if ('agent_thought_chunk' in update) { + const text = extractText(update.agent_thought_chunk.content); + return { + type: 'text', + text: `[thinking] ${text}`, + isPartial: true, + sessionId, + raw: update, + }; + } + + // User message chunk (echo of user input) + if ('user_message_chunk' in update) { + // Usually not displayed, but can be used for confirmation + return null; + } + + // Tool call started + if ('tool_call' in update) { + const tc = update.tool_call; + return { + type: 'tool_use', + toolName: tc.title, + toolState: { + id: tc.toolCallId, + input: tc.rawInput, + status: mapToolStatus(tc.status), + }, + sessionId, + raw: update, + }; + } + + // Tool call update + if ('tool_call_update' in update) { + const tc = update.tool_call_update; + return { + type: 'tool_use', + toolName: tc.title || '', + toolState: { + id: tc.toolCallId, + input: tc.rawInput, + output: tc.rawOutput, + status: mapToolStatus(tc.status), + }, + sessionId, + raw: update, + }; + } + + // Plan update - map to system message + if ('plan' in update) { + const entries = update.plan.entries.map((e) => `- [${e.status}] ${e.content}`).join('\n'); + return { + type: 'system', + text: `Plan:\n${entries}`, + sessionId, + raw: update, + }; + } + + // Available commands update + if ('available_commands_update' in update) { + // Map to slash commands for UI + return { + type: 'init', + slashCommands: update.available_commands_update.availableCommands.map((c) => c.name), + sessionId, + raw: update, + }; + } + + // Mode update + if ('current_mode_update' in update) { + // Could emit a mode change event + return null; + } + + return null; +} + +/** + * Create an init event from ACP session creation + */ +export function createSessionIdEvent(sessionId: SessionId): ParsedEvent { + return { + type: 'init', + sessionId, + raw: { type: 'session_created', sessionId }, + }; +} + +/** + * Create a result event from ACP prompt response + */ +export function createResultEvent( + sessionId: SessionId, + text: string, + _stopReason: string, + usage?: { inputTokens?: number; outputTokens?: number; totalTokens?: number } +): ParsedEvent { + // Convert ACP usage format to Maestro's ParsedEvent usage format + const eventUsage = usage ? { + inputTokens: usage.inputTokens || 0, + outputTokens: usage.outputTokens || 0, + // ACP doesn't provide cache tokens, so default to 0 + cacheReadTokens: 0, + cacheCreationTokens: 0, + // ACP doesn't provide cost, calculated separately based on model + costUsd: 0, + // Context window should be configured in agent settings + contextWindow: 0, + } : undefined; + + return { + type: 'result', + text, + sessionId, + usage: eventUsage, + raw: { type: 'prompt_response', stopReason: _stopReason, usage }, + }; +} + +/** + * Create an error event + */ +export function createErrorEvent(sessionId: SessionId, message: string): ParsedEvent { + return { + type: 'error', + text: message, + sessionId, + raw: { type: 'error', message }, + }; +} diff --git a/src/main/acp/acp-client.ts b/src/main/acp/acp-client.ts new file mode 100644 index 000000000..e341ce4d2 --- /dev/null +++ b/src/main/acp/acp-client.ts @@ -0,0 +1,595 @@ +/** + * ACP (Agent Client Protocol) Client Implementation + * + * A client for communicating with ACP-compatible agents like OpenCode. + * Uses JSON-RPC 2.0 over stdio to communicate with the agent process. + * + * @see https://agentclientprotocol.com/protocol/overview + */ + +import { spawn, ChildProcess } from 'child_process'; +import { EventEmitter } from 'events'; +import { createInterface, Interface } from 'readline'; +import { logger } from '../utils/logger'; +import type { + RequestId, + JsonRpcRequest, + JsonRpcResponse, + JsonRpcNotification, + Implementation, + ClientCapabilities, + AgentCapabilities, + InitializeRequest, + InitializeResponse, + SessionId, + NewSessionRequest, + NewSessionResponse, + LoadSessionRequest, + LoadSessionResponse, + PromptRequest, + PromptResponse, + ContentBlock, + SessionNotification, + SessionUpdate, + CancelNotification, + RequestPermissionRequest, + RequestPermissionResponse, + ReadTextFileRequest, + ReadTextFileResponse, + WriteTextFileRequest, + WriteTextFileResponse, + CreateTerminalRequest, + CreateTerminalResponse, + TerminalOutputRequest, + TerminalOutputResponse, +} from './types'; +import { CURRENT_PROTOCOL_VERSION } from './types'; + +const LOG_CONTEXT = '[ACPClient]'; + +/** + * Events emitted by the ACP client + */ +export interface ACPClientEvents { + /** Session update notification from agent */ + 'session:update': (sessionId: SessionId, update: SessionUpdate) => void; + /** Permission request from agent */ + 'session:permission_request': ( + request: RequestPermissionRequest, + respond: (response: RequestPermissionResponse) => void + ) => void; + /** File read request from agent */ + 'fs:read': ( + request: ReadTextFileRequest, + respond: (response: ReadTextFileResponse) => void + ) => void; + /** File write request from agent */ + 'fs:write': ( + request: WriteTextFileRequest, + respond: (response: WriteTextFileResponse) => void + ) => void; + /** Terminal create request from agent */ + 'terminal:create': ( + request: CreateTerminalRequest, + respond: (response: CreateTerminalResponse) => void + ) => void; + /** Terminal output request from agent */ + 'terminal:output': ( + request: TerminalOutputRequest, + respond: (response: TerminalOutputResponse) => void + ) => void; + /** Error occurred */ + error: (error: Error) => void; + /** Client disconnected */ + disconnected: () => void; +} + +/** + * Configuration for ACP client + */ +export interface ACPClientConfig { + /** Command to spawn the agent (e.g., 'opencode') */ + command: string; + /** Arguments for the agent command (e.g., ['acp']) */ + args: string[]; + /** Working directory for the agent process */ + cwd: string; + /** Environment variables for the agent process */ + env?: Record; + /** Client info to send during initialization */ + clientInfo?: Implementation; + /** Client capabilities */ + clientCapabilities?: ClientCapabilities; +} + +/** + * ACP Client for communicating with agents via JSON-RPC over stdio + */ +export class ACPClient extends EventEmitter { + private process: ChildProcess | null = null; + private readline: Interface | null = null; + private requestId = 0; + private pendingRequests = new Map< + RequestId, + { + resolve: (result: unknown) => void; + reject: (error: Error) => void; + method: string; + } + >(); + private config: ACPClientConfig; + private isConnected = false; + private agentCapabilities: AgentCapabilities | null = null; + private agentInfo: Implementation | null = null; + + constructor(config: ACPClientConfig) { + super(); + this.config = config; + } + + /** + * Get the underlying child process (for PID access) + */ + getProcess(): ChildProcess | null { + return this.process; + } + + /** + * Start the agent process and initialize the connection + */ + async connect(): Promise { + if (this.isConnected) { + throw new Error('Already connected'); + } + + const fullCommand = `${this.config.command} ${this.config.args.join(' ')}`; + logger.info(`Starting ACP agent: ${fullCommand}`, LOG_CONTEXT); + + // Build environment with extended PATH + // Electron doesn't inherit the user's shell PATH, so we need to add common paths + // where node, npm, and other tools are typically installed + const env = { ...process.env, ...this.config.env }; + const isWin = process.platform === 'win32'; + + if (isWin) { + const appData = process.env.APPDATA || ''; + const localAppData = process.env.LOCALAPPDATA || ''; + const programFiles = process.env.ProgramFiles || 'C:\\Program Files'; + const standardPaths = [ + `${appData}\\npm`, + `${localAppData}\\npm`, + `${programFiles}\\nodejs`, + `${programFiles}\\Git\\cmd`, + ].join(';'); + env.PATH = env.PATH ? `${standardPaths};${env.PATH}` : standardPaths; + } else { + // macOS/Linux: Add Homebrew, nvm, volta, fnm, and standard paths + const home = process.env.HOME || ''; + const standardPaths = [ + '/opt/homebrew/bin', // Homebrew on Apple Silicon + '/usr/local/bin', // Homebrew on Intel, many CLI tools + `${home}/.nvm/versions/node`, // nvm (we'll expand this below) + `${home}/.volta/bin`, // Volta + `${home}/.fnm`, // fnm + `${home}/.local/bin`, // pipx, etc. + '/usr/bin', + '/bin', + '/usr/sbin', + '/sbin', + ].join(':'); + + // Also try to detect the active nvm node version + const nvmDir = process.env.NVM_DIR || `${home}/.nvm`; + let nvmNodePath = ''; + try { + // Try to read the default version + const fs = require('fs'); + const path = require('path'); + const defaultVersionFile = path.join(nvmDir, 'alias', 'default'); + if (fs.existsSync(defaultVersionFile)) { + const version = fs.readFileSync(defaultVersionFile, 'utf8').trim(); + const nodePath = path.join(nvmDir, 'versions', 'node', `v${version.replace(/^v/, '')}`, 'bin'); + if (fs.existsSync(nodePath)) { + nvmNodePath = nodePath; + } + } + } catch { + // Ignore errors - nvm might not be installed + } + + const allPaths = nvmNodePath ? `${nvmNodePath}:${standardPaths}` : standardPaths; + env.PATH = env.PATH ? `${allPaths}:${env.PATH}` : allPaths; + } + + // Spawn the agent process + this.process = spawn(this.config.command, this.config.args, { + cwd: this.config.cwd, + env, + stdio: ['pipe', 'pipe', 'pipe'], + }); + + if (!this.process.stdout || !this.process.stdin) { + throw new Error('Failed to create agent process with stdio'); + } + + // Set up readline for line-by-line JSON-RPC parsing + this.readline = createInterface({ + input: this.process.stdout, + crlfDelay: Infinity, + }); + + this.readline.on('line', (line) => this.handleLine(line)); + + this.process.stderr?.on('data', (data) => { + logger.warn(`Agent stderr: ${data.toString()}`, LOG_CONTEXT); + }); + + this.process.on('close', (code) => { + logger.info(`Agent process exited with code ${code}`, LOG_CONTEXT); + this.handleDisconnect(); + }); + + this.process.on('error', (error) => { + logger.error(`Agent process error: ${error.message}`, LOG_CONTEXT); + this.emit('error', error); + }); + + // Send initialize request + const initRequest: InitializeRequest = { + protocolVersion: CURRENT_PROTOCOL_VERSION, + clientInfo: this.config.clientInfo || { + name: 'maestro', + version: '0.12.0', + title: 'Maestro', + }, + clientCapabilities: this.config.clientCapabilities || { + fs: { + readTextFile: true, + writeTextFile: true, + }, + terminal: true, + }, + }; + + const response = (await this.sendRequest('initialize', initRequest)) as InitializeResponse; + + this.agentCapabilities = response.agentCapabilities || null; + this.agentInfo = response.agentInfo || null; + this.isConnected = true; + + logger.info( + `Connected to agent: ${this.agentInfo?.name || 'unknown'} v${this.agentInfo?.version || '?'}`, + LOG_CONTEXT + ); + + return response; + } + + /** + * Disconnect from the agent + */ + disconnect(): void { + if (this.process) { + this.process.kill(); + this.process = null; + } + this.handleDisconnect(); + } + + /** + * Create a new session + */ + async newSession(cwd: string): Promise { + const request: NewSessionRequest = { + cwd, + mcpServers: [], // No MCP servers for now + }; + return (await this.sendRequest('session/new', request)) as NewSessionResponse; + } + + /** + * Load an existing session + */ + async loadSession(sessionId: SessionId, cwd: string): Promise { + if (!this.agentCapabilities?.loadSession) { + throw new Error('Agent does not support loading sessions'); + } + const request: LoadSessionRequest = { + sessionId, + cwd, + mcpServers: [], + }; + return (await this.sendRequest('session/load', request)) as LoadSessionResponse; + } + + /** + * Send a prompt to the agent + * + * Note: ACP ContentBlock format is { type: 'text', text: 'content' } + * not { text: { text: 'content' } } as the union type suggests + */ + async prompt(sessionId: SessionId, text: string): Promise { + // ACP uses a simpler content block format for text + const contentBlock = { + type: 'text', + text, + }; + const request: PromptRequest = { + sessionId, + prompt: [contentBlock as unknown as ContentBlock], + }; + return (await this.sendRequest('session/prompt', request)) as PromptResponse; + } + + /** + * Send a prompt with images + */ + async promptWithImages( + sessionId: SessionId, + text: string, + images: Array<{ data: string; mimeType: string }> + ): Promise { + const contentBlocks: ContentBlock[] = [{ text: { text } }]; + + for (const image of images) { + contentBlocks.push({ + image: { + data: image.data, + mimeType: image.mimeType, + }, + }); + } + + const request: PromptRequest = { + sessionId, + prompt: contentBlocks, + }; + return (await this.sendRequest('session/prompt', request)) as PromptResponse; + } + + /** + * Cancel ongoing operations for a session + */ + cancel(sessionId: SessionId): void { + const notification: CancelNotification = { sessionId }; + this.sendNotification('session/cancel', notification); + } + + /** + * Get agent capabilities + */ + getAgentCapabilities(): AgentCapabilities | null { + return this.agentCapabilities; + } + + /** + * Get agent info + */ + getAgentInfo(): Implementation | null { + return this.agentInfo; + } + + /** + * Check if connected + */ + getIsConnected(): boolean { + return this.isConnected; + } + + // ============================================================================ + // Private methods + // ============================================================================ + + private handleLine(line: string): void { + if (!line.trim()) return; + + try { + const message = JSON.parse(line); + + // Log all inbound messages at transport layer + if ('id' in message && message.id !== null) { + if ('result' in message || 'error' in message) { + // Response to our request + logger.debug('[INBOUND RESPONSE]', '[ACP Transport]', { id: message.id, hasResult: 'result' in message, hasError: 'error' in message, data: message }); + this.handleResponse(message as JsonRpcResponse); + } else { + // Request from the agent to us + logger.debug('[INBOUND REQUEST]', '[ACP Transport]', { method: message.method, id: message.id, data: message }); + this.handleAgentRequest(message as JsonRpcRequest); + } + } else if ('method' in message) { + // Notification + logger.debug('[INBOUND NOTIFICATION]', '[ACP Transport]', { method: message.method, data: message }); + this.handleNotification(message as JsonRpcNotification); + } + } catch (error) { + logger.error(`Failed to parse JSON-RPC message: ${line}`, LOG_CONTEXT); + } + } + + private handleResponse(response: JsonRpcResponse): void { + const pending = this.pendingRequests.get(response.id); + if (!pending) { + logger.warn(`Received response for unknown request: ${response.id}`, LOG_CONTEXT); + return; + } + + this.pendingRequests.delete(response.id); + + if (response.error) { + pending.reject(new Error(`${response.error.message} (code: ${response.error.code})`)); + } else { + pending.resolve(response.result); + } + } + + private handleNotification(notification: JsonRpcNotification): void { + switch (notification.method) { + case 'session/update': { + const params = notification.params as SessionNotification; + // OpenCode uses a slightly different format: { sessionUpdate: 'type', ...data } + // Convert to standard format if needed + const update = this.normalizeSessionUpdate(params.update || params); + this.emit('session:update', params.sessionId, update); + break; + } + default: + logger.debug(`Unhandled notification: ${notification.method}`, LOG_CONTEXT); + } + } + + /** + * Normalize session update format from OpenCode + * OpenCode sends: { sessionUpdate: 'agent_message_chunk', content: {...} } + * We need: { agent_message_chunk: { content: {...} } } + */ + private normalizeSessionUpdate(update: unknown): SessionUpdate { + const raw = update as Record; + + if ('sessionUpdate' in raw) { + const updateType = raw.sessionUpdate as string; + const { sessionUpdate, ...rest } = raw; + + // Convert to standard ACP format + return { [updateType]: rest } as unknown as SessionUpdate; + } + + return update as SessionUpdate; + } + + private handleAgentRequest(request: JsonRpcRequest): void { + const respond = (result: unknown) => { + this.sendResponse(request.id, result); + }; + + const respondError = (code: number, message: string) => { + this.sendErrorResponse(request.id, code, message); + }; + + switch (request.method) { + case 'session/request_permission': { + const params = request.params as RequestPermissionRequest; + this.emit('session:permission_request', params, (response: RequestPermissionResponse) => { + respond(response); + }); + break; + } + case 'fs/read_text_file': { + const params = request.params as ReadTextFileRequest; + this.emit('fs:read', params, (response: ReadTextFileResponse) => { + respond(response); + }); + break; + } + case 'fs/write_text_file': { + const params = request.params as WriteTextFileRequest; + this.emit('fs:write', params, (response: WriteTextFileResponse) => { + respond(response); + }); + break; + } + case 'terminal/create': { + const params = request.params as CreateTerminalRequest; + this.emit('terminal:create', params, (response: CreateTerminalResponse) => { + respond(response); + }); + break; + } + case 'terminal/output': { + const params = request.params as TerminalOutputRequest; + this.emit('terminal:output', params, (response: TerminalOutputResponse) => { + respond(response); + }); + break; + } + default: + logger.warn(`Unhandled agent request: ${request.method}`, LOG_CONTEXT); + respondError(-32601, `Method not found: ${request.method}`); + } + } + + private sendRequest(method: string, params: unknown): Promise { + return new Promise((resolve, reject) => { + const id = ++this.requestId; + const request: JsonRpcRequest = { + jsonrpc: '2.0', + id, + method, + params, + }; + + this.pendingRequests.set(id, { resolve, reject, method }); + + const line = JSON.stringify(request) + '\n'; + logger.debug(`Sending request: ${method} (id: ${id})`, LOG_CONTEXT); + logger.debug('[OUTBOUND REQUEST]', '[ACP Transport]', { method, id, data: request }); + + if (!this.process?.stdin?.writable) { + reject(new Error('Agent process is not writable')); + return; + } + + this.process.stdin.write(line); + }); + } + + private sendNotification(method: string, params: unknown): void { + const notification: JsonRpcNotification = { + jsonrpc: '2.0', + method, + params, + }; + + const line = JSON.stringify(notification) + '\n'; + logger.debug(`Sending notification: ${method}`, LOG_CONTEXT); + logger.debug('[OUTBOUND NOTIFICATION]', '[ACP Transport]', { method, data: notification }); + + if (this.process?.stdin?.writable) { + this.process.stdin.write(line); + } + } + + private sendResponse(id: RequestId, result: unknown): void { + const response: JsonRpcResponse = { + jsonrpc: '2.0', + id, + result, + }; + + const line = JSON.stringify(response) + '\n'; + logger.debug('[OUTBOUND RESPONSE]', '[ACP Transport]', { id, data: response }); + + if (this.process?.stdin?.writable) { + this.process.stdin.write(line); + } + } + + private sendErrorResponse(id: RequestId, code: number, message: string): void { + const response: JsonRpcResponse = { + jsonrpc: '2.0', + id, + error: { code, message }, + }; + + const line = JSON.stringify(response) + '\n'; + logger.debug('[OUTBOUND ERROR RESPONSE]', '[ACP Transport]', { id, code, message, data: response }); + + if (this.process?.stdin?.writable) { + this.process.stdin.write(line); + } + } + + private handleDisconnect(): void { + this.isConnected = false; + this.readline?.close(); + this.readline = null; + + // Reject all pending requests + for (const [id, pending] of this.pendingRequests) { + pending.reject(new Error('Connection closed')); + this.pendingRequests.delete(id); + } + + this.emit('disconnected'); + } +} diff --git a/src/main/acp/acp-process.ts b/src/main/acp/acp-process.ts new file mode 100644 index 000000000..4b5806674 --- /dev/null +++ b/src/main/acp/acp-process.ts @@ -0,0 +1,407 @@ +/** + * ACP Process Wrapper + * + * Wraps an ACPClient to provide the same interface as ProcessManager + * for ACP-enabled agents like OpenCode. + * + * This allows seamless switching between: + * - Standard mode: spawn process → parse stdout JSON + * - ACP mode: ACPClient → JSON-RPC over stdio + */ + +import { EventEmitter } from 'events'; +import { ACPClient, type ACPClientConfig } from './acp-client'; +import { + acpUpdateToParseEvent, + createSessionIdEvent, + createResultEvent, +} from './acp-adapter'; +import type { SessionUpdate, SessionId } from './types'; +import { logger } from '../utils/logger'; + +const LOG_CONTEXT = '[ACPProcess]'; + +export interface ACPProcessConfig { + /** Maestro session ID */ + sessionId: string; + /** Agent type (e.g., 'opencode') */ + toolType: string; + /** Working directory */ + cwd: string; + /** Command to run (e.g., 'opencode') */ + command: string; + /** Initial prompt to send */ + prompt?: string; + /** Images to include with prompt */ + images?: Array<{ data: string; mimeType: string }>; + /** ACP session ID for resume */ + acpSessionId?: string; + /** Custom environment variables */ + customEnvVars?: Record; + /** Context window size */ + contextWindow?: number; +} + +/** + * Represents an ACP-based agent process. + * Emits the same events as ProcessManager for compatibility: + * - 'data': parsed event data + * - 'exit': process exit + * - 'agent-error': agent errors + */ +export class ACPProcess extends EventEmitter { + private client: ACPClient; + private config: ACPProcessConfig; + private acpSessionId: SessionId | null = null; + private streamedText = ''; // Text accumulated during current prompt + private emittedTextLength = 0; // Track how much text we've emitted (for deduplication within a response) + private totalAccumulatedText = ''; // All text across all prompts (for cross-prompt deduplication) + private startTime: number; + private isLoadingSession = false; // Track if we're loading a session (to ignore historical messages) + + constructor(config: ACPProcessConfig) { + super(); + this.config = config; + this.startTime = Date.now(); + + // Create ACP client + const clientConfig: ACPClientConfig = { + command: config.command, + args: ['acp'], // ACP mode + cwd: config.cwd, + env: config.customEnvVars, + clientInfo: { + name: 'maestro', + version: '0.12.0', + title: 'Maestro', + }, + clientCapabilities: { + fs: { + readTextFile: true, + writeTextFile: true, + }, + terminal: true, + }, + }; + + this.client = new ACPClient(clientConfig); + + // Wire up ACP events + this.setupEventHandlers(); + } + + /** + * Get the PID of the spawned OpenCode process + */ + get pid(): number { + const process = this.client.getProcess(); + return process?.pid ?? -1; + } + + /** + * Get session info for compatibility + */ + getInfo(): { + sessionId: string; + toolType: string; + pid: number; + cwd: string; + isTerminal: boolean; + isBatchMode: boolean; + startTime: number; + command: string; + args: string[]; + } { + return { + sessionId: this.config.sessionId, + toolType: this.config.toolType, + pid: this.pid, + cwd: this.config.cwd, + isTerminal: false, + isBatchMode: true, + startTime: this.startTime, + command: this.config.command, + args: ['acp'], + }; + } + + /** + * Connect to ACP agent and run the initial prompt + */ + async start(): Promise<{ pid: number; success: boolean }> { + try { + logger.info(`Starting ACP process for ${this.config.toolType}`, LOG_CONTEXT, { + sessionId: this.config.sessionId, + command: this.config.command, + hasPrompt: !!this.config.prompt, + }); + + // Connect to the ACP agent + const initResponse = await this.client.connect(); + + logger.info(`ACP connected to ${initResponse.agentInfo?.name}`, LOG_CONTEXT, { + version: initResponse.agentInfo?.version, + protocolVersion: initResponse.protocolVersion, + }); + + // Create or load session + if (this.config.acpSessionId) { + // Resume existing session + // Set flag to ignore historical messages during session load + this.isLoadingSession = true; + await this.client.loadSession(this.config.acpSessionId, this.config.cwd); + this.acpSessionId = this.config.acpSessionId; + // Clear flag after session load completes + this.isLoadingSession = false; + logger.debug('Session loaded, ignoring historical messages received during load', LOG_CONTEXT); + } else { + // Create new session + const sessionResponse = await this.client.newSession(this.config.cwd); + this.acpSessionId = sessionResponse.sessionId; + } + + // Emit session_id event + const sessionIdEvent = createSessionIdEvent(this.acpSessionId); + this.emit('data', this.config.sessionId, sessionIdEvent); + + // If we have a prompt, send it + if (this.config.prompt) { + this.sendPrompt(this.config.prompt); + } + + return { pid: this.pid, success: true }; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error(`Failed to start ACP process: ${message}`, LOG_CONTEXT); + + this.emit('agent-error', this.config.sessionId, { + type: 'unknown', + message, + recoverable: false, + }); + this.emit('exit', this.config.sessionId, 1); + + return { pid: -1, success: false }; + } + } + + /** + * Send a prompt to the agent + */ + async sendPrompt(text: string): Promise { + if (!this.acpSessionId) { + logger.error('Cannot send prompt: no ACP session', LOG_CONTEXT); + return; + } + + // Clear streamed text for new prompt (but keep totalAccumulatedText for cross-prompt dedup) + this.streamedText = ''; + this.emittedTextLength = 0; // Reset emission tracker for new prompt + + try { + logger.debug(`Sending prompt to ACP agent`, LOG_CONTEXT, { + sessionId: this.config.sessionId, + promptLength: text.length, + }); + + let response; + if (this.config.images && this.config.images.length > 0) { + response = await this.client.promptWithImages( + this.acpSessionId, + text, + this.config.images + ); + } else { + response = await this.client.prompt(this.acpSessionId, text); + } + + logger.debug('Received prompt response from ACP agent', LOG_CONTEXT, { + stopReason: response.stopReason, + hasUsage: !!response.usage, + usage: response.usage, + }); + + // Workaround for OpenCode bug: Remove previous response if it's repeated + // OpenCode may send cumulative text (all previous messages + new message) + let finalText = this.streamedText; + if (this.totalAccumulatedText && finalText.startsWith(this.totalAccumulatedText)) { + // Agent repeated the previous response - extract only the new part + const newContent = finalText.substring(this.totalAccumulatedText.length); + logger.debug('Detected cumulative response, extracting delta', LOG_CONTEXT, { + previousLength: this.totalAccumulatedText.length, + totalLength: finalText.length, + deltaLength: newContent.length, + }); + finalText = newContent; + } + + // Update total accumulated text for next deduplication check + this.totalAccumulatedText += finalText; + + // Emit final result event to signal completion + // Include finalText (deduplicated) so ProcessManager can emit it if streaming was disabled + const resultEvent = createResultEvent( + this.config.sessionId, + finalText, // Use deduplicated text + response.stopReason, + response.usage // Include usage stats from response + ); + this.emit('data', this.config.sessionId, resultEvent); + + // If stop reason indicates completion, emit exit + if (response.stopReason === 'end_turn' || response.stopReason === 'cancelled') { + this.emit('exit', this.config.sessionId, 0); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error(`ACP prompt failed: ${message}`, LOG_CONTEXT); + + this.emit('agent-error', this.config.sessionId, { + type: 'unknown', + message, + recoverable: false, + }); + } + } + + /** + * Write data to the agent (for follow-up prompts) + */ + async write(data: string): Promise { + // In ACP mode, writing is sending a new prompt + await this.sendPrompt(data); + } + + /** + * Cancel ongoing operations + */ + cancel(): void { + if (this.acpSessionId) { + this.client.cancel(this.acpSessionId); + } + } + + /** + * Kill the ACP process + */ + kill(): void { + this.client.disconnect(); + this.emit('exit', this.config.sessionId, 0); + } + + /** + * Interrupt (same as cancel for ACP) + */ + interrupt(): void { + this.cancel(); + } + + /** + * Set up event handlers for ACP client + */ + private setupEventHandlers(): void { + // Handle session updates + this.client.on('session:update', (sessionId: SessionId, update: SessionUpdate) => { + // Ignore updates during session load - these are historical messages + if (this.isLoadingSession) { + logger.debug('Ignoring session update during session load (historical message)', LOG_CONTEXT, { + updateKeys: Object.keys(update) + }); + return; + } + + const event = acpUpdateToParseEvent(sessionId, update); + if (event) { + // Accumulate text for final result + if (event.type === 'text' && event.text) { + this.streamedText += event.text; + + // Deduplication: Check against totalAccumulatedText (cross-prompt) + within-prompt tracking + // Calculate the absolute position in the total accumulated text + const absolutePosition = this.totalAccumulatedText.length + this.emittedTextLength; + const totalCurrentLength = this.totalAccumulatedText.length + this.streamedText.length; + + if (totalCurrentLength > absolutePosition) { + // Extract only the new portion that hasn't been emitted yet + const fullText = this.totalAccumulatedText + this.streamedText; + const newText = fullText.substring(absolutePosition); + this.emittedTextLength = this.streamedText.length; // Update within-prompt tracker + + // Emit only the delta + const deltaEvent = { ...event, text: newText }; + this.emit('data', this.config.sessionId, deltaEvent); + } + // Skip emitting if we've already sent this text + } else { + // Non-text events - emit as-is + this.emit('data', this.config.sessionId, event); + } + } + }); + + // Handle permission requests (auto-approve in YOLO mode) + this.client.on('session:permission_request', (request, respond) => { + logger.debug(`ACP permission request: ${request.toolCall.title}`, LOG_CONTEXT); + + // Find allow option and auto-approve + const allowOption = request.options.find( + (o: { kind: string; optionId: string }) => o.kind === 'allow_once' || o.kind === 'allow_always' + ); + if (allowOption) { + respond({ outcome: { selected: { optionId: allowOption.optionId } } }); + } else { + respond({ outcome: { cancelled: {} } }); + } + }); + + // Handle file system read requests + this.client.on('fs:read', async (request, respond) => { + try { + const fs = await import('fs'); + const content = fs.readFileSync(request.path, 'utf-8'); + respond({ content }); + } catch { + logger.error(`Failed to read file: ${request.path}`, LOG_CONTEXT); + respond({ content: '' }); + } + }); + + // Handle file system write requests + this.client.on('fs:write', async (request, respond) => { + try { + const fs = await import('fs'); + fs.writeFileSync(request.path, request.content, 'utf-8'); + respond({}); + } catch { + logger.error(`Failed to write file: ${request.path}`, LOG_CONTEXT); + respond({}); + } + }); + + // Handle disconnection + this.client.on('disconnected', () => { + logger.info('ACP client disconnected', LOG_CONTEXT); + }); + + // Handle errors + this.client.on('error', (error) => { + logger.error(`ACP client error: ${error.message}`, LOG_CONTEXT); + this.emit('agent-error', this.config.sessionId, { + type: 'unknown', + message: error.message, + recoverable: false, + }); + }); + } +} + +/** + * Create and start an ACP process + */ +export async function spawnACPProcess( + config: ACPProcessConfig +): Promise<{ process: ACPProcess; pid: number; success: boolean }> { + const process = new ACPProcess(config); + const result = await process.start(); + return { process, ...result }; +} diff --git a/src/main/acp/index.ts b/src/main/acp/index.ts new file mode 100644 index 000000000..403fa3f7e --- /dev/null +++ b/src/main/acp/index.ts @@ -0,0 +1,19 @@ +/** + * ACP (Agent Client Protocol) Module + * + * Provides standardized communication with ACP-compatible agents like OpenCode. + * This enables Maestro to use a single protocol for all agents instead of + * custom parsers for each agent type. + * + * @see https://agentclientprotocol.com/ + */ + +export { ACPClient, type ACPClientConfig, type ACPClientEvents } from './acp-client'; +export * from './types'; +export { + acpUpdateToParseEvent, + createSessionIdEvent, + createResultEvent, + createErrorEvent, +} from './acp-adapter'; +export { ACPProcess, spawnACPProcess, type ACPProcessConfig } from './acp-process'; diff --git a/src/main/acp/types.ts b/src/main/acp/types.ts new file mode 100644 index 000000000..228bfff77 --- /dev/null +++ b/src/main/acp/types.ts @@ -0,0 +1,456 @@ +/** + * ACP (Agent Client Protocol) Type Definitions + * + * Based on the ACP specification at https://agentclientprotocol.com/protocol/schema + * These types define the JSON-RPC messages for communicating with ACP-compatible agents. + */ + +// ============================================================================ +// Core JSON-RPC Types +// ============================================================================ + +export type RequestId = string | number | null; + +export interface JsonRpcRequest { + jsonrpc: '2.0'; + id: RequestId; + method: string; + params?: unknown; +} + +export interface JsonRpcResponse { + jsonrpc: '2.0'; + id: RequestId; + result?: unknown; + error?: JsonRpcError; +} + +export interface JsonRpcNotification { + jsonrpc: '2.0'; + method: string; + params?: unknown; +} + +export interface JsonRpcError { + code: number; + message: string; + data?: unknown; +} + +// ============================================================================ +// Protocol Version +// ============================================================================ + +export type ProtocolVersion = number; +export const CURRENT_PROTOCOL_VERSION: ProtocolVersion = 1; + +// ============================================================================ +// Implementation Info +// ============================================================================ + +export interface Implementation { + name: string; + version: string; + title?: string; +} + +// ============================================================================ +// Capabilities +// ============================================================================ + +export interface ClientCapabilities { + fs?: { + readTextFile?: boolean; + writeTextFile?: boolean; + }; + terminal?: boolean; +} + +export interface AgentCapabilities { + loadSession?: boolean; + mcpCapabilities?: { + http?: boolean; + sse?: boolean; + }; + promptCapabilities?: { + audio?: boolean; + embeddedContext?: boolean; + image?: boolean; + }; + sessionCapabilities?: Record; +} + +// ============================================================================ +// Initialize +// ============================================================================ + +export interface InitializeRequest { + protocolVersion: ProtocolVersion; + clientInfo?: Implementation; + clientCapabilities?: ClientCapabilities; +} + +export interface InitializeResponse { + protocolVersion: ProtocolVersion; + agentInfo?: Implementation; + agentCapabilities?: AgentCapabilities; + authMethods?: AuthMethod[]; +} + +export interface AuthMethod { + id: string; + name: string; + description?: string; +} + +// ============================================================================ +// Session Management +// ============================================================================ + +export type SessionId = string; + +export interface McpServerStdio { + name: string; + command: string; + args: string[]; + env: EnvVariable[]; +} + +export interface EnvVariable { + name: string; + value: string; +} + +export type McpServer = { stdio: McpServerStdio }; + +export interface NewSessionRequest { + cwd: string; + mcpServers: McpServer[]; +} + +export interface NewSessionResponse { + sessionId: SessionId; + modes?: SessionModeState; +} + +export interface LoadSessionRequest { + sessionId: SessionId; + cwd: string; + mcpServers: McpServer[]; +} + +export interface LoadSessionResponse { + modes?: SessionModeState; +} + +export interface SessionModeState { + availableModes: SessionMode[]; + currentModeId: SessionModeId; +} + +export type SessionModeId = string; + +export interface SessionMode { + id: SessionModeId; + name: string; + description?: string; +} + +// ============================================================================ +// Content Blocks +// ============================================================================ + +export interface TextContent { + text: string; + annotations?: Annotations; +} + +export interface ImageContent { + data: string; + mimeType: string; + uri?: string; + annotations?: Annotations; +} + +export interface ResourceLink { + uri: string; + name: string; + mimeType?: string; + description?: string; + title?: string; + size?: number; + annotations?: Annotations; +} + +export interface EmbeddedResource { + resource: TextResourceContents | BlobResourceContents; + annotations?: Annotations; +} + +export interface TextResourceContents { + uri: string; + text: string; + mimeType?: string; +} + +export interface BlobResourceContents { + uri: string; + blob: string; + mimeType?: string; +} + +export interface Annotations { + audience?: Role[]; + lastModified?: string; + priority?: number; +} + +export type Role = 'assistant' | 'user'; + +export type ContentBlock = + | { text: TextContent } + | { image: ImageContent } + | { resource_link: ResourceLink } + | { resource: EmbeddedResource }; + +// ============================================================================ +// Prompt +// ============================================================================ + +export interface PromptRequest { + sessionId: SessionId; + prompt: ContentBlock[]; +} + +export type StopReason = + | 'end_turn' + | 'max_tokens' + | 'max_turn_requests' + | 'refusal' + | 'cancelled'; + +export interface PromptResponse { + stopReason: StopReason; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + }; +} + +// ============================================================================ +// Session Updates (Notifications) +// ============================================================================ + +export interface SessionNotification { + sessionId: SessionId; + update: SessionUpdate; +} + +export type SessionUpdate = + | { user_message_chunk: ContentChunk } + | { agent_message_chunk: ContentChunk } + | { agent_thought_chunk: ContentChunk } + | { tool_call: ToolCall } + | { tool_call_update: ToolCallUpdate } + | { plan: Plan } + | { available_commands_update: AvailableCommandsUpdate } + | { current_mode_update: CurrentModeUpdate }; + +export interface ContentChunk { + content: ContentBlock; +} + +// ============================================================================ +// Tool Calls +// ============================================================================ + +export type ToolCallId = string; + +export type ToolKind = + | 'read' + | 'edit' + | 'delete' + | 'move' + | 'search' + | 'execute' + | 'think' + | 'fetch' + | 'switch_mode' + | 'other'; + +export type ToolCallStatus = 'pending' | 'in_progress' | 'completed' | 'failed'; + +export interface ToolCall { + toolCallId: ToolCallId; + title: string; + kind?: ToolKind; + status?: ToolCallStatus; + rawInput?: unknown; + rawOutput?: unknown; + content?: ToolCallContent[]; + locations?: ToolCallLocation[]; +} + +export interface ToolCallUpdate { + toolCallId: ToolCallId; + title?: string; + kind?: ToolKind; + status?: ToolCallStatus; + rawInput?: unknown; + rawOutput?: unknown; + content?: ToolCallContent[]; + locations?: ToolCallLocation[]; +} + +export type ToolCallContent = + | { content: { content: ContentBlock } } + | { diff: Diff } + | { terminal: Terminal }; + +export interface Diff { + path: string; + oldText?: string; + newText: string; +} + +export interface Terminal { + terminalId: string; +} + +export interface ToolCallLocation { + path: string; + line?: number; +} + +// ============================================================================ +// Plan +// ============================================================================ + +export interface Plan { + entries: PlanEntry[]; +} + +export interface PlanEntry { + content: string; + priority: PlanEntryPriority; + status: PlanEntryStatus; +} + +export type PlanEntryPriority = 'high' | 'medium' | 'low'; +export type PlanEntryStatus = 'pending' | 'in_progress' | 'completed'; + +// ============================================================================ +// Commands +// ============================================================================ + +export interface AvailableCommandsUpdate { + availableCommands: AvailableCommand[]; +} + +export interface AvailableCommand { + name: string; + description: string; + input?: AvailableCommandInput; +} + +export interface AvailableCommandInput { + hint: string; +} + +export interface CurrentModeUpdate { + currentModeId: SessionModeId; +} + +// ============================================================================ +// Permission Request (Client → Agent response) +// ============================================================================ + +export interface RequestPermissionRequest { + sessionId: SessionId; + toolCall: ToolCallUpdate; + options: PermissionOption[]; +} + +export interface PermissionOption { + optionId: PermissionOptionId; + name: string; + kind: PermissionOptionKind; +} + +export type PermissionOptionId = string; +export type PermissionOptionKind = 'allow_once' | 'allow_always' | 'reject_once' | 'reject_always'; + +export interface RequestPermissionResponse { + outcome: RequestPermissionOutcome; +} + +export type RequestPermissionOutcome = + | { cancelled: Record } + | { selected: { optionId: PermissionOptionId } }; + +// ============================================================================ +// File System (Client methods) +// ============================================================================ + +export interface ReadTextFileRequest { + sessionId: SessionId; + path: string; + line?: number; + limit?: number; +} + +export interface ReadTextFileResponse { + content: string; +} + +export interface WriteTextFileRequest { + sessionId: SessionId; + path: string; + content: string; +} + +export interface WriteTextFileResponse { + // Empty response +} + +// ============================================================================ +// Terminal (Client methods) +// ============================================================================ + +export interface CreateTerminalRequest { + sessionId: SessionId; + command: string; + args?: string[]; + cwd?: string; + env?: EnvVariable[]; + outputByteLimit?: number; +} + +export interface CreateTerminalResponse { + terminalId: string; +} + +export interface TerminalOutputRequest { + sessionId: SessionId; + terminalId: string; +} + +export interface TerminalOutputResponse { + output: string; + truncated: boolean; + exitStatus?: TerminalExitStatus; +} + +export interface TerminalExitStatus { + exitCode?: number; + signal?: string; +} + +// ============================================================================ +// Cancel +// ============================================================================ + +export interface CancelNotification { + sessionId: SessionId; +} diff --git a/src/main/agent-capabilities.ts b/src/main/agent-capabilities.ts index 96a9acee8..8f36895d7 100644 --- a/src/main/agent-capabilities.ts +++ b/src/main/agent-capabilities.ts @@ -63,6 +63,9 @@ export interface AgentCapabilities { /** Agent emits streaming thinking/reasoning content that can be displayed */ supportsThinkingDisplay: boolean; + + /** Agent supports ACP (Agent Client Protocol) for standardized communication */ + supportsACP: boolean; } /** @@ -87,6 +90,7 @@ export const DEFAULT_CAPABILITIES: AgentCapabilities = { supportsModelSelection: false, supportsStreamJsonInput: false, supportsThinkingDisplay: false, + supportsACP: false, }; /** @@ -123,6 +127,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: false, // Model is configured via Anthropic account supportsStreamJsonInput: true, // --input-format stream-json for images via stdin supportsThinkingDisplay: true, // Emits streaming assistant messages + supportsACP: false, // ACP adapter available via @zed-industries/claude-code-acp but not native }, /** @@ -147,6 +152,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: false, supportsStreamJsonInput: false, supportsThinkingDisplay: false, // Terminal is not an AI agent + supportsACP: false, }, /** @@ -174,6 +180,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: true, // -m, --model flag - Documented supportsStreamJsonInput: false, // Uses -i, --image flag instead supportsThinkingDisplay: true, // Emits reasoning tokens (o3/o4-mini) + supportsACP: false, // ACP adapter available via Zed but not native }, /** @@ -200,6 +207,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: false, // Not yet investigated supportsStreamJsonInput: false, supportsThinkingDisplay: false, // Not yet investigated + supportsACP: false, // Not investigated }, /** @@ -226,6 +234,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: false, // Not yet investigated supportsStreamJsonInput: false, supportsThinkingDisplay: false, // Not yet investigated + supportsACP: false, // Not investigated }, /** @@ -253,6 +262,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: true, // --model flag supportsStreamJsonInput: false, supportsThinkingDisplay: false, // Not yet investigated + supportsACP: false, // Not investigated }, /** @@ -280,6 +290,7 @@ export const AGENT_CAPABILITIES: Record = { supportsModelSelection: true, // --model provider/model (e.g., 'ollama/qwen3:8b') - Verified supportsStreamJsonInput: false, // Uses -f, --file flag instead supportsThinkingDisplay: true, // Emits streaming text chunks + supportsACP: true, // Native ACP via `opencode acp` command - Verified }, }; diff --git a/src/main/agent-detector.ts b/src/main/agent-detector.ts index 48e7b1233..5aab7d085 100644 --- a/src/main/agent-detector.ts +++ b/src/main/agent-detector.ts @@ -153,6 +153,20 @@ const AGENT_DEFINITIONS: Omit) => { app.whenReady().then(async () => { // Load logger settings first - const logLevel = store.get('logLevel', 'info'); + // Environment variable takes precedence over settings (useful for development) + const envLogLevel = process.env.MAESTRO_LOG_LEVEL as 'debug' | 'info' | 'warn' | 'error' | undefined; + const logLevel = envLogLevel || store.get('logLevel', 'info'); logger.setLogLevel(logLevel); const maxLogBuffer = store.get('maxLogBuffer', 1000); logger.setMaxLogBuffer(maxLogBuffer); @@ -681,7 +683,8 @@ app.whenReady().then(async () => { logger.info('Maestro application starting', 'Startup', { version: app.getVersion(), platform: process.platform, - logLevel + logLevel, + ...(envLogLevel && { logLevelSource: 'env' }) }); // Initialize core services diff --git a/src/main/ipc/handlers/process.ts b/src/main/ipc/handlers/process.ts index 4ac13d4be..c28d5d5a5 100644 --- a/src/main/ipc/handlers/process.ts +++ b/src/main/ipc/handlers/process.ts @@ -173,25 +173,49 @@ export function registerProcessHandlers(deps: ProcessHandlerDependencies): void ? finalArgs[sessionArgIndex + 1] : config.agentSessionId; + // Get contextWindow: session-level override takes priority over agent-level config + // Falls back to the agent's configOptions default (e.g., 400000 for Codex, 128000 for OpenCode) + const contextWindow = getContextWindowValue(agent, agentConfigValues, config.sessionCustomContextWindow); + + // Check if ACP mode is enabled for this agent (currently only OpenCode supports it) + const supportsACP = agent?.capabilities?.supportsACP; + const useACPConfig = agentConfigValues.useACP; + const useACP = supportsACP && useACPConfig === true; + const acpShowStreaming = agentConfigValues.acpShowStreaming === true; + logger.debug('ACP mode check', LOG_CONTEXT, { + toolType: config.toolType, + supportsACP, + useACPConfig, + useACP, + acpShowStreaming, + agentConfigValues + }); + + // Build command string for logging (ACP vs CLI format) + const fullCommand = useACP && config.prompt + ? `${config.command} acp` + : `${config.command} ${finalArgs.join(' ')}`; + logger.info(`Spawning process: ${config.command}`, LOG_CONTEXT, { sessionId: config.sessionId, toolType: config.toolType, cwd: config.cwd, command: config.command, - fullCommand: `${config.command} ${finalArgs.join(' ')}`, - args: finalArgs, + fullCommand, + args: useACP && config.prompt ? ['acp'] : finalArgs, requiresPty: agent?.requiresPty || false, shell: shellToUse, ...(agentSessionId && { agentSessionId }), ...(config.readOnlyMode && { readOnlyMode: true }), ...(config.yoloMode && { yoloMode: true }), ...(config.modelId && { modelId: config.modelId }), - ...(config.prompt && { prompt: config.prompt.length > 500 ? config.prompt.substring(0, 500) + '...' : config.prompt }) + ...(config.prompt && { prompt: config.prompt.length > 500 ? config.prompt.substring(0, 500) + '...' : config.prompt }), + ...(useACP && { useACP: true, acpShowStreaming }) }); - // Get contextWindow: session-level override takes priority over agent-level config - // Falls back to the agent's configOptions default (e.g., 400000 for Codex, 128000 for OpenCode) - const contextWindow = getContextWindowValue(agent, agentConfigValues, config.sessionCustomContextWindow); + if (useACP) { + logger.info('ACP mode enabled for agent', LOG_CONTEXT, { toolType: config.toolType, acpShowStreaming }); + } const result = processManager.spawn({ ...config, @@ -205,6 +229,9 @@ export function registerProcessHandlers(deps: ProcessHandlerDependencies): void customEnvVars: effectiveCustomEnvVars, // Pass custom env vars (session-level or agent-level) imageArgs: agent?.imageArgs, // Function to build image CLI args (for Codex, OpenCode) noPromptSeparator: agent?.noPromptSeparator, // OpenCode doesn't support '--' before prompt + useACP, // Use ACP protocol if enabled in agent config + acpShowStreaming, // Show streaming output in ACP mode + acpSessionId: config.agentSessionId, // ACP session ID for resume }); logger.info(`Process spawned successfully`, LOG_CONTEXT, { diff --git a/src/main/process-manager.ts b/src/main/process-manager.ts index f11020cdd..b9ccc0873 100644 --- a/src/main/process-manager.ts +++ b/src/main/process-manager.ts @@ -10,6 +10,7 @@ import { getOutputParser, type ParsedEvent, type AgentOutputParser } from './par import { aggregateModelUsage } from './parsers/usage-aggregator'; import type { AgentError } from '../shared/types'; import { getAgentCapabilities } from './agent-capabilities'; +import { ACPProcess, type ACPProcessConfig } from './acp'; // Re-export parser types for consumers export type { ParsedEvent, AgentOutputParser } from './parsers'; @@ -58,6 +59,9 @@ interface ProcessConfig { contextWindow?: number; // Configured context window size (0 or undefined = not configured, hide UI) customEnvVars?: Record; // Custom environment variables from user configuration noPromptSeparator?: boolean; // If true, don't add '--' before the prompt (e.g., OpenCode doesn't support it) + useACP?: boolean; // If true, use ACP protocol instead of stdout JSON parsing (for OpenCode) + acpShowStreaming?: boolean; // If true, show streaming text output in ACP mode (default: false to avoid duplicates) + acpSessionId?: string; // ACP session ID for resuming sessions } interface ManagedProcess { @@ -65,11 +69,13 @@ interface ManagedProcess { toolType: string; ptyProcess?: pty.IPty; childProcess?: ChildProcess; + acpProcess?: ACPProcess; // ACP-based process (for OpenCode with useACP flag) cwd: string; pid: number; isTerminal: boolean; isBatchMode?: boolean; // True for agents that run in batch mode (exit after response) isStreamJsonMode?: boolean; // True when using stream-json input/output (for images) + isACPMode?: boolean; // True when using ACP protocol jsonBuffer?: string; // Buffer for accumulating JSON output in batch mode lastCommand?: string; // Last command sent to terminal (for filtering command echoes) sessionIdEmitted?: boolean; // True after session_id has been emitted (prevents duplicate emissions) @@ -271,7 +277,120 @@ export class ProcessManager extends EventEmitter { * Spawn a new process for a session */ spawn(config: ProcessConfig): { pid: number; success: boolean } { - const { sessionId, toolType, cwd, command, args, requiresPty, prompt, shell, shellArgs, shellEnvVars, images, imageArgs, contextWindow, customEnvVars, noPromptSeparator } = config; + const { sessionId, toolType, cwd, command, args, requiresPty, prompt, shell, shellArgs, shellEnvVars, images, imageArgs, contextWindow, customEnvVars, noPromptSeparator, useACP, acpShowStreaming, acpSessionId } = config; + + // ======================================================================== + // ACP Mode: Use Agent Client Protocol instead of stdout JSON parsing + // This is controlled by the useACP flag in agent config + // ======================================================================== + if (useACP && prompt) { + logger.info('[ProcessManager] Using ACP mode for agent', 'ProcessManager', { + sessionId, + toolType, + command, + }); + + // Convert image data URLs to ACP format + const acpImages = images?.map(dataUrl => { + const match = dataUrl.match(/^data:(image\/[^;]+);base64,(.+)$/); + if (match) { + return { data: match[2], mimeType: match[1] }; + } + return null; + }).filter((img): img is { data: string; mimeType: string } => img !== null); + + const acpConfig: ACPProcessConfig = { + sessionId, + toolType, + cwd, + command, + prompt, + images: acpImages, + acpSessionId, + customEnvVars, + contextWindow, + }; + + // Create ACP process + const acpProcess = new ACPProcess(acpConfig); + + // Create managed process entry + const managedProcess: ManagedProcess = { + sessionId, + toolType, + acpProcess, + cwd, + pid: -1, // Will be updated after start + isTerminal: false, + isBatchMode: true, + isACPMode: true, + startTime: Date.now(), + contextWindow, + command, + args: ['acp'], + }; + + this.processes.set(sessionId, managedProcess); + + // Wire up ACP events to ProcessManager events + // ACP emits ParsedEvent objects, but the renderer expects strings + // For text events, emit the text content; for other events, emit as JSON + // acpShowStreaming controls whether streaming text is shown (default: false to avoid duplicates) + acpProcess.on('data', (sid: string, event: ParsedEvent) => { + if (event.type === 'text' && event.text) { + // Only emit streaming text if acpShowStreaming is enabled + if (acpShowStreaming) { + this.emit('data', sid, event.text); + } + // Always emit as thinking chunk if partial (for showThinking mode) + if (event.isPartial) { + this.emit('thinking-chunk', sid, event.text); + } + } else if (event.type === 'result') { + // Emit result text only if streaming was disabled (otherwise it was already shown) + // This ensures text is displayed exactly once + if (!acpShowStreaming && event.text) { + this.emit('data', sid, event.text); + } + // Don't emit empty result events or JSON - just signal completion via exit event + } else if (event.type === 'init' && event.sessionId) { + // Emit session ID event + this.emit('session-id', sid, event.sessionId); + } else if (event.type === 'tool_use' || event.type === 'system' || event.type === 'error') { + // For actionable event types, emit as JSON for rendering + this.emit('data', sid, JSON.stringify(event)); + } + // Ignore other event types (usage, etc.) - don't emit as raw JSON + }); + + acpProcess.on('agent-error', (sid: string, error: AgentError) => { + this.emit('agent-error', sid, error); + }); + + acpProcess.on('exit', (sid: string, code: number) => { + this.emit('exit', sid, code); + this.processes.delete(sid); + }); + + // Start the ACP process asynchronously + acpProcess.start().then(result => { + managedProcess.pid = result.pid; + if (!result.success) { + logger.error('[ProcessManager] ACP process failed to start', 'ProcessManager', { sessionId }); + } + }).catch(error => { + logger.error('[ProcessManager] ACP process start error', 'ProcessManager', { + sessionId, + error: String(error), + }); + }); + + return { pid: -1, success: true }; // Return immediately, events will follow + } + + // ======================================================================== + // Standard Mode: Spawn process and parse stdout JSON + // ======================================================================== // For batch mode with images, use stream-json mode and send message via stdin // For batch mode without images, append prompt to args with -- separator (unless noPromptSeparator is true) @@ -1065,15 +1184,24 @@ export class ProcessManager extends EventEmitter { sessionId, toolType: process.toolType, isTerminal: process.isTerminal, + isACPMode: process.isACPMode, pid: process.pid, hasPtyProcess: !!process.ptyProcess, hasChildProcess: !!process.childProcess, + hasACPProcess: !!process.acpProcess, hasStdin: !!process.childProcess?.stdin, dataLength: data.length, dataPreview: data.substring(0, 50) }); try { + // Handle ACP process + if (process.isACPMode && process.acpProcess) { + logger.debug('[ProcessManager] Writing to ACP process', 'ProcessManager', { sessionId }); + process.acpProcess.write(data); + return true; + } + if (process.isTerminal && process.ptyProcess) { logger.debug('[ProcessManager] Writing to PTY process', 'ProcessManager', { sessionId, pid: process.pid }); // Track the command for filtering echoes (remove trailing newline for comparison) @@ -1124,6 +1252,13 @@ export class ProcessManager extends EventEmitter { } try { + // Handle ACP process + if (process.isACPMode && process.acpProcess) { + logger.debug('[ProcessManager] Cancelling ACP process', 'ProcessManager', { sessionId }); + process.acpProcess.interrupt(); + return true; + } + if (process.isTerminal && process.ptyProcess) { // For PTY processes, send Ctrl+C character logger.debug('[ProcessManager] Sending Ctrl+C to PTY process', 'ProcessManager', { sessionId, pid: process.pid }); @@ -1151,7 +1286,10 @@ export class ProcessManager extends EventEmitter { if (!process) return false; try { - if (process.isTerminal && process.ptyProcess) { + // Handle ACP process + if (process.isACPMode && process.acpProcess) { + process.acpProcess.kill(); + } else if (process.isTerminal && process.ptyProcess) { process.ptyProcess.kill(); } else if (process.childProcess) { process.childProcess.kill('SIGTERM'); diff --git a/src/renderer/App.tsx b/src/renderer/App.tsx index ca2386057..5e31144d1 100644 --- a/src/renderer/App.tsx +++ b/src/renderer/App.tsx @@ -1499,6 +1499,7 @@ export default function MaestroConsole() { // Suppress toast if user is already viewing this tab (they'll see the response directly) // Only show toasts for out-of-view completions (different session or different tab) + // ALSO suppress toasts for ACP mode sessions (they exit immediately after each message) const currentActiveSession = sessionsRef.current.find(s => s.id === activeSessionIdRef.current); const isViewingCompletedTab = currentActiveSession?.id === actualSessionId && (!tabIdFromSession || currentActiveSession.activeTabId === tabIdFromSession); @@ -4340,6 +4341,33 @@ export default function MaestroConsole() { console.error('Failed to delete playbooks:', error); } + // Stop watching AutoRun folder if configured + // Only unwatch if no other session is using the same folder + if (session.autoRunFolderPath) { + const otherSessionsUsingSameFolder = sessions.filter( + s => s.id !== id && s.autoRunFolderPath === session.autoRunFolderPath + ); + if (otherSessionsUsingSameFolder.length === 0) { + try { + await window.maestro.autorun.unwatchFolder(session.autoRunFolderPath); + } catch (error) { + console.error('Failed to unwatch AutoRun folder:', error); + } + } + } + + // TODO: Delete agent's session storage (OpenCode/Codex sessions) + // Currently these sessions remain in the agent's storage (~/.local/share/opencode/storage/) + // even after the Maestro session is deleted, causing them to appear in Agent Sessions Browser. + // Once ACP supports session deletion, we should: + // if (session.agentSessionId && session.toolType) { + // try { + // await window.maestro.agentSessions.delete(session.toolType, session.agentSessionId); + // } catch (error) { + // console.error('Failed to delete agent session:', error); + // } + // } + // If this is a worktree session, track its path to prevent re-discovery if (session.worktreeParentPath && session.cwd) { setRemovedWorktreePaths(prev => new Set([...prev, session.cwd])); @@ -4389,6 +4417,24 @@ export default function MaestroConsole() { } catch (error) { console.error('Failed to delete playbooks:', error); } + + // Stop watching AutoRun folder if configured + // Only unwatch if no other session is using the same folder + if (session.autoRunFolderPath) { + const otherSessionsUsingSameFolder = groupSessions.filter( + s => s.id !== session.id && s.autoRunFolderPath === session.autoRunFolderPath + ); + const sessionsOutsideGroup = sessions.filter( + s => s.groupId !== groupId && s.autoRunFolderPath === session.autoRunFolderPath + ); + if (otherSessionsUsingSameFolder.length === 0 && sessionsOutsideGroup.length === 0) { + try { + await window.maestro.autorun.unwatchFolder(session.autoRunFolderPath); + } catch (error) { + console.error('Failed to unwatch AutoRun folder:', error); + } + } + } } // Track all removed paths to prevent re-discovery diff --git a/src/renderer/components/EditGroupChatModal.tsx b/src/renderer/components/EditGroupChatModal.tsx index 2784e02da..469b33ca5 100644 --- a/src/renderer/components/EditGroupChatModal.tsx +++ b/src/renderer/components/EditGroupChatModal.tsx @@ -343,10 +343,11 @@ export function EditGroupChatModal({ agentConfigRef.current = newConfig; setConfigWasModified(true); }} - onConfigBlur={async () => { + onConfigBlur={async (immediateConfig) => { if (selectedAgent) { - // Use ref to get latest config (state may be stale in async callback) - await window.maestro.agents.setConfig(selectedAgent, agentConfigRef.current); + // Use immediate config if provided (for checkbox), otherwise use ref + const configToSave = immediateConfig || agentConfigRef.current; + await window.maestro.agents.setConfig(selectedAgent, configToSave); setConfigWasModified(true); } }} diff --git a/src/renderer/components/NewGroupChatModal.tsx b/src/renderer/components/NewGroupChatModal.tsx index 111294a08..791373374 100644 --- a/src/renderer/components/NewGroupChatModal.tsx +++ b/src/renderer/components/NewGroupChatModal.tsx @@ -314,10 +314,11 @@ export function NewGroupChatModal({ setAgentConfig(newConfig); agentConfigRef.current = newConfig; }} - onConfigBlur={async () => { + onConfigBlur={async (immediateConfig) => { if (selectedAgent) { - // Use ref to get latest config (state may be stale in async callback) - await window.maestro.agents.setConfig(selectedAgent, agentConfigRef.current); + // Use immediate config if provided (for checkbox), otherwise use ref + const configToSave = immediateConfig || agentConfigRef.current; + await window.maestro.agents.setConfig(selectedAgent, configToSave); } }} availableModels={availableModels} diff --git a/src/renderer/components/NewInstanceModal.tsx b/src/renderer/components/NewInstanceModal.tsx index 1fd4faae7..3d0a06a0b 100644 --- a/src/renderer/components/NewInstanceModal.tsx +++ b/src/renderer/components/NewInstanceModal.tsx @@ -512,8 +512,9 @@ export function NewInstanceModal({ isOpen, onClose, onCreate, theme, existingSes } })); }} - onConfigBlur={() => { - const currentConfig = agentConfigs[agent.id] || {}; + onConfigBlur={(immediateConfig) => { + // Use immediate config if provided (for checkbox), otherwise use state + const currentConfig = immediateConfig || agentConfigs[agent.id] || {}; window.maestro.agents.setConfig(agent.id, currentConfig); }} availableModels={availableModels[agent.id] || []} @@ -978,10 +979,12 @@ export function EditAgentModal({ isOpen, onClose, onSave, theme, session, existi onConfigChange={(key, value) => { setAgentConfig(prev => ({ ...prev, [key]: value })); }} - onConfigBlur={() => { + onConfigBlur={(immediateConfig) => { + // Use immediate config if provided (for checkbox), otherwise use state + const configToSave = immediateConfig || agentConfig; // Both model and contextWindow are now saved per-session on modal save // Other config options (if any) can still be saved at agent level - const { model: _model, contextWindow: _contextWindow, ...otherConfig } = agentConfig; + const { model: _model, contextWindow: _contextWindow, ...otherConfig } = configToSave; if (Object.keys(otherConfig).length > 0) { window.maestro.agents.setConfig(session.toolType, otherConfig); } diff --git a/src/renderer/components/Wizard/screens/AgentSelectionScreen.tsx b/src/renderer/components/Wizard/screens/AgentSelectionScreen.tsx index 4d08110be..7300af7d9 100644 --- a/src/renderer/components/Wizard/screens/AgentSelectionScreen.tsx +++ b/src/renderer/components/Wizard/screens/AgentSelectionScreen.tsx @@ -762,8 +762,10 @@ export function AgentSelectionScreen({ theme }: AgentSelectionScreenProps): JSX. onConfigChange={(key, value) => { setAgentConfig(prev => ({ ...prev, [key]: value })); }} - onConfigBlur={async () => { - await window.maestro.agents.setConfig(configuringAgentId!, agentConfig); + onConfigBlur={async (immediateConfig) => { + // Use immediate config if provided (for checkbox), otherwise use state + const configToSave = immediateConfig || agentConfig; + await window.maestro.agents.setConfig(configuringAgentId!, configToSave); }} availableModels={availableModels} loadingModels={loadingModels} diff --git a/src/renderer/components/shared/AgentConfigPanel.tsx b/src/renderer/components/shared/AgentConfigPanel.tsx index 7ad03d0e6..fab5e064b 100644 --- a/src/renderer/components/shared/AgentConfigPanel.tsx +++ b/src/renderer/components/shared/AgentConfigPanel.tsx @@ -243,7 +243,7 @@ export interface AgentConfigPanelProps { // Agent-specific config options agentConfig: Record; onConfigChange: (key: string, value: any) => void; - onConfigBlur: () => void; + onConfigBlur: (immediateConfig?: Record) => void; // Model selection (if supported) availableModels?: string[]; loadingModels?: boolean; @@ -601,9 +601,11 @@ export function AgentConfigPanel({ type="checkbox" checked={agentConfig[option.key] ?? option.default} onChange={(e) => { - onConfigChange(option.key, e.target.checked); - // Immediately persist checkbox changes - onConfigBlur(); + const newValue = e.target.checked; + onConfigChange(option.key, newValue); + // Immediately persist checkbox changes with the new value + // We pass the updated config directly since React state updates are async + onConfigBlur({ ...agentConfig, [option.key]: newValue }); }} className="w-4 h-4" style={{ accentColor: theme.colors.accent }} diff --git a/src/renderer/hooks/useAgentExecution.ts b/src/renderer/hooks/useAgentExecution.ts index 961959324..20e679103 100644 --- a/src/renderer/hooks/useAgentExecution.ts +++ b/src/renderer/hooks/useAgentExecution.ts @@ -409,17 +409,41 @@ export function useAgentExecution( // Spawn with session resume - the IPC handler will use the agent's resumeArgs builder const commandToUse = agent.path || agent.command; - window.maestro.process.spawn({ - sessionId: targetSessionId, - toolType, - cwd, - command: commandToUse, - args: agent.args || [], - prompt, - agentSessionId: resumeAgentSessionId, // This triggers the agent's resume mechanism - }).catch(() => { - cleanup(); - resolve({ success: false }); + + // Get agent configuration to check if ACP is enabled + window.maestro.agentConfigs.get(toolType).then((agentConfig) => { + const useACP = agentConfig?.useACP ?? false; + const acpShowStreaming = agentConfig?.acpShowStreaming ?? false; + + window.maestro.process.spawn({ + sessionId: targetSessionId, + toolType, + cwd, + command: commandToUse, + args: agent.args || [], + prompt, + agentSessionId: resumeAgentSessionId, // This triggers the agent's resume mechanism + useACP, + acpShowStreaming, + }).catch(() => { + cleanup(); + resolve({ success: false }); + }); + }).catch((err) => { + console.error('[spawnBackgroundSynopsis] Failed to get agent config:', err); + // Fallback to spawn without ACP + window.maestro.process.spawn({ + sessionId: targetSessionId, + toolType, + cwd, + command: commandToUse, + args: agent.args || [], + prompt, + agentSessionId: resumeAgentSessionId, + }).catch(() => { + cleanup(); + resolve({ success: false }); + }); }); }); } catch (error) {