Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions backend/src/pipeline/populate-collection-runtime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import type { DatasetContext, PopulateColumn } from "./populate.js";
import type { PopulateRuntimeResult } from "./populate-runtime.js";
import {
emptyPopulateRuntimeResult,
populateRecipeRunResultFromRuntimeResult,
type PopulateRecipe,
type PopulateRecipeRunResult,
type PopulateRecipeRuntime,
} from "./populate-self-healing.js";

export interface CollectionPopulatePipelineColumn {
name: string;
type: PopulateColumn["type"];
description?: string;
}

export interface CollectionPopulatePipelineInput {
datasetId: string;
datasetName: string;
description: string;
columns: CollectionPopulatePipelineColumn[];
requiredColumns: string[];
prompt: string;
recipeInstructions: string;
targetRows: number;
}

export type CollectionPopulatePipelineRunner = (
input: CollectionPopulatePipelineInput
) => Promise<PopulateRuntimeResult>;

export interface CollectionPopulateRecipeRuntimeOptions {
runPipeline: CollectionPopulatePipelineRunner;
targetRows?: number;
}

export class CollectionPopulateRecipeRuntime implements PopulateRecipeRuntime {
constructor(private readonly input: CollectionPopulateRecipeRuntimeOptions) {}

async runRecipe(input: {
recipe: PopulateRecipe;
context: DatasetContext;
}): Promise<PopulateRecipeRunResult> {
const startedAtMs = Date.now();
const startedAt = new Date(startedAtMs).toISOString();
let result: PopulateRuntimeResult;
let failureMessage: string | undefined;

try {
result = await this.input.runPipeline(
collectionPipelineInputFromRecipe({
recipe: input.recipe,
context: input.context,
targetRows: this.input.targetRows ?? 10,
})
);
} catch (error) {
failureMessage = error instanceof Error ? error.message : String(error);
result = emptyPopulateRuntimeResult([failureMessage]);
}

return populateRecipeRunResultFromRuntimeResult({
recipe: input.recipe,
context: input.context,
result,
failureMessage,
startedAt,
startedAtMs,
});
}
}

export function collectionPipelineInputFromRecipe(input: {
recipe: PopulateRecipe;
context: DatasetContext;
targetRows: number;
}): CollectionPopulatePipelineInput {
const recipeInstructions = input.recipe.runtimeInstructions.trim();
return {
datasetId: input.context.datasetId,
datasetName: input.context.datasetName,
description: input.context.description,
columns: input.context.columns.map((column) => ({
name: column.name,
type: column.type,
description: column.description,
})),
requiredColumns: input.context.columns.map((column) => column.name),
prompt: buildCollectionPopulatePrompt({
context: input.context,
recipeInstructions,
}),
recipeInstructions,
targetRows: input.targetRows,
};
}

function buildCollectionPopulatePrompt(input: {
context: DatasetContext;
recipeInstructions: string;
}): string {
const columnLines = input.context.columns.map((column) => {
const description = column.description ? ` - ${column.description}` : "";
return `- ${column.name} (${column.type})${description}`;
});
const parts = [
`Dataset: ${input.context.datasetName}`,
`Task: ${input.context.description}`,
"",
"Requested columns:",
...columnLines,
];

if (input.recipeInstructions) {
parts.push(
"",
"Durable recipe instructions:",
input.recipeInstructions
);
}

return parts.join("\n");
}
58 changes: 38 additions & 20 deletions backend/src/pipeline/populate-self-healing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,32 +167,50 @@ export class MastraPopulateRecipeRuntime implements PopulateRecipeRuntime {
result = emptyPopulateRuntimeResult([failureMessage]);
}

const productionValidation = validatePopulateRuntimeResult({
result,
return populateRecipeRunResultFromRuntimeResult({
recipe: input.recipe,
context: input.context,
});
const artifacts = artifactsForRun({
result,
failureMessage,
validationIssues: result.validationIssues,
productionValidation,
});
const completedAt = new Date().toISOString();

return {
...result,
recipeId: input.recipe.recipeId,
recipeVersion: input.recipe.version,
runStatus: productionValidation.isValid ? "succeeded" : "failed",
startedAt,
completedAt,
runtimeMs: Date.now() - startedAtMs,
productionValidation,
artifacts,
};
startedAtMs,
});
}
}

export function populateRecipeRunResultFromRuntimeResult(input: {
recipe: PopulateRecipe;
context: DatasetContext;
result: PopulateRuntimeResult;
failureMessage?: string;
startedAt: string;
startedAtMs: number;
}): PopulateRecipeRunResult {
const productionValidation = validatePopulateRuntimeResult({
result: input.result,
context: input.context,
});
const artifacts = artifactsForRun({
result: input.result,
failureMessage: input.failureMessage,
validationIssues: input.result.validationIssues,
productionValidation,
});
const completedAt = new Date().toISOString();

return {
...input.result,
recipeId: input.recipe.recipeId,
recipeVersion: input.recipe.version,
runStatus: productionValidation.isValid ? "succeeded" : "failed",
startedAt: input.startedAt,
completedAt,
runtimeMs: Date.now() - input.startedAtMs,
productionValidation,
artifacts,
};
}

