diff --git a/backend/src/pipeline/populate-collection-runtime.ts b/backend/src/pipeline/populate-collection-runtime.ts new file mode 100644 index 0000000..b0b695a --- /dev/null +++ b/backend/src/pipeline/populate-collection-runtime.ts @@ -0,0 +1,123 @@ +import type { DatasetContext, PopulateColumn } from "./populate.js"; +import type { PopulateRuntimeResult } from "./populate-runtime.js"; +import { + emptyPopulateRuntimeResult, + populateRecipeRunResultFromRuntimeResult, + type PopulateRecipe, + type PopulateRecipeRunResult, + type PopulateRecipeRuntime, +} from "./populate-self-healing.js"; + +export interface CollectionPopulatePipelineColumn { + name: string; + type: PopulateColumn["type"]; + description?: string; +} + +export interface CollectionPopulatePipelineInput { + datasetId: string; + datasetName: string; + description: string; + columns: CollectionPopulatePipelineColumn[]; + requiredColumns: string[]; + prompt: string; + recipeInstructions: string; + targetRows: number; +} + +export type CollectionPopulatePipelineRunner = ( + input: CollectionPopulatePipelineInput +) => Promise; + +export interface CollectionPopulateRecipeRuntimeOptions { + runPipeline: CollectionPopulatePipelineRunner; + targetRows?: number; +} + +export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime { + constructor(private readonly input: CollectionPopulateRecipeRuntimeOptions) {} + + async runRecipe(input: { + recipe: PopulateRecipe; + context: DatasetContext; + }): Promise { + const startedAtMs = Date.now(); + const startedAt = new Date(startedAtMs).toISOString(); + let result: PopulateRuntimeResult; + let failureMessage: string | undefined; + + try { + result = await this.input.runPipeline( + collectionPipelineInputFromRecipe({ + recipe: input.recipe, + context: input.context, + targetRows: this.input.targetRows ?? 10, + }) + ); + } catch (error) { + failureMessage = error instanceof Error ? error.message : String(error); + result = emptyPopulateRuntimeResult([failureMessage]); + } + + return populateRecipeRunResultFromRuntimeResult({ + recipe: input.recipe, + context: input.context, + result, + failureMessage, + startedAt, + startedAtMs, + }); + } +} + +export function collectionPipelineInputFromRecipe(input: { + recipe: PopulateRecipe; + context: DatasetContext; + targetRows: number; +}): CollectionPopulatePipelineInput { + const recipeInstructions = input.recipe.runtimeInstructions.trim(); + return { + datasetId: input.context.datasetId, + datasetName: input.context.datasetName, + description: input.context.description, + columns: input.context.columns.map((column) => ({ + name: column.name, + type: column.type, + description: column.description, + })), + requiredColumns: input.context.columns.map((column) => column.name), + prompt: buildCollectionPopulatePrompt({ + context: input.context, + recipeInstructions, + }), + recipeInstructions, + targetRows: input.targetRows, + }; +} + +function buildCollectionPopulatePrompt(input: { + context: DatasetContext; + recipeInstructions: string; +}): string { + const columnLines = input.context.columns.map((column) => { + const description = column.description ? ` - ${column.description}` : ""; + return `- ${column.name} (${column.type})${description}`; + }); + const parts = [ + `Dataset: ${input.context.datasetName}`, + `Task: ${input.context.description}`, + "", + "Requested columns:", + ...columnLines, + ]; + + if (input.recipeInstructions) { + parts.push( + "", + "Durable recipe instructions:", + input.recipeInstructions + ); + } + + return parts.join("\n"); +} diff --git a/backend/src/pipeline/populate-self-healing.ts b/backend/src/pipeline/populate-self-healing.ts index 0a51728..b5f89e2 100644 --- a/backend/src/pipeline/populate-self-healing.ts +++ b/backend/src/pipeline/populate-self-healing.ts @@ -167,32 +167,50 @@ export class MastraPopulateRecipeRuntime implements PopulateRecipeRuntime { result = emptyPopulateRuntimeResult([failureMessage]); } - const productionValidation = validatePopulateRuntimeResult({ - result, + return populateRecipeRunResultFromRuntimeResult({ + recipe: input.recipe, context: input.context, - }); - const artifacts = artifactsForRun({ result, failureMessage, - validationIssues: result.validationIssues, - productionValidation, - }); - const completedAt = new Date().toISOString(); - - return { - ...result, - recipeId: input.recipe.recipeId, - recipeVersion: input.recipe.version, - runStatus: productionValidation.isValid ? "succeeded" : "failed", startedAt, - completedAt, - runtimeMs: Date.now() - startedAtMs, - productionValidation, - artifacts, - }; + startedAtMs, + }); } } +export function populateRecipeRunResultFromRuntimeResult(input: { + recipe: PopulateRecipe; + context: DatasetContext; + result: PopulateRuntimeResult; + failureMessage?: string; + startedAt: string; + startedAtMs: number; +}): PopulateRecipeRunResult { + const productionValidation = validatePopulateRuntimeResult({ + result: input.result, + context: input.context, + }); + const artifacts = artifactsForRun({ + result: input.result, + failureMessage: input.failureMessage, + validationIssues: input.result.validationIssues, + productionValidation, + }); + const completedAt = new Date().toISOString(); + + return { + ...input.result, + recipeId: input.recipe.recipeId, + recipeVersion: input.recipe.version, + runStatus: productionValidation.isValid ? "succeeded" : "failed", + startedAt: input.startedAt, + completedAt, + runtimeMs: Date.now() - input.startedAtMs, + productionValidation, + artifacts, + }; +} + export class DefaultPopulateRecipeAuthor implements PopulateRecipeAuthor { async generateRecipe( input: PopulateRecipeAuthorGenerateInput @@ -836,7 +854,7 @@ function artifactsForRun(input: { return artifacts; } -function emptyPopulateRuntimeResult(validationIssues: string[]): PopulateRuntimeResult { +export function emptyPopulateRuntimeResult(validationIssues: string[]): PopulateRuntimeResult { return { rows: [], validationIssues, diff --git a/backend/test/populate-collection-runtime.test.ts b/backend/test/populate-collection-runtime.test.ts new file mode 100644 index 0000000..162a74f --- /dev/null +++ b/backend/test/populate-collection-runtime.test.ts @@ -0,0 +1,135 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { + CollectionPopulateRecipeRuntime, + collectionPipelineInputFromRecipe, + type CollectionPopulatePipelineInput, +} from "../src/pipeline/populate-collection-runtime.js"; +import { + createPopulateRecipe, + type PopulateRecipe, +} from "../src/pipeline/populate-self-healing.js"; +import type { DatasetContext } from "../src/pipeline/populate.js"; + +const context: DatasetContext = { + datasetId: "dataset-ai-posts", + datasetName: "AI posts", + description: "Find latest blog posts from OpenAI.", + columns: [ + { + name: "entity_name", + type: "text", + description: "Company name.", + }, + { + name: "latest_post_title", + type: "text", + description: "Post title.", + }, + { + name: "source_url", + type: "url", + description: "Source URL.", + }, + { + name: "evidence_quote", + type: "text", + description: "Evidence quote.", + }, + ], +}; + +test("collection runtime threads recipe instructions into the collection prompt", async () => { + let capturedInput: CollectionPopulatePipelineInput | undefined; + const runtime = new CollectionPopulateRecipeRuntime({ + targetRows: 3, + runPipeline: async (input) => { + capturedInput = input; + return { + rows: [{ + cells: { + entity_name: "OpenAI", + latest_post_title: "Release notes from OpenAI", + source_url: "https://openai.com/news", + evidence_quote: "Release notes from OpenAI", + }, + sourceUrls: ["https://openai.com/news"], + evidence: [{ + columnName: "latest_post_title", + sourceUrl: "https://openai.com/news", + quote: "Release notes from OpenAI", + }], + needsReview: false, + }], + validationIssues: [], + usage: { + promptTokens: 11, + completionTokens: 7, + totalTokens: 18, + }, + metrics: { + searchCalls: 1, + fetchCalls: 1, + browserCalls: 0, + agentRuns: 1, + agentSteps: 0, + }, + }; + }, + }); + const recipe = collectionRecipe({ + runtimeInstructions: + "Prefer official news pages already known to work. Do not use aggregator pages.", + }); + + const run = await runtime.runRecipe({ recipe, context }); + + assert.ok(capturedInput); + assert.equal(capturedInput.datasetId, context.datasetId); + assert.equal(capturedInput.datasetName, context.datasetName); + assert.equal(capturedInput.targetRows, 3); + assert.deepEqual(capturedInput.requiredColumns, [ + "entity_name", + "latest_post_title", + "source_url", + "evidence_quote", + ]); + assert.match(capturedInput.prompt, /Find latest blog posts from OpenAI/); + assert.match(capturedInput.prompt, /Durable recipe instructions/); + assert.match(capturedInput.prompt, /Do not use aggregator pages/); + assert.equal( + capturedInput.recipeInstructions, + "Prefer official news pages already known to work. Do not use aggregator pages." + ); + assert.equal(run.runStatus, "succeeded"); + assert.equal(run.productionValidation.isValid, true); + assert.equal(run.productionValidation.score, 1); + assert.equal(run.rows[0]?.cells.entity_name, "OpenAI"); +}); + +test("collection pipeline input builder trims empty recipe instructions", () => { + const input = collectionPipelineInputFromRecipe({ + recipe: collectionRecipe({ runtimeInstructions: " " }), + context, + targetRows: 5, + }); + + assert.equal(input.recipeInstructions, ""); + assert.doesNotMatch(input.prompt, /Durable recipe instructions/); +}); + +function collectionRecipe(input: { + runtimeInstructions?: string; +} = {}): PopulateRecipe { + return createPopulateRecipe({ + recipeId: "collection-v1", + datasetId: context.datasetId, + version: 1, + status: "active", + runtimeInstructions: input.runtimeInstructions ?? "", + sourceDescription: context.description, + requestedColumns: context.columns.map((column) => column.name), + createdBy: "system", + }); +}