diff --git a/backend/src/pipeline/populate-collection-runtime.ts b/backend/src/pipeline/populate-collection-runtime.ts index b0b695a..455fafb 100644 --- a/backend/src/pipeline/populate-collection-runtime.ts +++ b/backend/src/pipeline/populate-collection-runtime.ts @@ -14,7 +14,15 @@ export interface CollectionPopulatePipelineColumn { description?: string; } -export interface CollectionPopulatePipelineInput { +export interface CollectionPopulateBenchmarkMetadata { + promptId?: string; + promptQuality?: string; + persona?: string; + expectedStress?: string; +} + +export interface CollectionPopulatePipelineInput + extends CollectionPopulateBenchmarkMetadata { datasetId: string; datasetName: string; description: string; @@ -32,6 +40,7 @@ export type CollectionPopulatePipelineRunner = ( export interface CollectionPopulateRecipeRuntimeOptions { runPipeline: CollectionPopulatePipelineRunner; targetRows?: number; + benchmarkMetadata?: CollectionPopulateBenchmarkMetadata; } export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime { @@ -52,6 +61,7 @@ export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime { recipe: input.recipe, context: input.context, targetRows: this.input.targetRows ?? 10, + benchmarkMetadata: this.input.benchmarkMetadata, }) ); } catch (error) { @@ -74,9 +84,11 @@ export function collectionPipelineInputFromRecipe(input: { recipe: PopulateRecipe; context: DatasetContext; targetRows: number; + benchmarkMetadata?: CollectionPopulateBenchmarkMetadata; }): CollectionPopulatePipelineInput { const recipeInstructions = input.recipe.runtimeInstructions.trim(); return { + ...input.benchmarkMetadata, datasetId: input.context.datasetId, datasetName: input.context.datasetName, description: input.context.description, diff --git a/backend/src/pipeline/populate-runtime-selection.ts b/backend/src/pipeline/populate-runtime-selection.ts index bb19b1a..62c6656 100644 --- a/backend/src/pipeline/populate-runtime-selection.ts +++ b/backend/src/pipeline/populate-runtime-selection.ts @@ -1,5 +1,9 @@ +import { resolve } from "node:path"; +import { pathToFileURL } from "node:url"; + import { CollectionPopulateRecipeRuntime, + type CollectionPopulateBenchmarkMetadata, type CollectionPopulatePipelineRunner, } from "./populate-collection-runtime.js"; import { @@ -42,13 +46,48 @@ export async function createPopulateRecipeRuntime( if (runtimeName === "mastra") { return new MastraPopulateRecipeRuntime({ maxRows: input.maxRows }); } - if (!input.collectionRunner) { + const collectionRunner = + input.collectionRunner ?? await loadCollectionRunnerFromEnv(input.env); + if (!collectionRunner) { throw new Error( - "POPULATE_AGENT_RUNTIME=collection requires a collection pipeline runner." + "POPULATE_AGENT_RUNTIME=collection requires a collection pipeline runner or POPULATE_COLLECTION_RUNNER_MODULE." ); } return new CollectionPopulateRecipeRuntime({ - runPipeline: input.collectionRunner, + runPipeline: collectionRunner, targetRows: input.maxRows, + benchmarkMetadata: collectionBenchmarkMetadataFromEnv(input.env), }); } + +async function loadCollectionRunnerFromEnv( + env: NodeJS.ProcessEnv +): Promise { + const moduleSpecifier = env.POPULATE_COLLECTION_RUNNER_MODULE; + if (!moduleSpecifier) { + return undefined; + } + + const moduleUrl = moduleSpecifier.startsWith(".") || moduleSpecifier.startsWith("/") + ? pathToFileURL(resolve(moduleSpecifier)).href + : moduleSpecifier; + const loadedModule = await import(moduleUrl); + const runner = loadedModule.runCollectionPopulatePipeline ?? loadedModule.default; + if (typeof runner !== "function") { + throw new Error( + `${moduleSpecifier} must export runCollectionPopulatePipeline(input) or a default runner.` + ); + } + return runner as CollectionPopulatePipelineRunner; +} + +function collectionBenchmarkMetadataFromEnv( + env: NodeJS.ProcessEnv +): CollectionPopulateBenchmarkMetadata { + return { + promptId: env.BIGSET_BENCHMARK_PROMPT_ID, + promptQuality: env.BIGSET_BENCHMARK_PROMPT_QUALITY, + persona: env.BIGSET_BENCHMARK_PERSONA, + expectedStress: env.BIGSET_BENCHMARK_EXPECTED_STRESS, + }; +} diff --git a/backend/test/populate-collection-runtime.test.ts b/backend/test/populate-collection-runtime.test.ts index 162a74f..a9fd9e8 100644 --- a/backend/test/populate-collection-runtime.test.ts +++ b/backend/test/populate-collection-runtime.test.ts @@ -44,6 +44,12 @@ test("collection runtime threads recipe instructions into the collection prompt" let capturedInput: CollectionPopulatePipelineInput | undefined; const runtime = new CollectionPopulateRecipeRuntime({ targetRows: 3, + benchmarkMetadata: { + promptId: "latest-ai-blog-posts", + promptQuality: "easy", + persona: "technical operator", + expectedStress: "Latest dated source pages; date precision matters.", + }, runPipeline: async (input) => { capturedInput = input; return { @@ -89,6 +95,13 @@ test("collection runtime threads recipe instructions into the collection prompt" assert.equal(capturedInput.datasetId, context.datasetId); assert.equal(capturedInput.datasetName, context.datasetName); assert.equal(capturedInput.targetRows, 3); + assert.equal(capturedInput.promptId, "latest-ai-blog-posts"); + assert.equal(capturedInput.promptQuality, "easy"); + assert.equal(capturedInput.persona, "technical operator"); + assert.equal( + capturedInput.expectedStress, + "Latest dated source pages; date precision matters." + ); assert.deepEqual(capturedInput.requiredColumns, [ "entity_name", "latest_post_title", @@ -119,6 +132,25 @@ test("collection pipeline input builder trims empty recipe instructions", () => assert.doesNotMatch(input.prompt, /Durable recipe instructions/); }); +test("collection pipeline input builder carries benchmark metadata", () => { + const input = collectionPipelineInputFromRecipe({ + recipe: collectionRecipe(), + context, + targetRows: 5, + benchmarkMetadata: { + promptId: "saas-pricing-pages", + promptQuality: "medium", + persona: "startup founder", + expectedStress: "Official pricing evidence.", + }, + }); + + assert.equal(input.promptId, "saas-pricing-pages"); + assert.equal(input.promptQuality, "medium"); + assert.equal(input.persona, "startup founder"); + assert.equal(input.expectedStress, "Official pricing evidence."); +}); + function collectionRecipe(input: { runtimeInstructions?: string; } = {}): PopulateRecipe { diff --git a/backend/test/populate-runtime-selection.test.ts b/backend/test/populate-runtime-selection.test.ts index 8bdf928..b1a9993 100644 --- a/backend/test/populate-runtime-selection.test.ts +++ b/backend/test/populate-runtime-selection.test.ts @@ -6,7 +6,11 @@ import { selectedPopulateRuntimeName, } from "../src/pipeline/populate-runtime-selection.js"; import { CollectionPopulateRecipeRuntime } from "../src/pipeline/populate-collection-runtime.js"; -import { MastraPopulateRecipeRuntime } from "../src/pipeline/populate-self-healing.js"; +import { + createPopulateRecipe, + MastraPopulateRecipeRuntime, +} from "../src/pipeline/populate-self-healing.js"; +import type { DatasetContext } from "../src/pipeline/populate.js"; test("populate runtime selection defaults to Mastra", async () => { assert.equal(selectedPopulateRuntimeName({}), "mastra"); @@ -48,3 +52,77 @@ test("populate runtime selection rejects collection without a runner", async () /requires a collection pipeline runner/ ); }); + +test("populate runtime selection loads collection runner from env module", async () => { + const runtime = await createPopulateRecipeRuntime({ + env: { + POPULATE_AGENT_RUNTIME: "collection", + POPULATE_COLLECTION_RUNNER_MODULE: runnerModuleUrl(), + BIGSET_BENCHMARK_PROMPT_ID: "latest-ai-blog-posts", + BIGSET_BENCHMARK_PROMPT_QUALITY: "easy", + BIGSET_BENCHMARK_PERSONA: "technical operator", + BIGSET_BENCHMARK_EXPECTED_STRESS: "Latest dated source pages.", + }, + }); + const context: DatasetContext = { + datasetId: "dataset-ai-posts", + datasetName: "AI posts", + description: "Find latest blog posts from OpenAI.", + columns: [ + { name: "entity_name", type: "text" }, + { name: "source_url", type: "url" }, + { name: "evidence_quote", type: "text" }, + ], + }; + const run = await runtime.runRecipe({ + context, + recipe: createPopulateRecipe({ + recipeId: "collection-v1", + datasetId: context.datasetId, + version: 1, + status: "active", + runtimeInstructions: "Prefer official sources.", + sourceDescription: context.description, + requestedColumns: context.columns.map((column) => column.name), + createdBy: "system", + }), + }); + + assert.equal(run.runStatus, "succeeded"); + assert.equal(run.rows[0]?.cells.entity_name, "latest-ai-blog-posts"); + assert.equal(run.rows[0]?.cells.evidence_quote, "technical operator"); +}); + +function runnerModuleUrl(): string { + const source = ` + export async function runCollectionPopulatePipeline(input) { + const quote = input.expectedStress || "Loaded runner module."; + return { + rows: [{ + cells: { + entity_name: input.promptId, + source_url: "https://example.com/source", + evidence_quote: input.persona, + }, + sourceUrls: ["https://example.com/source"], + evidence: [ + { columnName: "entity_name", sourceUrl: "https://example.com/source", quote }, + { columnName: "source_url", sourceUrl: "https://example.com/source", quote }, + { columnName: "evidence_quote", sourceUrl: "https://example.com/source", quote }, + ], + needsReview: false, + }], + validationIssues: [], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + metrics: { + searchCalls: 0, + fetchCalls: 0, + browserCalls: 0, + agentRuns: 1, + agentSteps: 0, + }, + }; + } + `; + return `data:text/javascript,${encodeURIComponent(source)}`; +} diff --git a/benchmarks/dataset-agent/README.md b/benchmarks/dataset-agent/README.md index 3321c3c..94525f4 100644 --- a/benchmarks/dataset-agent/README.md +++ b/benchmarks/dataset-agent/README.md @@ -40,6 +40,9 @@ the shell. The runner module must export `runCollectionPopulatePipeline(input)` or a default runner that accepts `CollectionPopulatePipelineInput` and returns a `PopulateRuntimeResult`. +App and CLI collection-runtime runs use the same runner shape, but load it from +`POPULATE_COLLECTION_RUNNER_MODULE` when `POPULATE_AGENT_RUNTIME=collection`. + ## Verify Self-Healing Stack Use this before asking someone else to migrate a new collection agent into the @@ -73,12 +76,17 @@ For each prompt the runner sets: - `BIGSET_BENCHMARK_PROMPT` - `BIGSET_BENCHMARK_PROMPT_ID` - `BIGSET_BENCHMARK_PROMPT_QUALITY` +- `BIGSET_BENCHMARK_PERSONA` +- `BIGSET_BENCHMARK_EXPECTED_STRESS` - `BIGSET_BENCHMARK_REQUIRED_COLUMNS` - `BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS` `BIGSET_BENCHMARK_REQUIRED_COLUMNS` is the requested table shape. `BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS` is the hard row identity minimum. -Rows still need at least one source URL and evidence quote. +Rows still need at least one source URL and evidence quote. Collection benchmark +runners receive prompt id, quality, persona, expected stress, and required +columns through `CollectionPopulatePipelineInput` so they can build the same +benchmark/spec context that the direct collection lane expects. ## Agent Output Contract diff --git a/benchmarks/dataset-agent/adapters/collection-self-healing-adapter.mjs b/benchmarks/dataset-agent/adapters/collection-self-healing-adapter.mjs index 06e4f0c..c9480ba 100644 --- a/benchmarks/dataset-agent/adapters/collection-self-healing-adapter.mjs +++ b/benchmarks/dataset-agent/adapters/collection-self-healing-adapter.mjs @@ -5,6 +5,8 @@ import { resolve } from "node:path"; const prompt = requiredEnv("BIGSET_BENCHMARK_PROMPT"); const promptId = process.env.BIGSET_BENCHMARK_PROMPT_ID ?? "benchmark-prompt"; const promptQuality = process.env.BIGSET_BENCHMARK_PROMPT_QUALITY ?? "unknown"; +const persona = process.env.BIGSET_BENCHMARK_PERSONA; +const expectedStress = process.env.BIGSET_BENCHMARK_EXPECTED_STRESS; const requiredColumns = columnList( requiredEnv("BIGSET_BENCHMARK_REQUIRED_COLUMNS") ); @@ -74,6 +76,12 @@ const service = new SelfHealingPopulateRecipeService({ runtime: new CollectionPopulateRecipeRuntime({ runPipeline: collectionRunner, targetRows: Number(process.env.BIGSET_COLLECTION_BENCHMARK_MAX_ROWS ?? "10"), + benchmarkMetadata: { + promptId, + promptQuality, + persona, + expectedStress, + }, }), author: new DefaultPopulateRecipeAuthor(), }); diff --git a/benchmarks/dataset-agent/run-benchmark.mjs b/benchmarks/dataset-agent/run-benchmark.mjs index 6d8d0d2..2de1099 100755 --- a/benchmarks/dataset-agent/run-benchmark.mjs +++ b/benchmarks/dataset-agent/run-benchmark.mjs @@ -534,6 +534,8 @@ async function runSystemPrompt(input) { BIGSET_BENCHMARK_PROMPT: input.promptDefinition.prompt, BIGSET_BENCHMARK_PROMPT_ID: input.promptDefinition.id, BIGSET_BENCHMARK_PROMPT_QUALITY: input.promptDefinition.quality, + BIGSET_BENCHMARK_PERSONA: input.promptDefinition.persona, + BIGSET_BENCHMARK_EXPECTED_STRESS: input.promptDefinition.expectedStress, BIGSET_BENCHMARK_REQUIRED_COLUMNS: input.promptDefinition.requiredColumns.join(","), BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS: minimumRequiredColumns.join(","), }, diff --git a/docs/data-collection-agent-migration-plan.md b/docs/data-collection-agent-migration-plan.md index 8fcc7e3..6531984 100644 --- a/docs/data-collection-agent-migration-plan.md +++ b/docs/data-collection-agent-migration-plan.md @@ -9,6 +9,16 @@ the collection pipeline is migrated into BigSet. intentionally stacked and should not be merged out of order. - PR #37 adds `make verify-self-healing`, which is the cheap local gate before touching live data or spending OpenRouter/TinyFish credits. +- PR #38 adds this migration plan and keeps the target boundaries explicit. +- PR #39 adds `CollectionPopulateRecipeRuntime`, an adapter boundary that can + run a collection pipeline through the same `PopulateRecipeRuntime` interface + as Mastra. +- PR #40 adds `POPULATE_AGENT_RUNTIME=collection` selection through the real + HTTP and CLI entrypoints. PR #42 extends that socket so app/CLI runs can load + a runner module from `POPULATE_COLLECTION_RUNNER_MODULE`. +- PR #41 adds a `collection-self-heal` benchmark lane that wraps the collection + runtime inside `SelfHealingPopulateRecipeService`. This is the benchmark + socket Meteor can use once the real collection runner is available. - `feat/data-collection-agent-v14` vendors the collection pipeline under `backend/BigSet_Data_Collection_Agent` and includes the memory module. - Clean `feat/data-collection-agent-v14` tests pass once ignored backend @@ -63,9 +73,14 @@ The current layer: Dry-run and benchmark paths intentionally use in-memory stores so they do not pollute durable recipe history. +The current layer now can: + +- run an injected collection runner through the same self-healing runtime + boundary and benchmark harness as Mastra + The current layer does not yet: -- run the collection pipeline as its runtime +- run the real vendored collection pipeline as its runtime in this stack - generate Playwright scripts as a durable production recipe - run a green live Convex canary in this local environment - prove quality on a full real benchmark for the collection runtime @@ -73,7 +88,9 @@ The current layer does not yet: ## Migration Sequence 1. Branch from the top of the self-healing stack. - - Base new work on `codex/self-healing-verification`. + - For any new collection-runner work, base on + `codex/collection-self-healing-benchmark` so PR #39, #40, and #41 stay in + the path. - Do not edit `main` or `feat/data-collection-agent-v14` directly. 2. Fix the collection branch as a clean build source. @@ -88,10 +105,14 @@ The current layer does not yet: - Gate: `npm --prefix backend test` and `npm --prefix backend run build`. 3. Add a collection runtime adapter. + - Status: done in PR #39. - Implement the existing `PopulateRecipeRuntime` interface. - Input: BigSet `DatasetContext`. - Transform `recipe.runtimeInstructions` into the collection pipeline prompt/spec alongside the dataset description and columns. + - Propagate `requiredColumns`, prompt id, prompt quality, persona, and + benchmark stress metadata into the collection pipeline's benchmark/spec + generation path when those fields are available. - Output: rows, source URLs, evidence quotes, usage, metrics, and debug captured sources. - No direct Convex writes inside the adapter. @@ -100,6 +121,7 @@ The current layer does not yet: behavior. 4. Add runtime selection through the real entrypoints. + - Status: done in PR #40 for injected collection runners. - Add a runtime factory for the self-healing runner. - Add an env switch such as `POPULATE_AGENT_RUNTIME=collection`. - Wire both `POST /populate` and `populate:self-heal --dataset-id` through @@ -108,6 +130,7 @@ The current layer does not yet: entrypoints use the selected runtime. 5. Add a self-healing-wrapped benchmark adapter for the collection runtime. + - Status: done in PR #41 for injected collection runners. - Reuse `benchmarks/dataset-agent/run-benchmark.mjs`. - Exercise `SelfHealingPopulateRecipeService` with the collection runtime inside it, not the direct collection pipeline alone. @@ -146,6 +169,8 @@ Before any merge: - `npm --prefix backend run build` passes - adapter test proves `recipe.runtimeInstructions` reaches the collection pipeline prompt/spec +- adapter or runner tests prove benchmark metadata and `requiredColumns` reach + the collection pipeline's spec generation path - HTTP-route and CLI tests prove `POPULATE_AGENT_RUNTIME=collection` reaches the selected runtime through real app entrypoints - benchmark no-key smoke proves blocked with zero spend @@ -155,10 +180,62 @@ Before any merge: - live dataset commit is tested only on a throwaway dataset - backend build does not depend on `frontend/convex/_generated` +## Meteor Handoff Shape + +Meteor does not need to rebuild the self-healing wrapper. The socket is now: + +```text +runCollectionPopulatePipeline(CollectionPopulatePipelineInput) + -> Promise +``` + +`CollectionPopulatePipelineInput.recipeInstructions` is the self-healing signal. +`requiredColumns` and benchmark metadata are the scoring signal. If the +collection runner ignores `recipeInstructions`, repaired recipes cannot change +future behavior. If it ignores `requiredColumns` or benchmark metadata, the +benchmark can stop measuring the same task. + +The real benchmark command after a runner module exists is: + +```bash +BIGSET_COLLECTION_BENCHMARK_RUNNER_MODULE=./backend/src/pipeline/collection-agent-runner.ts \ +node benchmarks/dataset-agent/run-benchmark.mjs \ + --prompt-ids latest-ai-blog-posts,saas-pricing-pages \ + --system collection-self-heal='node --import ./backend/node_modules/tsx/dist/esm/index.mjs benchmarks/dataset-agent/adapters/collection-self-healing-adapter.mjs' +``` + ## Next Engineering Move -Create a fresh branch from `codex/self-healing-verification` and first implement -the collection runtime adapter contract, including the -`recipe.runtimeInstructions` bridge and its unit test. Do not wire it as the -default runtime until the self-healing-wrapped benchmark adapter produces better -evidence than the current Mastra path. +Create a fresh branch from `codex/collection-self-healing-benchmark` and port the +real collection runner behind the existing adapter boundary: + +1. Add a runner module, likely `backend/src/pipeline/collection-agent-runner.ts`, + that exports `runCollectionPopulatePipeline(input)`. +2. Port only the collection pipeline files needed by that runner from + `feat/data-collection-agent-v14`. +3. Convert `CollectionPopulatePipelineInput` into the collection pipeline's + prompt/spec. Include `input.prompt`, `input.recipeInstructions`, + `input.requiredColumns`, prompt id/quality, persona, and expected-stress + benchmark context when available. +4. Convert the collection pipeline output into `PopulateRuntimeResult`: rows, + source URLs, evidence quotes, usage, metrics, and debug captured sources. +5. Keep Convex writes, auth, cron scheduling, and durable recipe storage outside + the collection runner. +6. Fix build blockers while porting: TinyFish status typing, OpenRouter provider + declaration leak, backend dependency on generated frontend Convex API, and + AI SDK `maxTokens`. +7. Gate in this order: `npm --prefix backend test`, `npm --prefix backend run + build`, `make verify-self-healing`, 2-prompt `collection-self-heal` + benchmark, then full benchmark only if the 2-prompt run is not obviously + broken. + +When testing the real app or CLI path, set: + +```bash +POPULATE_AGENT_RUNTIME=collection +POPULATE_COLLECTION_RUNNER_MODULE=./backend/src/pipeline/collection-agent-runner.ts +``` + +Do not switch the default runtime from Mastra to collection until the +self-healing-wrapped collection benchmark has better evidence than the current +Mastra lane.