Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions apps/mesh/migrations/016-downstream-token-client-info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Migration 015: Add client registration info to downstream_tokens
*
* Adds clientId and clientSecret columns to support Dynamic Client Registration
* and token refresh for downstream MCP OAuth flows.
*/

import type { Kysely } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
// Add clientId and clientSecret for Dynamic Client Registration
await db.schema
.alterTable("downstream_tokens")
.addColumn("clientId", "text")
.execute();

await db.schema
.alterTable("downstream_tokens")
.addColumn("clientSecret", "text")
.execute();

// Add tokenEndpoint to know where to refresh
await db.schema
.alterTable("downstream_tokens")
.addColumn("tokenEndpoint", "text")
.execute();

// Create index for faster lookups by connectionId + userId
await db.schema
.createIndex("idx_downstream_tokens_connection_user")
.on("downstream_tokens")
.columns(["connectionId", "userId"])
.execute();
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropIndex("idx_downstream_tokens_connection_user").execute();

await db.schema
.alterTable("downstream_tokens")
.dropColumn("tokenEndpoint")
.execute();

await db.schema
.alterTable("downstream_tokens")
.dropColumn("clientSecret")
.execute();

await db.schema
.alterTable("downstream_tokens")
.dropColumn("clientId")
.execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import * as migration012gatewaytoolselectionmode from "./012-gateway-tool-select
import * as migration013monitoringuseragentgateway from "./013-monitoring-user-agent-gateway.ts";
import * as migration014gatewayresourcesprompts from "./014-gateway-resources-prompts.ts";
import * as migration015monitoringproperties from "./015-monitoring-properties.ts";
import * as migration016downstreamtokenclientinfo from "./016-downstream-token-client-info.ts";

const migrations = {
"001-initial-schema": migration001initialschema,
Expand All @@ -31,6 +32,7 @@ const migrations = {
"013-monitoring-user-agent-gateway": migration013monitoringuseragentgateway,
"014-gateway-resources-prompts": migration014gatewayresourcesprompts,
"015-monitoring-properties": migration015monitoringproperties,
"016-downstream-token-client-info": migration016downstreamtokenclientinfo,
} satisfies Record<string, Migration>;

export default migrations;
4 changes: 4 additions & 0 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { shouldSkipMeshContext, SYSTEM_PATHS } from "./utils/paths";
import { createEventBus, type EventBus } from "../event-bus";
import { meter, prometheusExporter, tracer } from "../observability";
import authRoutes from "./routes/auth";
import downstreamTokenRoutes from "./routes/downstream-token";
import gatewayRoutes from "./routes/gateway";
import managementRoutes from "./routes/management";
import modelsRoutes from "./routes/models";
Expand Down Expand Up @@ -550,6 +551,9 @@ export function createApp(options: CreateAppOptions = {}) {
return c.json({ success: true });
});

// Downstream token management routes
app.route("/api", downstreamTokenRoutes);

// ============================================================================
// 404 Handler
// ============================================================================
Expand Down
95 changes: 95 additions & 0 deletions apps/mesh/src/api/routes/downstream-token.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { describe, it, expect, beforeEach, afterEach, mock } from "bun:test";
import { Hono } from "hono";
import type { MeshContext } from "../../core/mesh-context";
import { CredentialVault } from "../../encryption/credential-vault";
import {
createDatabase,
closeDatabase,
type MeshDatabase,
} from "../../database";
import { createTestSchema } from "../../storage/test-helpers";
import downstreamTokenRoutes from "./downstream-token";

describe("Downstream Token Routes", () => {
let database: MeshDatabase;
let app: Hono<{ Variables: { meshContext: MeshContext } }>;

beforeEach(async () => {
database = createDatabase(":memory:");
await createTestSchema(database.db);

const vault = new CredentialVault(CredentialVault.generateKey());

const ctx = {
db: database.db,
vault,
organization: { id: "org_1" },
auth: { user: { id: "user_1" } },
storage: {
connections: {
findById: mock(async () => ({ id: "conn_1" })),
},
},
} as unknown as MeshContext;

app = new Hono();
app.use("*", async (c, next) => {
c.set("meshContext", ctx);
await next();
});
app.route("/", downstreamTokenRoutes);
});

afterEach(async () => {
await closeDatabase(database);
mock.restore();
});

it("rejects invalid tokenEndpoint", async () => {
const res = await app.request("/connections/conn_1/oauth-token", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
accessToken: "at",
tokenEndpoint: "not-a-url",
}),
});

expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("tokenEndpoint must be a valid URL");
});

it("rejects non-http(s) tokenEndpoint", async () => {
const res = await app.request("/connections/conn_1/oauth-token", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
accessToken: "at",
tokenEndpoint: "javascript:alert(1)",
}),
});

expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("tokenEndpoint must be an http(s) URL");
});