export class DefaultPopulateRecipeAuthor implements PopulateRecipeAuthor {
async generateRecipe(
input: PopulateRecipeAuthorGenerateInput
Expand Down Expand Up @@ -836,7 +854,7 @@ function artifactsForRun(input: {
return artifacts;
}

function emptyPopulateRuntimeResult(validationIssues: string[]): PopulateRuntimeResult {
export function emptyPopulateRuntimeResult(validationIssues: string[]): PopulateRuntimeResult {
return {
rows: [],
validationIssues,
Expand Down
135 changes: 135 additions & 0 deletions backend/test/populate-collection-runtime.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import assert from "node:assert/strict";
import { test } from "node:test";

import {
CollectionPopulateRecipeRuntime,
collectionPipelineInputFromRecipe,
type CollectionPopulatePipelineInput,
} from "../src/pipeline/populate-collection-runtime.js";
import {
createPopulateRecipe,
type PopulateRecipe,
} from "../src/pipeline/populate-self-healing.js";
import type { DatasetContext } from "../src/pipeline/populate.js";

const context: DatasetContext = {
datasetId: "dataset-ai-posts",
datasetName: "AI posts",
description: "Find latest blog posts from OpenAI.",
columns: [
{
name: "entity_name",
type: "text",
description: "Company name.",
},
{
name: "latest_post_title",
type: "text",
description: "Post title.",
},
{
name: "source_url",
type: "url",
description: "Source URL.",
},
{
name: "evidence_quote",
type: "text",
description: "Evidence quote.",
},
],
};

test("collection runtime threads recipe instructions into the collection prompt", async () => {
let capturedInput: CollectionPopulatePipelineInput | undefined;
const runtime = new CollectionPopulateRecipeRuntime({
targetRows: 3,
runPipeline: async (input) => {
capturedInput = input;
return {
rows: [{
cells: {
entity_name: "OpenAI",
latest_post_title: "Release notes from OpenAI",
source_url: "https://openai.com/news",
evidence_quote: "Release notes from OpenAI",
},
sourceUrls: ["https://openai.com/news"],
evidence: [{
columnName: "latest_post_title",
sourceUrl: "https://openai.com/news",
quote: "Release notes from OpenAI",
}],
needsReview: false,
}],
validationIssues: [],
usage: {
promptTokens: 11,
completionTokens: 7,
totalTokens: 18,
},
metrics: {
searchCalls: 1,
fetchCalls: 1,
browserCalls: 0,
agentRuns: 1,
agentSteps: 0,
},
};
},
});
const recipe = collectionRecipe({
runtimeInstructions:
"Prefer official news pages already known to work. Do not use aggregator pages.",
});

const run = await runtime.runRecipe({ recipe, context });

assert.ok(capturedInput);
assert.equal(capturedInput.datasetId, context.datasetId);
assert.equal(capturedInput.datasetName, context.datasetName);
assert.equal(capturedInput.targetRows, 3);
assert.deepEqual(capturedInput.requiredColumns, [
"entity_name",
"latest_post_title",
"source_url",
"evidence_quote",
]);
assert.match(capturedInput.prompt, /Find latest blog posts from OpenAI/);
assert.match(capturedInput.prompt, /Durable recipe instructions/);
assert.match(capturedInput.prompt, /Do not use aggregator pages/);
assert.equal(
capturedInput.recipeInstructions,
"Prefer official news pages already known to work. Do not use aggregator pages."
);
assert.equal(run.runStatus, "succeeded");
assert.equal(run.productionValidation.isValid, true);
assert.equal(run.productionValidation.score, 1);
assert.equal(run.rows[0]?.cells.entity_name, "OpenAI");
});

test("collection pipeline input builder trims empty recipe instructions", () => {
const input = collectionPipelineInputFromRecipe({
recipe: collectionRecipe({ runtimeInstructions: " " }),
context,
targetRows: 5,
});

assert.equal(input.recipeInstructions, "");
assert.doesNotMatch(input.prompt, /Durable recipe instructions/);
});

function collectionRecipe(input: {
runtimeInstructions?: string;
} = {}): PopulateRecipe {
return createPopulateRecipe({
recipeId: "collection-v1",
datasetId: context.datasetId,
version: 1,
status: "active",
runtimeInstructions: input.runtimeInstructions ?? "",
sourceDescription: context.description,
requestedColumns: context.columns.map((column) => column.name),
createdBy: "system",
});
}