Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is

`src/pipeline/populate-self-healing-runner.ts` — shared route/CLI runner. HTTP populate uses a durable filesystem store and `ConvexPopulateDatasetRowWriter`; benchmark/dry-run paths can inject an in-memory store and skip row commits.

`npm --silent run populate:self-heal -- --context context.json` — operator/cron-friendly dry run. It emits one JSON summary to stdout and does not persist recipe history or commit rows.
`npm --silent run populate:self-heal -- --dataset-id <datasetId>` — operator/cron-friendly dry run. It loads live dataset context with system Convex auth, emits one JSON summary to stdout, and does not persist recipe history or commit rows.

`npm --silent run populate:self-heal -- --context context.json --commit` — commits validated rows through the atomic Convex replace mutation. Requires `CONVEX_SELF_HOSTED_ADMIN_KEY`, `OPENROUTER_API_KEY`, and `TINYFISH_API_KEY`.
`npm --silent run populate:self-heal -- --dataset-id <datasetId> --commit` — commits validated rows through the atomic Convex replace mutation. Requires `CONVEX_URL`, `CONVEX_SELF_HOSTED_ADMIN_KEY`, `OPENROUTER_API_KEY`, and `TINYFISH_API_KEY`.

`npm --silent run populate:self-heal -- --context context.json` — dev harness dry run for a pasted `DatasetContext`. It uses an isolated in-memory recipe store; `--recipe-store-dir` is rejected unless `--commit` is set.

## Mastra (Workflow Orchestration)