it("accepts http(s) tokenEndpoint", async () => {
const res = await app.request("/connections/conn_1/oauth-token", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
accessToken: "at",
refreshToken: "rt",
expiresIn: 3600,
tokenEndpoint: "https://example.com/token",
}),
});

expect(res.status).toBe(200);
const body = (await res.json()) as { success: boolean; expiresAt: string };
expect(body.success).toBe(true);
expect(body.expiresAt).toBeTruthy();
});
});
163 changes: 163 additions & 0 deletions apps/mesh/src/api/routes/downstream-token.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Downstream Token API Routes
*
* Handles OAuth token management for downstream MCP connections.
* Called from frontend after OAuth authentication to persist tokens.
*/

import { Hono } from "hono";
import type { MeshContext } from "../../core/mesh-context";
import {
DownstreamTokenStorage,
type DownstreamTokenData,
} from "../../storage/downstream-token";

// Define Hono variables type
type Variables = {
meshContext: MeshContext;
};

const app = new Hono<{ Variables: Variables }>();

/**
* POST /api/connections/:connectionId/oauth-token
*
* Save OAuth tokens after authentication.
* Called from frontend after OAuth flow completes.
*/
app.post("/connections/:connectionId/oauth-token", async (c) => {
const ctx = c.get("meshContext");
const connectionId = c.req.param("connectionId");

// Require authentication
const userId = ctx.auth.user?.id ?? ctx.auth.apiKey?.userId ?? null;
if (!userId) {
return c.json({ error: "Unauthorized" }, 401);
}

// Verify connection exists and user has access
// Pass organizationId to ensure the user has access to this connection
// Connections are scoped to organizations, and ctx.storage.connections.findById
// enforces this check if organizationId is provided.
const connection = await ctx.storage.connections.findById(
connectionId,
ctx.organization?.id,
);
if (!connection) {
return c.json({ error: "Connection not found" }, 404);
}

// Parse request body
const body = await c.req.json<{
accessToken: string;
refreshToken?: string | null;
expiresIn?: number | null;
scope?: string | null;
clientId?: string | null;
clientSecret?: string | null;
tokenEndpoint?: string | null;
}>();

if (!body.accessToken) {
return c.json({ error: "accessToken is required" }, 400);
}

if (body.tokenEndpoint) {
let url: URL;
try {
url = new URL(body.tokenEndpoint);
} catch {
return c.json({ error: "tokenEndpoint must be a valid URL" }, 400);
}

if (url.protocol !== "http:" && url.protocol !== "https:") {
return c.json({ error: "tokenEndpoint must be an http(s) URL" }, 400);
}
}

// Calculate expiry time
const expiresAt = body.expiresIn
? new Date(Date.now() + body.expiresIn * 1000)
: null;

// Create storage instance
const tokenStorage = new DownstreamTokenStorage(ctx.db, ctx.vault);

// Save token
const tokenData: DownstreamTokenData = {
connectionId,
userId,
accessToken: body.accessToken,
refreshToken: body.refreshToken ?? null,
scope: body.scope ?? null,
expiresAt,
clientId: body.clientId ?? null,
clientSecret: body.clientSecret ?? null,
tokenEndpoint: body.tokenEndpoint ?? null,
};

const token = await tokenStorage.upsert(tokenData);

return c.json({
success: true,
expiresAt: token.expiresAt,
});
});

/**
* DELETE /api/connections/:connectionId/oauth-token
*
* Delete OAuth token for a connection.
*/
app.delete("/connections/:connectionId/oauth-token", async (c) => {
const ctx = c.get("meshContext");
const connectionId = c.req.param("connectionId");

const userId = ctx.auth.user?.id ?? ctx.auth.apiKey?.userId ?? null;
if (!userId) {
return c.json({ error: "Unauthorized" }, 401);
}

const tokenStorage = new DownstreamTokenStorage(ctx.db, ctx.vault);
await tokenStorage.delete(connectionId, userId);

return c.json({ success: true });
});

/**
* GET /api/connections/:connectionId/oauth-token/status
*
* Check if user has a valid cached token for a connection.
*/
app.get("/connections/:connectionId/oauth-token/status", async (c) => {
const ctx = c.get("meshContext");
const connectionId = c.req.param("connectionId");

const userId = ctx.auth.user?.id ?? ctx.auth.apiKey?.userId ?? null;
if (!userId) {
return c.json({ error: "Unauthorized" }, 401);
}

const tokenStorage = new DownstreamTokenStorage(ctx.db, ctx.vault);
const token = await tokenStorage.get(connectionId, userId);

if (!token) {
return c.json({
hasToken: false,
isExpired: true,
canRefresh: false,
});
}

const isExpired = tokenStorage.isExpired(token);
const canRefresh = !!token.refreshToken && !!token.tokenEndpoint;

return c.json({
hasToken: true,
isExpired,
canRefresh,
expiresAt: token.expiresAt,
});
});

export default app;
Loading