From 823fa38c6706f65c084c8318cf8a0c4a790c9ddd Mon Sep 17 00:00:00 2001 From: Edward Tran Date: Fri, 22 May 2026 19:16:29 +0700 Subject: [PATCH] Wire Mastra populate through self-healing --- .env.example | 8 + .gitignore | 2 + backend/.env.example | 1 + backend/src/env.ts | 6 + backend/src/index.ts | 61 ++- .../src/pipeline/populate-convex-writer.ts | 71 ++++ .../populate-runtime-prerequisites.ts | 29 ++ .../pipeline/populate-self-healing-runner.ts | 128 ++++++ backend/src/pipeline/populate-self-healing.ts | 74 ++++ backend/test/populate-convex-writer.test.ts | 63 +++ .../populate-runtime-prerequisites.test.ts | 26 ++ .../test/populate-self-healing-runner.test.ts | 365 ++++++++++++++++++ benchmarks/dataset-agent/README.md | 6 +- .../adapters/mastra-populate-adapter.mjs | 55 ++- docker-compose.dev.yml | 3 + frontend/convex/datasetRows.ts | 32 ++ 16 files changed, 900 insertions(+), 30 deletions(-) create mode 100644 backend/src/pipeline/populate-convex-writer.ts create mode 100644 backend/src/pipeline/populate-runtime-prerequisites.ts create mode 100644 backend/src/pipeline/populate-self-healing-runner.ts create mode 100644 backend/test/populate-convex-writer.test.ts create mode 100644 backend/test/populate-runtime-prerequisites.test.ts create mode 100644 backend/test/populate-self-healing-runner.test.ts diff --git a/.env.example b/.env.example index 42ce2db..8959888 100644 --- a/.env.example +++ b/.env.example @@ -9,11 +9,19 @@ CLERK_SECRET_KEY=sk_test_... # Generate at https://openrouter.ai/settings/keys OPENROUTER_API_KEY=sk-or-... +# TinyFish — required by populate agent web search/fetch. +# Generate at https://agent.tinyfish.ai/api-keys +TINYFISH_API_KEY= + # Generate once after the first `make dev` with: # docker compose exec convex ./generate_admin_key.sh # Used by the backend container to call internal Convex functions. CONVEX_SELF_HOSTED_ADMIN_KEY= +# Durable store for self-healing populate recipe manifests. +# Docker dev overrides this to /app/.bigset/populate-recipes on a named volume. +POPULATE_RECIPE_STORE_DIR=.bigset/populate-recipes + # PostHog (optional — leave blank to disable analytics entirely in local dev). # Get from https://us.posthog.com/project/settings/general. NEXT_PUBLIC_POSTHOG_KEY= diff --git a/.gitignore b/.gitignore index d5b51c3..91c25ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store node_modules/ +backend/node_modules .env .env.local Project_BigSet_brief.md @@ -22,6 +23,7 @@ tmp/ temp/ .mastra +.bigset/ # Local tarballs *.tgz diff --git a/backend/.env.example b/backend/.env.example index 5f6f461..a56d9df 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -1,6 +1,7 @@ CLIENT_ORIGIN=http://localhost:3500 CONVEX_URL=http://localhost:3210 PORT=3501 +POPULATE_RECIPE_STORE_DIR=.bigset/populate-recipes # Required once the backend starts writing rows via internal Convex mutations. # Generate with: docker compose exec convex ./generate_admin_key.sh diff --git a/backend/src/env.ts b/backend/src/env.ts index cbd44cf..475994b 100644 --- a/backend/src/env.ts +++ b/backend/src/env.ts @@ -24,4 +24,10 @@ export const env = { CLERK_PUBLISHABLE_KEY: process.env.CLERK_PUBLISHABLE_KEY, OPENROUTER_API_KEY: process.env.OPENROUTER_API_KEY, + TINYFISH_API_KEY: process.env.TINYFISH_API_KEY, + + // Durable recipe manifests for the self-healing populate layer. In Docker + // dev this points at a named volume; locally it defaults under the repo. + POPULATE_RECIPE_STORE_DIR: + process.env.POPULATE_RECIPE_STORE_DIR || ".bigset/populate-recipes", }; diff --git a/backend/src/index.ts b/backend/src/index.ts index cbb30ea..e8fd196 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -5,10 +5,13 @@ import { env } from "./env.js"; import clerkAuthPlugin, { requireAuth } from "./clerk-auth.js"; import { inferSchema } from "./pipeline/schema-inference.js"; import { datasetContextSchema } from "./pipeline/populate.js"; -import { populateWorkflow } from "./mastra/workflows/populate.js"; +import { ConvexPopulateDatasetRowWriter } from "./pipeline/populate-convex-writer.js"; +import { populateRuntimePrerequisiteError } from "./pipeline/populate-runtime-prerequisites.js"; +import { runSelfHealingPopulate } from "./pipeline/populate-self-healing-runner.js"; import { convex, api } from "./convex.js"; const fastify = Fastify({ logger: true }); +const populateRowWriter = new ConvexPopulateDatasetRowWriter(); await fastify.register(fastifyCors, { origin: env.CLIENT_ORIGIN, @@ -72,17 +75,42 @@ await fastify.register(async (instance) => { if (dataset.ownerId !== authenticatedUserId) { return reply.code(403).send({ error: "Not authorized to populate this dataset" }); } + const prerequisiteError = populateRuntimePrerequisiteError({ + convexAdminKey: env.CONVEX_ADMIN_KEY, + openRouterApiKey: env.OPENROUTER_API_KEY, + tinyFishApiKey: env.TINYFISH_API_KEY, + }); + if (prerequisiteError) { + return reply.code(500).send({ + error: prerequisiteError, + }); + } - const run = await populateWorkflow.createRun(); - const result = await run.start({ inputData: parsed.data }); - - req.log.info({ workflowStatus: result.status, steps: JSON.stringify(result.steps).slice(0, 2000) }, "Populate workflow completed"); + const result = await runSelfHealingPopulate({ + context: parsed.data, + recipeStoreDirectory: env.POPULATE_RECIPE_STORE_DIR, + rowWriter: populateRowWriter, + shouldCommitRows: true, + }); - if (result.status !== "success") { - throw new Error(`Workflow ended with status: ${result.status}`); + req.log.info({ + action: result.action, + datasetId: result.datasetId, + committedRows: result.committedRows?.insertedRowCount ?? 0, + validationIssues: result.validationIssues.slice(0, 5), + }, "Self-healing populate completed"); + + if (!result.success) { + return reply.code(422).send({ + error: "Self-healing populate failed validation.", + result: responseSafePopulateResult(result), + }); } - return { success: true, result: result.result }; + return { + success: true, + result: responseSafePopulateResult(result), + }; } catch (err) { const msg = err instanceof Error ? err.message : String(err); if (msg.includes("validator") || msg.includes("Invalid")) { @@ -100,3 +128,20 @@ try { fastify.log.error(err); process.exit(1); } + +function responseSafePopulateResult( + result: Awaited> +) { + const diagnosticRun = result.selectedRun ?? result.diagnosticRun; + return { + action: result.action, + datasetId: result.datasetId, + success: result.success, + committedRows: result.committedRows, + rejectionReasons: result.rejectionReasons, + validationIssues: result.validationIssues, + productionValidation: diagnosticRun?.productionValidation, + metrics: diagnosticRun?.metrics, + rowCount: diagnosticRun?.rows.length ?? 0, + }; +} diff --git a/backend/src/pipeline/populate-convex-writer.ts b/backend/src/pipeline/populate-convex-writer.ts new file mode 100644 index 0000000..78335a0 --- /dev/null +++ b/backend/src/pipeline/populate-convex-writer.ts @@ -0,0 +1,71 @@ +import { env } from "../env.js"; +import { convex, internal } from "../convex.js"; +import type { + PopulateDatasetRowWriter, + PopulateDatasetWriteResult, +} from "./populate-self-healing-runner.js"; + +interface ConvexMutationClient { + mutation(functionReference: unknown, args: unknown): Promise; +} + +export class ConvexPopulateDatasetRowWriter implements PopulateDatasetRowWriter { + constructor( + private readonly input: { + convexClient?: ConvexMutationClient; + internalApi?: typeof internal; + } = {} + ) {} + + async replaceRows(input: Parameters[0]): + Promise { + if (!env.CONVEX_ADMIN_KEY) { + throw new Error( + "CONVEX_SELF_HOSTED_ADMIN_KEY is required to commit self-healed populate rows." + ); + } + + const convexClient = this.input.convexClient ?? convex; + const internalApi = this.input.internalApi ?? internal; + const replacement = await convexClient.mutation( + internalApi.datasetRows.replaceByDataset, + { + datasetId: input.datasetId, + rows: input.rows.map((row) => ({ + data: row.cells, + sources: row.sourceUrls, + })), + } + ); + + return normalizeReplacementResult(replacement, input.rows.length); + } +} + +function normalizeReplacementResult( + value: unknown, + fallbackInsertedRowCount: number +): PopulateDatasetWriteResult { + if ( + typeof value === "object" && + value !== null && + "insertedRowCount" in value + ) { + const replacement = value as { + clearedRowCount?: unknown; + insertedRowCount?: unknown; + }; + return { + clearedRowCount: typeof replacement.clearedRowCount === "number" + ? replacement.clearedRowCount + : undefined, + insertedRowCount: typeof replacement.insertedRowCount === "number" + ? replacement.insertedRowCount + : fallbackInsertedRowCount, + }; + } + + return { + insertedRowCount: fallbackInsertedRowCount, + }; +} diff --git a/backend/src/pipeline/populate-runtime-prerequisites.ts b/backend/src/pipeline/populate-runtime-prerequisites.ts new file mode 100644 index 0000000..d334559 --- /dev/null +++ b/backend/src/pipeline/populate-runtime-prerequisites.ts @@ -0,0 +1,29 @@ +export interface PopulateRuntimePrerequisites { + convexAdminKey?: string; + openRouterApiKey?: string; + tinyFishApiKey?: string; +} + +export function missingPopulateRuntimePrerequisites( + input: PopulateRuntimePrerequisites +): string[] { + const requiredKeys: Array<[string, string | undefined]> = [ + ["CONVEX_SELF_HOSTED_ADMIN_KEY", input.convexAdminKey], + ["OPENROUTER_API_KEY", input.openRouterApiKey], + ["TINYFISH_API_KEY", input.tinyFishApiKey], + ]; + + return requiredKeys + .filter(([, value]) => !value) + .map(([name]) => name); +} + +export function populateRuntimePrerequisiteError( + input: PopulateRuntimePrerequisites +): string | undefined { + const missingNames = missingPopulateRuntimePrerequisites(input); + if (missingNames.length === 0) { + return undefined; + } + return `Backend is missing required populate runtime keys: ${missingNames.join(", ")}.`; +} diff --git a/backend/src/pipeline/populate-self-healing-runner.ts b/backend/src/pipeline/populate-self-healing-runner.ts new file mode 100644 index 0000000..3e3347d --- /dev/null +++ b/backend/src/pipeline/populate-self-healing-runner.ts @@ -0,0 +1,128 @@ +import { join } from "node:path"; + +import type { DatasetContext } from "./populate.js"; +import { + DefaultPopulateRecipeAuthor, + FileSystemPopulateRecipeStore, + MastraPopulateRecipeRuntime, + SelfHealingPopulateRecipeService, + type PopulateRecipeAuthor, + type PopulateRecipeRunResult, + type PopulateRecipeRuntime, + type PopulateRecipeStore, + type SelfHealingPopulateTickResult, +} from "./populate-self-healing.js"; + +export interface PopulateDatasetRowWriter { + replaceRows(input: { + datasetId: string; + rows: PopulateRecipeRunResult["rows"]; + }): Promise; +} + +export interface PopulateDatasetWriteResult { + clearedRowCount?: number; + insertedRowCount: number; +} + +export interface RunSelfHealingPopulateInput { + context: DatasetContext; + store?: PopulateRecipeStore; + runtime?: PopulateRecipeRuntime; + author?: PopulateRecipeAuthor; + rowWriter?: PopulateDatasetRowWriter; + shouldCommitRows?: boolean; + recipeStoreDirectory?: string; +} + +export interface RunSelfHealingPopulateResult { + success: boolean; + action: SelfHealingPopulateTickResult["action"]; + datasetId: string; + selectedRun?: PopulateRecipeRunResult; + diagnosticRun?: PopulateRecipeRunResult; + committedRows?: PopulateDatasetWriteResult; + rejectionReasons: string[]; + validationIssues: string[]; + tick: SelfHealingPopulateTickResult; +} + +export async function runSelfHealingPopulate( + input: RunSelfHealingPopulateInput +): Promise { + if (input.shouldCommitRows && !input.rowWriter) { + throw new Error("rowWriter is required when shouldCommitRows is true."); + } + const rowWriter = input.rowWriter; + + const store = input.store ?? new FileSystemPopulateRecipeStore( + input.recipeStoreDirectory ?? defaultPopulateRecipeStoreDirectory() + ); + const service = new SelfHealingPopulateRecipeService({ + store, + runtime: input.runtime ?? new MastraPopulateRecipeRuntime(), + author: input.author ?? new DefaultPopulateRecipeAuthor(), + }); + const tick = await service.tick({ + datasetId: input.context.datasetId, + context: input.context, + }); + const selectedRun = successfulRunForTick(tick); + const diagnosticRun = diagnosticRunForTick(tick); + let committedRows: PopulateDatasetWriteResult | undefined; + + if (input.shouldCommitRows && selectedRun && rowWriter) { + committedRows = await rowWriter.replaceRows({ + datasetId: input.context.datasetId, + rows: selectedRun.rows, + }); + } + + return { + success: Boolean(selectedRun), + action: tick.action, + datasetId: input.context.datasetId, + selectedRun, + diagnosticRun, + committedRows, + rejectionReasons: tick.rejectionReasons, + validationIssues: validationIssuesForSelfHealingTick(tick), + tick, + }; +} + +export function successfulRunForTick( + tick: SelfHealingPopulateTickResult +): PopulateRecipeRunResult | undefined { + if (tick.action === "active_rerun_succeeded") { + return tick.activeRun; + } + if ( + tick.action === "generated_initial_recipe" || + tick.action === "repaired_active_recipe" + ) { + return tick.candidateRun; + } + return undefined; +} + +export function diagnosticRunForTick( + tick: SelfHealingPopulateTickResult +): PopulateRecipeRunResult | undefined { + return successfulRunForTick(tick) ?? tick.candidateRun ?? tick.activeRun; +} + +export function validationIssuesForSelfHealingTick( + tick: SelfHealingPopulateTickResult +): string[] { + const run = diagnosticRunForTick(tick); + return Array.from(new Set([ + ...(run?.validationIssues ?? []), + ...(run?.productionValidation.criticalIssues ?? []), + ...tick.rejectionReasons, + ])); +} + +function defaultPopulateRecipeStoreDirectory(): string { + return join(process.cwd(), ".bigset", "populate-recipes"); +} diff --git a/backend/src/pipeline/populate-self-healing.ts b/backend/src/pipeline/populate-self-healing.ts index 960c1ea..0a51728 100644 --- a/backend/src/pipeline/populate-self-healing.ts +++ b/backend/src/pipeline/populate-self-healing.ts @@ -193,6 +193,36 @@ export class MastraPopulateRecipeRuntime implements PopulateRecipeRuntime { } } +export class DefaultPopulateRecipeAuthor implements PopulateRecipeAuthor { + async generateRecipe( + input: PopulateRecipeAuthorGenerateInput + ): Promise { + return createPopulateRecipe({ + recipeId: populateRecipeId(input.context.datasetId, input.nextVersion), + datasetId: input.context.datasetId, + version: input.nextVersion, + sourceDescription: input.context.description, + requestedColumns: requestedColumnNames(input.context), + runtimeInstructions: initialRuntimeInstructions(input.context), + createdBy: "system", + }); + } + + async repairRecipe( + input: PopulateRecipeAuthorRepairInput + ): Promise { + return createPopulateRecipe({ + recipeId: populateRecipeId(input.context.datasetId, input.nextVersion), + datasetId: input.context.datasetId, + version: input.nextVersion, + sourceDescription: input.context.description, + requestedColumns: requestedColumnNames(input.context), + runtimeInstructions: repairRuntimeInstructions(input), + createdBy: "system", + }); + } +} + export class SelfHealingPopulateRecipeService { constructor( private readonly input: { @@ -504,6 +534,50 @@ function normalizeCandidateRecipe(input: { }; } +function populateRecipeId(datasetId: string, version: number): string { + return `${safePathSegment(datasetId)}-recipe-v${version}`; +} + +function requestedColumnNames(context: DatasetContext): string[] { + return context.columns.map((column) => column.name); +} + +function initialRuntimeInstructions(context: DatasetContext): string { + return [ + "Use search_web before fetch_page unless an official source URL is already obvious.", + "Prefer official docs, pricing, blog, product, or company pages over third-party summaries.", + "Every inserted row must include source_url and evidence_quote cells when those columns exist.", + "Every inserted row must include at least one source URL and one evidence quote.", + `Requested columns: ${requestedColumnNames(context).join(", ")}.`, + ].join("\n"); +} + +function repairRuntimeInstructions(input: PopulateRecipeAuthorRepairInput): string { + const failureSummary = [ + ...input.failedRun.productionValidation.criticalIssues, + ...input.failedRun.validationIssues, + ] + .map((issue) => issue.trim()) + .filter(Boolean) + .slice(0, 8); + const priorInstructions = input.activeRecipe.runtimeInstructions.trim(); + return [ + priorInstructions || initialRuntimeInstructions(input.context), + "", + "Repair focus from previous failed run:", + ...failureSummary.map((issue) => `- ${truncateInstruction(issue, 240)}`), + "- Do not reuse rows that failed validation without fixing source URL and evidence quote coverage.", + "- If expected entities were missing, collect one source-backed row per missing entity before returning.", + ].join("\n"); +} + +function truncateInstruction(value: string, maxLength: number): string { + if (value.length <= maxLength) { + return value; + } + return `${value.slice(0, maxLength - 12)} [truncated]`; +} + function contextWithRecipeInstructions( context: DatasetContext, recipe: PopulateRecipe diff --git a/backend/test/populate-convex-writer.test.ts b/backend/test/populate-convex-writer.test.ts new file mode 100644 index 0000000..d347b9f --- /dev/null +++ b/backend/test/populate-convex-writer.test.ts @@ -0,0 +1,63 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +test("Convex populate row writer uses one atomic replace mutation", async () => { + process.env.CONVEX_URL = process.env.CONVEX_URL ?? "https://example.convex.cloud"; + process.env.CONVEX_SELF_HOSTED_ADMIN_KEY = + process.env.CONVEX_SELF_HOSTED_ADMIN_KEY ?? "test-admin-key"; + const { ConvexPopulateDatasetRowWriter } = await import( + "../src/pipeline/populate-convex-writer.js" + ); + const calls: Array<{ functionReference: unknown; args: unknown }> = []; + const replaceByDataset = Symbol("replaceByDataset"); + const writer = new ConvexPopulateDatasetRowWriter({ + internalApi: { + datasetRows: { + replaceByDataset, + }, + }, + convexClient: { + async mutation(functionReference, args) { + calls.push({ functionReference, args }); + return { + clearedRowCount: 2, + insertedRowCount: 1, + }; + }, + }, + }); + + const result = await writer.replaceRows({ + datasetId: "dataset-ai-posts", + rows: [{ + cells: { + entity_name: "OpenAI", + source_url: "https://openai.com/news", + }, + sourceUrls: ["https://openai.com/news"], + evidence: [{ + columnName: "entity_name", + sourceUrl: "https://openai.com/news", + quote: "OpenAI", + }], + needsReview: true, + }], + }); + + assert.deepEqual(result, { + clearedRowCount: 2, + insertedRowCount: 1, + }); + assert.equal(calls.length, 1); + assert.equal(calls[0]?.functionReference, replaceByDataset); + assert.deepEqual(calls[0]?.args, { + datasetId: "dataset-ai-posts", + rows: [{ + data: { + entity_name: "OpenAI", + source_url: "https://openai.com/news", + }, + sources: ["https://openai.com/news"], + }], + }); +}); diff --git a/backend/test/populate-runtime-prerequisites.test.ts b/backend/test/populate-runtime-prerequisites.test.ts new file mode 100644 index 0000000..76e7d37 --- /dev/null +++ b/backend/test/populate-runtime-prerequisites.test.ts @@ -0,0 +1,26 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { + missingPopulateRuntimePrerequisites, + populateRuntimePrerequisiteError, +} from "../src/pipeline/populate-runtime-prerequisites.js"; + +test("populate runtime prerequisite check reports every missing key", () => { + assert.deepEqual(missingPopulateRuntimePrerequisites({}), [ + "CONVEX_SELF_HOSTED_ADMIN_KEY", + "OPENROUTER_API_KEY", + "TINYFISH_API_KEY", + ]); +}); + +test("populate runtime prerequisite check passes when all keys are configured", () => { + const input = { + convexAdminKey: "convex", + openRouterApiKey: "openrouter", + tinyFishApiKey: "tinyfish", + }; + + assert.deepEqual(missingPopulateRuntimePrerequisites(input), []); + assert.equal(populateRuntimePrerequisiteError(input), undefined); +}); diff --git a/backend/test/populate-self-healing-runner.test.ts b/backend/test/populate-self-healing-runner.test.ts new file mode 100644 index 0000000..b63c4c0 --- /dev/null +++ b/backend/test/populate-self-healing-runner.test.ts @@ -0,0 +1,365 @@ +import assert from "node:assert/strict"; +import { mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { test } from "node:test"; + +import type { DatasetContext } from "../src/pipeline/populate.js"; +import { + createPopulateRecipe, + FileSystemPopulateRecipeStore, + InMemoryPopulateRecipeStore, + type PopulateRecipe, + type PopulateRecipeAuthor, + type PopulateRecipeRunResult, + type PopulateRecipeRuntime, + type SelfHealingPopulateTickResult, +} from "../src/pipeline/populate-self-healing.js"; +import { + diagnosticRunForTick, + runSelfHealingPopulate, + validationIssuesForSelfHealingTick, + type PopulateDatasetRowWriter, +} from "../src/pipeline/populate-self-healing-runner.js"; + +const context: DatasetContext = { + datasetId: "dataset-ai-posts", + datasetName: "AI posts", + description: "Find latest blog posts from OpenAI.", + columns: [ + { + name: "entity_name", + type: "text", + description: "Company name.", + }, + { + name: "latest_post_title", + type: "text", + description: "Post title.", + }, + { + name: "source_url", + type: "url", + description: "Source URL.", + }, + { + name: "evidence_quote", + type: "text", + description: "Evidence quote.", + }, + ], +}; + +test("self-healing runner commits rows only after a successful tick", async () => { + const store = new InMemoryPopulateRecipeStore(); + const generatedRecipe = recipe({ recipeId: "generated-v1" }); + const writer = new FakePopulateDatasetRowWriter(); + + const result = await runSelfHealingPopulate({ + context, + store, + runtime: new FakePopulateRecipeRuntime({ + "generated-v1": validRun(generatedRecipe), + }), + author: new FakeRecipeAuthor({ generatedRecipe }), + rowWriter: writer, + shouldCommitRows: true, + }); + + assert.equal(result.success, true); + assert.equal(result.action, "generated_initial_recipe"); + assert.equal(result.committedRows?.insertedRowCount, 1); + assert.equal(writer.replaceCalls.length, 1); + assert.equal(writer.replaceCalls[0]?.datasetId, context.datasetId); + assert.equal(writer.replaceCalls[0]?.rows[0]?.cells.entity_name, "OpenAI"); +}); + +test("self-healing runner requires a row writer before runtime work when committing", async () => { + let runtimeCalls = 0; + + await assert.rejects( + runSelfHealingPopulate({ + context, + runtime: { + async runRecipe(input) { + runtimeCalls += 1; + return validRun(input.recipe); + }, + }, + author: new FakeRecipeAuthor({ + generatedRecipe: recipe({ recipeId: "generated-v1" }), + }), + shouldCommitRows: true, + }), + /rowWriter is required/ + ); + + assert.equal(runtimeCalls, 0); +}); + +test("self-healing runner commits healthy active reruns", async () => { + const store = new InMemoryPopulateRecipeStore(); + const activeRecipe = recipe({ recipeId: "active-v1", status: "active" }); + const writer = new FakePopulateDatasetRowWriter(); + await store.saveRecipe(activeRecipe); + + const result = await runSelfHealingPopulate({ + context, + store, + runtime: new FakePopulateRecipeRuntime({ + "active-v1": validRun(activeRecipe), + }), + author: new FakeRecipeAuthor(), + rowWriter: writer, + shouldCommitRows: true, + }); + + assert.equal(result.success, true); + assert.equal(result.action, "active_rerun_succeeded"); + assert.equal(result.selectedRun?.recipeId, "active-v1"); + assert.equal(writer.replaceCalls.length, 1); +}); + +test("self-healing runner commits promoted repair candidate rows", async () => { + const store = new InMemoryPopulateRecipeStore(); + const activeRecipe = recipe({ recipeId: "active-broken", status: "active" }); + const repairedRecipe = recipe({ recipeId: "repair-v2", version: 2 }); + const writer = new FakePopulateDatasetRowWriter(); + await store.saveRecipe(activeRecipe); + + const result = await runSelfHealingPopulate({ + context, + store, + runtime: new FakePopulateRecipeRuntime({ + "active-broken": invalidRun(activeRecipe, "No source-backed rows."), + "repair-v2": validRun(repairedRecipe), + }), + author: new FakeRecipeAuthor({ repairedRecipe }), + rowWriter: writer, + shouldCommitRows: true, + }); + + assert.equal(result.success, true); + assert.equal(result.action, "repaired_active_recipe"); + assert.equal(result.selectedRun?.recipeId, "repair-v2"); + assert.equal(writer.replaceCalls.length, 1); +}); + +test("self-healing runner does not clear or insert rows when candidate is rejected", async () => { + const store = new InMemoryPopulateRecipeStore(); + const activeRecipe = recipe({ recipeId: "active-broken", status: "active" }); + const rejectedRecipe = recipe({ recipeId: "repair-v2", version: 2 }); + const writer = new FakePopulateDatasetRowWriter(); + await store.saveRecipe(activeRecipe); + + const result = await runSelfHealingPopulate({ + context, + store, + runtime: new FakePopulateRecipeRuntime({ + "active-broken": invalidRun(activeRecipe, "No source-backed rows."), + "repair-v2": invalidRun(rejectedRecipe, "Still no evidence."), + }), + author: new FakeRecipeAuthor({ repairedRecipe: rejectedRecipe }), + rowWriter: writer, + shouldCommitRows: true, + }); + + assert.equal(result.success, false); + assert.equal(result.action, "candidate_rejected"); + assert.equal(result.selectedRun, undefined); + assert.equal(result.diagnosticRun?.recipeId, "repair-v2"); + assert.equal(result.committedRows, undefined); + assert.equal(writer.replaceCalls.length, 0); + assert.match(result.validationIssues.join("\n"), /Still no evidence/); +}); + +test("filesystem store lets the runner reuse an active recipe across calls", async () => { + const rootDirectory = await mkdtemp(join(tmpdir(), "bigset-populate-runner-")); + const store = new FileSystemPopulateRecipeStore(rootDirectory); + const generatedRecipe = recipe({ recipeId: "generated-v1" }); + const writer = new FakePopulateDatasetRowWriter(); + const runtime = new FakePopulateRecipeRuntime({ + "generated-v1": validRun(generatedRecipe), + }); + const author = new FakeRecipeAuthor({ generatedRecipe }); + + const first = await runSelfHealingPopulate({ + context, + store, + runtime, + author, + rowWriter: writer, + shouldCommitRows: true, + }); + const second = await runSelfHealingPopulate({ + context, + store: new FileSystemPopulateRecipeStore(rootDirectory), + runtime, + author, + rowWriter: writer, + shouldCommitRows: true, + }); + + assert.equal(first.action, "generated_initial_recipe"); + assert.equal(second.action, "active_rerun_succeeded"); + assert.equal(author.generateCalls, 1); + assert.equal(writer.replaceCalls.length, 2); +}); + +test("self-healing tick diagnostics expose rejected candidate validation issues", () => { + const candidateRecipe = recipe({ recipeId: "repair-v2", version: 2 }); + const candidateRun = invalidRun(candidateRecipe, "Missing expected entities: Anthropic."); + const tick: SelfHealingPopulateTickResult = { + datasetId: context.datasetId, + action: "candidate_rejected", + candidateRecipe, + candidateRun, + rejectionReasons: ["Candidate validation score is below the active recipe baseline."], + }; + + assert.equal(diagnosticRunForTick(tick)?.recipeId, "repair-v2"); + assert.deepEqual(validationIssuesForSelfHealingTick(tick), [ + "Missing expected entities: Anthropic.", + "Candidate validation score is below the active recipe baseline.", + ]); +}); + +function recipe(input: { + recipeId: string; + version?: number; + status?: PopulateRecipe["status"]; +}): PopulateRecipe { + return createPopulateRecipe({ + recipeId: input.recipeId, + datasetId: context.datasetId, + version: input.version ?? 1, + status: input.status, + sourceDescription: context.description, + requestedColumns: context.columns.map((column) => column.name), + createdAt: "2026-05-22T00:00:00.000Z", + }); +} + +function validRun(recipe: PopulateRecipe): PopulateRecipeRunResult { + return runResult({ + recipe, + rows: [{ + cells: { + entity_name: "OpenAI", + latest_post_title: "Release notes from OpenAI", + source_url: "https://openai.com/news", + evidence_quote: "Release notes from OpenAI", + }, + sourceUrls: ["https://openai.com/news"], + evidence: [{ + columnName: "latest_post_title", + sourceUrl: "https://openai.com/news", + quote: "Release notes from OpenAI", + }], + needsReview: true, + }], + isValid: true, + score: 1, + }); +} + +function invalidRun(recipe: PopulateRecipe, issue: string): PopulateRecipeRunResult { + return runResult({ + recipe, + rows: [], + validationIssues: [issue], + criticalIssues: [issue], + isValid: false, + score: 0, + }); +} + +function runResult(input: { + recipe: PopulateRecipe; + rows: PopulateRecipeRunResult["rows"]; + validationIssues?: string[]; + criticalIssues?: string[]; + isValid: boolean; + score: number; +}): PopulateRecipeRunResult { + return { + rows: input.rows, + validationIssues: input.validationIssues ?? [], + usage: { + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + }, + metrics: { + searchCalls: 0, + fetchCalls: 0, + browserCalls: 0, + agentRuns: 0, + agentSteps: 0, + }, + recipeId: input.recipe.recipeId, + recipeVersion: input.recipe.version, + runStatus: input.isValid ? "succeeded" : "failed", + startedAt: "2026-05-22T00:00:00.000Z", + completedAt: "2026-05-22T00:00:01.000Z", + runtimeMs: 1_000, + productionValidation: { + isValid: input.isValid, + score: input.score, + rowCount: input.rows.length, + requestedCellCompletenessRatio: input.score, + sourceUrlCoverageRatio: input.score, + evidenceCoverageRatio: input.score, + expectedEntityCoverageRatio: input.score, + expectedEntities: [], + missingExpectedEntities: [], + criticalIssues: input.criticalIssues ?? [], + warnings: input.validationIssues ?? [], + }, + artifacts: [], + }; +} + +class FakePopulateRecipeRuntime implements PopulateRecipeRuntime { + constructor(private readonly runsByRecipeId: Record) {} + + async runRecipe(input: { + recipe: PopulateRecipe; + context: DatasetContext; + }): Promise { + return this.runsByRecipeId[input.recipe.recipeId] ?? + invalidRun(input.recipe, `Missing fake run for ${input.recipe.recipeId}.`); + } +} + +class FakeRecipeAuthor implements PopulateRecipeAuthor { + generateCalls = 0; + + constructor( + private readonly recipes: { + generatedRecipe?: PopulateRecipe; + repairedRecipe?: PopulateRecipe; + } = {} + ) {} + + async generateRecipe(): Promise { + this.generateCalls += 1; + return this.recipes.generatedRecipe ?? recipe({ recipeId: "generated-v1" }); + } + + async repairRecipe(): Promise { + return this.recipes.repairedRecipe ?? recipe({ recipeId: "repair-v2", version: 2 }); + } +} + +class FakePopulateDatasetRowWriter implements PopulateDatasetRowWriter { + readonly replaceCalls: Array[0]> = []; + + async replaceRows(input: Parameters[0]) { + this.replaceCalls.push(input); + return { + clearedRowCount: 7, + insertedRowCount: input.rows.length, + }; + } +} diff --git a/benchmarks/dataset-agent/README.md b/benchmarks/dataset-agent/README.md index 4a4df46..016738d 100644 --- a/benchmarks/dataset-agent/README.md +++ b/benchmarks/dataset-agent/README.md @@ -7,9 +7,9 @@ benchmark env vars, runs one prompt, and prints one JSON object to stdout. ## Run Mastra Populate -The Mastra adapter calls `runPopulateRuntime`, a direct callable runtime around -the Mastra populate agent. It avoids the HTTP/auth route and uses an injected -in-memory row sink so benchmark runs do not clear or insert Convex rows. +The Mastra adapter calls the self-healing populate service around +`runPopulateRuntime`. It avoids the HTTP/auth route, uses an isolated in-memory +recipe store per prompt run, and never clears or inserts Convex rows. ```bash node benchmarks/dataset-agent/run-benchmark.mjs \ diff --git a/benchmarks/dataset-agent/adapters/mastra-populate-adapter.mjs b/benchmarks/dataset-agent/adapters/mastra-populate-adapter.mjs index d6cabbb..24096ce 100644 --- a/benchmarks/dataset-agent/adapters/mastra-populate-adapter.mjs +++ b/benchmarks/dataset-agent/adapters/mastra-populate-adapter.mjs @@ -25,32 +25,49 @@ if (missingRuntimeKeys.length > 0) { process.exit(0); } -const { runPopulateRuntime } = await import( - "../../../backend/src/pipeline/populate-runtime.ts" +const { + diagnosticRunForTick, + validationIssuesForSelfHealingTick, +} = await import( + "../../../backend/src/pipeline/populate-self-healing-runner.ts" +); +const { + DefaultPopulateRecipeAuthor, + InMemoryPopulateRecipeStore, + MastraPopulateRecipeRuntime, + SelfHealingPopulateRecipeService, +} = await import( + "../../../backend/src/pipeline/populate-self-healing.ts" ); -const result = await runPopulateRuntime({ - context: { - datasetId: `benchmark-${safeIdSegment(promptId)}`, - datasetName: `benchmark_${safeIdSegment(promptId)}`, - description: prompt, - columns: requiredColumns.map((columnName) => ({ - name: columnName, - type: inferPopulateColumnType(columnName), - description: `Benchmark requested column for ${promptQuality} prompt.`, - })), - }, - maxRows: Number(process.env.BIGSET_MASTRA_BENCHMARK_MAX_ROWS ?? "10"), +const context = { + datasetId: `benchmark-${safeIdSegment(promptId)}`, + datasetName: `benchmark_${safeIdSegment(promptId)}`, + description: prompt, + columns: requiredColumns.map((columnName) => ({ + name: columnName, + type: inferPopulateColumnType(columnName), + description: `Benchmark requested column for ${promptQuality} prompt.`, + })), +}; +const service = new SelfHealingPopulateRecipeService({ + store: new InMemoryPopulateRecipeStore(), + runtime: new MastraPopulateRecipeRuntime({ + maxRows: Number(process.env.BIGSET_MASTRA_BENCHMARK_MAX_ROWS ?? "10"), + }), + author: new DefaultPopulateRecipeAuthor(), }); +const tick = await service.tick({ datasetId: context.datasetId, context }); +const result = diagnosticRunForTick(tick); console.log(JSON.stringify({ - rows: result.rows, + rows: result?.rows ?? [], validationIssues: [ - ...result.validationIssues, - ...minimumColumnIssues(result.rows), + ...validationIssuesForSelfHealingTick(tick), + ...minimumColumnIssues(result?.rows ?? []), ], - usage: result.usage, - metrics: result.metrics, + usage: result?.usage ?? emptyUsage(), + metrics: result?.metrics ?? emptyMetrics(), })); function minimumColumnIssues(rows) { diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 7a0eec1..05ab9c7 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -24,10 +24,12 @@ services: - "3501:3501" volumes: - ./backend/src:/app/src + - populate_recipe_data:/app/.bigset environment: CLIENT_ORIGIN: http://localhost:3500 CONVEX_URL: http://convex:3210 PORT: 3501 + POPULATE_RECIPE_STORE_DIR: /app/.bigset/populate-recipes CONVEX_SELF_HOSTED_ADMIN_KEY: ${CONVEX_SELF_HOSTED_ADMIN_KEY:-} CLERK_SECRET_KEY: ${CLERK_SECRET_KEY:-} CLERK_PUBLISHABLE_KEY: ${NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY:-} @@ -130,3 +132,4 @@ services: volumes: pgdata: convex_data: + populate_recipe_data: diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index dc3f318..473dfc9 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -114,3 +114,35 @@ export const insertBatch = internalMutation({ } }, }); + +export const replaceByDataset = internalMutation({ + args: { + datasetId: v.id("datasets"), + rows: v.array(v.object({ + data: v.record(v.string(), v.any()), + sources: v.optional(v.array(v.string())), + })), + }, + handler: async (ctx, args) => { + const existingRows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + + for (const row of existingRows) { + await ctx.db.delete(row._id); + } + for (const row of args.rows) { + await ctx.db.insert("datasetRows", { + datasetId: args.datasetId, + data: row.data, + sources: row.sources, + }); + } + + return { + clearedRowCount: existingRows.length, + insertedRowCount: args.rows.length, + }; + }, +});