Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export type {

export type { AppSettings, NotificationSettings } from './ports/settings.js';

export type { RuntimeAdapterPort } from './ports/runtime-adapter-port.js';
export { createOpenClawGatewayAdapter } from './services/openclaw-gateway-adapter.js';

export type {
ChatContentBlock,
ChatMessage,
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/ports/runtime-adapter-port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { RuntimeAdapter } from '@clawwork/shared';

/**
* Dependency injection port for RuntimeAdapter.
* This allows switching between OpenClaw Gateway, ACP, etc.
* without changing core service code.
*/
export interface RuntimeAdapterPort {
getAdapter(runtimeId?: string): RuntimeAdapter;
registerAdapter(adapter: RuntimeAdapter, makeDefault?: boolean): void;
getDefaultAdapter(): RuntimeAdapter;
getAllAdapters(): RuntimeAdapter[];
}
311 changes: 311 additions & 0 deletions packages/core/src/services/openclaw-gateway-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
import type {
RuntimeAdapter,
RuntimeCapabilities,
RuntimeInfo,
ExecutionRef,
ExecutionInput,
ExecutionEvent,
ExecutionEventCallback,
ExecutionEventSubscription,
} from '@clawwork/shared';
import type { GatewayTransportPort } from '../ports/gateway-transport.js';

/**
* Adapter that wraps the existing GatewayTransportPort behind the RuntimeAdapter interface.
* Bridges OpenClaw Gateway events (chat, agent, approval) into unified ExecutionEvent format.
*/
export function createOpenClawGatewayAdapter(
gatewayId: string,
gatewayName: string,
transport: GatewayTransportPort,
): RuntimeAdapter {
const listeners = new Set<ExecutionEventCallback>();

function emit(event: ExecutionEvent): void {
for (const cb of listeners) {
try {
cb(event);
} catch {
/* swallow listener errors */
}
}
}

function makeRef(sessionKey: string, label?: string): ExecutionRef {
return { executionId: sessionKey, runtimeId: gatewayId, sessionKey, label };
}

// Bridge gateway events to execution events
const removeGatewayEvent = transport.onGatewayEvent((data) => {
if (data.event === 'chat') {
const payload = data.payload as Record<string, unknown>;
const sessionKey = payload.sessionKey as string;
const state = payload.state as string;
const text = (payload.text as string) || '';
const runId = payload.runId as string;

if (!sessionKey) return;

if (state === 'delta') {
emit({
type: 'execution.message.delta',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
text,
runId,
});
} else if (state === 'final') {
emit({
type: 'execution.message.final',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
text,
runId,
});
emit({
type: 'execution.completed',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
runId,
});
} else if (state === 'error') {
const errorMessage =
(payload.errorMessage as string) || (payload.error as { message?: string })?.message || 'Unknown error';
emit({
type: 'execution.error',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
message: errorMessage,
code: (payload.errorCode as string) ?? (payload.error as { code?: string })?.code,
runId,
});
} else if (state === 'aborted') {
emit({
type: 'execution.cancelled',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
});
}
} else if (data.event === 'agent') {
const payload = data.payload as Record<string, unknown>;
const sessionKey = payload.sessionKey as string;
const stream = payload.stream as string;
const d = payload.data as Record<string, unknown>;

if (!sessionKey || !d) return;

if (stream === 'tool') {
const toolData = d as {
phase?: string;
name?: string;
toolCallId?: string;
meta?: string;
isError?: boolean;
args?: string;
};
if (toolData.name && toolData.toolCallId) {
const status = toolData.phase === 'result' ? (toolData.isError ? 'error' : 'done') : 'running';
emit({
type: 'execution.tool.call',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
toolCallId: toolData.toolCallId,
name: toolData.name,
status: status as 'running' | 'done' | 'error',
args: toolData.args ? JSON.parse(toolData.args) : undefined,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using JSON.parse directly on data received from the gateway is unsafe. If the gateway sends malformed JSON or an empty string in the args field, this will throw an exception and crash the entire event listener for this adapter. Consider using a safe JSON parsing utility or wrapping this in a try-catch block.

Suggested change
args: toolData.args ? JSON.parse(toolData.args) : undefined,
args: toolData.args ? (() => { try { return JSON.parse(toolData.args); } catch { return undefined; } })() : undefined,

result: toolData.meta,
});
}
} else if (stream === 'lifecycle') {
const lc = d as {
phase?: string;
selectedProvider?: string;
selectedModel?: string;
activeProvider?: string;
activeModel?: string;
error?: string;
};
emit({
type: 'execution.lifecycle',
executionId: sessionKey,
runtimeId: gatewayId,
sessionKey,
timestamp: Date.now(),
phase: (lc.phase as 'start' | 'end' | 'error' | 'fallback') || 'start',
selectedProvider: lc.selectedProvider,
selectedModel: lc.selectedModel,
activeProvider: lc.activeProvider,
activeModel: lc.activeModel,
error: lc.error,
});
}
} else if (data.event === 'exec.approval.requested') {
const payload = data.payload as {
id: string;
request: Record<string, unknown>;
createdAtMs: number;
expiresAtMs: number;
};
const sessionKey = (data.payload as Record<string, unknown>).sessionKey as string | undefined;
emit({
type: 'execution.approval.requested',
executionId: sessionKey || data.gatewayId,
runtimeId: data.gatewayId,
sessionKey,
timestamp: Date.now(),
id: payload.id,
request: payload.request,
createdAtMs: payload.createdAtMs,
expiresAtMs: payload.expiresAtMs,
});
} else if (data.event === 'exec.approval.resolved') {
const payload = data.payload as { id: string; decision?: string };
emit({
type: 'execution.approval.resolved',
executionId: data.gatewayId,
runtimeId: data.gatewayId,
timestamp: Date.now(),
id: payload.id,
decision: payload.decision,
});
Comment on lines +174 to +182
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The execution.approval.resolved event is missing the sessionKey and uses data.gatewayId as the executionId. This is inconsistent with the execution.approval.requested event (lines 161-172) and will likely break correlation in the UI if the approval was associated with a specific session. You should extract the sessionKey from the payload if available.

      const payload = data.payload as { id: string; decision?: string };
      const sessionKey = (data.payload as Record<string, unknown>).sessionKey as string | undefined;
      emit({
        type: 'execution.approval.resolved',
        executionId: sessionKey || data.gatewayId,
        runtimeId: data.gatewayId,
        sessionKey,
        timestamp: Date.now(),
        id: payload.id,
        decision: payload.decision,
      });

}
});

const capabilities: RuntimeCapabilities = {
streamsText: true,
supportsToolEvents: true,
supportsApprovals: true,
supportsMCP: true,
accessesFilesystem: true,
constrainsNetwork: false,
supportsResume: false,
producesArtifacts: false,
reportsUsage: true,
supportsChildExecutions: true,
streamsThinking: true,
reportsLifecycle: true,
};

let cachedInfo: RuntimeInfo | null = null;
const cachedStatusInfo: { connected: boolean; serverVersion?: string } = { connected: false };

const removeGatewayStatus = transport.onGatewayStatus((s) => {
cachedStatusInfo.connected = s.connected;
cachedStatusInfo.serverVersion = s.serverVersion;
cachedInfo = null; // invalidate cache
});

return {
getRuntimeInfo(): RuntimeInfo {
if (cachedInfo) return cachedInfo;
cachedInfo = {
id: gatewayId,
name: gatewayName,
connected: cachedStatusInfo.connected,
serverVersion: cachedStatusInfo.serverVersion,
};
return cachedInfo;
},

getCapabilities(): RuntimeCapabilities {
return { ...capabilities };
},

async createExecution(params: {
agentId: string;
executionId: string;
sessionKey: string;
initialMessage?: string;
}) {
const res = await transport.createSession(gatewayId, {
key: params.sessionKey,
agentId: params.agentId,
message: params.initialMessage,
});
if (!res.ok) throw new Error(res.error || 'Failed to create execution');
return makeRef(params.sessionKey);
},

async sendInput(executionRef: ExecutionRef, input: ExecutionInput) {
const res = await transport.sendMessage(
executionRef.runtimeId,
executionRef.executionId,
input.message,
input.attachments as Array<{ mimeType: string; fileName: string; content: string }> | undefined,
);
if (!res.ok) throw new Error(res.error || 'Failed to send input');
},

async cancelExecution(executionRef: ExecutionRef) {
const res = await transport.abortChat(executionRef.runtimeId, executionRef.executionId);
if (!res.ok) throw new Error(res.error || 'Failed to cancel execution');
},

async getHealth() {
return { ok: cachedStatusInfo.connected };
},

onExecutionEvent(callback: ExecutionEventCallback): ExecutionEventSubscription {
listeners.add(callback);
return {
unsubscribe: () => {
listeners.delete(callback);
if (listeners.size === 0) {
removeGatewayEvent();
removeGatewayStatus();
}
Comment on lines +265 to +268
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This cleanup logic introduces a bug. Once removeGatewayEvent() and removeGatewayStatus() are called (when the last listener unsubscribes), the transport listeners are permanently detached for this adapter instance. If a new listener is added later via onExecutionEvent, it will never receive events because the transport connection is not re-established.

Since this adapter is intended to be a long-lived service wrapping the transport, it is safer to remove this premature cleanup or implement a reference-counting mechanism that re-subscribes when the first listener is added.

Suggested change
if (listeners.size === 0) {
removeGatewayEvent();
removeGatewayStatus();
}
if (listeners.size === 0) {
// Avoid permanent unsubscription here as it prevents re-subscription later.
// If cleanup is required, implement ref-counting to re-establish listeners.
}

},
};
},

// Optional methods
async listChildren(executionRef: ExecutionRef) {
const res = await transport.listSessionsBySpawner(gatewayId, executionRef.executionId);
if (!res.ok || !res.result) return [];
const sessions = res.result as Array<{ key: string; label?: string }>;
return sessions.map((s: { key: string; label?: string }) => makeRef(s.key, s.label));
},

async listApprovals() {
// Approvals are streamed via events; this is a placeholder
return [];
},

async resolveApproval(_id: string, _decision: 'allow-once' | 'allow-always' | 'deny') {
// The GatewayTransportPort doesn't have a resolveApproval directly;
// this would need to be added or handled via HTTP
throw new Error('resolveApproval not yet implemented on GatewayTransportPort');
},

async listModels() {
const res = await transport.listModels(gatewayId);
if (!res.ok || !res.result) return [];
const data = res.result as { models?: Array<{ id: string; name?: string; provider?: string }> };
return data.models || [];
},

async listAgents() {
const res = await transport.listAgents(gatewayId);
if (!res.ok || !res.result) return [];
const data = res.result as { agents?: Array<{ id: string; name?: string }> };
return data.agents || [];
},

async deleteExecution(executionRef: ExecutionRef) {
const res = await transport.deleteSession(executionRef.runtimeId, executionRef.executionId);
if (!res.ok) throw new Error(res.error || 'Failed to delete execution');
},
};
}
1 change: 1 addition & 0 deletions packages/shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './types.js';
export * from './gateway-protocol.js';
export * from './constants.js';
export * from './debug.js';
export * from './runtime-adapter.js';
Loading
Loading