diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 931b8f49..78550b3e 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -21,6 +21,9 @@ export type { export type { AppSettings, NotificationSettings } from './ports/settings.js'; +export type { RuntimeAdapterPort } from './ports/runtime-adapter-port.js'; +export { createOpenClawGatewayAdapter } from './services/openclaw-gateway-adapter.js'; + export type { ChatContentBlock, ChatMessage, diff --git a/packages/core/src/ports/runtime-adapter-port.ts b/packages/core/src/ports/runtime-adapter-port.ts new file mode 100644 index 00000000..0c2aaac1 --- /dev/null +++ b/packages/core/src/ports/runtime-adapter-port.ts @@ -0,0 +1,13 @@ +import type { RuntimeAdapter } from '@clawwork/shared'; + +/** + * Dependency injection port for RuntimeAdapter. + * This allows switching between OpenClaw Gateway, ACP, etc. + * without changing core service code. + */ +export interface RuntimeAdapterPort { + getAdapter(runtimeId?: string): RuntimeAdapter; + registerAdapter(adapter: RuntimeAdapter, makeDefault?: boolean): void; + getDefaultAdapter(): RuntimeAdapter; + getAllAdapters(): RuntimeAdapter[]; +} diff --git a/packages/core/src/services/openclaw-gateway-adapter.ts b/packages/core/src/services/openclaw-gateway-adapter.ts new file mode 100644 index 00000000..84a7bacf --- /dev/null +++ b/packages/core/src/services/openclaw-gateway-adapter.ts @@ -0,0 +1,311 @@ +import type { + RuntimeAdapter, + RuntimeCapabilities, + RuntimeInfo, + ExecutionRef, + ExecutionInput, + ExecutionEvent, + ExecutionEventCallback, + ExecutionEventSubscription, +} from '@clawwork/shared'; +import type { GatewayTransportPort } from '../ports/gateway-transport.js'; + +/** + * Adapter that wraps the existing GatewayTransportPort behind the RuntimeAdapter interface. + * Bridges OpenClaw Gateway events (chat, agent, approval) into unified ExecutionEvent format. + */ +export function createOpenClawGatewayAdapter( + gatewayId: string, + gatewayName: string, + transport: GatewayTransportPort, +): RuntimeAdapter { + const listeners = new Set(); + + function emit(event: ExecutionEvent): void { + for (const cb of listeners) { + try { + cb(event); + } catch { + /* swallow listener errors */ + } + } + } + + function makeRef(sessionKey: string, label?: string): ExecutionRef { + return { executionId: sessionKey, runtimeId: gatewayId, sessionKey, label }; + } + + // Bridge gateway events to execution events + const removeGatewayEvent = transport.onGatewayEvent((data) => { + if (data.event === 'chat') { + const payload = data.payload as Record; + const sessionKey = payload.sessionKey as string; + const state = payload.state as string; + const text = (payload.text as string) || ''; + const runId = payload.runId as string; + + if (!sessionKey) return; + + if (state === 'delta') { + emit({ + type: 'execution.message.delta', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + text, + runId, + }); + } else if (state === 'final') { + emit({ + type: 'execution.message.final', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + text, + runId, + }); + emit({ + type: 'execution.completed', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + runId, + }); + } else if (state === 'error') { + const errorMessage = + (payload.errorMessage as string) || (payload.error as { message?: string })?.message || 'Unknown error'; + emit({ + type: 'execution.error', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + message: errorMessage, + code: (payload.errorCode as string) ?? (payload.error as { code?: string })?.code, + runId, + }); + } else if (state === 'aborted') { + emit({ + type: 'execution.cancelled', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + }); + } + } else if (data.event === 'agent') { + const payload = data.payload as Record; + const sessionKey = payload.sessionKey as string; + const stream = payload.stream as string; + const d = payload.data as Record; + + if (!sessionKey || !d) return; + + if (stream === 'tool') { + const toolData = d as { + phase?: string; + name?: string; + toolCallId?: string; + meta?: string; + isError?: boolean; + args?: string; + }; + if (toolData.name && toolData.toolCallId) { + const status = toolData.phase === 'result' ? (toolData.isError ? 'error' : 'done') : 'running'; + emit({ + type: 'execution.tool.call', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + toolCallId: toolData.toolCallId, + name: toolData.name, + status: status as 'running' | 'done' | 'error', + args: toolData.args ? JSON.parse(toolData.args) : undefined, + result: toolData.meta, + }); + } + } else if (stream === 'lifecycle') { + const lc = d as { + phase?: string; + selectedProvider?: string; + selectedModel?: string; + activeProvider?: string; + activeModel?: string; + error?: string; + }; + emit({ + type: 'execution.lifecycle', + executionId: sessionKey, + runtimeId: gatewayId, + sessionKey, + timestamp: Date.now(), + phase: (lc.phase as 'start' | 'end' | 'error' | 'fallback') || 'start', + selectedProvider: lc.selectedProvider, + selectedModel: lc.selectedModel, + activeProvider: lc.activeProvider, + activeModel: lc.activeModel, + error: lc.error, + }); + } + } else if (data.event === 'exec.approval.requested') { + const payload = data.payload as { + id: string; + request: Record; + createdAtMs: number; + expiresAtMs: number; + }; + const sessionKey = (data.payload as Record).sessionKey as string | undefined; + emit({ + type: 'execution.approval.requested', + executionId: sessionKey || data.gatewayId, + runtimeId: data.gatewayId, + sessionKey, + timestamp: Date.now(), + id: payload.id, + request: payload.request, + createdAtMs: payload.createdAtMs, + expiresAtMs: payload.expiresAtMs, + }); + } else if (data.event === 'exec.approval.resolved') { + const payload = data.payload as { id: string; decision?: string }; + emit({ + type: 'execution.approval.resolved', + executionId: data.gatewayId, + runtimeId: data.gatewayId, + timestamp: Date.now(), + id: payload.id, + decision: payload.decision, + }); + } + }); + + const capabilities: RuntimeCapabilities = { + streamsText: true, + supportsToolEvents: true, + supportsApprovals: true, + supportsMCP: true, + accessesFilesystem: true, + constrainsNetwork: false, + supportsResume: false, + producesArtifacts: false, + reportsUsage: true, + supportsChildExecutions: true, + streamsThinking: true, + reportsLifecycle: true, + }; + + let cachedInfo: RuntimeInfo | null = null; + const cachedStatusInfo: { connected: boolean; serverVersion?: string } = { connected: false }; + + const removeGatewayStatus = transport.onGatewayStatus((s) => { + cachedStatusInfo.connected = s.connected; + cachedStatusInfo.serverVersion = s.serverVersion; + cachedInfo = null; // invalidate cache + }); + + return { + getRuntimeInfo(): RuntimeInfo { + if (cachedInfo) return cachedInfo; + cachedInfo = { + id: gatewayId, + name: gatewayName, + connected: cachedStatusInfo.connected, + serverVersion: cachedStatusInfo.serverVersion, + }; + return cachedInfo; + }, + + getCapabilities(): RuntimeCapabilities { + return { ...capabilities }; + }, + + async createExecution(params: { + agentId: string; + executionId: string; + sessionKey: string; + initialMessage?: string; + }) { + const res = await transport.createSession(gatewayId, { + key: params.sessionKey, + agentId: params.agentId, + message: params.initialMessage, + }); + if (!res.ok) throw new Error(res.error || 'Failed to create execution'); + return makeRef(params.sessionKey); + }, + + async sendInput(executionRef: ExecutionRef, input: ExecutionInput) { + const res = await transport.sendMessage( + executionRef.runtimeId, + executionRef.executionId, + input.message, + input.attachments as Array<{ mimeType: string; fileName: string; content: string }> | undefined, + ); + if (!res.ok) throw new Error(res.error || 'Failed to send input'); + }, + + async cancelExecution(executionRef: ExecutionRef) { + const res = await transport.abortChat(executionRef.runtimeId, executionRef.executionId); + if (!res.ok) throw new Error(res.error || 'Failed to cancel execution'); + }, + + async getHealth() { + return { ok: cachedStatusInfo.connected }; + }, + + onExecutionEvent(callback: ExecutionEventCallback): ExecutionEventSubscription { + listeners.add(callback); + return { + unsubscribe: () => { + listeners.delete(callback); + if (listeners.size === 0) { + removeGatewayEvent(); + removeGatewayStatus(); + } + }, + }; + }, + + // Optional methods + async listChildren(executionRef: ExecutionRef) { + const res = await transport.listSessionsBySpawner(gatewayId, executionRef.executionId); + if (!res.ok || !res.result) return []; + const sessions = res.result as Array<{ key: string; label?: string }>; + return sessions.map((s: { key: string; label?: string }) => makeRef(s.key, s.label)); + }, + + async listApprovals() { + // Approvals are streamed via events; this is a placeholder + return []; + }, + + async resolveApproval(_id: string, _decision: 'allow-once' | 'allow-always' | 'deny') { + // The GatewayTransportPort doesn't have a resolveApproval directly; + // this would need to be added or handled via HTTP + throw new Error('resolveApproval not yet implemented on GatewayTransportPort'); + }, + + async listModels() { + const res = await transport.listModels(gatewayId); + if (!res.ok || !res.result) return []; + const data = res.result as { models?: Array<{ id: string; name?: string; provider?: string }> }; + return data.models || []; + }, + + async listAgents() { + const res = await transport.listAgents(gatewayId); + if (!res.ok || !res.result) return []; + const data = res.result as { agents?: Array<{ id: string; name?: string }> }; + return data.agents || []; + }, + + async deleteExecution(executionRef: ExecutionRef) { + const res = await transport.deleteSession(executionRef.runtimeId, executionRef.executionId); + if (!res.ok) throw new Error(res.error || 'Failed to delete execution'); + }, + }; +} diff --git a/packages/pwa/src/gateway/client.ts b/packages/pwa/src/gateway/client.ts index 6df1b57a..16802537 100644 --- a/packages/pwa/src/gateway/client.ts +++ b/packages/pwa/src/gateway/client.ts @@ -400,7 +400,7 @@ export class BrowserGatewayClient { const params: GatewayConnectParams = { minProtocol: 3, - maxProtocol: 3, + maxProtocol: 4, client: { id: 'gateway-client', displayName: 'ClawWork PWA', diff --git a/packages/shared/src/gateway-protocol.ts b/packages/shared/src/gateway-protocol.ts index 6b34ae35..fbf8d17c 100644 --- a/packages/shared/src/gateway-protocol.ts +++ b/packages/shared/src/gateway-protocol.ts @@ -50,7 +50,7 @@ export type GatewayAuth = export interface GatewayConnectParams { minProtocol: 3; - maxProtocol: 3; + maxProtocol: 4; client: { id: string; displayName: string; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 06ca5c26..d4682341 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -2,3 +2,4 @@ export * from './types.js'; export * from './gateway-protocol.js'; export * from './constants.js'; export * from './debug.js'; +export * from './runtime-adapter.js'; diff --git a/packages/shared/src/runtime-adapter.ts b/packages/shared/src/runtime-adapter.ts new file mode 100644 index 00000000..a49174a0 --- /dev/null +++ b/packages/shared/src/runtime-adapter.ts @@ -0,0 +1,318 @@ +/** + * Runtime execution boundary — abstracts any agent runtime + * (OpenClaw Gateway, ACP, Codex, Claude Code, etc.) + */ + +// ── Runtime Capabilities ── + +export interface RuntimeCapabilities { + /** Can stream text deltas */ + streamsText: boolean; + /** Supports tool call/result events */ + supportsToolEvents: boolean; + /** Supports approval requests */ + supportsApprovals: boolean; + /** Supports MCP/tool integration */ + supportsMCP: boolean; + /** Has filesystem access */ + accessesFilesystem: boolean; + /** Network constrained (sandboxed) */ + constrainsNetwork: boolean; + /** Supports execution resume */ + supportsResume: boolean; + /** Produces artifacts (files, images, etc.) */ + producesArtifacts: boolean; + /** Reports usage/quota */ + reportsUsage: boolean; + /** Supports child executions (subagents) */ + supportsChildExecutions: boolean; + /** Supports streaming thinking content */ + streamsThinking: boolean; + /** Can produce agent lifecycle events (start/end/fallback) */ + reportsLifecycle: boolean; +} + +// ── Execution ── + +export type ExecutionStatus = 'created' | 'running' | 'errored' | 'completed' | 'cancelled'; + +export interface ExecutionRef { + /** Runtime-scoped execution identifier */ + executionId: string; + /** Gateway/Runtime instance identifier */ + runtimeId: string; + /** OpenClaw session key (if applicable) */ + sessionKey?: string; + /** Human-readable label */ + label?: string; +} + +export interface ExecutionInfo { + ref: ExecutionRef; + status: ExecutionStatus; + agentId?: string; + model?: string; + modelProvider?: string; + createdAt: number; + updatedAt: number; + inputTokens?: number; + outputTokens?: number; + contextTokens?: number; +} + +// ── Execution Events (normalized) ── + +export type ExecutionEventType = + | 'execution.created' + | 'execution.started' + | 'execution.progress' + | 'execution.message.delta' + | 'execution.message.final' + | 'execution.thinking.delta' + | 'execution.tool.call' + | 'execution.tool.result' + | 'execution.approval.requested' + | 'execution.approval.resolved' + | 'execution.artifact.created' + | 'execution.warning' + | 'execution.error' + | 'execution.completed' + | 'execution.cancelled' + | 'execution.child.spawned' + | 'execution.lifecycle' + | 'execution.status'; + +export interface ExecutionEventBase { + type: ExecutionEventType; + executionId: string; + runtimeId: string; + sessionKey?: string; + timestamp: number; +} + +export interface ExecutionMessageDeltaEvent extends ExecutionEventBase { + type: 'execution.message.delta'; + text: string; + runId?: string; +} + +export interface ExecutionMessageFinalEvent extends ExecutionEventBase { + type: 'execution.message.final'; + text: string; + runId?: string; +} + +export interface ExecutionThinkingDeltaEvent extends ExecutionEventBase { + type: 'execution.thinking.delta'; + text: string; +} + +export interface ExecutionToolCallEvent extends ExecutionEventBase { + type: 'execution.tool.call'; + toolCallId: string; + name: string; + status: 'running' | 'done' | 'error'; + args?: Record; + result?: string; +} + +export interface ExecutionToolResultEvent extends ExecutionEventBase { + type: 'execution.tool.result'; + toolCallId: string; + name: string; + result?: string; + isError?: boolean; +} + +export interface ExecutionApprovalRequestedEvent extends ExecutionEventBase { + type: 'execution.approval.requested'; + id: string; + request: Record; + createdAtMs: number; + expiresAtMs: number; +} + +export interface ExecutionApprovalResolvedEvent extends ExecutionEventBase { + type: 'execution.approval.resolved'; + id: string; + decision?: string | null; +} + +export interface ExecutionErrorEvent extends ExecutionEventBase { + type: 'execution.error'; + message: string; + code?: string; + runId?: string; +} + +export interface ExecutionCompletedEvent extends ExecutionEventBase { + type: 'execution.completed'; + runId?: string; +} + +export interface ExecutionCancelledEvent extends ExecutionEventBase { + type: 'execution.cancelled'; +} + +export interface ExecutionLifecycleEvent extends ExecutionEventBase { + type: 'execution.lifecycle'; + phase: 'start' | 'end' | 'error' | 'fallback'; + selectedProvider?: string; + selectedModel?: string; + activeProvider?: string; + activeModel?: string; + error?: string; +} + +export interface ExecutionCreatedEvent extends ExecutionEventBase { + type: 'execution.created'; +} + +export interface ExecutionStartedEvent extends ExecutionEventBase { + type: 'execution.started'; +} + +export interface ExecutionChildSpawnedEvent extends ExecutionEventBase { + type: 'execution.child.spawned'; + childExecutionId: string; + childSessionKey?: string; +} + +export interface ExecutionStatusChangeEvent extends ExecutionEventBase { + type: 'execution.status'; + status: ExecutionStatus; +} + +export type ExecutionEvent = + | ExecutionCreatedEvent + | ExecutionStartedEvent + | ExecutionMessageDeltaEvent + | ExecutionMessageFinalEvent + | ExecutionThinkingDeltaEvent + | ExecutionToolCallEvent + | ExecutionToolResultEvent + | ExecutionApprovalRequestedEvent + | ExecutionApprovalResolvedEvent + | ExecutionErrorEvent + | ExecutionCompletedEvent + | ExecutionCancelledEvent + | ExecutionLifecycleEvent + | ExecutionChildSpawnedEvent + | ExecutionStatusChangeEvent; + +// ── Event Stream ── + +export interface ExecutionEventSubscription { + unsubscribe(): void; +} + +export type ExecutionEventCallback = (event: ExecutionEvent) => void; + +// ── Execution Input ── + +export interface ExecutionInput { + message: string; + attachments?: Array<{ + mimeType: string; + fileName: string; + content: string; + }>; + idempotencyKey?: string; +} + +// ── Runtime Info ── + +export interface RuntimeInfo { + id: string; + name: string; + version?: string; + connected: boolean; + serverVersion?: string; +} + +// ── RuntimeAdapter Interface ── + +export interface RuntimeAdapter { + /** Get basic info about this runtime */ + getRuntimeInfo(): RuntimeInfo; + + /** Get runtime capabilities */ + getCapabilities(): RuntimeCapabilities; + + /** Create a new execution */ + createExecution(params: { + agentId: string; + executionId: string; + sessionKey: string; + initialMessage?: string; + }): Promise; + + /** Send user input to an active execution */ + sendInput(executionRef: ExecutionRef, input: ExecutionInput): Promise; + + /** Cancel an active execution */ + cancelExecution(executionRef: ExecutionRef): Promise; + + /** Resume a paused execution (approval pending, etc.) */ + resumeExecution?(executionRef: ExecutionRef): Promise; + + /** Subscribe to execution events */ + onExecutionEvent(callback: ExecutionEventCallback): ExecutionEventSubscription; + + /** List child executions (subagent sessions) */ + listChildren?(executionRef: ExecutionRef): Promise; + + /** List pending approvals */ + listApprovals?(): Promise }>>; + + /** Resolve an approval request */ + resolveApproval?(id: string, decision: 'allow-once' | 'allow-always' | 'deny'): Promise; + + /** List artifacts produced by an execution */ + listArtifacts?( + executionRef: ExecutionRef, + ): Promise>; + + /** Get usage/cost info */ + getUsage?(): Promise>; + + /** Get health status */ + getHealth(): Promise<{ ok: boolean; error?: string }>; + + /** Get execution info by ref */ + getExecutionInfo?(executionRef: ExecutionRef): Promise; + + /** Get chat history for an execution */ + getChatHistory?( + executionRef: ExecutionRef, + limit?: number, + ): Promise<{ messages: Array<{ role: string; content: string; timestamp: string }> }>; + + /** List models available on this runtime */ + listModels?(): Promise>; + + /** List agents available on this runtime */ + listAgents?(): Promise>; + + /** Delete a session/execution */ + deleteExecution?(executionRef: ExecutionRef): Promise; +} + +// ── Execution Registry ── + +export interface ExecutionRegistryEntry { + taskId: string; + executionRef: ExecutionRef; + status: ExecutionStatus; + createdAt: number; + updatedAt: number; + agentId?: string; +} + +export interface ExecutionRegistry { + entries: Map; + addEntry(taskId: string, entry: ExecutionRegistryEntry): void; + getEntry(executionId: string): ExecutionRegistryEntry | undefined; + updateStatus(executionId: string, status: ExecutionStatus): void; + getEntriesForTask(taskId: string): ExecutionRegistryEntry[]; +}