diff --git a/apps/mesh/package.json b/apps/mesh/package.json index c441b379c1..a063fc7fb0 100644 --- a/apps/mesh/package.json +++ b/apps/mesh/package.json @@ -1,6 +1,6 @@ { "name": "@decocms/mesh", - "version": "1.1.0", + "version": "1.1.1", "description": "MCP Mesh - Self-hostable MCP Gateway for managing AI connections and tools", "author": "Deco team", "license": "MIT", diff --git a/apps/mesh/src/api/routes/proxy.ts b/apps/mesh/src/api/routes/proxy.ts index 302afcd927..eb1f4c2520 100644 --- a/apps/mesh/src/api/routes/proxy.ts +++ b/apps/mesh/src/api/routes/proxy.ts @@ -17,30 +17,30 @@ import { getMonitoringConfig } from "@/core/config"; import { getStableStdioClient } from "@/stdio/stable-transport"; import { ConnectionEntity, - isStdioParameters, type HttpConnectionParameters, + isStdioParameters, } from "@/tools/connection/schema"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js"; import { - CallToolRequestSchema, - GetPromptRequestSchema, - ListPromptsRequestSchema, - ListResourcesRequestSchema, - ListResourceTemplatesRequestSchema, - ListToolsRequestSchema, - ReadResourceRequestSchema, type CallToolRequest, + CallToolRequestSchema, type CallToolResult, type GetPromptRequest, + GetPromptRequestSchema, type GetPromptResult, + ListPromptsRequestSchema, type ListPromptsResult, + ListResourcesRequestSchema, type ListResourcesResult, + ListResourceTemplatesRequestSchema, type ListResourceTemplatesResult, + ListToolsRequestSchema, type ListToolsResult, type ReadResourceRequest, + ReadResourceRequestSchema, type ReadResourceResult, type Tool, } from "@modelcontextprotocol/sdk/types.js"; @@ -452,7 +452,7 @@ async function createMCPProxyDoNotUseDirectly( throw error; } finally { // Close client - stdio connections ignore close() via stable-transport - await client.close(); + client.close().catch(console.error); } }, ); @@ -475,43 +475,76 @@ async function createMCPProxyDoNotUseDirectly( } // Fall back to client for connections without indexed tools - const client = await createClient(); - return await client.listTools(); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.listTools(); + } finally { + client?.close().catch(console.error); + } }; // List resources from downstream connection const listResources = async (): Promise => { - const client = await createClient(); - return await client.listResources(); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.listResources(); + } finally { + client?.close().catch(console.error); + } }; // Read a specific resource from downstream connection const readResource = async ( params: ReadResourceRequest["params"], ): Promise => { - const client = await createClient(); - return await client.readResource(params); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.readResource(params); + } finally { + client?.close().catch(console.error); + } }; // List resource templates from downstream connection const listResourceTemplates = async (): Promise => { - const client = await createClient(); - return await client.listResourceTemplates(); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.listResourceTemplates(); + } finally { + client?.close().catch(console.error); + } }; // List prompts from downstream connection const listPrompts = async (): Promise => { - const client = await createClient(); - return await client.listPrompts(); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.listPrompts(); + } catch (error) { + console.error("[proxy:listPrompts] Error listing prompts:", error); + throw error; + } finally { + client?.close().catch(console.error); + } }; // Get a specific prompt from downstream connection const getPrompt = async ( params: GetPromptRequest["params"], ): Promise => { - const client = await createClient(); - return await client.getPrompt(params); + let client: Awaited> | undefined; + try { + client = await createClient(); + return await client.getPrompt(params); + } finally { + client?.close().catch(console.error); + } }; // Call tool using fetch directly for streaming support diff --git a/apps/mesh/src/web/components/details/workflow/components/step-detail-panel.tsx b/apps/mesh/src/web/components/details/workflow/components/step-detail-panel.tsx index 5de5c7e78e..2833a3036d 100644 --- a/apps/mesh/src/web/components/details/workflow/components/step-detail-panel.tsx +++ b/apps/mesh/src/web/components/details/workflow/components/step-detail-panel.tsx @@ -30,7 +30,8 @@ import type { JsonSchema } from "@/web/utils/constants"; import { MonacoCodeEditor } from "./monaco-editor"; import type { Step, ToolCallAction } from "@decocms/bindings/workflow"; import { useMcp } from "@/web/hooks/use-mcp"; -import { usePollingWorkflowExecution } from "../hooks"; +import { useExecutionCompletedStep } from "../hooks"; +import { useState } from "react"; interface StepDetailPanelProps { className?: string; @@ -101,6 +102,7 @@ export function StepDetailPanel({ className }: StepDetailPanelProps) { + ); } @@ -203,7 +205,7 @@ function InputSection({ step }: { step: Step }) { const isToolStep = "toolName" in step.action; const toolName = isToolStep && "toolName" in step.action ? step.action.toolName : null; - + const trackingExecutionId = useTrackingExecutionId(); const { tool } = useGatewayTool(toolName ?? ""); if (!tool || !tool.inputSchema) { @@ -220,7 +222,7 @@ function InputSection({ step }: { step: Step }) { @@ -231,6 +233,7 @@ function InputSection({ step }: { step: Step }) { } setInputParams={handleInputChange} @@ -249,11 +252,11 @@ function InputSection({ step }: { step: Step }) { function OutputSection({ step }: { step: Step }) { const outputSchema = step.outputSchema; const trackingExecutionId = useTrackingExecutionId(); - const { step_results } = usePollingWorkflowExecution(trackingExecutionId); - const stepResult = step_results?.find( - (result) => result.step_id === step.name, + const { output, error } = useExecutionCompletedStep( + trackingExecutionId, + step.name, ); - const output = stepResult?.output; + const content = output ? output : error ? { error: error } : null; // Always show the Output section (even if empty) const properties = @@ -283,8 +286,8 @@ function OutputSection({ step }: { step: Step }) {
No output schema defined
- ) : output ? ( - + ) : content ? ( + ) : (
{propertyEntries.map(([key, propSchema]) => ( @@ -458,6 +461,10 @@ export default async function(input: Input): Promise { }); }; + if ("code" in step.action && step.action.code) { + return null; + } + // No transform code → show collapsed with Plus if (!hasTransformCode) { return ( @@ -503,6 +510,60 @@ export default async function(input: Input): Promise { ); } +function StepCodeSection({ step }: { step: Step }) { + const [isOpen, setIsOpen] = useState(false); + const { updateStep } = useWorkflowActions(); + const code = + "code" in step.action && step.action.code ? step.action.code : null; + const trackingExecutionId = useTrackingExecutionId(); + const handleCodeSave = ( + code: string, + outputSchema: Record | null, + ) => { + updateStep(step.name, { + action: { + ...step.action, + code: code, + }, + ...(outputSchema ? { outputSchema } : {}), + }); + }; + if (!code) { + return null; + } + return ( +
+
setIsOpen(!isOpen)} + > +
+

+ Step Code +

+ {isOpen ? ( + + ) : ( + + )} +
+
+ {isOpen && ( +
+ handleCodeSave(code, outputSchema)} + key={`step-code-${step.name}-${trackingExecutionId}`} + code={code} + language="typescript" + height="100%" + readOnly={trackingExecutionId !== undefined} + /> +
+ )} +
+ ); +} + // Helper function to convert JSON Schema types to TypeScript types function jsonSchemaTypeToTS(schema: JsonSchema): string { if (Array.isArray(schema.type)) { diff --git a/apps/mesh/src/web/components/details/workflow/components/workflow-step-card.tsx b/apps/mesh/src/web/components/details/workflow/components/workflow-step-card.tsx index 4805782a50..fb94e11878 100644 --- a/apps/mesh/src/web/components/details/workflow/components/workflow-step-card.tsx +++ b/apps/mesh/src/web/components/details/workflow/components/workflow-step-card.tsx @@ -51,6 +51,10 @@ export function WorkflowStepCard({ const { item: executionItem } = usePollingWorkflowExecution(trackingExecutionId); const executionCompletedAt = executionItem?.completed_at_epoch_ms; + const isCompletedSuccessfully = + executionItem?.completed_steps?.success?.includes(step.name) ?? false; + const isCompletedWithError = + executionItem?.completed_steps?.error?.includes(step.name) ?? false; const isToolStep = "toolName" in step.action; const connectionId = @@ -62,7 +66,11 @@ export function WorkflowStepCard({ const hasToolSelected = Boolean(toolName); const outputSchemaProperties = getOutputSchemaProperties(step); - const status = executionStatus?.status; + const status = isCompletedSuccessfully + ? "success" + : isCompletedWithError + ? "error" + : executionStatus?.status; const isTracking = executionStatus !== undefined; const hasStatusIndicator = status === "success" || status === "error"; diff --git a/apps/mesh/src/web/components/details/workflow/hooks/derived/use-resolved-refs.ts b/apps/mesh/src/web/components/details/workflow/hooks/derived/use-resolved-refs.ts index df16a91d03..3c8e9cee90 100644 --- a/apps/mesh/src/web/components/details/workflow/hooks/derived/use-resolved-refs.ts +++ b/apps/mesh/src/web/components/details/workflow/hooks/derived/use-resolved-refs.ts @@ -3,22 +3,16 @@ import { useTrackingExecutionId } from "../../stores/workflow"; export function useResolvedRefs() { const trackingExecutionId = useTrackingExecutionId(); - const { step_results, item: executionItem } = + const { item: executionItem } = usePollingWorkflowExecution(trackingExecutionId); const resolvedRefs: Record | undefined = - trackingExecutionId && step_results + trackingExecutionId && executionItem ? (() => { const refs: Record = {}; // Add workflow input as "input" if (executionItem?.input) { refs["input"] = executionItem.input; } - // Add each step's output by step_id - for (const result of step_results) { - if (result.step_id && result.output !== undefined) { - refs[result.step_id as string] = result.output; - } - } return refs; })() : undefined; diff --git a/apps/mesh/src/web/components/details/workflow/hooks/derived/use-step-execution-status.ts b/apps/mesh/src/web/components/details/workflow/hooks/derived/use-step-execution-status.ts index 1512765057..2e80170c70 100644 --- a/apps/mesh/src/web/components/details/workflow/hooks/derived/use-step-execution-status.ts +++ b/apps/mesh/src/web/components/details/workflow/hooks/derived/use-step-execution-status.ts @@ -15,8 +15,8 @@ export interface StepExecutionStatus { /** * Returns execution status for all steps when tracking an execution. * Determines step status based on: - * - If step has output in step_results -> success - * - If step has error in step_results -> error + * - If step name is in completed_steps.success -> success + * - If step name is in completed_steps.error -> error * - If execution is running and step is next in line -> running * - Otherwise -> pending */ @@ -25,7 +25,7 @@ export function useStepExecutionStatuses(): | undefined { const trackingExecutionId = useTrackingExecutionId(); const steps = useWorkflowSteps(); - const { step_results, item: executionItem } = + const { item: executionItem } = usePollingWorkflowExecution(trackingExecutionId); if (!trackingExecutionId || !executionItem) { @@ -34,42 +34,26 @@ export function useStepExecutionStatuses(): const statuses: Record = {}; - // Build a map of step results by step_id - const resultsByStepId = new Map< - string, - { output?: unknown; error?: unknown } - >(); - if (step_results) { - for (const result of step_results) { - const stepId = result.step_id as string | undefined; - if (stepId) { - resultsByStepId.set(stepId, { - output: result.output, - error: result.error, - }); - } - } - } + // Get completed step names from the execution + const successSteps = new Set(executionItem.completed_steps?.success ?? []); + const errorSteps = new Set(executionItem.completed_steps?.error ?? []); // Find the last completed step index to determine which step is currently running let lastCompletedIndex = -1; steps.forEach((step, index) => { - const result = resultsByStepId.get(step.name); - if (result?.output !== undefined || result?.error !== undefined) { + if (successSteps.has(step.name) || errorSteps.has(step.name)) { lastCompletedIndex = index; } }); // Determine status for each step steps.forEach((step, index) => { - const result = resultsByStepId.get(step.name); - let status: StepExecutionStatus["status"] = "pending"; - if (result?.error !== undefined) { + if (errorSteps.has(step.name)) { status = "error"; - } else if (result?.output !== undefined) { + } else if (successSteps.has(step.name)) { status = "success"; } else if ( executionItem.status === "running" && @@ -81,8 +65,9 @@ export function useStepExecutionStatuses(): statuses[step.name] = { status, - output: result?.output, - error: typeof result?.error === "string" ? result.error : undefined, + // Note: output/error details need to be fetched separately via useExecutionCompletedStep + output: undefined, + error: undefined, stepIndex: index, }; }); diff --git a/apps/mesh/src/web/components/details/workflow/hooks/queries/use-workflow-collection-item.ts b/apps/mesh/src/web/components/details/workflow/hooks/queries/use-workflow-collection-item.ts index 13b3cb7a04..ecafae4cfa 100644 --- a/apps/mesh/src/web/components/details/workflow/hooks/queries/use-workflow-collection-item.ts +++ b/apps/mesh/src/web/components/details/workflow/hooks/queries/use-workflow-collection-item.ts @@ -6,10 +6,9 @@ import { useToolCallQuery } from "@/web/hooks/use-tool-call"; type ExecutionQueryResult = { item: WorkflowExecution | null; - step_results: Record | null; }; -const POLLING_INTERVALS = [1, 500, 1000, 2000, 3000]; +const POLLING_INTERVALS = [1, 1000, 2000, 3000, 5000, 10000]; export function usePollingWorkflowExecution(executionId?: string) { const connection = useWorkflowBindingConnection(); @@ -44,11 +43,41 @@ export function usePollingWorkflowExecution(executionId?: string) { return { item: data?.item, - step_results: data?.step_results, isLoading, } as { item: WorkflowExecution | null; - step_results: Record[] | null; + isLoading: boolean; + }; +} + +export function useExecutionCompletedStep( + executionId?: string, + stepName?: string, +) { + const connection = useWorkflowBindingConnection(); + const toolCaller = createToolCaller(connection.id); + + const { data, isLoading } = useToolCallQuery< + { executionId: string | undefined; stepId: string | undefined }, + { output: unknown | null; error: string | null } + >({ + toolCaller: toolCaller, + toolName: "COLLECTION_WORKFLOW_EXECUTION_GET_STEP_RESULT", + toolInputParams: { + executionId: executionId, + stepId: stepName, + }, + scope: connection.id, + enabled: !!executionId && !!stepName, + }); + + return { + output: data?.output, + error: data?.error, + isLoading, + } as { + output: unknown | null; + error: string | null; isLoading: boolean; }; } diff --git a/packages/bindings/package.json b/packages/bindings/package.json index e7247ea7e4..a482ac2173 100644 --- a/packages/bindings/package.json +++ b/packages/bindings/package.json @@ -1,6 +1,6 @@ { "name": "@decocms/bindings", - "version": "1.0.6", + "version": "1.0.7", "type": "module", "scripts": { "check": "tsc --noEmit", diff --git a/packages/bindings/src/well-known/workflow.ts b/packages/bindings/src/well-known/workflow.ts index d0a2e8bd2f..c63d3f6b61 100644 --- a/packages/bindings/src/well-known/workflow.ts +++ b/packages/bindings/src/well-known/workflow.ts @@ -164,10 +164,7 @@ export type WorkflowExecutionStatus = z.infer< * Includes lock columns and retry tracking. */ export const WorkflowExecutionSchema = BaseCollectionEntitySchema.extend({ - steps: z - .array(StepSchema) - .describe("Steps that make up the workflow") - .describe("Workflow that was executed"), + steps: z.array(StepSchema).describe("Steps that make up the workflow"), gateway_id: z .string() .describe("ID of the gateway that will be used to execute the workflow"), @@ -203,6 +200,15 @@ export const WorkflowExecutionSchema = BaseCollectionEntitySchema.extend({ error: z .unknown() .describe("Error that occurred during the workflow execution"), + completed_steps: z + .object({ + success: z + .array(z.string()) + .describe("Names of the steps that were completed successfully"), + error: z.array(z.string()).describe("Names of the steps that errored"), + }) + .optional() + .describe("Names of the steps that were completed and their status"), }); export type WorkflowExecution = z.infer;