Skip to content

Commit 5b610cf

Browse files
authored
refactor(workflow): enhance proxy and step execution handling (#2163)
* refactor(workflow): enhance proxy and step execution handling - Updated the proxy routes to improve client connection management and error handling. - Refactored the StepDetailPanel to integrate a new StepCodeSection for editing step code. - Enhanced the WorkflowStepCard to reflect execution status based on completed steps. - Modified polling logic in usePollingWorkflowExecution to streamline data retrieval. - Bumped bindings package version to 1.0.7 and added completed_steps tracking in the workflow schema. * revert * wip * wip * bump mesh * wips
1 parent ef682aa commit 5b610cf

File tree

9 files changed

+192
-76
lines changed

9 files changed

+192
-76
lines changed

apps/mesh/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@decocms/mesh",
3-
"version": "1.1.0",
3+
"version": "1.1.1",
44
"description": "MCP Mesh - Self-hostable MCP Gateway for managing AI connections and tools",
55
"author": "Deco team",
66
"license": "MIT",

apps/mesh/src/api/routes/proxy.ts

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,30 @@ import { getMonitoringConfig } from "@/core/config";
1717
import { getStableStdioClient } from "@/stdio/stable-transport";
1818
import {
1919
ConnectionEntity,
20-
isStdioParameters,
2120
type HttpConnectionParameters,
21+
isStdioParameters,
2222
} from "@/tools/connection/schema";
2323
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
2424
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
2525
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
2626
import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js";
2727
import {
28-
CallToolRequestSchema,
29-
GetPromptRequestSchema,
30-
ListPromptsRequestSchema,
31-
ListResourcesRequestSchema,
32-
ListResourceTemplatesRequestSchema,
33-
ListToolsRequestSchema,
34-
ReadResourceRequestSchema,
3528
type CallToolRequest,
29+
CallToolRequestSchema,
3630
type CallToolResult,
3731
type GetPromptRequest,
32+
GetPromptRequestSchema,
3833
type GetPromptResult,
34+
ListPromptsRequestSchema,
3935
type ListPromptsResult,
36+
ListResourcesRequestSchema,
4037
type ListResourcesResult,
38+
ListResourceTemplatesRequestSchema,
4139
type ListResourceTemplatesResult,
40+
ListToolsRequestSchema,
4241
type ListToolsResult,
4342
type ReadResourceRequest,
43+
ReadResourceRequestSchema,
4444
type ReadResourceResult,
4545
type Tool,
4646
} from "@modelcontextprotocol/sdk/types.js";
@@ -452,7 +452,7 @@ async function createMCPProxyDoNotUseDirectly(
452452
throw error;
453453
} finally {
454454
// Close client - stdio connections ignore close() via stable-transport
455-
await client.close();
455+
client.close().catch(console.error);
456456
}
457457
},
458458
);
@@ -475,43 +475,76 @@ async function createMCPProxyDoNotUseDirectly(
475475
}
476476

477477
// Fall back to client for connections without indexed tools
478-
const client = await createClient();
479-
return await client.listTools();
478+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
479+
try {
480+
client = await createClient();
481+
return await client.listTools();
482+
} finally {
483+
client?.close().catch(console.error);
484+
}
480485
};
481486

482487
// List resources from downstream connection
483488
const listResources = async (): Promise<ListResourcesResult> => {
484-
const client = await createClient();
485-
return await client.listResources();
489+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
490+
try {
491+
client = await createClient();
492+
return await client.listResources();
493+
} finally {
494+
client?.close().catch(console.error);
495+
}
486496
};
487497

488498
// Read a specific resource from downstream connection
489499
const readResource = async (
490500
params: ReadResourceRequest["params"],
491501
): Promise<ReadResourceResult> => {
492-
const client = await createClient();
493-
return await client.readResource(params);
502+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
503+
try {
504+
client = await createClient();
505+
return await client.readResource(params);
506+
} finally {
507+
client?.close().catch(console.error);
508+
}
494509
};
495510

496511
// List resource templates from downstream connection
497512
const listResourceTemplates =
498513
async (): Promise<ListResourceTemplatesResult> => {
499-
const client = await createClient();
500-
return await client.listResourceTemplates();
514+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
515+
try {
516+
client = await createClient();
517+
return await client.listResourceTemplates();
518+
} finally {
519+
client?.close().catch(console.error);
520+
}
501521
};
502522

