Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapters/workspace-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const WORKSPACE_EVENTS = new Set<EngineEventType>([
"bundle.crashed",
"bundle.recovered",
"bundle.dead",
"bundle.startFailed",
"data.changed",
"config.changed",
"skill.created",
Expand Down
1 change: 1 addition & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const SSE_ROUTES: Partial<Record<EngineEventType, SseRoute>> = {
"bundle.crashed": { scope: "workspace", wsIdField: "wsId" },
"bundle.recovered": { scope: "workspace", wsIdField: "wsId" },
"bundle.dead": { scope: "workspace", wsIdField: "wsId" },
"bundle.startFailed": { scope: "workspace", wsIdField: "wsId" },
// Per-principal connection state — workspace-scoped. Drives the
// pending-auth banner; without forwarding here, the banner never auto-clears
// after a user completes interactive OAuth.
Expand Down
4 changes: 3 additions & 1 deletion src/api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ export function startServer(options: ServerOptions): ServerHandle {
const internalToken = runtime.getInternalToken();

const mcpSources = runtime.mcpSources();
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink());
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink(), {
startFailures: runtime.bundleStartFailures(),
});
healthMonitor.start();

// SSE event manager — listens to runtime events and broadcasts to clients
Expand Down
6 changes: 6 additions & 0 deletions src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export type EngineEventType =
| "bundle.crashed"
| "bundle.recovered"
| "bundle.dead"
/**
* Bundle failed to start at boot (no McpSource was ever produced).
* Distinct from `bundle.crashed`, which requires a running source that
* went away. Payload: { wsId, serverName, bundleName, error }.
*/
| "bundle.startFailed"
/**
* Per-principal connection state change for a remote URL bundle.
* Payload: { wsId, serverName, principalId, state, authorizationUrl? }.
Expand Down
32 changes: 29 additions & 3 deletions src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ export class Runtime {
_systemSource: import("../tools/types.ts").ToolSource | null;
/** Platform sources (home, conversations, files, etc.) — retained for JIT workspace registration. */
private _platformSources: import("../tools/types.ts").ToolSource[] = [];
/**
* Boot-time bundle startup failures recorded by `startWorkspaceBundles`.
* These never produced an McpSource; HealthMonitor reads this list at
* construction so `/v1/health` reports the failures as terminal `dead`
* entries rather than silently omitting them.
*/
private _bundleStartFailures: import("./workspace-runtime.ts").BundleStartFailure[] = [];
/**
* Domain-context getter for the automations bundle. Set by the
* automations source factory; consumed by internal callers (CLI's
Expand Down Expand Up @@ -540,13 +547,24 @@ export class Runtime {

// Phase 3: Start workspace bundles with per-workspace registries
const configDir = config.configPath ? dirname(config.configPath) : undefined;
const { registries: workspaceRegistries, entries: workspaceBundleEntries } =
await startWorkspaceBundles(workspaceStore, platformSources, systemTools, events, configDir, {
const {
registries: workspaceRegistries,
entries: workspaceBundleEntries,
failures: bundleStartFailures,
} = await startWorkspaceBundles(
workspaceStore,
platformSources,
systemTools,
events,
configDir,
{
workDir: resolveWorkDir(config),
allowInsecureRemotes: config.allowInsecureRemotes,
});
},
);
rt._workspaceRegistries = workspaceRegistries;
rt._platformSources = platformSources;
rt._bundleStartFailures = bundleStartFailures;

// Wire the workspace registries into lifecycle so workspace-scope
// startAuth / disconnect / install can add+remove sources without
Expand Down Expand Up @@ -1083,6 +1101,14 @@ export class Runtime {
return [...names];
}

/**
* Boot-time bundle startup failures. Read once at HealthMonitor
* construction so failed bundles appear as `dead` in `/v1/health`.
*/
bundleStartFailures(): import("./workspace-runtime.ts").BundleStartFailure[] {
return [...this._bundleStartFailures];
}

/** Get MCP sources across all workspace registries (for health monitoring). */
mcpSources(): McpSource[] {
const sources: McpSource[] = [];
Expand Down
31 changes: 29 additions & 2 deletions src/runtime/workspace-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ import type { WorkspaceStore } from "../workspace/workspace-store.ts";
// Types
// ---------------------------------------------------------------------------

/**
* A boot-time bundle startup failure — recorded when `startBundleSource`
* throws. Surfaced via `bundle.startFailed` event (workspace log + SSE)
* and threaded into HealthMonitor so `/v1/health` reports the failed
* bundle as `state: "dead"` rather than omitting it entirely.
*/
export interface BundleStartFailure {
wsId: string;
serverName: string;
bundleName: string;
error: string;
}

/** A single entry in the process inventory — one per (workspace, bundle) pair. */
export interface ProcessInventoryEntry {
/** Workspace id (e.g., "ws_engineering"). */
Expand Down Expand Up @@ -133,7 +146,11 @@ export async function startWorkspaceBundles(
allowInsecureRemotes?: boolean;
workDir?: string;
},
): Promise<{ registries: Map<string, ToolRegistry>; entries: ProcessInventoryEntry[] }> {
): Promise<{
registries: Map<string, ToolRegistry>;
entries: ProcessInventoryEntry[];
failures: BundleStartFailure[];
}> {
const workDir = opts?.workDir ?? join(process.env.NB_WORK_DIR ?? "", ".nimblebrain");
const workspaces = await workspaceStore.list();
const inventory = buildProcessInventory(workspaces, workDir);
Expand Down Expand Up @@ -174,6 +191,7 @@ export async function startWorkspaceBundles(
wsEntries.map((entry) => ({ wsId, entry })),
);
const resultEntries: ProcessInventoryEntry[] = new Array(flat.length);
const failures: BundleStartFailure[] = [];
const concurrency = resolveBundleStartConcurrency();
const startMs = Date.now();

Expand Down Expand Up @@ -260,9 +278,18 @@ export async function startWorkspaceBundles(
resultEntries[idx] = { ...entry, serverName: result.sourceName, meta: result.meta };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
const bundleName = bundleNameFromRef(entry.bundle);
process.stderr.write(
`[workspace-runtime] Failed to start ${entry.serverName} in ${wsId}: ${msg}\n`,
);
// Persistent observability: workspace log (via WorkspaceLogSink) +
// SSE broadcast (via SseEventManager). Without this, operators have
// to grep container stderr to discover a failed boot.
eventSink.emit({
type: "bundle.startFailed",
data: { wsId, serverName: entry.serverName, bundleName, error: msg },
});
failures.push({ wsId, serverName: entry.serverName, bundleName, error: msg });
}
});

Expand All @@ -273,7 +300,7 @@ export async function startWorkspaceBundles(
`[workspace-runtime] Started ${finalEntries.length}/${flat.length} bundles in ${elapsedMs}ms (concurrency=${concurrency})`,
);
}
return { registries, entries: finalEntries };
return { registries, entries: finalEntries, failures };
}

