Skip to content

Commit 9829cdf

Browse files
committed
feat: implement auto-start for STDIO connections and enhance event bus logging
- Added functionality to automatically start STDIO connections based on the AUTO_START_CONNECTIONS environment variable. - Introduced a new helper function, autoStartConnectionsByTitle, to manage the auto-start process. - Enhanced EventBus with additional logging for event processing and subscription matching. - Implemented a mechanism to kill orphaned STDIO processes to prevent stale connections. - Updated error handling in the proxy to avoid logging unnecessary "Method not found" errors. This commit improves the management of STDIO connections and provides better visibility into event bus operations.
1 parent af27cb0 commit 9829cdf

7 files changed

Lines changed: 211 additions & 10 deletions

File tree

apps/mesh/src/api/app.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,72 @@ import {
100100
import { MiddlewareHandler } from "hono/types";
101101
import { getToolsByCategory, MANAGEMENT_TOOLS } from "../tools/registry";
102102
import { Env } from "./env";
103+
import { dangerouslyCreateSuperUserMCPProxy } from "./routes/proxy";
103104
import { resetStdioConnectionPool } from "../stdio/stable-transport";
104105
import { devLogger } from "./utils/dev-logger";
105106

106107
const getHandleOAuthProtectedResourceMetadata = () =>
107108
oAuthProtectedResourceMetadata(auth);
108109
const getHandleOAuthDiscoveryMetadata = () => oAuthDiscoveryMetadata(auth);
109110

111+
/**
112+
* Auto-start STDIO connections by title
113+
* Used with AUTO_START_CONNECTIONS env var
114+
*
115+
* Uses dangerouslyCreateSuperUserMCPProxy to create a system-level proxy
116+
* that spawns the STDIO process with proper credentials.
117+
*/
118+
async function autoStartConnectionsByTitle(
119+
database: MeshDatabase,
120+
titles: string[],
121+
) {
122+
const db = database.db;
123+
124+
// Query all STDIO connections matching the titles
125+
const connections = await db
126+
.selectFrom("connections")
127+
.selectAll()
128+
.where("connection_type", "=", "STDIO")
129+
.where("title", "in", titles)
130+
.execute();
131+
132+
if (connections.length === 0) {
133+
console.log(`[AutoStart] No matching STDIO connections found`);
134+
return;
135+
}
136+
137+
console.log(
138+
`[AutoStart] Found ${connections.length} connections to start: ${connections.map((c) => c.title).join(", ")}`,
139+
);
140+
141+
for (const conn of connections) {
142+
try {
143+
console.log(`[AutoStart] Starting: ${conn.title} (${conn.id})`);
144+
145+
// Create system context and use the superuser proxy
146+
const ctx = await ContextFactory.create();
147+
148+
const proxy = await dangerouslyCreateSuperUserMCPProxy(conn.id, {
149+
...ctx,
150+
auth: { ...ctx.auth, user: { id: "auto-start" } },
151+
});
152+
153+
// listTools() uses cached DB data, doesn't spawn STDIO
154+
// listPrompts() forces actual client connection, triggering spawn
155+
// Ignore "Method not found" - some MCPs don't implement prompts
156+
try {
157+
await proxy.client.listPrompts();
158+
} catch {
159+
// Ignore - the spawn happened, that's what matters
160+
}
161+
162+
console.log(`[AutoStart] ✓ ${conn.title} started`);
163+
} catch (error) {
164+
console.error(`[AutoStart] ✗ ${conn.title} failed:`, error);
165+
}
166+
}
167+
}
168+
110169
/**
111170
* Resource server metadata type
112171
*/
@@ -136,7 +195,8 @@ export function createApp(options: CreateAppOptions = {}) {
136195

137196
// Kill and respawn STDIO connections on restart/HMR
138197
// Old processes have stale credentials, need fresh spawn with new tokens
139-
resetStdioConnectionPool().catch((err) => {
198+
// IMPORTANT: Track this promise so autoStart waits for it to complete
199+
const poolResetPromise = resetStdioConnectionPool().catch((err) => {
140200
console.error("[StableStdio] Error resetting pool:", err);
141201
});
142202

@@ -494,6 +554,35 @@ export function createApp(options: CreateAppOptions = {}) {
494554
console.log("[EventBus] Worker started");
495555
});
496556

557+
// Auto-start connections specified in AUTO_START_CONNECTIONS env var
558+
// Format: comma-separated connection titles, e.g. "Bridge,Pilot"
559+
const autoStartConnections = process.env.AUTO_START_CONNECTIONS;
560+
if (autoStartConnections) {
561+
const connectionTitles = autoStartConnections
562+
.split(",")
563+
.map((s) => s.trim())
564+
.filter(Boolean);
565+
if (connectionTitles.length > 0) {
566+
console.log(
567+
`[AutoStart] Will start connections: ${connectionTitles.join(", ")}`,
568+
);
569+
// Wait for pool reset to complete before starting new connections
570+
// This prevents race conditions where old processes haven't been killed yet
571+
// Also add a small delay to let the app fully initialize
572+
(async () => {
573+
try {
574+
// Wait for pool reset to finish (kills old STDIO processes)
575+
await poolResetPromise;
576+
// Small delay to ensure ports are released
577+
await new Promise((resolve) => setTimeout(resolve, 500));
578+
await autoStartConnectionsByTitle(database, connectionTitles);
579+
} catch (error) {
580+
console.error("[AutoStart] Failed:", error);
581+
}
582+
})();
583+
}
584+
}
585+
497586
// Inject MeshContext into requests
498587
// Skip auth routes, static files, health check, and metrics - they don't need MeshContext
499588
app.use("*", async (c, next) => {

apps/mesh/src/api/routes/proxy.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,10 @@ async function createMCPProxyDoNotUseDirectly(
376376
env.MESH_URL = meshUrl;
377377
}
378378

379+
// Pass the connection ID so STDIO servers can identify themselves
380+
// (needed for event bus subscriptions via gateway)
381+
env.MESH_CONNECTION_ID = connectionId;
382+
379383
// Pass state as JSON for bindings
380384
const state = connection.configuration_state;
381385
if (state && Object.keys(state).length > 0) {
@@ -648,7 +652,16 @@ async function createMCPProxyDoNotUseDirectly(
648652
client = await createClient();
649653
return await client.listPrompts();
650654
} catch (error) {
651-
console.error("[proxy:listPrompts] Error listing prompts:", error);
655+
// Prompts are optional in MCP - don't log "Method not found" errors
656+
// Error code -32601 is "Method not found" which is expected for MCPs without prompts
657+
const isMethodNotFound =
658+
error &&
659+
typeof error === "object" &&
660+
"code" in error &&
661+
error.code === -32601;
662+
if (!isMethodNotFound) {
663+
console.error("[proxy:listPrompts] Error listing prompts:", error);
664+
}
652665
throw error;
653666
} finally {
654667
client?.close().catch(console.error);

apps/mesh/src/event-bus/event-bus.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ export class EventBus implements IEventBus {
118118

119119
// Find matching subscriptions and create delivery records
120120
const subscriptions = await this.storage.getMatchingSubscriptions(event);
121+
122+
console.log(
123+
`[EventBus] Event ${event.type} from ${event.source}: ${subscriptions.length} matching subscriptions`,
124+
);
125+
if (subscriptions.length > 0) {
126+
console.log(
127+
`[EventBus] Subscribers:`,
128+
subscriptions.map((s) => s.connectionId).join(", "),
129+
);
130+
}
131+
121132
if (subscriptions.length > 0) {
122133
// Determine when to deliver:
123134
// - deliverAt: use specified time
@@ -134,6 +145,9 @@ export class EventBus implements IEventBus {
134145
// Only notify strategy for immediate delivery (no scheduled time and no cron)
135146
// Scheduled events will be picked up by the polling worker at the right time
136147
if (this.notifyStrategy && !deliverAt) {
148+
console.log(
149+
`[EventBus] Triggering immediate processing for ${event.type}`,
150+
);
137151
await this.notifyStrategy.notify(eventId).catch((error) => {
138152
console.warn("[EventBus] Notify failed (non-critical):", error);
139153
});

apps/mesh/src/event-bus/worker.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ export class EventBusWorker {
109109
private notifySubscriber: NotifySubscriberFn;
110110
private running = false;
111111
private processing = false;
112+
private pendingProcessRequest = false;
112113
private config: Required<EventBusConfig>;
113114

114115
constructor(
@@ -160,10 +161,15 @@ export class EventBusWorker {
160161
* Called by the NotifyStrategy when events are available
161162
*/
162163
async processNow(): Promise<void> {
163-
if (!this.running) return;
164+
if (!this.running) {
165+
return;
166+
}
164167

165-
// Prevent concurrent processing
166-
if (this.processing) return;
168+
// If already processing, mark that we need another run after current finishes
169+
if (this.processing) {
170+
this.pendingProcessRequest = true;
171+
return;
172+
}
167173

168174
this.processing = true;
169175
try {
@@ -172,6 +178,13 @@ export class EventBusWorker {
172178
console.error("[EventBus] Error processing events:", error);
173179
} finally {
174180
this.processing = false;
181+
182+
// If processNow was called while we were processing, run again
183+
if (this.pendingProcessRequest) {
184+
this.pendingProcessRequest = false;
185+
// Use setImmediate to avoid stack overflow on rapid fire events
186+
setImmediate(() => this.processNow());
187+
}
175188
}
176189
}
177190

@@ -186,6 +199,8 @@ export class EventBusWorker {
186199
);
187200
if (pendingDeliveries.length === 0) return;
188201

202+
console.log(`[EventBus] Processing ${pendingDeliveries.length} deliveries`);
203+
189204
// Group by subscription (connection)
190205
const grouped = groupByConnection(pendingDeliveries);
191206

@@ -194,12 +209,21 @@ export class EventBusWorker {
194209

195210
for (const [subscriptionId, batch] of grouped) {
196211
try {
212+
console.log(
213+
`[EventBus] Delivering ${batch.events.length} events to ${batch.connectionId}`,
214+
);
215+
197216
// Call ON_EVENTS on the subscriber connection
198217
const result = await this.notifySubscriber(
199218
batch.connectionId,
200219
batch.events,
201220
);
202221

222+
console.log(
223+
`[EventBus] Delivery result for ${batch.connectionId}:`,
224+
JSON.stringify(result).slice(0, 200),
225+
);
226+
203227
// Check if per-event results were provided
204228
if (result.results && Object.keys(result.results).length > 0) {
205229
// Per-event mode: process each event individually

apps/mesh/src/stdio/stable-transport.ts

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,51 @@ async function forceCloseAllStdioConnections(): Promise<void> {
343343
await new Promise((resolve) => setTimeout(resolve, 100));
344344
}
345345

346+
/**
347+
* Kill orphaned STDIO processes that might be left from previous Mesh instances.
348+
* This handles cases where the connection pool is empty but old processes are still running.
349+
*/
350+
async function killOrphanedStdioProcesses(): Promise<void> {
351+
const { exec } = await import("child_process");
352+
const { promisify } = await import("util");
353+
const execAsync = promisify(exec);
354+
355+
// Patterns for processes spawned by Mesh that might be orphaned
356+
// These are common command patterns for STDIO MCPs
357+
const patterns = [
358+
"mesh-bridge.*server", // Mesh bridge server
359+
"pilot.*server/main", // Pilot server
360+
];
361+
362+
for (const pattern of patterns) {
363+
try {
364+
// Use pkill with -f to match full command line
365+
// -9 for SIGKILL to ensure termination
366+
await execAsync(`pkill -9 -f "${pattern}" 2>/dev/null || true`);
367+
} catch {
368+
// Ignore errors - process might not exist
369+
}
370+
}
371+
372+
// Also kill anything listening on port 9999 (Bridge WebSocket)
373+
try {
374+
const { stdout } = await execAsync(`lsof -t -i:9999 2>/dev/null || true`);
375+
const pids = stdout.trim().split("\n").filter(Boolean);
376+
for (const pid of pids) {
377+
try {
378+
process.kill(Number(pid), "SIGKILL");
379+
console.log(
380+
`[StableStdio] Killed orphaned process on port 9999: PID ${pid}`,
381+
);
382+
} catch {
383+
// Process might already be dead
384+
}
385+
}
386+
} catch {
387+
// Ignore errors
388+
}
389+
}
390+
346391
/**
347392
* Force close all connections and clear the pool
348393
* Used on app startup/HMR to ensure fresh processes with new credentials
@@ -352,6 +397,7 @@ export async function resetStdioConnectionPool(): Promise<void> {
352397
`[StableStdio] Reset requested. Pool size: ${connectionPool.size}, keys: [${Array.from(connectionPool.keys()).join(", ")}]`,
353398
);
354399

400+
// First, close connections we know about in the pool
355401
if (connectionPool.size > 0) {
356402
console.log(
357403
`[StableStdio] Resetting ${connectionPool.size} connections (killing processes)`,
@@ -360,9 +406,11 @@ export async function resetStdioConnectionPool(): Promise<void> {
360406
console.log(
361407
`[StableStdio] Reset complete. Pool size: ${connectionPool.size}`,
362408
);
363-
} else {
364-
console.log(`[StableStdio] Pool was empty, nothing to reset`);
365409
}
410+
411+
// Then, kill any orphaned processes that might be left from previous runs
412+
// (handles case where pool was empty but old processes are still running)
413+
await killOrphanedStdioProcesses();
366414
}
367415

368416
// Register shutdown handlers - clean up connections before exit

apps/mesh/src/tools/eventbus/subscribe.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ export const EVENT_SUBSCRIBE = defineTool({
2222
const organization = requireOrganization(ctx);
2323
await ctx.access.check();
2424

25-
// Get the subscriber connection ID from the caller's token
26-
const connectionId = ctx.connectionId;
25+
// Get the subscriber connection ID
26+
// Use explicit subscriberId if provided (for calls via gateway), otherwise use caller's connection
27+
const connectionId = input.subscriberId || ctx.connectionId;
2728
if (!connectionId) {
2829
throw new Error(
29-
"Connection ID required to subscribe. Use a connection-scoped token.",
30+
"Connection ID required to subscribe. Use a connection-scoped token or provide subscriberId.",
3031
);
3132
}
3233
// Create the subscription

packages/bindings/src/well-known/event-bus.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ export const EventSubscribeInputSchema = z.object({
121121
.max(1000)
122122
.optional()
123123
.describe("JSONPath filter expression on event data"),
124+
125+
/**
126+
* Optional: Override the subscriber connection ID.
127+
* When calling through a gateway, use this to specify which connection
128+
* should receive the events (defaults to the caller's connection ID).
129+
*/
130+
subscriberId: z
131+
.string()
132+
.optional()
133+
.describe(
134+
"Override subscriber connection ID (for subscriptions via gateway)",
135+
),
124136
});
125137

126138
export type EventSubscribeInput = z.infer<typeof EventSubscribeInputSchema>;

0 commit comments

Comments
 (0)