Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion backend/src/pipeline/populate-collection-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ export interface CollectionPopulatePipelineColumn {
description?: string;
}

export interface CollectionPopulatePipelineInput {
export interface CollectionPopulateBenchmarkMetadata {
promptId?: string;
promptQuality?: string;
persona?: string;
expectedStress?: string;
}

export interface CollectionPopulatePipelineInput
extends CollectionPopulateBenchmarkMetadata {
datasetId: string;
datasetName: string;
description: string;
Expand All @@ -32,6 +40,7 @@ export type CollectionPopulatePipelineRunner = (
export interface CollectionPopulateRecipeRuntimeOptions {
runPipeline: CollectionPopulatePipelineRunner;
targetRows?: number;
benchmarkMetadata?: CollectionPopulateBenchmarkMetadata;
}

export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime {
Expand All @@ -52,6 +61,7 @@ export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime {
recipe: input.recipe,
context: input.context,
targetRows: this.input.targetRows ?? 10,
benchmarkMetadata: this.input.benchmarkMetadata,
})
);
} catch (error) {
Expand All @@ -74,9 +84,11 @@ export function collectionPipelineInputFromRecipe(input: {
recipe: PopulateRecipe;
context: DatasetContext;
targetRows: number;
benchmarkMetadata?: CollectionPopulateBenchmarkMetadata;
}): CollectionPopulatePipelineInput {
const recipeInstructions = input.recipe.runtimeInstructions.trim();
return {
...input.benchmarkMetadata,
datasetId: input.context.datasetId,
datasetName: input.context.datasetName,
description: input.context.description,
Expand Down
45 changes: 42 additions & 3 deletions backend/src/pipeline/populate-runtime-selection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { resolve } from "node:path";
import { pathToFileURL } from "node:url";

import {
CollectionPopulateRecipeRuntime,
type CollectionPopulateBenchmarkMetadata,
type CollectionPopulatePipelineRunner,
} from "./populate-collection-runtime.js";
import {
Expand Down Expand Up @@ -42,13 +46,48 @@ export async function createPopulateRecipeRuntime(
if (runtimeName === "mastra") {
return new MastraPopulateRecipeRuntime({ maxRows: input.maxRows });
}
if (!input.collectionRunner) {
const collectionRunner =
input.collectionRunner ?? await loadCollectionRunnerFromEnv(input.env);
if (!collectionRunner) {
throw new Error(
"POPULATE_AGENT_RUNTIME=collection requires a collection pipeline runner."
"POPULATE_AGENT_RUNTIME=collection requires a collection pipeline runner or POPULATE_COLLECTION_RUNNER_MODULE."
);
}
return new CollectionPopulateRecipeRuntime({
runPipeline: input.collectionRunner,
runPipeline: collectionRunner,
targetRows: input.maxRows,
benchmarkMetadata: collectionBenchmarkMetadataFromEnv(input.env),
});
}

async function loadCollectionRunnerFromEnv(
env: NodeJS.ProcessEnv
): Promise<CollectionPopulatePipelineRunner | undefined> {
const moduleSpecifier = env.POPULATE_COLLECTION_RUNNER_MODULE;
if (!moduleSpecifier) {
return undefined;
}

const moduleUrl = moduleSpecifier.startsWith(".") || moduleSpecifier.startsWith("/")
? pathToFileURL(resolve(moduleSpecifier)).href
: moduleSpecifier;
const loadedModule = await import(moduleUrl);
const runner = loadedModule.runCollectionPopulatePipeline ?? loadedModule.default;
if (typeof runner !== "function") {
throw new Error(
`${moduleSpecifier} must export runCollectionPopulatePipeline(input) or a default runner.`
);
}
return runner as CollectionPopulatePipelineRunner;
}

function collectionBenchmarkMetadataFromEnv(
env: NodeJS.ProcessEnv
): CollectionPopulateBenchmarkMetadata {
return {
promptId: env.BIGSET_BENCHMARK_PROMPT_ID,
promptQuality: env.BIGSET_BENCHMARK_PROMPT_QUALITY,
persona: env.BIGSET_BENCHMARK_PERSONA,
expectedStress: env.BIGSET_BENCHMARK_EXPECTED_STRESS,
};
}
32 changes: 32 additions & 0 deletions backend/test/populate-collection-runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ test("collection runtime threads recipe instructions into the collection prompt"
let capturedInput: CollectionPopulatePipelineInput | undefined;
const runtime = new CollectionPopulateRecipeRuntime({
targetRows: 3,
benchmarkMetadata: {
promptId: "latest-ai-blog-posts",
promptQuality: "easy",
persona: "technical operator",
expectedStress: "Latest dated source pages; date precision matters.",
},
runPipeline: async (input) => {
capturedInput = input;
return {
Expand Down Expand Up @@ -89,6 +95,13 @@ test("collection runtime threads recipe instructions into the collection prompt"
assert.equal(capturedInput.datasetId, context.datasetId);
assert.equal(capturedInput.datasetName, context.datasetName);
assert.equal(capturedInput.targetRows, 3);
assert.equal(capturedInput.promptId, "latest-ai-blog-posts");
assert.equal(capturedInput.promptQuality, "easy");
assert.equal(capturedInput.persona, "technical operator");
assert.equal(
capturedInput.expectedStress,
"Latest dated source pages; date precision matters."
);
assert.deepEqual(capturedInput.requiredColumns, [
"entity_name",
"latest_post_title",
Expand Down Expand Up @@ -119,6 +132,25 @@ test("collection pipeline input builder trims empty recipe instructions", () =>
assert.doesNotMatch(input.prompt, /Durable recipe instructions/);
});

test("collection pipeline input builder carries benchmark metadata", () => {
const input = collectionPipelineInputFromRecipe({
recipe: collectionRecipe(),
context,
targetRows: 5,
benchmarkMetadata: {
promptId: "saas-pricing-pages",
promptQuality: "medium",
persona: "startup founder",
expectedStress: "Official pricing evidence.",
},
});

assert.equal(input.promptId, "saas-pricing-pages");
assert.equal(input.promptQuality, "medium");
assert.equal(input.persona, "startup founder");
assert.equal(input.expectedStress, "Official pricing evidence.");
});

function collectionRecipe(input: {
runtimeInstructions?: string;
} = {}): PopulateRecipe {
Expand Down
80 changes: 79 additions & 1 deletion backend/test/populate-runtime-selection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
selectedPopulateRuntimeName,
} from "../src/pipeline/populate-runtime-selection.js";
import { CollectionPopulateRecipeRuntime } from "../src/pipeline/populate-collection-runtime.js";
import { MastraPopulateRecipeRuntime } from "../src/pipeline/populate-self-healing.js";
import {
createPopulateRecipe,
MastraPopulateRecipeRuntime,
} from "../src/pipeline/populate-self-healing.js";
import type { DatasetContext } from "../src/pipeline/populate.js";

test("populate runtime selection defaults to Mastra", async () => {
assert.equal(selectedPopulateRuntimeName({}), "mastra");
Expand Down Expand Up @@ -48,3 +52,77 @@ test("populate runtime selection rejects collection without a runner", async ()
/requires a collection pipeline runner/
);
});

test("populate runtime selection loads collection runner from env module", async () => {
const runtime = await createPopulateRecipeRuntime({
env: {
POPULATE_AGENT_RUNTIME: "collection",
POPULATE_COLLECTION_RUNNER_MODULE: runnerModuleUrl(),
BIGSET_BENCHMARK_PROMPT_ID: "latest-ai-blog-posts",
BIGSET_BENCHMARK_PROMPT_QUALITY: "easy",
BIGSET_BENCHMARK_PERSONA: "technical operator",
BIGSET_BENCHMARK_EXPECTED_STRESS: "Latest dated source pages.",
},
});
const context: DatasetContext = {
datasetId: "dataset-ai-posts",
datasetName: "AI posts",
description: "Find latest blog posts from OpenAI.",
columns: [
{ name: "entity_name", type: "text" },
{ name: "source_url", type: "url" },
{ name: "evidence_quote", type: "text" },
],
};
const run = await runtime.runRecipe({
context,
recipe: createPopulateRecipe({
recipeId: "collection-v1",
datasetId: context.datasetId,
version: 1,
status: "active",
runtimeInstructions: "Prefer official sources.",
sourceDescription: context.description,
requestedColumns: context.columns.map((column) => column.name),
createdBy: "system",
}),
});

assert.equal(run.runStatus, "succeeded");
assert.equal(run.rows[0]?.cells.entity_name, "latest-ai-blog-posts");
assert.equal(run.rows[0]?.cells.evidence_quote, "technical operator");
});

function runnerModuleUrl(): string {
const source = `
export async function runCollectionPopulatePipeline(input) {
const quote = input.expectedStress || "Loaded runner module.";
return {
rows: [{
cells: {
entity_name: input.promptId,
source_url: "https://example.com/source",
evidence_quote: input.persona,
},
sourceUrls: ["https://example.com/source"],
evidence: [
{ columnName: "entity_name", sourceUrl: "https://example.com/source", quote },
{ columnName: "source_url", sourceUrl: "https://example.com/source", quote },
{ columnName: "evidence_quote", sourceUrl: "https://example.com/source", quote },
],
needsReview: false,
}],
validationIssues: [],
usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 },
metrics: {
searchCalls: 0,
fetchCalls: 0,
browserCalls: 0,
agentRuns: 1,
agentSteps: 0,
},
};
}
`;
return `data:text/javascript,${encodeURIComponent(source)}`;
}
10 changes: 9 additions & 1 deletion benchmarks/dataset-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ the shell. The runner module must export `runCollectionPopulatePipeline(input)`
or a default runner that accepts `CollectionPopulatePipelineInput` and returns a
`PopulateRuntimeResult`.

App and CLI collection-runtime runs use the same runner shape, but load it from
`POPULATE_COLLECTION_RUNNER_MODULE` when `POPULATE_AGENT_RUNTIME=collection`.

## Verify Self-Healing Stack

Use this before asking someone else to migrate a new collection agent into the
Expand Down Expand Up @@ -73,12 +76,17 @@ For each prompt the runner sets:
- `BIGSET_BENCHMARK_PROMPT`
- `BIGSET_BENCHMARK_PROMPT_ID`
- `BIGSET_BENCHMARK_PROMPT_QUALITY`
- `BIGSET_BENCHMARK_PERSONA`
- `BIGSET_BENCHMARK_EXPECTED_STRESS`
- `BIGSET_BENCHMARK_REQUIRED_COLUMNS`
- `BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS`

`BIGSET_BENCHMARK_REQUIRED_COLUMNS` is the requested table shape.
`BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS` is the hard row identity minimum.
Rows still need at least one source URL and evidence quote.
Rows still need at least one source URL and evidence quote. Collection benchmark
runners receive prompt id, quality, persona, expected stress, and required
columns through `CollectionPopulatePipelineInput` so they can build the same
benchmark/spec context that the direct collection lane expects.

## Agent Output Contract

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { resolve } from "node:path";
const prompt = requiredEnv("BIGSET_BENCHMARK_PROMPT");
const promptId = process.env.BIGSET_BENCHMARK_PROMPT_ID ?? "benchmark-prompt";
const promptQuality = process.env.BIGSET_BENCHMARK_PROMPT_QUALITY ?? "unknown";
const persona = process.env.BIGSET_BENCHMARK_PERSONA;
const expectedStress = process.env.BIGSET_BENCHMARK_EXPECTED_STRESS;
const requiredColumns = columnList(
requiredEnv("BIGSET_BENCHMARK_REQUIRED_COLUMNS")
);
Expand Down Expand Up @@ -74,6 +76,12 @@ const service = new SelfHealingPopulateRecipeService({
runtime: new CollectionPopulateRecipeRuntime({
runPipeline: collectionRunner,
targetRows: Number(process.env.BIGSET_COLLECTION_BENCHMARK_MAX_ROWS ?? "10"),
benchmarkMetadata: {
promptId,
promptQuality,
persona,
expectedStress,
},
}),
author: new DefaultPopulateRecipeAuthor(),
});
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/dataset-agent/run-benchmark.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ async function runSystemPrompt(input) {
BIGSET_BENCHMARK_PROMPT: input.promptDefinition.prompt,
BIGSET_BENCHMARK_PROMPT_ID: input.promptDefinition.id,
BIGSET_BENCHMARK_PROMPT_QUALITY: input.promptDefinition.quality,
BIGSET_BENCHMARK_PERSONA: input.promptDefinition.persona,
BIGSET_BENCHMARK_EXPECTED_STRESS: input.promptDefinition.expectedStress,
BIGSET_BENCHMARK_REQUIRED_COLUMNS: input.promptDefinition.requiredColumns.join(","),
BIGSET_BENCHMARK_MINIMUM_REQUIRED_COLUMNS: minimumRequiredColumns.join(","),
},
Expand Down
Loading