/**
Expand Down
26 changes: 25 additions & 1 deletion src/tools/health-monitor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EventSink } from "../engine/types.ts";
import type { BundleStartFailure } from "../runtime/workspace-runtime.ts";
import type { McpSource } from "./mcp-source.ts";

export type BundleState = "healthy" | "restarting" | "dead";
Expand All @@ -8,6 +9,12 @@ export interface BundleHealth {
state: BundleState;
uptime: number | null;
restartCount: number;
/**
* Workspace id — populated only for boot-time start failures (live
* sources don't carry a wsId on `McpSource`). Lets `/v1/health`
* consumers distinguish same-named failed bundles across workspaces.
*/
wsId?: string;
}

interface BundleRecord {
Expand All @@ -23,6 +30,13 @@ const DEFAULT_CHECK_INTERVAL_MS = 30_000;
export interface HealthMonitorOptions {
checkIntervalMs?: number;
baseDelayMs?: number;
/**
* Boot-time start failures from `startWorkspaceBundles`. These never
* produced an `McpSource`, so they can't be restarted; we surface them
* as terminal `dead` entries in `getStatus()` so `/v1/health` reflects
* the failure instead of silently omitting the bundle.
*/
startFailures?: BundleStartFailure[];
}

/**
Expand All @@ -31,6 +45,7 @@ export interface HealthMonitorOptions {
*/
export class HealthMonitor {
private records: BundleRecord[];
private startFailures: BundleStartFailure[];
private timer: ReturnType<typeof setInterval> | null = null;
private checkIntervalMs: number;
private baseDelayMs: number;
Expand All @@ -47,6 +62,7 @@ export class HealthMonitor {
state: "healthy" as BundleState,
restartCount: 0,
}));
this.startFailures = opts.startFailures ?? [];
}