503523
// List prompts from downstream connection
504524
const listPrompts = async (): Promise<ListPromptsResult> => {
505-
const client = await createClient();
506-
return await client.listPrompts();
525+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
526+
try {
527+
client = await createClient();
528+
return await client.listPrompts();
529+
} catch (error) {
530+
console.error("[proxy:listPrompts] Error listing prompts:", error);
531+
throw error;
532+
} finally {
533+
client?.close().catch(console.error);
534+
}
507535
};
508536

509537
// Get a specific prompt from downstream connection
510538
const getPrompt = async (
511539
params: GetPromptRequest["params"],
512540
): Promise<GetPromptResult> => {
513-
const client = await createClient();
514-
return await client.getPrompt(params);
541+
let client: Awaited<ReturnType<typeof createClient>> | undefined;
542+
try {
543+
client = await createClient();
544+
return await client.getPrompt(params);
545+
} finally {
546+
client?.close().catch(console.error);
547+
}
515548
};
516549

517550
// Call tool using fetch directly for streaming support

apps/mesh/src/web/components/details/workflow/components/step-detail-panel.tsx

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ import type { JsonSchema } from "@/web/utils/constants";
3030
import { MonacoCodeEditor } from "./monaco-editor";
3131
import type { Step, ToolCallAction } from "@decocms/bindings/workflow";
3232
import { useMcp } from "@/web/hooks/use-mcp";
33-
import { usePollingWorkflowExecution } from "../hooks";
33+
import { useExecutionCompletedStep } from "../hooks";
34+
import { useState } from "react";
3435