Expand Down
1 change: 1 addition & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ await fastify.register(async (instance) => {
return reply.code(403).send({ error: "Not authorized to populate this dataset" });
}
const prerequisiteError = populateRuntimePrerequisiteError({
convexUrl: env.CONVEX_URL,
convexAdminKey: env.CONVEX_ADMIN_KEY,
openRouterApiKey: env.OPENROUTER_API_KEY,
tinyFishApiKey: env.TINYFISH_API_KEY,
Expand Down
56 changes: 56 additions & 0 deletions backend/src/pipeline/populate-dataset-context-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { ConvexHttpClient } from "convex/browser";
import { anyApi } from "convex/server";

import {
datasetContextSchema,
type DatasetContext,
} from "./populate.js";

export interface PopulateDatasetContextQueryClient {
query(functionReference: unknown, args: unknown): Promise<unknown>;
}

export class ConvexPopulateDatasetContextLoader {
constructor(
private readonly input: {
convexClient: PopulateDatasetContextQueryClient;
internalApi?: typeof anyApi;
}
) {}

async loadContext(datasetId: string): Promise<DatasetContext> {
const internalApi = this.input.internalApi ?? anyApi;
const dataset = await this.input.convexClient.query(
internalApi.datasets.getForSystemPopulate,
{ id: datasetId }
);

if (!dataset || typeof dataset !== "object") {
throw new Error(`Dataset ${datasetId} not found.`);
}
const record = dataset as {
name?: unknown;
description?: unknown;
columns?: unknown;
};

return datasetContextSchema.parse({
datasetId,
datasetName: record.name,
description: record.description,
columns: record.columns,
});
}
}

export function createConvexPopulateDatasetContextLoader(input: {
convexUrl: string;
convexAdminKey: string;
}): ConvexPopulateDatasetContextLoader {
const convexClient = new ConvexHttpClient(input.convexUrl);
(convexClient as unknown as {
setAdminAuth(adminKey: string): void;
}).setAdminAuth(input.convexAdminKey);

return new ConvexPopulateDatasetContextLoader({ convexClient });
}
5 changes: 4 additions & 1 deletion backend/src/pipeline/populate-runtime-prerequisites.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
export interface PopulateRuntimePrerequisites {
convexUrl?: string;
convexAdminKey?: string;
openRouterApiKey?: string;
tinyFishApiKey?: string;
shouldCommitRows?: boolean;
shouldLoadDatasetContext?: boolean;
}

export function missingPopulateRuntimePrerequisites(
input: PopulateRuntimePrerequisites
): string[] {
const requiredKeys: Array<[string, string | undefined]> = [];
if (input.shouldCommitRows ?? true) {
if ((input.shouldCommitRows ?? true) || input.shouldLoadDatasetContext) {
requiredKeys.push(["CONVEX_URL", input.convexUrl]);
requiredKeys.push(["CONVEX_SELF_HOSTED_ADMIN_KEY", input.convexAdminKey]);
}
requiredKeys.push(
Expand Down
72 changes: 59 additions & 13 deletions backend/src/pipeline/populate-self-healing-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "./populate-self-healing-runner.js";

export interface PopulateSelfHealingCliOptions {
datasetId?: string;
contextPath?: string;
shouldReadStdin: boolean;
shouldCommitRows: boolean;
Expand All @@ -28,6 +29,7 @@ export interface PopulateSelfHealingCliDependencies {
writeStdout?: (text: string) => void;
writeStderr?: (text: string) => void;
runSelfHealing?: typeof runSelfHealingPopulate;
loadDatasetContextById?: (datasetId: string) => Promise<DatasetContext>;
createRowWriter?: () => Promise<PopulateDatasetRowWriter>;
}

Expand All @@ -40,7 +42,11 @@ export async function runPopulateSelfHealingCli(
try {
const options = parsePopulateSelfHealingCliArgs(input.argv);
const prerequisiteError = populateRuntimePrerequisiteError(
prerequisitesFromEnv(input.env, options.shouldCommitRows)
prerequisitesFromEnv({
env: input.env,
shouldCommitRows: options.shouldCommitRows,
shouldLoadDatasetContext: Boolean(options.datasetId),
})
);
if (prerequisiteError) {
writeStdout(JSON.stringify({
Expand All @@ -51,10 +57,13 @@ export async function runPopulateSelfHealingCli(
return 1;
}

const context = await readDatasetContext({
const context = await resolveDatasetContext({
options,
readFileText: input.readFileText ?? ((path) => readFile(path, "utf8")),
readStdinText: input.readStdinText ?? readProcessStdin,
loadDatasetContextById:
input.loadDatasetContextById ??
((datasetId) => defaultLoadDatasetContextById(datasetId, input.env)),
});
const rowWriter = options.shouldCommitRows
? await (input.createRowWriter ?? defaultCreateRowWriter)()
Expand Down Expand Up @@ -91,6 +100,7 @@ export function parsePopulateSelfHealingCliArgs(
shouldReadStdin: false,
shouldCommitRows: false,
};
const contextSources: string[] = [];

for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
Expand All @@ -101,10 +111,20 @@ export function parsePopulateSelfHealingCliArgs(
}
options.contextPath = value;
options.shouldReadStdin = value === "-";
contextSources.push(arg);
index += 1;
} else if (arg === "--stdin") {
options.shouldReadStdin = true;
options.contextPath = "-";
contextSources.push(arg);
} else if (arg === "--dataset-id") {
const value = argv[index + 1];
if (!value) {
throw new Error("--dataset-id requires a dataset id.");
}
options.datasetId = value;
contextSources.push(arg);
index += 1;
} else if (arg === "--commit") {
options.shouldCommitRows = true;
} else if (arg === "--recipe-store-dir") {
Expand All @@ -127,38 +147,64 @@ export function parsePopulateSelfHealingCliArgs(
}
}

if (!options.contextPath && !options.shouldReadStdin) {
throw new Error("Missing --context <file> or --stdin.");
if (contextSources.length === 0) {
throw new Error("Missing --dataset-id <id>, --context <file>, or --stdin.");
}
if (contextSources.length > 1) {
throw new Error(
`Choose exactly one context source: ${contextSources.join(", ")}.`
);
}
if (!options.shouldCommitRows && options.recipeStoreDirectory) {
throw new Error("--recipe-store-dir requires --commit.");
}
return options;
}

async function readDatasetContext(input: {
async function resolveDatasetContext(input: {
options: PopulateSelfHealingCliOptions;
readFileText: (path: string) => Promise<string>;
readStdinText: () => Promise<string>;
loadDatasetContextById: (datasetId: string) => Promise<DatasetContext>;
}): Promise<DatasetContext> {
if (input.options.datasetId) {
return input.loadDatasetContextById(input.options.datasetId);
}
const text = input.options.shouldReadStdin
? await input.readStdinText()
: await input.readFileText(input.options.contextPath!);
return datasetContextSchema.parse(JSON.parse(text));
}

function prerequisitesFromEnv(
env: NodeJS.ProcessEnv,
shouldCommitRows: boolean
): PopulateRuntimePrerequisites {
function prerequisitesFromEnv(input: {
env: NodeJS.ProcessEnv;
shouldCommitRows: boolean;
shouldLoadDatasetContext: boolean;
}): PopulateRuntimePrerequisites {
return {
convexAdminKey: env.CONVEX_SELF_HOSTED_ADMIN_KEY,
openRouterApiKey: env.OPENROUTER_API_KEY,
tinyFishApiKey: env.TINYFISH_API_KEY,
shouldCommitRows,
convexUrl: input.env.CONVEX_URL,
convexAdminKey: input.env.CONVEX_SELF_HOSTED_ADMIN_KEY,
openRouterApiKey: input.env.OPENROUTER_API_KEY,
tinyFishApiKey: input.env.TINYFISH_API_KEY,
shouldCommitRows: input.shouldCommitRows,
shouldLoadDatasetContext: input.shouldLoadDatasetContext,
};
}

async function defaultLoadDatasetContextById(
datasetId: string,
env: NodeJS.ProcessEnv
): Promise<DatasetContext> {
const { createConvexPopulateDatasetContextLoader } = await import(
"./populate-dataset-context-loader.js"
);
const loader = createConvexPopulateDatasetContextLoader({
convexUrl: env.CONVEX_URL!,
convexAdminKey: env.CONVEX_SELF_HOSTED_ADMIN_KEY!,
});
return loader.loadContext(datasetId);
}

async function defaultCreateRowWriter(): Promise<PopulateDatasetRowWriter> {
const { ConvexPopulateDatasetRowWriter } = await import(
"./populate-convex-writer.js"
Expand Down
67 changes: 67 additions & 0 deletions backend/test/populate-dataset-context-loader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import assert from "node:assert/strict";
import { test } from "node:test";

import { ConvexPopulateDatasetContextLoader } from "../src/pipeline/populate-dataset-context-loader.js";

test("Convex dataset context loader maps system dataset to populate context", async () => {
const getForSystemPopulate = Symbol("getForSystemPopulate");
const calls: Array<{ functionReference: unknown; args: unknown }> = [];
const loader = new ConvexPopulateDatasetContextLoader({
internalApi: {
datasets: {
getForSystemPopulate,
},
},
convexClient: {
async query(functionReference, args) {
calls.push({ functionReference, args });
return {
name: "AI posts",
description: "Find latest blog posts from OpenAI.",
columns: [{
name: "entity_name",
type: "text",
description: "Company name.",
}],
};
},
},
});

const context = await loader.loadContext("dataset-ai-posts");

assert.deepEqual(calls, [{
functionReference: getForSystemPopulate,
args: { id: "dataset-ai-posts" },
}]);
assert.deepEqual(context, {
datasetId: "dataset-ai-posts",
datasetName: "AI posts",
description: "Find latest blog posts from OpenAI.",
columns: [{
name: "entity_name",
type: "text",
description: "Company name.",
}],
});
});

test("Convex dataset context loader rejects missing dataset", async () => {
const loader = new ConvexPopulateDatasetContextLoader({
internalApi: {
datasets: {
getForSystemPopulate: Symbol("getForSystemPopulate"),
},
},
convexClient: {
async query() {
return null;
},
},
});

await assert.rejects(
loader.loadContext("missing-dataset"),
/Dataset missing-dataset not found/
);
});
14 changes: 14 additions & 0 deletions backend/test/populate-runtime-prerequisites.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {

test("populate runtime prerequisite check reports every missing key", () => {
assert.deepEqual(missingPopulateRuntimePrerequisites({}), [
"CONVEX_URL",
"CONVEX_SELF_HOSTED_ADMIN_KEY",
"OPENROUTER_API_KEY",
"TINYFISH_API_KEY",
Expand All @@ -27,6 +28,7 @@ test("populate runtime prerequisite check skips Convex admin key for dry runs",

test("populate runtime prerequisite check passes when all keys are configured", () => {
const input = {
convexUrl: "http://convex:3210",
convexAdminKey: "convex",
openRouterApiKey: "openrouter",
tinyFishApiKey: "tinyfish",
Expand All @@ -35,3 +37,15 @@ test("populate runtime prerequisite check passes when all keys are configured",
assert.deepEqual(missingPopulateRuntimePrerequisites(input), []);
assert.equal(populateRuntimePrerequisiteError(input), undefined);
});

test("populate runtime prerequisite check requires Convex keys for dataset-id dry runs", () => {
assert.deepEqual(
missingPopulateRuntimePrerequisites({
openRouterApiKey: "openrouter",
tinyFishApiKey: "tinyfish",
shouldCommitRows: false,
shouldLoadDatasetContext: true,
}),
["CONVEX_URL", "CONVEX_SELF_HOSTED_ADMIN_KEY"]
);
});
Loading