diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 7ff5bab..38eb942 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -27,9 +27,11 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is `src/pipeline/populate-self-healing-runner.ts` — shared route/CLI runner. HTTP populate uses a durable filesystem store and `ConvexPopulateDatasetRowWriter`; benchmark/dry-run paths can inject an in-memory store and skip row commits. -`npm --silent run populate:self-heal -- --context context.json` — operator/cron-friendly dry run. It emits one JSON summary to stdout and does not persist recipe history or commit rows. +`npm --silent run populate:self-heal -- --dataset-id ` — operator/cron-friendly dry run. It loads live dataset context with system Convex auth, emits one JSON summary to stdout, and does not persist recipe history or commit rows. -`npm --silent run populate:self-heal -- --context context.json --commit` — commits validated rows through the atomic Convex replace mutation. Requires `CONVEX_SELF_HOSTED_ADMIN_KEY`, `OPENROUTER_API_KEY`, and `TINYFISH_API_KEY`. +`npm --silent run populate:self-heal -- --dataset-id --commit` — commits validated rows through the atomic Convex replace mutation. Requires `CONVEX_URL`, `CONVEX_SELF_HOSTED_ADMIN_KEY`, `OPENROUTER_API_KEY`, and `TINYFISH_API_KEY`. + +`npm --silent run populate:self-heal -- --context context.json` — dev harness dry run for a pasted `DatasetContext`. It uses an isolated in-memory recipe store; `--recipe-store-dir` is rejected unless `--commit` is set. ## Mastra (Workflow Orchestration) diff --git a/backend/src/index.ts b/backend/src/index.ts index e8fd196..8f413a9 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -76,6 +76,7 @@ await fastify.register(async (instance) => { return reply.code(403).send({ error: "Not authorized to populate this dataset" }); } const prerequisiteError = populateRuntimePrerequisiteError({ + convexUrl: env.CONVEX_URL, convexAdminKey: env.CONVEX_ADMIN_KEY, openRouterApiKey: env.OPENROUTER_API_KEY, tinyFishApiKey: env.TINYFISH_API_KEY, diff --git a/backend/src/pipeline/populate-dataset-context-loader.ts b/backend/src/pipeline/populate-dataset-context-loader.ts new file mode 100644 index 0000000..f306e7a --- /dev/null +++ b/backend/src/pipeline/populate-dataset-context-loader.ts @@ -0,0 +1,56 @@ +import { ConvexHttpClient } from "convex/browser"; +import { anyApi } from "convex/server"; + +import { + datasetContextSchema, + type DatasetContext, +} from "./populate.js"; + +export interface PopulateDatasetContextQueryClient { + query(functionReference: unknown, args: unknown): Promise; +} + +export class ConvexPopulateDatasetContextLoader { + constructor( + private readonly input: { + convexClient: PopulateDatasetContextQueryClient; + internalApi?: typeof anyApi; + } + ) {} + + async loadContext(datasetId: string): Promise { + const internalApi = this.input.internalApi ?? anyApi; + const dataset = await this.input.convexClient.query( + internalApi.datasets.getForSystemPopulate, + { id: datasetId } + ); + + if (!dataset || typeof dataset !== "object") { + throw new Error(`Dataset ${datasetId} not found.`); + } + const record = dataset as { + name?: unknown; + description?: unknown; + columns?: unknown; + }; + + return datasetContextSchema.parse({ + datasetId, + datasetName: record.name, + description: record.description, + columns: record.columns, + }); + } +} + +export function createConvexPopulateDatasetContextLoader(input: { + convexUrl: string; + convexAdminKey: string; +}): ConvexPopulateDatasetContextLoader { + const convexClient = new ConvexHttpClient(input.convexUrl); + (convexClient as unknown as { + setAdminAuth(adminKey: string): void; + }).setAdminAuth(input.convexAdminKey); + + return new ConvexPopulateDatasetContextLoader({ convexClient }); +} diff --git a/backend/src/pipeline/populate-runtime-prerequisites.ts b/backend/src/pipeline/populate-runtime-prerequisites.ts index f231670..7292f13 100644 --- a/backend/src/pipeline/populate-runtime-prerequisites.ts +++ b/backend/src/pipeline/populate-runtime-prerequisites.ts @@ -1,15 +1,18 @@ export interface PopulateRuntimePrerequisites { + convexUrl?: string; convexAdminKey?: string; openRouterApiKey?: string; tinyFishApiKey?: string; shouldCommitRows?: boolean; + shouldLoadDatasetContext?: boolean; } export function missingPopulateRuntimePrerequisites( input: PopulateRuntimePrerequisites ): string[] { const requiredKeys: Array<[string, string | undefined]> = []; - if (input.shouldCommitRows ?? true) { + if ((input.shouldCommitRows ?? true) || input.shouldLoadDatasetContext) { + requiredKeys.push(["CONVEX_URL", input.convexUrl]); requiredKeys.push(["CONVEX_SELF_HOSTED_ADMIN_KEY", input.convexAdminKey]); } requiredKeys.push( diff --git a/backend/src/pipeline/populate-self-healing-command.ts b/backend/src/pipeline/populate-self-healing-command.ts index d8ad1d3..f363d4a 100644 --- a/backend/src/pipeline/populate-self-healing-command.ts +++ b/backend/src/pipeline/populate-self-healing-command.ts @@ -13,6 +13,7 @@ import { } from "./populate-self-healing-runner.js"; export interface PopulateSelfHealingCliOptions { + datasetId?: string; contextPath?: string; shouldReadStdin: boolean; shouldCommitRows: boolean; @@ -28,6 +29,7 @@ export interface PopulateSelfHealingCliDependencies { writeStdout?: (text: string) => void; writeStderr?: (text: string) => void; runSelfHealing?: typeof runSelfHealingPopulate; + loadDatasetContextById?: (datasetId: string) => Promise; createRowWriter?: () => Promise; } @@ -40,7 +42,11 @@ export async function runPopulateSelfHealingCli( try { const options = parsePopulateSelfHealingCliArgs(input.argv); const prerequisiteError = populateRuntimePrerequisiteError( - prerequisitesFromEnv(input.env, options.shouldCommitRows) + prerequisitesFromEnv({ + env: input.env, + shouldCommitRows: options.shouldCommitRows, + shouldLoadDatasetContext: Boolean(options.datasetId), + }) ); if (prerequisiteError) { writeStdout(JSON.stringify({ @@ -51,10 +57,13 @@ export async function runPopulateSelfHealingCli( return 1; } - const context = await readDatasetContext({ + const context = await resolveDatasetContext({ options, readFileText: input.readFileText ?? ((path) => readFile(path, "utf8")), readStdinText: input.readStdinText ?? readProcessStdin, + loadDatasetContextById: + input.loadDatasetContextById ?? + ((datasetId) => defaultLoadDatasetContextById(datasetId, input.env)), }); const rowWriter = options.shouldCommitRows ? await (input.createRowWriter ?? defaultCreateRowWriter)() @@ -91,6 +100,7 @@ export function parsePopulateSelfHealingCliArgs( shouldReadStdin: false, shouldCommitRows: false, }; + const contextSources: string[] = []; for (let index = 0; index < argv.length; index += 1) { const arg = argv[index]; @@ -101,10 +111,20 @@ export function parsePopulateSelfHealingCliArgs( } options.contextPath = value; options.shouldReadStdin = value === "-"; + contextSources.push(arg); index += 1; } else if (arg === "--stdin") { options.shouldReadStdin = true; options.contextPath = "-"; + contextSources.push(arg); + } else if (arg === "--dataset-id") { + const value = argv[index + 1]; + if (!value) { + throw new Error("--dataset-id requires a dataset id."); + } + options.datasetId = value; + contextSources.push(arg); + index += 1; } else if (arg === "--commit") { options.shouldCommitRows = true; } else if (arg === "--recipe-store-dir") { @@ -127,8 +147,13 @@ export function parsePopulateSelfHealingCliArgs( } } - if (!options.contextPath && !options.shouldReadStdin) { - throw new Error("Missing --context or --stdin."); + if (contextSources.length === 0) { + throw new Error("Missing --dataset-id , --context , or --stdin."); + } + if (contextSources.length > 1) { + throw new Error( + `Choose exactly one context source: ${contextSources.join(", ")}.` + ); } if (!options.shouldCommitRows && options.recipeStoreDirectory) { throw new Error("--recipe-store-dir requires --commit."); @@ -136,29 +161,50 @@ export function parsePopulateSelfHealingCliArgs( return options; } -async function readDatasetContext(input: { +async function resolveDatasetContext(input: { options: PopulateSelfHealingCliOptions; readFileText: (path: string) => Promise; readStdinText: () => Promise; + loadDatasetContextById: (datasetId: string) => Promise; }): Promise { + if (input.options.datasetId) { + return input.loadDatasetContextById(input.options.datasetId); + } const text = input.options.shouldReadStdin ? await input.readStdinText() : await input.readFileText(input.options.contextPath!); return datasetContextSchema.parse(JSON.parse(text)); } -function prerequisitesFromEnv( - env: NodeJS.ProcessEnv, - shouldCommitRows: boolean -): PopulateRuntimePrerequisites { +function prerequisitesFromEnv(input: { + env: NodeJS.ProcessEnv; + shouldCommitRows: boolean; + shouldLoadDatasetContext: boolean; +}): PopulateRuntimePrerequisites { return { - convexAdminKey: env.CONVEX_SELF_HOSTED_ADMIN_KEY, - openRouterApiKey: env.OPENROUTER_API_KEY, - tinyFishApiKey: env.TINYFISH_API_KEY, - shouldCommitRows, + convexUrl: input.env.CONVEX_URL, + convexAdminKey: input.env.CONVEX_SELF_HOSTED_ADMIN_KEY, + openRouterApiKey: input.env.OPENROUTER_API_KEY, + tinyFishApiKey: input.env.TINYFISH_API_KEY, + shouldCommitRows: input.shouldCommitRows, + shouldLoadDatasetContext: input.shouldLoadDatasetContext, }; } +async function defaultLoadDatasetContextById( + datasetId: string, + env: NodeJS.ProcessEnv +): Promise { + const { createConvexPopulateDatasetContextLoader } = await import( + "./populate-dataset-context-loader.js" + ); + const loader = createConvexPopulateDatasetContextLoader({ + convexUrl: env.CONVEX_URL!, + convexAdminKey: env.CONVEX_SELF_HOSTED_ADMIN_KEY!, + }); + return loader.loadContext(datasetId); +} + async function defaultCreateRowWriter(): Promise { const { ConvexPopulateDatasetRowWriter } = await import( "./populate-convex-writer.js" diff --git a/backend/test/populate-dataset-context-loader.test.ts b/backend/test/populate-dataset-context-loader.test.ts new file mode 100644 index 0000000..1cf4113 --- /dev/null +++ b/backend/test/populate-dataset-context-loader.test.ts @@ -0,0 +1,67 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { ConvexPopulateDatasetContextLoader } from "../src/pipeline/populate-dataset-context-loader.js"; + +test("Convex dataset context loader maps system dataset to populate context", async () => { + const getForSystemPopulate = Symbol("getForSystemPopulate"); + const calls: Array<{ functionReference: unknown; args: unknown }> = []; + const loader = new ConvexPopulateDatasetContextLoader({ + internalApi: { + datasets: { + getForSystemPopulate, + }, + }, + convexClient: { + async query(functionReference, args) { + calls.push({ functionReference, args }); + return { + name: "AI posts", + description: "Find latest blog posts from OpenAI.", + columns: [{ + name: "entity_name", + type: "text", + description: "Company name.", + }], + }; + }, + }, + }); + + const context = await loader.loadContext("dataset-ai-posts"); + + assert.deepEqual(calls, [{ + functionReference: getForSystemPopulate, + args: { id: "dataset-ai-posts" }, + }]); + assert.deepEqual(context, { + datasetId: "dataset-ai-posts", + datasetName: "AI posts", + description: "Find latest blog posts from OpenAI.", + columns: [{ + name: "entity_name", + type: "text", + description: "Company name.", + }], + }); +}); + +test("Convex dataset context loader rejects missing dataset", async () => { + const loader = new ConvexPopulateDatasetContextLoader({ + internalApi: { + datasets: { + getForSystemPopulate: Symbol("getForSystemPopulate"), + }, + }, + convexClient: { + async query() { + return null; + }, + }, + }); + + await assert.rejects( + loader.loadContext("missing-dataset"), + /Dataset missing-dataset not found/ + ); +}); diff --git a/backend/test/populate-runtime-prerequisites.test.ts b/backend/test/populate-runtime-prerequisites.test.ts index 5a77e3f..eb55222 100644 --- a/backend/test/populate-runtime-prerequisites.test.ts +++ b/backend/test/populate-runtime-prerequisites.test.ts @@ -8,6 +8,7 @@ import { test("populate runtime prerequisite check reports every missing key", () => { assert.deepEqual(missingPopulateRuntimePrerequisites({}), [ + "CONVEX_URL", "CONVEX_SELF_HOSTED_ADMIN_KEY", "OPENROUTER_API_KEY", "TINYFISH_API_KEY", @@ -27,6 +28,7 @@ test("populate runtime prerequisite check skips Convex admin key for dry runs", test("populate runtime prerequisite check passes when all keys are configured", () => { const input = { + convexUrl: "http://convex:3210", convexAdminKey: "convex", openRouterApiKey: "openrouter", tinyFishApiKey: "tinyfish", @@ -35,3 +37,15 @@ test("populate runtime prerequisite check passes when all keys are configured", assert.deepEqual(missingPopulateRuntimePrerequisites(input), []); assert.equal(populateRuntimePrerequisiteError(input), undefined); }); + +test("populate runtime prerequisite check requires Convex keys for dataset-id dry runs", () => { + assert.deepEqual( + missingPopulateRuntimePrerequisites({ + openRouterApiKey: "openrouter", + tinyFishApiKey: "tinyfish", + shouldCommitRows: false, + shouldLoadDatasetContext: true, + }), + ["CONVEX_URL", "CONVEX_SELF_HOSTED_ADMIN_KEY"] + ); +}); diff --git a/backend/test/populate-self-healing-command.test.ts b/backend/test/populate-self-healing-command.test.ts index 46092ab..c8b0310 100644 --- a/backend/test/populate-self-healing-command.test.ts +++ b/backend/test/populate-self-healing-command.test.ts @@ -33,6 +33,74 @@ test("self-healing CLI parses context and dry-run mode", () => { }); }); +test("self-healing CLI parses dataset-id mode", () => { + assert.deepEqual(parsePopulateSelfHealingCliArgs([ + "--dataset-id", + "dataset-ai-posts", + "--commit", + ]), { + datasetId: "dataset-ai-posts", + shouldReadStdin: false, + shouldCommitRows: true, + }); +}); + +test("self-healing CLI rejects dataset-id mixed with context input", () => { + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--dataset-id", + "dataset-ai-posts", + "--context", + "context.json", + ]), + /Choose exactly one context source/ + ); + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--context", + "context.json", + "--dataset-id", + "dataset-ai-posts", + ]), + /Choose exactly one context source/ + ); + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--dataset-id", + "dataset-ai-posts", + "--stdin", + ]), + /Choose exactly one context source/ + ); + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--stdin", + "--dataset-id", + "dataset-ai-posts", + ]), + /Choose exactly one context source/ + ); +}); + +test("self-healing CLI rejects context and stdin mixed in any order", () => { + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--context", + "context.json", + "--stdin", + ]), + /Choose exactly one context source/ + ); + assert.throws( + () => parsePopulateSelfHealingCliArgs([ + "--stdin", + "--context", + "context.json", + ]), + /Choose exactly one context source/ + ); +}); + test("self-healing CLI dry run does not require Convex admin key or create writer", async () => { const stdout: string[] = []; let runCalls = 0; @@ -70,6 +138,144 @@ test("self-healing CLI dry run does not require Convex admin key or create write assert.equal(output.rowCount, 1); }); +test("self-healing CLI dataset-id dry run loads context before running", async () => { + const stdout: string[] = []; + let loadedDatasetId = ""; + let didReadFile = false; + const exitCode = await runPopulateSelfHealingCli({ + argv: ["--dataset-id", "dataset-ai-posts"], + env: { + CONVEX_URL: "http://convex:3210", + CONVEX_SELF_HOSTED_ADMIN_KEY: "convex-admin", + OPENROUTER_API_KEY: "openrouter", + TINYFISH_API_KEY: "tinyfish", + }, + readFileText: async () => { + didReadFile = true; + return JSON.stringify(context); + }, + loadDatasetContextById: async (datasetId) => { + loadedDatasetId = datasetId; + return context; + }, + writeStdout: (text) => stdout.push(text), + writeStderr: () => undefined, + runSelfHealing: async (input) => { + assert.equal(input.context.datasetId, context.datasetId); + assert.equal(input.shouldCommitRows, false); + assert.ok(input.store); + assert.equal(input.rowWriter, undefined); + return successfulResult(input.context.datasetId); + }, + }); + + assert.equal(exitCode, 0); + assert.equal(loadedDatasetId, "dataset-ai-posts"); + assert.equal(didReadFile, false); + assert.equal(JSON.parse(stdout[0]!).success, true); +}); + +test("self-healing CLI dataset-id commit loads context and creates writer", async () => { + const stdout: string[] = []; + let writerCalls = 0; + const exitCode = await runPopulateSelfHealingCli({ + argv: ["--dataset-id", "dataset-ai-posts", "--commit"], + env: { + CONVEX_URL: "http://convex:3210", + CONVEX_SELF_HOSTED_ADMIN_KEY: "convex-admin", + OPENROUTER_API_KEY: "openrouter", + TINYFISH_API_KEY: "tinyfish", + POPULATE_RECIPE_STORE_DIR: ".bigset/populate-recipes", + }, + loadDatasetContextById: async (datasetId) => ({ + ...context, + datasetId, + }), + createRowWriter: async () => { + writerCalls += 1; + return { + async replaceRows() { + return { insertedRowCount: 1 }; + }, + }; + }, + writeStdout: (text) => stdout.push(text), + writeStderr: () => undefined, + runSelfHealing: async (input) => { + assert.equal(input.context.datasetId, "dataset-ai-posts"); + assert.equal(input.shouldCommitRows, true); + assert.equal(input.store, undefined); + assert.equal(input.recipeStoreDirectory, ".bigset/populate-recipes"); + assert.ok(input.rowWriter); + return successfulResult(input.context.datasetId); + }, + }); + + assert.equal(exitCode, 0); + assert.equal(writerCalls, 1); + assert.equal(JSON.parse(stdout[0]!).success, true); +}); + +test("self-healing CLI dataset-id mode preflights Convex keys before loading context", async () => { + const stdout: string[] = []; + let loadCalls = 0; + const exitCode = await runPopulateSelfHealingCli({ + argv: ["--dataset-id", "dataset-ai-posts"], + env: { + OPENROUTER_API_KEY: "openrouter", + TINYFISH_API_KEY: "tinyfish", + }, + loadDatasetContextById: async () => { + loadCalls += 1; + return context; + }, + writeStdout: (text) => stdout.push(text), + writeStderr: () => undefined, + }); + + assert.equal(exitCode, 1); + assert.equal(loadCalls, 0); + assert.match(stdout[0]!, /CONVEX_URL/); + assert.match(stdout[0]!, /CONVEX_SELF_HOSTED_ADMIN_KEY/); +}); + +test("self-healing CLI dataset-id loader failures skip runtime and writer", async () => { + const stdout: string[] = []; + let runCalls = 0; + let writerCalls = 0; + const exitCode = await runPopulateSelfHealingCli({ + argv: ["--dataset-id", "not-a-convex-id", "--commit"], + env: { + CONVEX_URL: "http://convex:3210", + CONVEX_SELF_HOSTED_ADMIN_KEY: "convex-admin", + OPENROUTER_API_KEY: "openrouter", + TINYFISH_API_KEY: "tinyfish", + }, + loadDatasetContextById: async () => { + throw new Error("Invalid dataset id: not-a-convex-id."); + }, + createRowWriter: async () => { + writerCalls += 1; + return { + async replaceRows() { + return { insertedRowCount: 0 }; + }, + }; + }, + writeStdout: (text) => stdout.push(text), + writeStderr: () => undefined, + runSelfHealing: async () => { + runCalls += 1; + throw new Error("runtime should not run"); + }, + }); + + assert.equal(exitCode, 1); + assert.equal(runCalls, 0); + assert.equal(writerCalls, 0); + assert.match(stdout[0]!, /Invalid dataset id/); +}); + test("self-healing CLI rejects durable recipe store on dry run", async () => { const stdout: string[] = []; const stderr: string[] = []; diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 95050e7..e944e51 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -1,4 +1,4 @@ -import { query, mutation } from "./_generated/server.js"; +import { query, mutation, internalQuery } from "./_generated/server.js"; import type { QueryCtx } from "./_generated/server.js"; import { v } from "convex/values"; import type { Doc } from "./_generated/dataModel.js"; @@ -82,6 +82,13 @@ export const get = query({ }, }); +export const getForSystemPopulate = internalQuery({ + args: { id: v.id("datasets") }, + handler: async (ctx, args) => { + return await ctx.db.get(args.id); + }, +}); + export const create = mutation({ args: { name: v.string(),