3536
interface StepDetailPanelProps {
3637
className?: string;
@@ -101,6 +102,7 @@ export function StepDetailPanel({ className }: StepDetailPanelProps) {
101102
<InputSection step={currentStep} />
102103
<OutputSection step={currentStep} />
103104
<TransformCodeSection step={currentStep} />
105+
<StepCodeSection step={currentStep} />
104106
</div>
105107
);
106108
}
@@ -203,7 +205,7 @@ function InputSection({ step }: { step: Step }) {
203205
const isToolStep = "toolName" in step.action;
204206
const toolName =
205207
isToolStep && "toolName" in step.action ? step.action.toolName : null;
206-
208+
const trackingExecutionId = useTrackingExecutionId();
207209
const { tool } = useGatewayTool(toolName ?? "");
208210

209211
if (!tool || !tool.inputSchema) {
@@ -220,7 +222,7 @@ function InputSection({ step }: { step: Step }) {
220222
<Accordion
221223
type="single"
222224
collapsible
223-
defaultValue="input"
225+
defaultValue={trackingExecutionId ? "output" : "input"}
224226
className="border-b border-border shrink-0"
225227
>
226228
<AccordionItem value="input" className="border-b-0">
@@ -231,6 +233,7 @@ function InputSection({ step }: { step: Step }) {
231233
</AccordionTrigger>
232234
<AccordionContent className="px-5 pt-2">
233235
<ToolInput
236+
key={step.name}
234237
inputSchema={tool.inputSchema as JsonSchema}
235238
inputParams={step.input as Record<string, unknown>}
236239
setInputParams={handleInputChange}
@@ -249,11 +252,11 @@ function InputSection({ step }: { step: Step }) {
249252
function OutputSection({ step }: { step: Step }) {
250253
const outputSchema = step.outputSchema;
251254
const trackingExecutionId = useTrackingExecutionId();
252-
const { step_results } = usePollingWorkflowExecution(trackingExecutionId);
253-
const stepResult = step_results?.find(
254-
(result) => result.step_id === step.name,
255+
const { output, error } = useExecutionCompletedStep(
256+
trackingExecutionId,
257+
step.name,
255258
);
256-
const output = stepResult?.output;
259+
const content = output ? output : error ? { error: error } : null;
257260

258261
// Always show the Output section (even if empty)
259262
const properties =
@@ -283,8 +286,8 @@ function OutputSection({ step }: { step: Step }) {
283286
<div className="text-sm text-muted-foreground italic">
284287
No output schema defined
285288
</div>
286-
) : output ? (
287-
<OutputMonacoEditor output={output} />
289+
) : content ? (
290+
<OutputMonacoEditor output={content} />
288291
) : (
289292
<div className="space-y-2">
290293
{propertyEntries.map(([key, propSchema]) => (
@@ -458,6 +461,10 @@ export default async function(input: Input): Promise<Output> {
458461
});
459462
};
460463

464+
if ("code" in step.action && step.action.code) {
465+
return null;
466+
}
467+
461468
// No transform code → show collapsed with Plus
462469
if (!hasTransformCode) {
463470
return (
@@ -503,6 +510,60 @@ export default async function(input: Input): Promise<Output> {
503510
);
504511
}
505512

513+
function StepCodeSection({ step }: { step: Step }) {
514+
const [isOpen, setIsOpen] = useState(false);
515+
const { updateStep } = useWorkflowActions();
516+
const code =
517+
"code" in step.action && step.action.code ? step.action.code : null;
518+
const trackingExecutionId = useTrackingExecutionId();
519+
const handleCodeSave = (
520+
code: string,
521+
outputSchema: Record<string, unknown> | null,
522+
) => {
523+
updateStep(step.name, {
524+
action: {
525+
...step.action,
526+
code: code,
527+
},
528+
...(outputSchema ? { outputSchema } : {}),
529+
});
530+
};
531+
if (!code) {
532+
return null;
533+
}
534+
return (
535+
<div className="flex-1 flex flex-col min-h-0">
536+
<div
537+
className="p-5 shrink-0 cursor-pointer hover:bg-accent/50 transition-colors"
538+
onClick={() => setIsOpen(!isOpen)}
539+
>
540+
<div className="flex items-center justify-between">
541+
<h3 className="text-sm font-medium text-muted-foreground">
542+
Step Code
543+
</h3>
544+
{isOpen ? (
545+
<Minus size={14} className="text-muted-foreground" />
546+
) : (
547+
<Plus size={14} className="text-muted-foreground" />
548+
)}
549+
</div>
550+
</div>
551+
{isOpen && (
552+
<div className="flex-1 min-h-120 h-full">
553+
<MonacoCodeEditor
554+
onSave={(code, outputSchema) => handleCodeSave(code, outputSchema)}
555+
key={`step-code-${step.name}-${trackingExecutionId}`}
556+
code={code}
557+
language="typescript"
558+
height="100%"
559+
readOnly={trackingExecutionId !== undefined}
560+
/>
561+
</div>
562+
)}
563+
</div>
564+
);
565+
}
566+
506567
// Helper function to convert JSON Schema types to TypeScript types
507568
function jsonSchemaTypeToTS(schema: JsonSchema): string {
508569
if (Array.isArray(schema.type)) {

apps/mesh/src/web/components/details/workflow/components/workflow-step-card.tsx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ export function WorkflowStepCard({
5151
const { item: executionItem } =
5252
usePollingWorkflowExecution(trackingExecutionId);
5353
const executionCompletedAt = executionItem?.completed_at_epoch_ms;
54+
const isCompletedSuccessfully =
55+
executionItem?.completed_steps?.success?.includes(step.name) ?? false;
56+
const isCompletedWithError =
57+
executionItem?.completed_steps?.error?.includes(step.name) ?? false;
5458

5559
const isToolStep = "toolName" in step.action;
5660
const connectionId =
@@ -62,7 +66,11 @@ export function WorkflowStepCard({
6266
const hasToolSelected = Boolean(toolName);
6367
const outputSchemaProperties = getOutputSchemaProperties(step);
6468

65-
const status = executionStatus?.status;
69+
const status = isCompletedSuccessfully
70+
? "success"
71+
: isCompletedWithError
72+
? "error"
73+
: executionStatus?.status;
6674
const isTracking = executionStatus !== undefined;
6775
const hasStatusIndicator = status === "success" || status === "error";
6876

apps/mesh/src/web/components/details/workflow/hooks/derived/use-resolved-refs.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,16 @@ import { useTrackingExecutionId } from "../../stores/workflow";
33

44
export function useResolvedRefs() {
55
const trackingExecutionId = useTrackingExecutionId();
6-
const { step_results, item: executionItem } =
6+
const { item: executionItem } =
77
usePollingWorkflowExecution(trackingExecutionId);
88
const resolvedRefs: Record<string, unknown> | undefined =
9-
trackingExecutionId && step_results
9+
trackingExecutionId && executionItem
1010
? (() => {
1111
const refs: Record<string, unknown> = {};
1212
// Add workflow input as "input"
1313
if (executionItem?.input) {
1414
refs["input"] = executionItem.input;
1515
}
16-
// Add each step's output by step_id
17-
for (const result of step_results) {
18-
if (result.step_id && result.output !== undefined) {
19-
refs[result.step_id as string] = result.output;
20-
}
21-
}
2216
return refs;
2317
})()
2418
: undefined;

0 commit comments

Comments
 (0)