/** Start the periodic health check loop. */
Expand All @@ -71,12 +87,20 @@ export class HealthMonitor {

/** Get per-bundle health info. */
getStatus(): BundleHealth[] {
return this.records.map((r) => ({
const live: BundleHealth[] = this.records.map((r) => ({
name: r.source.name,
state: r.state,
uptime: r.source.uptime(),
restartCount: r.restartCount,
}));
const dead: BundleHealth[] = this.startFailures.map((f) => ({
name: f.serverName,
state: "dead" as const,
uptime: null,
restartCount: 0,
wsId: f.wsId,
}));
return [...live, ...dead];
}

private async checkOne(record: BundleRecord): Promise<void> {
Expand Down
70 changes: 70 additions & 0 deletions test/unit/health-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,76 @@ describe("HealthMonitor", () => {
monitor.stop();
});

it("includes boot-time start failures as dead entries in getStatus", async () => {
const sink = makeEventCollector();
const monitor = new HealthMonitor([], sink, {
checkIntervalMs: 60_000,
baseDelayMs: 1,
startFailures: [
{ wsId: "ws_a", serverName: "broken", bundleName: "@nb/broken", error: "no manifest" },
{ wsId: "ws_b", serverName: "remote-x", bundleName: "https://x", error: "ECONNREFUSED" },
],
});

const status = monitor.getStatus();
expect(status).toHaveLength(2);

const broken = status.find((s) => s.name === "broken");
expect(broken?.state).toBe("dead");
expect(broken?.uptime).toBeNull();
expect(broken?.restartCount).toBe(0);
expect(broken?.wsId).toBe("ws_a");

const remote = status.find((s) => s.name === "remote-x");
expect(remote?.state).toBe("dead");
expect(remote?.wsId).toBe("ws_b");

monitor.stop();
});

it("does not attempt to restart boot-time failed bundles", async () => {
const sink = makeEventCollector();
const monitor = new HealthMonitor([], sink, {
checkIntervalMs: 60_000,
baseDelayMs: 1,
startFailures: [
{ wsId: "ws_a", serverName: "broken", bundleName: "@nb/broken", error: "no manifest" },
],
});

await monitor.check();

// No restart, no extra events — these never produced a source to restart.
expect(sink.events).toHaveLength(0);
const status = monitor.getStatus();
expect(status[0]!.state).toBe("dead");

monitor.stop();
});

it("merges live sources and start failures in getStatus", async () => {
const source = makeMockSource("live-one");
const sink = makeEventCollector();
const monitor = new HealthMonitor([source], sink, {
checkIntervalMs: 60_000,
baseDelayMs: 1,
startFailures: [
{ wsId: "ws_a", serverName: "dead-one", bundleName: "@nb/dead", error: "boom" },
],
});

const status = monitor.getStatus();
expect(status).toHaveLength(2);
const names = status.map((s) => s.name).sort();
expect(names).toEqual(["dead-one", "live-one"]);
expect(status.find((s) => s.name === "live-one")?.state).toBe("healthy");
expect(status.find((s) => s.name === "live-one")?.wsId).toBeUndefined();
expect(status.find((s) => s.name === "dead-one")?.state).toBe("dead");
expect(status.find((s) => s.name === "dead-one")?.wsId).toBe("ws_a");

monitor.stop();
});

it("stop() clears the interval so no more checks run", async () => {
const source = makeMockSource("interval-bundle");
const sink = makeEventCollector();
Expand Down
Loading