diff --git a/mcp-studio/README.md b/mcp-studio/README.md index 5431bf88..752b1d12 100644 --- a/mcp-studio/README.md +++ b/mcp-studio/README.md @@ -1,3 +1,493 @@ # MCP Studio -Your MCP server description goes here. +MCP server for managing workflows, executions, assistants, and prompts. Supports both HTTP and stdio transports. + +## Features + +- **Workflows**: Create, update, delete, and list workflow definitions +- **Executions**: View and manage workflow execution history +- **Assistants**: Manage AI assistant configurations +- **Prompts**: Store and retrieve prompt templates + +## Workflow Scripting Language + +MCP Studio provides a declarative JSON-based language for defining multi-step tool call workflows. Workflows automatically handle dependency resolution, parallel execution, and data flow between steps. + +### Core Concepts + +#### Steps + +A workflow is a sequence of **steps**. Each step has: + +- `name`: Unique identifier (other steps reference it as `@name.field`) +- `action`: What the step does (tool call, code transform, or wait for signal) +- `input`: Data passed to the action (can include `@ref` references) +- `outputSchema`: Optional JSON Schema for output validation +- `config`: Optional retry/timeout settings + +```json +{ + "name": "fetch_user", + "action": { "toolName": "GET_USER" }, + "input": { "userId": "@input.user_id" } +} +``` + +#### Automatic Parallel Execution + +Steps run **in parallel** unless they reference each other. The execution order is auto-determined from `@ref` dependencies: + +```json +{ + "title": "Parallel Fetch", + "steps": [ + { "name": "fetch_users", "action": { "toolName": "GET_USERS" } }, + { "name": "fetch_orders", "action": { "toolName": "GET_ORDERS" } }, + { + "name": "merge", + "action": { "code": "..." }, + "input": { + "users": "@fetch_users.data", + "orders": "@fetch_orders.data" + } + } + ] +} +``` + +In this example: +- `fetch_users` and `fetch_orders` run **in parallel** (no dependencies) +- `merge` waits for **both** to complete (references both via `@ref`) + +### The `@ref` Syntax + +The `@ref` syntax wires data between steps: + +| Reference | Description | +|-----------|-------------| +| `@input.field` | Workflow input data | +| `@stepName.field` | Output from a previous step | +| `@stepName.nested.path` | Nested path into step output | +| `@item` | Current item in forEach loop | +| `@index` | Current index in forEach loop | + +#### Examples + +```json +// Direct reference - entire value +{ "user": "@fetch_user" } + +// Nested path +{ "userName": "@fetch_user.profile.name" } + +// String interpolation +{ "message": "Hello @fetch_user.name, your order @fetch_order.id is ready" } + +// Array access +{ "firstItem": "@fetch_list.items.0" } +``` + +### Action Types + +#### 1. Tool Call Action + +Invokes an MCP tool through the configured gateway: + +```json +{ + "name": "get_weather", + "action": { + "toolName": "WEATHER_GET_FORECAST" + }, + "input": { + "city": "@input.city", + "units": "celsius" + } +} +``` + +With optional result transformation: + +```json +{ + "name": "get_weather", + "action": { + "toolName": "WEATHER_GET_FORECAST", + "transformCode": "interface Output { temp: number } export default function(input) { return { temp: input.temperature.current } }" + }, + "input": { "city": "@input.city" } +} +``` + +#### 2. Code Action + +Pure TypeScript for data transformation. Runs in a sandboxed QuickJS environment: + +```json +{ + "name": "merge_data", + "action": { + "code": "interface Input { users: User[]; orders: Order[] } interface Output { combined: Array<{ user: User; orderCount: number }> } export default function(input: Input): Output { return { combined: input.users.map(u => ({ user: u, orderCount: input.orders.filter(o => o.userId === u.id).length })) } }" + }, + "input": { + "users": "@fetch_users.data", + "orders": "@fetch_orders.data" + } +} +``` + +Code requirements: +- Must export a `default` function +- Optionally declare `Input` and `Output` interfaces for type extraction +- Runs in isolated sandbox (no network, filesystem, or non-deterministic APIs) + +#### 3. Wait for Signal Action (Human-in-the-Loop) + +Pauses execution until an external signal is received: + +```json +{ + "name": "await_approval", + "action": { + "signalName": "approval" + }, + "config": { + "timeoutMs": 86400000 + } +} +``` + +Use `SEND_SIGNAL` tool to resume: +```json +{ "executionId": "...", "signalName": "approval", "payload": { "approved": true } } +``` + +### Step Configuration + +Optional retry and timeout settings: + +```json +{ + "name": "flaky_api_call", + "action": { "toolName": "EXTERNAL_API" }, + "config": { + "maxAttempts": 3, + "backoffMs": 1000, + "timeoutMs": 30000 + } +} +``` + +| Option | Default | Description | +|--------|---------|-------------| +| `maxAttempts` | 1 | Maximum retry attempts on failure | +| `backoffMs` | - | Initial delay between retries (doubles each attempt) | +| `timeoutMs` | 30000 | Maximum execution time before timeout | + +### Output Schema + +Define expected output structure with JSON Schema: + +```json +{ + "name": "extract_info", + "action": { "toolName": "LLM_EXTRACT" }, + "outputSchema": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "age": { "type": "number" } + }, + "required": ["name"] + } +} +``` + +When `outputSchema` is provided (without `transformCode`), the step output is automatically filtered to only include properties defined in the schema. + +### Complete Workflow Example + +```json +{ + "id": "enrich-contact", + "title": "Enrich Contact Information", + "description": "Fetches contact data from multiple sources and merges into unified profile", + "steps": [ + { + "name": "lookup_email", + "description": "Find email by name", + "action": { "toolName": "CLEARBIT_LOOKUP" }, + "input": { "name": "@input.contact_name" } + }, + { + "name": "lookup_linkedin", + "description": "Find LinkedIn profile", + "action": { "toolName": "LINKEDIN_SEARCH" }, + "input": { "query": "@input.contact_name @input.company" } + }, + { + "name": "get_company_info", + "description": "Fetch company details", + "action": { "toolName": "CRUNCHBASE_COMPANY" }, + "input": { "name": "@input.company" } + }, + { + "name": "merge_profile", + "description": "Combine all data sources", + "action": { + "code": "interface Input { email: { address: string }; linkedin: { url: string; title: string }; company: { size: string; funding: string } } interface Output { profile: { email: string; linkedinUrl: string; title: string; companySize: string; funding: string } } export default function(input: Input): Output { return { profile: { email: input.email.address, linkedinUrl: input.linkedin.url, title: input.linkedin.title, companySize: input.company.size, funding: input.company.funding } } }" + }, + "input": { + "email": "@lookup_email", + "linkedin": "@lookup_linkedin", + "company": "@get_company_info" + }, + "outputSchema": { + "type": "object", + "properties": { + "profile": { + "type": "object", + "properties": { + "email": { "type": "string" }, + "linkedinUrl": { "type": "string" }, + "title": { "type": "string" }, + "companySize": { "type": "string" }, + "funding": { "type": "string" } + } + } + } + } + } + ] +} +``` + +Execution flow: +1. `lookup_email`, `lookup_linkedin`, and `get_company_info` run **in parallel** +2. `merge_profile` waits for all three, then combines results + +### DAG Visualization + +The workflow engine builds a **Directed Acyclic Graph (DAG)** from step dependencies. Functions available for visualization: + +- `computeStepLevels(steps)` - Get execution level for each step +- `groupStepsByLevel(steps)` - Group steps by parallel execution level +- `buildDependencyEdges(steps)` - Get `[from, to]` edges for graph visualization +- `validateNoCycles(steps)` - Check for circular dependencies + +--- + +## Filesystem Workflow Loading + +Load workflows from JSON files instead of (or alongside) the database. This enables version-controlled workflows, MCP packaging, and database-free operation. + +### Configuration + +```bash +# Set workflow directory (scans recursively for *.json files) +WORKFLOW_DIR=/path/to/workflows bun run stdio + +# Or specify individual files +WORKFLOW_FILES=./workflows/enrich.json,./workflows/notify.json bun run stdio + +# Combine with database (workflows from both sources are merged) +WORKFLOW_DIR=/path/to/workflows DATABASE_URL=... bun run stdio +``` + +### Directory Structure + +``` +workflows/ +├── enrich-contact.json # Single workflow file +├── notify-team.workflow.json # Alternative naming convention +└── my-mcp/ # MCPs can package workflows + ├── workflow-a.json + └── bundled.json # Can contain multiple workflows +``` + +### File Formats + +**Single workflow:** +```json +{ + "id": "enrich-contact", + "title": "Enrich Contact", + "description": "Fetch and merge contact data", + "steps": [ + { "name": "lookup", "action": { "toolName": "LOOKUP_CONTACT" } } + ] +} +``` + +**Multiple workflows in one file:** +```json +{ + "workflows": [ + { "id": "workflow-a", "title": "...", "steps": [...] }, + { "id": "workflow-b", "title": "...", "steps": [...] } + ] +} +``` + +**Array format:** +```json +[ + { "id": "workflow-a", "title": "...", "steps": [...] }, + { "id": "workflow-b", "title": "...", "steps": [...] } +] +``` + +### Filesystem-Specific Tools + +When filesystem mode is enabled, additional tools become available: + +- `WORKFLOW_RELOAD` - Reload all workflows from disk (after editing files) +- `WORKFLOW_SOURCE_INFO` - Show where workflows are loaded from + +### Hot Reload + +When `WORKFLOW_DIR` is set, file changes are automatically detected and workflows are reloaded. Edit a JSON file and the changes are immediately available. + +### Source Filtering + +The `COLLECTION_WORKFLOW_LIST` tool accepts a `source` parameter: + +```json +{ "source": "filesystem" } // Only filesystem workflows +{ "source": "database" } // Only database workflows +{ "source": "all" } // Both (default) +``` + +Each workflow in the response includes `_source: "filesystem" | "database"` to identify its origin. + +### Use Cases + +1. **Version Control**: Store workflows in git alongside code +2. **MCP Packaging**: MCPs can ship pre-built workflows in their package +3. **Local Development**: Edit JSON files with hot-reload +4. **Database-Free**: Run without PostgreSQL for simple setups +5. **CI/CD**: Deploy workflows from repository as code + +### Example: MCP with Bundled Workflows + +An MCP package can include workflows that are automatically available: + +``` +my-mcp/ +├── package.json +├── src/ +│ └── index.ts +└── workflows/ + ├── enrich.json + └── notify.json +``` + +Start with: `WORKFLOW_DIR=./workflows bun run src/index.ts` + +--- + +## Usage + +### HTTP Transport (Mesh Web Connection) + +```bash +# Development with hot reload +bun run dev + +# Production +bun run build:server +node dist/server/main.js +``` + +### Stdio Transport (Mesh Custom Command) + +```bash +# Run directly +bun run stdio + +# Development with hot reload +bun run dev:stdio +``` + +#### Adding to Mesh as Custom Command + +In Mesh, add a new custom command connection: + +1. Go to **MCPs** → **Add MCP** → **Custom Command** +2. Configure the command: + - **Command**: `bun` + - **Args**: `--watch /path/to/mcp-studio/server/stdio.ts` +3. Click **Save** - Mesh will spawn the process and fetch the tools +4. Go to the **Settings** tab to configure the database binding +5. Select a PostgreSQL connection from the **Database** dropdown +6. Click **Save Changes** + +This enables: +- Live reloading during development +- Mesh bindings UI for database configuration (same dropdowns as HTTP connections) +- Automatic migrations when bindings are configured + +## Bindings + +The stdio transport supports Mesh bindings via: + +- `MCP_CONFIGURATION` - Returns the state schema for the bindings UI (uses `BindingOf` format) +- `ON_MCP_CONFIGURATION` - Receives configured bindings (connection IDs) and runs migrations + +The binding schema uses the same format as HTTP mode: +```typescript +const StdioStateSchema = z.object({ + DATABASE: BindingOf("@deco/postgres").describe("PostgreSQL database binding"), +}); +``` + +This renders as a dropdown in Mesh UI showing all connections that implement `@deco/postgres`. + +## Environment Variables + +| Variable | Required | Description | +|----------|----------|-------------| +| `DATABASE_URL` | No* | PostgreSQL connection string (e.g., from Supabase project settings) | +| `WORKFLOW_DIR` | No | Directory to scan for workflow JSON files (recursive) | +| `WORKFLOW_FILES` | No | Comma-separated list of specific workflow JSON file paths | + +*`DATABASE_URL` is required unless using filesystem mode (`WORKFLOW_DIR` or `WORKFLOW_FILES`). + +**Note**: STDIO connections receive binding configuration (connection IDs) from Mesh, but still require `DATABASE_URL` env var for actual database connectivity. This is because STDIO processes cannot proxy database calls through Mesh. + +## Available Tools + +### Workflow Collection +- `COLLECTION_WORKFLOW_LIST` - List all workflows +- `COLLECTION_WORKFLOW_GET` - Get a single workflow by ID +- `COLLECTION_WORKFLOW_CREATE` - Create a new workflow +- `COLLECTION_WORKFLOW_UPDATE` - Update an existing workflow +- `COLLECTION_WORKFLOW_DELETE` - Delete a workflow + +### Execution Collection +- `COLLECTION_WORKFLOW_EXECUTION_LIST` - List workflow executions +- `COLLECTION_WORKFLOW_EXECUTION_GET` - Get execution details with step results + +### Assistant Collection +- `COLLECTION_ASSISTANT_LIST` - List all assistants +- `COLLECTION_ASSISTANT_GET` - Get a single assistant by ID +- `COLLECTION_ASSISTANT_CREATE` - Create a new assistant +- `COLLECTION_ASSISTANT_UPDATE` - Update an existing assistant +- `COLLECTION_ASSISTANT_DELETE` - Delete an assistant + +### Prompt Collection +- `COLLECTION_PROMPT_LIST` - List all prompts +- `COLLECTION_PROMPT_GET` - Get a single prompt by ID + +## Development + +```bash +# Install dependencies +bun install + +# Type check +bun run check + +# Format code +bun run fmt +``` diff --git a/mcp-studio/package.json b/mcp-studio/package.json index 5607d721..a8d6dc14 100644 --- a/mcp-studio/package.json +++ b/mcp-studio/package.json @@ -6,6 +6,8 @@ "type": "module", "scripts": { "dev": "bun run --hot server/main.ts", + "dev:stdio": "bun --watch server/stdio.ts", + "stdio": "bun server/stdio.ts", "configure": "deco configure", "gen": "deco gen --output=shared/deco.gen.ts", "check": "tsc --noEmit", @@ -15,6 +17,7 @@ }, "dependencies": { "@ai-sdk/mcp": "^1.0.1", + "postgres": "^3.4.5", "@decocms/bindings": "^1.0.3", "@decocms/runtime": "^1.0.3", "@jitl/quickjs-wasmfile-release-sync": "^0.31.0", @@ -41,7 +44,8 @@ "tailwind-merge": "^3.0.2", "tailwindcss": "^4.0.6", "tailwindcss-animate": "^1.0.7", - "zod": "^3.24.3" + "zod": "^3.24.3", + "zod-to-json-schema": "^3.24.5" }, "devDependencies": { "deco-cli": "^0.28.0", diff --git a/mcp-studio/server/stdio-tools.ts b/mcp-studio/server/stdio-tools.ts new file mode 100644 index 00000000..fda80a94 --- /dev/null +++ b/mcp-studio/server/stdio-tools.ts @@ -0,0 +1,1355 @@ +/** + * MCP Studio - Stdio Tool Registration + * + * Adapts the runtime-based tools for standalone stdio transport. + * Uses Mesh bindings to connect to database via Mesh's proxy API. + * + * Supports Mesh bindings via: + * - MCP_CONFIGURATION: Returns the state schema for the bindings UI + * - ON_MCP_CONFIGURATION: Receives configured bindings, mesh token, and mesh URL + * + * When bindings are configured, calls Mesh's API to run SQL queries. + * The mesh token provides authentication and the binding's connection ID + * routes the query to the correct database. + */ + +import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import { z } from "zod"; +import zodToJsonSchema from "zod-to-json-schema"; +import { + isFilesystemMode, + loadWorkflows, + getCachedWorkflows, + getWorkflowById, + startWatching, + getWorkflowSource, + type LoadedWorkflow, +} from "./workflow-loader.ts"; + +// ============================================================================ +// Configuration State (Bindings) +// ============================================================================ + +/** + * Creates a binding schema compatible with Mesh UI. + * This produces the same format as @decocms/runtime's BindingOf. + */ +const BindingOf = (bindingType: string) => + z.object({ + __type: z.literal(bindingType).default(bindingType), + value: z.string().describe("Connection ID"), + }); + +/** + * State schema for stdio mode bindings. + * Matches HTTP mode's StateSchema for UI parity. + */ +const StdioStateSchema = z.object({ + DATABASE: BindingOf("@deco/postgres").describe("PostgreSQL database binding"), + EVENT_BUS: BindingOf("@deco/event-bus").describe( + "Event bus for workflow events", + ), + CONNECTION: BindingOf("@deco/connection").describe("Connection management"), +}); + +// ============================================================================ +// Mesh Configuration (from ON_MCP_CONFIGURATION) +// ============================================================================ + +interface MeshConfig { + meshUrl: string; + meshToken: string; + databaseConnectionId: string; +} + +let meshConfig: MeshConfig | null = null; +let migrationsRan = false; + +// ============================================================================ +// Database Connection via Mesh API +// ============================================================================ + +/** + * Call a tool on a Mesh connection via the proxy API. + * This allows STDIO MCPs to use bindings just like HTTP MCPs. + */ +async function callMeshTool( + connectionId: string, + toolName: string, + args: Record, +): Promise { + if (!meshConfig) { + throw new Error( + "Database not configured. Configure bindings in Mesh UI first.", + ); + } + + const endpoint = `${meshConfig.meshUrl}/mcp/${connectionId}`; + + const response = await fetch(endpoint, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + Authorization: `Bearer ${meshConfig.meshToken}`, + }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: Date.now(), + method: "tools/call", + params: { + name: toolName, + arguments: args, + }, + }), + }); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Mesh API error (${response.status}): ${text}`); + } + + // Handle both JSON and SSE responses + const contentType = response.headers.get("Content-Type") || ""; + let json: { + result?: { structuredContent?: T; content?: { text: string }[] }; + error?: { message: string }; + }; + + if (contentType.includes("text/event-stream")) { + // Parse SSE response - extract JSON from data lines + const text = await response.text(); + const lines = text.split("\n"); + const dataLines = lines.filter((line) => line.startsWith("data: ")); + const lastData = dataLines[dataLines.length - 1]; + if (!lastData) { + throw new Error("Empty SSE response from Mesh API"); + } + json = JSON.parse(lastData.slice(6)); // Remove "data: " prefix + } else { + json = await response.json(); + } + + if (json.error) { + throw new Error(`Mesh tool error: ${json.error.message}`); + } + + return (json.result?.structuredContent ?? + JSON.parse(json.result?.content?.[0]?.text ?? "null")) as T; +} + +/** + * Run SQL query via Mesh's database binding proxy. + * Uses DATABASES_RUN_SQL tool on the configured database connection. + */ +async function runSQL( + query: string, + params: unknown[] = [], +): Promise { + if (!meshConfig) { + throw new Error( + "Database not configured. Configure bindings in Mesh UI first.", + ); + } + + const result = await callMeshTool<{ + result: { results?: T[] }[]; + }>(meshConfig.databaseConnectionId, "DATABASES_RUN_SQL", { + sql: query, + params, + }); + + return result.result?.[0]?.results ?? []; +} + +// ============================================================================ +// Database Migrations +// ============================================================================ + +/** + * Run migrations to ensure all tables exist. + * This mirrors the `configuration.onChange` behavior from HTTP mode. + */ +async function runMigrations(): Promise { + console.error("[mcp-studio] Running migrations..."); + + // workflow_collection table + await runSQL(` + CREATE TABLE IF NOT EXISTS workflow_collection ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + input JSONB, + gateway_id TEXT NOT NULL, + description TEXT, + steps JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT, + updated_by TEXT + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_workflow_collection_created_at ON workflow_collection(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_collection_updated_at ON workflow_collection(updated_at DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_collection_title ON workflow_collection(title); + `); + + // workflow table + await runSQL(` + CREATE TABLE IF NOT EXISTS workflow ( + id TEXT PRIMARY KEY, + workflow_collection_id TEXT, + steps JSONB NOT NULL DEFAULT '{}', + input JSONB, + gateway_id TEXT NOT NULL, + created_at_epoch_ms BIGINT NOT NULL, + created_by TEXT + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_workflow_created_at_epoch ON workflow(created_at_epoch_ms DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_collection_id ON workflow(workflow_collection_id); + CREATE INDEX IF NOT EXISTS idx_workflow_gateway_id ON workflow(gateway_id); + `); + + // workflow_execution table + await runSQL(` + CREATE TABLE IF NOT EXISTS workflow_execution ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('enqueued', 'cancelled', 'success', 'error', 'running')), + input JSONB, + output JSONB, + created_at BIGINT NOT NULL DEFAULT (EXTRACT(EPOCH FROM now())*1000)::bigint, + updated_at BIGINT NOT NULL DEFAULT (EXTRACT(EPOCH FROM now())*1000)::bigint, + start_at_epoch_ms BIGINT, + started_at_epoch_ms BIGINT, + completed_at_epoch_ms BIGINT, + timeout_ms BIGINT, + deadline_at_epoch_ms BIGINT, + error JSONB, + created_by TEXT + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_workflow_execution_status ON workflow_execution(status); + CREATE INDEX IF NOT EXISTS idx_workflow_execution_created_at ON workflow_execution(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_execution_start_at ON workflow_execution(start_at_epoch_ms); + `); + + // workflow_execution_step_result table + await runSQL(` + CREATE TABLE IF NOT EXISTS workflow_execution_step_result ( + execution_id TEXT NOT NULL, + step_id TEXT NOT NULL, + started_at_epoch_ms BIGINT, + completed_at_epoch_ms BIGINT, + output JSONB, + error JSONB, + PRIMARY KEY (execution_id, step_id), + FOREIGN KEY (execution_id) REFERENCES workflow_execution(id) + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_workflow_execution_step_result_execution ON workflow_execution_step_result(execution_id); + CREATE INDEX IF NOT EXISTS idx_workflow_execution_step_result_started ON workflow_execution_step_result(started_at_epoch_ms DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_execution_step_result_completed ON workflow_execution_step_result(completed_at_epoch_ms DESC); + `); + + // assistants table + await runSQL(` + CREATE TABLE IF NOT EXISTS assistants ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT, + updated_by TEXT, + description TEXT NOT NULL, + instructions TEXT NOT NULL, + tool_set JSONB NOT NULL DEFAULT '{}', + avatar TEXT NOT NULL DEFAULT '', + system_prompt TEXT NOT NULL DEFAULT '', + gateway_id TEXT NOT NULL DEFAULT '', + model JSONB NOT NULL DEFAULT '{"id":"","connectionId":""}'::jsonb + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_assistants_created_at ON assistants(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_assistants_updated_at ON assistants(updated_at DESC); + CREATE INDEX IF NOT EXISTS idx_assistants_title ON assistants(title); + `); + + // prompts table + await runSQL(` + CREATE TABLE IF NOT EXISTS prompts ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT, + updated_by TEXT, + description TEXT, + arguments JSONB NOT NULL DEFAULT '[]', + icons JSONB NOT NULL DEFAULT '[]', + messages JSONB NOT NULL DEFAULT '[]' + ) + `); + + await runSQL(` + CREATE INDEX IF NOT EXISTS idx_prompts_created_at ON prompts(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_prompts_updated_at ON prompts(updated_at DESC); + CREATE INDEX IF NOT EXISTS idx_prompts_title ON prompts(title); + `); + + console.error("[mcp-studio] Migrations complete"); +} + +// ============================================================================ +// Tool Logging +// ============================================================================ + +function logTool(name: string, args: Record) { + const argStr = Object.entries(args) + .map(([k, v]) => `${k}=${JSON.stringify(v)?.slice(0, 50)}`) + .join(" "); + console.error(`[mcp-studio] ${name}${argStr ? ` ${argStr}` : ""}`); +} + +function withLogging>( + toolName: string, + handler: (args: T) => Promise, +): (args: T) => Promise { + return async (args: T) => { + logTool(toolName, args as Record); + return handler(args); + }; +} + +// ============================================================================ +// Tool Registration +// ============================================================================ + +export async function registerStdioTools(server: McpServer): Promise { + // ========================================================================= + // Initialize Filesystem Workflow Loading (if configured) + // ========================================================================= + + const filesystemMode = isFilesystemMode(); + if (filesystemMode) { + console.error("[mcp-studio] Filesystem workflow mode enabled"); + await loadWorkflows(); + + // Start watching for changes + const source = getWorkflowSource(); + if (source.workflowDir) { + await startWatching({ + ...source, + watch: true, + onChange: (workflows) => { + console.error( + `[mcp-studio] Workflows reloaded: ${workflows.length} workflow(s)`, + ); + }, + }); + } + } + + // ========================================================================= + // MCP Configuration Tools (for Mesh bindings UI) + // ========================================================================= + + server.registerTool( + "MCP_CONFIGURATION", + { + title: "MCP Configuration", + description: + "Returns the configuration schema for this MCP server. Used by Mesh to show the bindings UI.", + inputSchema: {}, + annotations: { readOnlyHint: true }, + }, + withLogging("MCP_CONFIGURATION", async () => { + const stateSchema = zodToJsonSchema(StdioStateSchema, { + $refStrategy: "none", + }); + + const result = { + stateSchema, + scopes: [ + "DATABASE::DATABASES_RUN_SQL", + "EVENT_BUS::*", + "CONNECTION::*", + ], + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + // Binding schema for ON_MCP_CONFIGURATION input + const BindingInputSchema = z + .object({ + __type: z.string(), + value: z.string(), + }) + .optional(); + + server.registerTool( + "ON_MCP_CONFIGURATION", + { + title: "On MCP Configuration", + description: + "Called by Mesh when the user saves binding configuration. Applies the configured state and mesh credentials.", + inputSchema: { + state: z + .object({ + DATABASE: BindingInputSchema, + EVENT_BUS: BindingInputSchema, + CONNECTION: BindingInputSchema, + }) + .passthrough() + .describe("The configured state from the bindings UI"), + scopes: z.array(z.string()).describe("List of authorized scopes"), + // Mesh credentials for STDIO connections to call back to Mesh API + meshToken: z + .string() + .optional() + .describe("JWT token for authenticating with Mesh API"), + meshUrl: z + .string() + .optional() + .describe("Base URL of the Mesh instance"), + }, + annotations: { readOnlyHint: false }, + }, + withLogging("ON_MCP_CONFIGURATION", async (args) => { + console.error("[mcp-studio] Received configuration"); + + const state = args.state || {}; + const databaseConnectionId = state.DATABASE?.value; + + // Store mesh configuration if provided + if (args.meshToken && args.meshUrl && databaseConnectionId) { + meshConfig = { + meshToken: args.meshToken, + meshUrl: args.meshUrl, + databaseConnectionId, + }; + console.error( + `[mcp-studio] Mesh binding configured: ${args.meshUrl} -> ${databaseConnectionId}`, + ); + + // Run migrations via Mesh API + if (!migrationsRan) { + try { + await runMigrations(); + migrationsRan = true; + console.error("[mcp-studio] Migrations completed via Mesh API"); + } catch (error) { + console.error("[mcp-studio] Migration error:", error); + } + } + } else if (databaseConnectionId) { + console.error( + `[mcp-studio] Database binding configured to: ${databaseConnectionId}`, + ); + console.error( + "[mcp-studio] Warning: No meshToken/meshUrl provided - database operations will fail", + ); + } + + if (state.EVENT_BUS?.value) { + console.error( + `[mcp-studio] Event bus binding: ${state.EVENT_BUS.value}`, + ); + } + if (state.CONNECTION?.value) { + console.error( + `[mcp-studio] Connection binding: ${state.CONNECTION.value}`, + ); + } + + const result = { success: true, configured: !!meshConfig }; + return { + content: [{ type: "text", text: JSON.stringify(result) }], + structuredContent: result, + }; + }), + ); + + // ========================================================================= + // Workflow Collection Tools + // ========================================================================= + + server.registerTool( + "COLLECTION_WORKFLOW_LIST", + { + title: "List Workflows", + description: "List all workflows with optional pagination", + inputSchema: { + limit: z.number().default(50), + offset: z.number().default(0), + source: z + .enum(["all", "filesystem", "database"]) + .default("all") + .describe( + "Filter by source: all (both), filesystem (from files), database (from PostgreSQL)", + ), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_WORKFLOW_LIST", async (args) => { + const includeFilesystem = + args.source === "all" || args.source === "filesystem"; + const includeDatabase = + args.source === "all" || args.source === "database"; + + let allItems: Record[] = []; + + // Get filesystem workflows + if (includeFilesystem && filesystemMode) { + const fsWorkflows = getCachedWorkflows().map((w) => ({ + ...w, + _source: "filesystem", + })); + allItems.push(...fsWorkflows); + } + + // Get database workflows (only if we have mesh config) + if (includeDatabase && meshConfig) { + try { + const dbItems = await runSQL>( + "SELECT * FROM workflow_collection ORDER BY updated_at DESC", + [], + ); + const transformed = dbItems.map((item) => ({ + ...transformWorkflow(item), + _source: "database", + })); + allItems.push(...transformed); + } catch (error) { + // Database not available, skip silently + console.error( + "[mcp-studio] Database query failed, using filesystem only", + ); + } + } + + // Apply pagination + const totalCount = allItems.length; + const paginatedItems = allItems.slice( + args.offset, + args.offset + args.limit, + ); + + const result = { + items: paginatedItems, + totalCount, + hasMore: args.offset + paginatedItems.length < totalCount, + mode: filesystemMode ? "filesystem" : "database", + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_WORKFLOW_GET", + { + title: "Get Workflow", + description: "Get a single workflow by ID", + inputSchema: { + id: z.string().describe("Workflow ID"), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_WORKFLOW_GET", async (args) => { + // Try filesystem first + if (filesystemMode) { + const fsWorkflow = getWorkflowById(args.id); + if (fsWorkflow) { + const result = { + item: { ...fsWorkflow, _source: "filesystem" }, + }; + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + } + } + + // Fall back to database + if (meshConfig) { + const items = await runSQL>( + "SELECT * FROM workflow_collection WHERE id = ? LIMIT 1", + [args.id], + ); + + const result = { + item: items[0] + ? { ...transformWorkflow(items[0]), _source: "database" } + : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + } + + // No workflow found + const result = { item: null }; + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_WORKFLOW_CREATE", + { + title: "Create Workflow", + description: "Create a new workflow", + inputSchema: { + data: z.object({ + id: z.string().optional(), + title: z.string(), + description: z.string().optional(), + steps: z.array(z.unknown()).optional(), + gateway_id: z.string().optional(), + }), + }, + annotations: { readOnlyHint: false }, + }, + withLogging("COLLECTION_WORKFLOW_CREATE", async (args) => { + const now = new Date().toISOString(); + const id = args.data.id || crypto.randomUUID(); + + await runSQL( + `INSERT INTO workflow_collection (id, title, description, steps, gateway_id, created_at, updated_at, created_by, updated_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + id, + args.data.title, + args.data.description || null, + JSON.stringify(args.data.steps || []), + args.data.gateway_id || "", + now, + now, + "stdio-user", + "stdio-user", + ], + ); + + const items = await runSQL>( + "SELECT * FROM workflow_collection WHERE id = ? LIMIT 1", + [id], + ); + + const result = { + item: items[0] ? transformWorkflow(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_WORKFLOW_UPDATE", + { + title: "Update Workflow", + description: "Update an existing workflow", + inputSchema: { + id: z.string(), + data: z.object({ + title: z.string().optional(), + description: z.string().optional(), + steps: z.array(z.unknown()).optional(), + }), + }, + annotations: { readOnlyHint: false }, + }, + withLogging("COLLECTION_WORKFLOW_UPDATE", async (args) => { + const now = new Date().toISOString(); + const setClauses: string[] = ["updated_at = ?", "updated_by = ?"]; + const params: unknown[] = [now, "stdio-user"]; + + if (args.data.title !== undefined) { + setClauses.push("title = ?"); + params.push(args.data.title); + } + if (args.data.description !== undefined) { + setClauses.push("description = ?"); + params.push(args.data.description); + } + if (args.data.steps !== undefined) { + setClauses.push("steps = ?"); + params.push(JSON.stringify(args.data.steps)); + } + + params.push(args.id); + + await runSQL( + `UPDATE workflow_collection SET ${setClauses.join(", ")} WHERE id = ?`, + params, + ); + + const items = await runSQL>( + "SELECT * FROM workflow_collection WHERE id = ? LIMIT 1", + [args.id], + ); + + const result = { + item: items[0] ? transformWorkflow(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_WORKFLOW_DELETE", + { + title: "Delete Workflow", + description: "Delete a workflow by ID", + inputSchema: { + id: z.string(), + }, + annotations: { readOnlyHint: false, destructiveHint: true }, + }, + withLogging("COLLECTION_WORKFLOW_DELETE", async (args) => { + const items = await runSQL>( + "DELETE FROM workflow_collection WHERE id = ? RETURNING *", + [args.id], + ); + + const result = { + item: items[0] ? transformWorkflow(items[0]) : null, + success: items.length > 0, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + // ========================================================================= + // Workflow Execution Tools + // ========================================================================= + + server.registerTool( + "COLLECTION_WORKFLOW_EXECUTION_LIST", + { + title: "List Executions", + description: "List workflow executions with pagination", + inputSchema: { + limit: z.number().default(50), + offset: z.number().default(0), + workflow_id: z.string().optional(), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_WORKFLOW_EXECUTION_LIST", async (args) => { + let sql = + "SELECT * FROM workflow_execution ORDER BY created_at DESC LIMIT ? OFFSET ?"; + const params: unknown[] = [args.limit, args.offset]; + + if (args.workflow_id) { + sql = + "SELECT * FROM workflow_execution WHERE workflow_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"; + params.unshift(args.workflow_id); + } + + const items = await runSQL>(sql, params); + + let countSql = "SELECT COUNT(*) as count FROM workflow_execution"; + const countParams: unknown[] = []; + + if (args.workflow_id) { + countSql = + "SELECT COUNT(*) as count FROM workflow_execution WHERE workflow_id = ?"; + countParams.push(args.workflow_id); + } + + const countResult = await runSQL<{ count: string }>( + countSql, + countParams, + ); + const totalCount = parseInt(countResult[0]?.count || "0", 10); + + const result = { + items: items.map(transformExecution), + totalCount, + hasMore: args.offset + items.length < totalCount, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_WORKFLOW_EXECUTION_GET", + { + title: "Get Execution", + description: "Get a single workflow execution by ID with step results", + inputSchema: { + id: z.string().describe("Execution ID"), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_WORKFLOW_EXECUTION_GET", async (args) => { + const executions = await runSQL>( + "SELECT * FROM workflow_execution WHERE id = ? LIMIT 1", + [args.id], + ); + + const stepResults = await runSQL>( + "SELECT * FROM workflow_step_result WHERE execution_id = ? ORDER BY created_at ASC", + [args.id], + ); + + const result = { + item: executions[0] ? transformExecution(executions[0]) : null, + step_results: stepResults.map(transformStepResult), + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + // ========================================================================= + // Assistant Collection Tools + // ========================================================================= + + server.registerTool( + "COLLECTION_ASSISTANT_LIST", + { + title: "List Assistants", + description: "List all assistants with pagination", + inputSchema: { + limit: z.number().default(50), + offset: z.number().default(0), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_ASSISTANT_LIST", async (args) => { + const items = await runSQL>( + "SELECT * FROM assistants ORDER BY updated_at DESC LIMIT ? OFFSET ?", + [args.limit, args.offset], + ); + + const countResult = await runSQL<{ count: string }>( + "SELECT COUNT(*) as count FROM assistants", + ); + const totalCount = parseInt(countResult[0]?.count || "0", 10); + + const result = { + items: items.map(transformAssistant), + totalCount, + hasMore: args.offset + items.length < totalCount, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_ASSISTANT_GET", + { + title: "Get Assistant", + description: "Get a single assistant by ID", + inputSchema: { + id: z.string().describe("Assistant ID"), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_ASSISTANT_GET", async (args) => { + const items = await runSQL>( + "SELECT * FROM assistants WHERE id = ? LIMIT 1", + [args.id], + ); + + const result = { + item: items[0] ? transformAssistant(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_ASSISTANT_CREATE", + { + title: "Create Assistant", + description: "Create a new assistant", + inputSchema: { + data: z.object({ + id: z.string().optional(), + title: z.string(), + description: z.string().optional(), + avatar: z.string().optional(), + system_prompt: z.string().optional(), + gateway_id: z.string().optional(), + model: z + .object({ + id: z.string(), + connectionId: z.string(), + }) + .optional(), + }), + }, + annotations: { readOnlyHint: false }, + }, + withLogging("COLLECTION_ASSISTANT_CREATE", async (args) => { + const now = new Date().toISOString(); + const id = args.data.id || crypto.randomUUID(); + const defaultAvatar = + "https://assets.decocache.com/decocms/fd07a578-6b1c-40f1-bc05-88a3b981695d/f7fc4ffa81aec04e37ae670c3cd4936643a7b269.png"; + + await runSQL( + `INSERT INTO assistants (id, title, description, avatar, system_prompt, gateway_id, model, created_at, updated_at, created_by, updated_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + id, + args.data.title, + args.data.description || null, + args.data.avatar || defaultAvatar, + args.data.system_prompt || "", + args.data.gateway_id || "", + JSON.stringify(args.data.model || { id: "", connectionId: "" }), + now, + now, + "stdio-user", + "stdio-user", + ], + ); + + const items = await runSQL>( + "SELECT * FROM assistants WHERE id = ? LIMIT 1", + [id], + ); + + const result = { + item: items[0] ? transformAssistant(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_ASSISTANT_UPDATE", + { + title: "Update Assistant", + description: "Update an existing assistant", + inputSchema: { + id: z.string(), + data: z.object({ + title: z.string().optional(), + description: z.string().optional(), + avatar: z.string().optional(), + system_prompt: z.string().optional(), + gateway_id: z.string().optional(), + model: z + .object({ + id: z.string(), + connectionId: z.string(), + }) + .optional(), + }), + }, + annotations: { readOnlyHint: false }, + }, + withLogging("COLLECTION_ASSISTANT_UPDATE", async (args) => { + const now = new Date().toISOString(); + const setClauses: string[] = ["updated_at = ?", "updated_by = ?"]; + const params: unknown[] = [now, "stdio-user"]; + + if (args.data.title !== undefined) { + setClauses.push("title = ?"); + params.push(args.data.title); + } + if (args.data.description !== undefined) { + setClauses.push("description = ?"); + params.push(args.data.description); + } + if (args.data.avatar !== undefined) { + setClauses.push("avatar = ?"); + params.push(args.data.avatar); + } + if (args.data.system_prompt !== undefined) { + setClauses.push("system_prompt = ?"); + params.push(args.data.system_prompt); + } + if (args.data.gateway_id !== undefined) { + setClauses.push("gateway_id = ?"); + params.push(args.data.gateway_id); + } + if (args.data.model !== undefined) { + setClauses.push("model = ?"); + params.push(JSON.stringify(args.data.model)); + } + + params.push(args.id); + + await runSQL( + `UPDATE assistants SET ${setClauses.join(", ")} WHERE id = ?`, + params, + ); + + const items = await runSQL>( + "SELECT * FROM assistants WHERE id = ? LIMIT 1", + [args.id], + ); + + const result = { + item: items[0] ? transformAssistant(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_ASSISTANT_DELETE", + { + title: "Delete Assistant", + description: "Delete an assistant by ID", + inputSchema: { + id: z.string(), + }, + annotations: { readOnlyHint: false, destructiveHint: true }, + }, + withLogging("COLLECTION_ASSISTANT_DELETE", async (args) => { + const items = await runSQL>( + "DELETE FROM assistants WHERE id = ? RETURNING *", + [args.id], + ); + + const result = { + item: items[0] ? transformAssistant(items[0]) : null, + success: items.length > 0, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + // ========================================================================= + // Prompt Collection Tools + // ========================================================================= + + server.registerTool( + "COLLECTION_PROMPT_LIST", + { + title: "List Prompts", + description: "List all prompts with pagination", + inputSchema: { + limit: z.number().default(50), + offset: z.number().default(0), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_PROMPT_LIST", async (args) => { + const items = await runSQL>( + "SELECT * FROM prompts ORDER BY updated_at DESC LIMIT ? OFFSET ?", + [args.limit, args.offset], + ); + + const countResult = await runSQL<{ count: string }>( + "SELECT COUNT(*) as count FROM prompts", + ); + const totalCount = parseInt(countResult[0]?.count || "0", 10); + + const result = { + items: items.map(transformPrompt), + totalCount, + hasMore: args.offset + items.length < totalCount, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "COLLECTION_PROMPT_GET", + { + title: "Get Prompt", + description: "Get a single prompt by ID", + inputSchema: { + id: z.string().describe("Prompt ID"), + }, + annotations: { readOnlyHint: true }, + }, + withLogging("COLLECTION_PROMPT_GET", async (args) => { + const items = await runSQL>( + "SELECT * FROM prompts WHERE id = ? LIMIT 1", + [args.id], + ); + + const result = { + item: items[0] ? transformPrompt(items[0]) : null, + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + // ========================================================================= + // Filesystem Workflow Tools + // ========================================================================= + + if (filesystemMode) { + server.registerTool( + "WORKFLOW_RELOAD", + { + title: "Reload Workflows", + description: + "Reload all workflows from the filesystem. Use this after editing workflow JSON files.", + inputSchema: {}, + annotations: { readOnlyHint: true }, + }, + withLogging("WORKFLOW_RELOAD", async () => { + const workflows = await loadWorkflows(); + + const result = { + success: true, + count: workflows.length, + workflows: workflows.map((w) => ({ + id: w.id, + title: w.title, + sourceFile: w._sourceFile, + stepCount: w.steps.length, + })), + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + + server.registerTool( + "WORKFLOW_SOURCE_INFO", + { + title: "Workflow Source Info", + description: + "Get information about where workflows are loaded from (filesystem paths, file counts)", + inputSchema: {}, + annotations: { readOnlyHint: true }, + }, + withLogging("WORKFLOW_SOURCE_INFO", async () => { + const source = getWorkflowSource(); + const workflows = getCachedWorkflows(); + + // Group by source file + const byFile = new Map(); + for (const w of workflows) { + const file = w._sourceFile; + if (!byFile.has(file)) { + byFile.set(file, []); + } + byFile.get(file)!.push(w.id); + } + + const result = { + mode: "filesystem", + workflowDir: source.workflowDir || null, + workflowFiles: source.workflowFiles || [], + totalWorkflows: workflows.length, + files: Array.from(byFile.entries()).map(([file, ids]) => ({ + path: file, + workflows: ids, + })), + }; + + return { + content: [{ type: "text", text: JSON.stringify(result, null, 2) }], + structuredContent: result, + }; + }), + ); + } + + console.error("[mcp-studio] All stdio tools registered"); + if (filesystemMode) { + console.error( + "[mcp-studio] Filesystem mode: WORKFLOW_RELOAD and WORKFLOW_SOURCE_INFO available", + ); + } +} + +// ============================================================================ +// Transform Functions +// ============================================================================ + +function transformWorkflow(row: Record) { + let steps: unknown[] = []; + if (row.steps) { + const parsed = + typeof row.steps === "string" ? JSON.parse(row.steps) : row.steps; + // Handle legacy { phases: [...] } format + if (parsed && typeof parsed === "object" && "phases" in parsed) { + steps = (parsed as { phases: unknown[] }).phases; + } else if (Array.isArray(parsed)) { + steps = parsed; + } + } + + // Ensure each step has required properties (action, name) to prevent UI crashes + const normalizedSteps = steps.map((step, index) => { + const s = step as Record; + return { + name: s.name || `Step_${index + 1}`, + description: s.description, + action: s.action || { toolName: "" }, // Default to empty tool step if missing + input: s.input || {}, + outputSchema: s.outputSchema || {}, + config: s.config, + }; + }); + + return { + id: row.id, + title: row.title, + description: row.description, + steps: normalizedSteps, + gateway_id: row.gateway_id, + created_at: row.created_at, + updated_at: row.updated_at, + created_by: row.created_by, + updated_by: row.updated_by, + }; +} + +function transformExecution(row: Record) { + return { + id: row.id, + workflow_id: row.workflow_id, + status: row.status, + input: typeof row.input === "string" ? JSON.parse(row.input) : row.input, + output: row.output + ? typeof row.output === "string" + ? JSON.parse(row.output) + : row.output + : null, + error: row.error, + created_at: row.created_at, + updated_at: row.updated_at, + started_at: row.started_at, + completed_at: row.completed_at, + }; +} + +function transformStepResult(row: Record) { + return { + id: row.id, + execution_id: row.execution_id, + step_name: row.step_name, + status: row.status, + input: row.input + ? typeof row.input === "string" + ? JSON.parse(row.input) + : row.input + : null, + output: row.output + ? typeof row.output === "string" + ? JSON.parse(row.output) + : row.output + : null, + error: row.error, + created_at: row.created_at, + completed_at: row.completed_at, + }; +} + +function transformAssistant(row: Record) { + const defaultAvatar = + "https://assets.decocache.com/decocms/fd07a578-6b1c-40f1-bc05-88a3b981695d/f7fc4ffa81aec04e37ae670c3cd4936643a7b269.png"; + const model = row.model + ? typeof row.model === "string" + ? JSON.parse(row.model) + : row.model + : { id: "", connectionId: "" }; + + return { + id: row.id, + title: row.title, + description: row.description, + avatar: row.avatar || defaultAvatar, + system_prompt: row.system_prompt || "", + gateway_id: row.gateway_id || "", + model, + created_at: row.created_at, + updated_at: row.updated_at, + created_by: row.created_by, + updated_by: row.updated_by, + }; +} + +function transformPrompt(row: Record) { + return { + id: row.id, + title: row.title, + description: row.description, + content: row.content, + variables: row.variables + ? typeof row.variables === "string" + ? JSON.parse(row.variables) + : row.variables + : [], + created_at: row.created_at, + updated_at: row.updated_at, + created_by: row.created_by, + updated_by: row.updated_by, + }; +} diff --git a/mcp-studio/server/stdio.ts b/mcp-studio/server/stdio.ts new file mode 100644 index 00000000..101fee4d --- /dev/null +++ b/mcp-studio/server/stdio.ts @@ -0,0 +1,57 @@ +#!/usr/bin/env node +/** + * MCP Studio - Stdio Entry Point + * + * This is the main entry point for running the MCP server via stdio, + * which is the standard transport for CLI-based MCP servers. + * + * Usage: + * bun run server/stdio.ts # Run directly + * bun --watch server/stdio.ts # Run with hot reload + * + * In Mesh, add as custom command: + * Command: bun + * Args: --watch /path/to/mcp-studio/server/stdio.ts + * + * Environment variables: + * DATABASE_URL - PostgreSQL connection string (required for workflow operations) + */ + +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import { registerStdioTools } from "./stdio-tools.ts"; + +/** + * Create and start the MCP server with stdio transport + */ +async function main() { + // Create MCP server + const server = new McpServer({ + name: "mcp-studio", + version: "1.0.0", + }); + + // Register all tools + await registerStdioTools(server); + + // Connect to stdio transport + const transport = new StdioServerTransport(); + await server.connect(transport); + + // Log startup (goes to stderr so it doesn't interfere with stdio protocol) + console.error("[mcp-studio] MCP server running via stdio"); + console.error( + "[mcp-studio] Available: Workflow, Execution, Assistant, and Prompt tools", + ); + + if (!process.env.DATABASE_URL) { + console.error( + "[mcp-studio] Warning: DATABASE_URL not set - database operations will fail", + ); + } +} + +main().catch((error) => { + console.error("Fatal error:", error); + process.exit(1); +}); diff --git a/mcp-studio/server/workflow-loader.ts b/mcp-studio/server/workflow-loader.ts new file mode 100644 index 00000000..27b452bd --- /dev/null +++ b/mcp-studio/server/workflow-loader.ts @@ -0,0 +1,315 @@ +/** + * Filesystem Workflow Loader + * + * Loads workflow definitions from JSON files on the filesystem. + * This enables: + * - Version-controlled workflows (store in git) + * - MCP packaging (MCPs can ship workflows) + * - Local development (edit files, hot-reload) + * - Database-free operation (no PostgreSQL required) + * + * Environment variables: + * - WORKFLOW_DIR: Directory to scan for *.workflow.json or *.json files + * - WORKFLOW_FILES: Comma-separated list of specific workflow files + * + * File formats supported: + * - Single workflow: { "id": "...", "title": "...", "steps": [...] } + * - Multiple workflows: { "workflows": [...] } + * + * Example directory structure: + * workflows/ + * ├── enrich-contact.workflow.json + * ├── notify-team.workflow.json + * └── my-mcp/ + * └── bundled-workflows.json (can contain multiple) + */ + +import { readdir, readFile, stat, watch } from "node:fs/promises"; +import { join, extname, basename, dirname } from "node:path"; +import { WorkflowSchema, type Workflow } from "@decocms/bindings/workflow"; + +export interface LoadedWorkflow extends Workflow { + /** Source file path */ + _sourceFile: string; + /** Whether this is a filesystem workflow (vs database) */ + _isFilesystem: true; +} + +export interface WorkflowLoaderOptions { + /** Directory to scan for workflow files */ + workflowDir?: string; + /** Specific workflow files to load */ + workflowFiles?: string[]; + /** Enable file watching for hot reload */ + watch?: boolean; + /** Callback when workflows change */ + onChange?: (workflows: LoadedWorkflow[]) => void; +} + +/** + * In-memory cache of loaded workflows + */ +let cachedWorkflows: LoadedWorkflow[] = []; +let watchAbortController: AbortController | null = null; + +/** + * Get the configured workflow source from environment + */ +export function getWorkflowSource(): WorkflowLoaderOptions { + const options: WorkflowLoaderOptions = {}; + + if (process.env.WORKFLOW_DIR) { + options.workflowDir = process.env.WORKFLOW_DIR; + } + + if (process.env.WORKFLOW_FILES) { + options.workflowFiles = process.env.WORKFLOW_FILES.split(",").map((f) => + f.trim(), + ); + } + + return options; +} + +/** + * Check if filesystem workflow loading is enabled + */ +export function isFilesystemMode(): boolean { + const source = getWorkflowSource(); + return !!(source.workflowDir || source.workflowFiles?.length); +} + +/** + * Parse a workflow file and extract workflow(s) + */ +async function parseWorkflowFile(filePath: string): Promise { + const content = await readFile(filePath, "utf-8"); + let parsed: unknown; + + try { + parsed = JSON.parse(content); + } catch (error) { + console.error(`[workflow-loader] Failed to parse ${filePath}:`, error); + return []; + } + + const workflows: LoadedWorkflow[] = []; + + // Handle array of workflows + if (Array.isArray(parsed)) { + for (const item of parsed) { + const validated = validateWorkflow(item, filePath); + if (validated) workflows.push(validated); + } + return workflows; + } + + // Handle object with "workflows" key + if ( + typeof parsed === "object" && + parsed !== null && + "workflows" in parsed && + Array.isArray((parsed as { workflows: unknown }).workflows) + ) { + for (const item of (parsed as { workflows: unknown[] }).workflows) { + const validated = validateWorkflow(item, filePath); + if (validated) workflows.push(validated); + } + return workflows; + } + + // Handle single workflow + const validated = validateWorkflow(parsed, filePath); + if (validated) workflows.push(validated); + + return workflows; +} + +/** + * Validate a workflow object against the schema + */ +function validateWorkflow( + data: unknown, + sourceFile: string, +): LoadedWorkflow | null { + const result = WorkflowSchema.safeParse(data); + + if (!result.success) { + console.error( + `[workflow-loader] Invalid workflow in ${sourceFile}:`, + result.error.format(), + ); + return null; + } + + // Generate ID from filename if not present + let id = result.data.id; + if (!id) { + const base = basename(sourceFile, extname(sourceFile)); + // Remove .workflow suffix if present + id = base.replace(/\.workflow$/, ""); + } + + return { + ...result.data, + id, + _sourceFile: sourceFile, + _isFilesystem: true, + }; +} + +/** + * Scan a directory for workflow files + */ +async function scanDirectory(dir: string): Promise { + const files: string[] = []; + + try { + const entries = await readdir(dir); + + for (const entry of entries) { + const fullPath = join(dir, entry); + const stats = await stat(fullPath); + + if (stats.isDirectory()) { + // Recursively scan subdirectories + const subFiles = await scanDirectory(fullPath); + files.push(...subFiles); + } else if (stats.isFile()) { + // Include .json and .workflow.json files + if (entry.endsWith(".json")) { + files.push(fullPath); + } + } + } + } catch (error) { + console.error(`[workflow-loader] Failed to scan ${dir}:`, error); + } + + return files; +} + +/** + * Load all workflows from configured sources + */ +export async function loadWorkflows( + options?: WorkflowLoaderOptions, +): Promise { + const source = options || getWorkflowSource(); + const allWorkflows: LoadedWorkflow[] = []; + const filesToLoad: string[] = []; + + // Collect files from directory + if (source.workflowDir) { + const dirFiles = await scanDirectory(source.workflowDir); + filesToLoad.push(...dirFiles); + console.error( + `[workflow-loader] Found ${dirFiles.length} files in ${source.workflowDir}`, + ); + } + + // Add explicitly specified files + if (source.workflowFiles) { + filesToLoad.push(...source.workflowFiles); + } + + // Load each file + for (const file of filesToLoad) { + const workflows = await parseWorkflowFile(file); + allWorkflows.push(...workflows); + } + + // Cache the results + cachedWorkflows = allWorkflows; + + console.error( + `[workflow-loader] Loaded ${allWorkflows.length} workflow(s) from filesystem`, + ); + + // Log workflow IDs for debugging + if (allWorkflows.length > 0) { + console.error( + `[workflow-loader] Workflows: ${allWorkflows.map((w) => w.id).join(", ")}`, + ); + } + + return allWorkflows; +} + +/** + * Get cached workflows (call loadWorkflows first) + */ +export function getCachedWorkflows(): LoadedWorkflow[] { + return cachedWorkflows; +} + +/** + * Get a specific workflow by ID + */ +export function getWorkflowById(id: string): LoadedWorkflow | undefined { + return cachedWorkflows.find((w) => w.id === id); +} + +/** + * Start watching for file changes + */ +export async function startWatching( + options: WorkflowLoaderOptions, +): Promise { + const source = options || getWorkflowSource(); + + // Stop any existing watcher + stopWatching(); + + watchAbortController = new AbortController(); + + if (source.workflowDir) { + console.error( + `[workflow-loader] Watching ${source.workflowDir} for changes`, + ); + + try { + const watcher = watch(source.workflowDir, { + recursive: true, + signal: watchAbortController.signal, + }); + + (async () => { + try { + for await (const event of watcher) { + if (event.filename?.endsWith(".json")) { + console.error( + `[workflow-loader] File changed: ${event.filename}`, + ); + await loadWorkflows(options); + options.onChange?.(cachedWorkflows); + } + } + } catch (error) { + if ((error as { name?: string }).name !== "AbortError") { + console.error("[workflow-loader] Watch error:", error); + } + } + })(); + } catch (error) { + console.error("[workflow-loader] Failed to start watcher:", error); + } + } +} + +/** + * Stop watching for file changes + */ +export function stopWatching(): void { + if (watchAbortController) { + watchAbortController.abort(); + watchAbortController = null; + } +} + +/** + * Reload workflows from disk + */ +export async function reloadWorkflows(): Promise { + return loadWorkflows(getWorkflowSource()); +}