diff --git a/backend/src/pipeline/collection-agent-runner.ts b/backend/src/pipeline/collection-agent-runner.ts index 9321a06..5c85a4f 100644 --- a/backend/src/pipeline/collection-agent-runner.ts +++ b/backend/src/pipeline/collection-agent-runner.ts @@ -7,9 +7,11 @@ import type { CollectionPopulatePipelineInput, CollectionPopulatePipelineRunner, } from "./populate-collection-runtime.js"; -import type { - PopulateCellValue, - PopulateRuntimeResult, +import { + populateProcessTraceFromSteps, + type PopulateCellValue, + type PopulateRuntimeResult, + type PopulateRuntimeTraceStep, } from "./populate-runtime.js"; type CollectionPipelineModule = { @@ -36,14 +38,27 @@ interface CollectionPipelineOptions { } interface CollectionPipelineResult { + runId?: string; + paths?: { + root?: string; + reportPath?: string; + }; report: { errors?: string[]; dataset_spec?: CollectionDatasetSpec; stats?: CollectionPhaseStats; - initial?: CollectionPhaseStats; + initial?: CollectionPhaseStats & { + search_queries?: string[]; + fetched_urls?: string[]; + failed_urls?: string[]; + }; repair?: { stats?: CollectionPhaseStats; + loops?: CollectionRepairLoopReport[]; }; + search_queries?: string[]; + fetched_urls?: string[]; + failed_urls?: string[]; quality?: { records?: CollectionRecordQuality[]; }; @@ -98,8 +113,18 @@ interface CollectionSourcesReport { } interface CollectionSourceOutcome { + url?: string; + phase?: string; outcome?: string; triage_status?: string; + error?: string; + records_extracted?: number; +} + +interface CollectionRepairLoopReport { + loop_index?: number; + repair_queries?: string[]; + stats?: CollectionPhaseStats; } const AGENT_REQUIRED_TRIAGE_STATUSES = new Set([ @@ -200,6 +225,16 @@ function collectionPipelineResultToPopulateRuntimeResult(input: { ], usage: usageFromPipeline(input.pipeline), metrics: metricsFromReport(input.pipeline.report), + debug: { + capturedRows: [], + capturedSources: [], + selectedRowSource: rows.length > 0 ? "collection_pipeline" : "none", + notes: collectionDebugNotes(input.pipeline.report), + processTrace: collectionProcessTrace({ + pipeline: input.pipeline, + rows, + }), + }, }; } @@ -231,6 +266,122 @@ function capabilityDiagnosticsFromReport(input: { ]; } +function collectionProcessTrace(input: { + pipeline: CollectionPipelineResult; + rows: Array>; +}) { + const report = input.pipeline.report; + const steps: PopulateRuntimeTraceStep[] = []; + + for (const query of report.search_queries ?? report.initial?.search_queries ?? []) { + steps.push({ + kind: "search", + label: "collection-search-query", + status: "succeeded", + input: { query }, + }); + } + + for (const url of report.fetched_urls ?? report.initial?.fetched_urls ?? []) { + steps.push({ + kind: "fetch", + label: "collection-fetched-url", + status: "succeeded", + input: { url }, + }); + } + + for (const url of report.failed_urls ?? report.initial?.failed_urls ?? []) { + steps.push({ + kind: "fetch", + label: "collection-failed-url", + status: "failed", + input: { url }, + }); + } + + for (const loop of report.repair?.loops ?? []) { + for (const query of loop.repair_queries ?? []) { + steps.push({ + kind: "repair", + label: "collection-repair-query", + status: "succeeded", + input: { + loopIndex: loop.loop_index, + query, + }, + }); + } + } + + for (const outcome of report.sources?.outcomes ?? []) { + if (!outcome.url) { + continue; + } + steps.push({ + kind: sourceOutcomeTraceKind(outcome), + label: `collection-source-${outcome.outcome ?? "unknown"}`, + status: sourceOutcomeTraceStatus(outcome), + input: { + url: outcome.url, + phase: outcome.phase, + triageStatus: outcome.triage_status, + }, + output: { + recordsExtracted: outcome.records_extracted, + }, + error: outcome.error, + }); + } + + return populateProcessTraceFromSteps({ + runtime: "collection", + steps, + selectedRowSource: input.rows.length > 0 ? "collection_pipeline" : "none", + notes: collectionDebugNotes(report), + artifactRoot: input.pipeline.paths?.root, + runReportPath: input.pipeline.paths?.reportPath, + }); +} + +function collectionDebugNotes(report: CollectionPipelineResult["report"]): string[] { + const notes = []; + if (report.stats) { + notes.push( + `collection stats: searches=${numberValue(report.stats.search_queries_executed)}, ` + + `fetches=${numberValue(report.stats.pages_fetched)}` + ); + } + if (report.repair?.loops && report.repair.loops.length > 0) { + notes.push(`collection repair loops=${report.repair.loops.length}`); + } + return notes; +} + +function sourceOutcomeTraceKind(outcome: CollectionSourceOutcome): PopulateRuntimeTraceStep["kind"] { + if (outcome.outcome?.startsWith("agent_")) { + return "agent"; + } + if (outcome.outcome === "fetch_failed") { + return "fetch"; + } + return "validation"; +} + +function sourceOutcomeTraceStatus( + outcome: CollectionSourceOutcome +): PopulateRuntimeTraceStep["status"] { + if ( + outcome.outcome && + ["fetch_failed", "skipped", "agent_failed", "agent_deferred", "no_records"].includes( + outcome.outcome + ) + ) { + return "failed"; + } + return "succeeded"; +} + function isAgentRequiredSourceOutcome(outcome: CollectionSourceOutcome): boolean { return ( typeof outcome.triage_status === "string" && diff --git a/backend/src/pipeline/populate-runtime.ts b/backend/src/pipeline/populate-runtime.ts index a91dbe3..0a3cff0 100644 --- a/backend/src/pipeline/populate-runtime.ts +++ b/backend/src/pipeline/populate-runtime.ts @@ -39,13 +39,61 @@ export interface PopulateRuntimeCapturedInsertedRow { export interface PopulateRuntimeCapturedSource { url: string; text: string; + source: "search" | "fetch" | "synthetic"; +} + +export type PopulateRuntimeTraceStepKind = + | "search" + | "fetch" + | "insert_row" + | "agent" + | "extract" + | "repair" + | "validation"; + +export interface PopulateRuntimeTraceStep { + kind: PopulateRuntimeTraceStepKind; + label: string; + status: "succeeded" | "failed" | "skipped"; + input?: Record; + output?: Record; + error?: string; +} + +export interface PopulateProcessTraceSourceArtifact { + url: string; + status: "succeeded" | "failed" | "skipped"; + source: "search" | "fetch" | "agent" | "collection" | "unknown"; + label?: string; + error?: string; +} + +export interface PopulateProcessTrace { + runtime: "mastra" | "mastra-injected" | "collection" | "unknown"; + searchQueries: string[]; + fetchedUrls: string[]; + sourceArtifacts: PopulateProcessTraceSourceArtifact[]; + selectedRowSource: + | "insert_row" + | "structured_recovery" + | "collection_pipeline" + | "none"; + notes: string[]; + steps: PopulateRuntimeTraceStep[]; + artifactRoot?: string; + runReportPath?: string; } export interface PopulateRuntimeDebug { capturedRows: PopulateRuntimeCapturedInsertedRow[]; capturedSources: PopulateRuntimeCapturedSource[]; - selectedRowSource: "insert_row" | "structured_recovery" | "none"; + selectedRowSource: + | "insert_row" + | "structured_recovery" + | "collection_pipeline" + | "none"; notes: string[]; + processTrace: PopulateProcessTrace; } export interface PopulateRuntimeResult { @@ -119,6 +167,7 @@ export async function runPopulateRuntime(input: { const capturedRows: PopulateRuntimeCapturedInsertedRow[] = []; const capturedSources: PopulateRuntimeCapturedSource[] = []; + const processTraceSteps: PopulateRuntimeTraceStep[] = []; const validationIssues: string[] = []; const debugNotes: string[] = []; const metrics = emptyMetrics(); @@ -131,6 +180,7 @@ export async function runPopulateRuntime(input: { metrics, webTools, maxRows: input.maxRows ?? 10, + processTraceSteps, }); const prompt = buildPopulatePrompt(parsedContext); let agentOutput: unknown; @@ -139,16 +189,64 @@ export async function runPopulateRuntime(input: { try { agentOutput = await input.agentRunner({ prompt, tools }); metrics.agentRuns += 1; + processTraceSteps.push({ + kind: "agent", + label: "populate-agent-injected", + status: "succeeded", + input: { + promptCharacters: prompt.length, + toolNames: Object.keys(tools), + }, + output: { + capturedRowCount: capturedRows.length, + capturedSourceCount: capturedSources.length, + }, + }); } catch (error) { - validationIssues.push(populateAgentFailureMessage(error)); + const message = populateAgentFailureMessage(error); + validationIssues.push(message); + processTraceSteps.push({ + kind: "agent", + label: "populate-agent-injected", + status: "failed", + input: { + promptCharacters: prompt.length, + toolNames: Object.keys(tools), + }, + error: message, + }); } } else { try { const agent = createRuntimePopulateAgent({ tools }); agentOutput = await agent.generate(prompt); metrics.agentRuns += 1; + processTraceSteps.push({ + kind: "agent", + label: "populate-agent-mastra", + status: "succeeded", + input: { + promptCharacters: prompt.length, + toolNames: Object.keys(tools), + }, + output: { + capturedRowCount: capturedRows.length, + capturedSourceCount: capturedSources.length, + }, + }); } catch (error) { - validationIssues.push(populateAgentFailureMessage(error)); + const message = populateAgentFailureMessage(error); + validationIssues.push(message); + processTraceSteps.push({ + kind: "agent", + label: "populate-agent-mastra", + status: "failed", + input: { + promptCharacters: prompt.length, + toolNames: Object.keys(tools), + }, + error: message, + }); } } @@ -173,12 +271,28 @@ export async function runPopulateRuntime(input: { capturedSources, }); metrics.agentRuns += 1; + processTraceSteps.push({ + kind: "extract", + label: "structured-row-recovery", + status: "succeeded", + input: { + capturedSourceCount: capturedSources.length, + }, + }); } catch (error) { - validationIssues.push( - `Structured row generation failed: ${ - error instanceof Error ? error.message : String(error) - }` - ); + const message = `Structured row generation failed: ${ + error instanceof Error ? error.message : String(error) + }`; + validationIssues.push(message); + processTraceSteps.push({ + kind: "extract", + label: "structured-row-recovery", + status: "failed", + input: { + capturedSourceCount: capturedSources.length, + }, + error: message, + }); } } @@ -214,6 +328,13 @@ export async function runPopulateRuntime(input: { insertedRows, structuredRows, }); + const processTrace = populateProcessTraceFromSteps({ + runtime: input.agentRunner ? "mastra-injected" : "mastra", + steps: processTraceSteps, + capturedSources, + selectedRowSource, + notes: debugNotes, + }); validationIssues.push(...validateRuntimeRows(rows)); return { @@ -226,6 +347,7 @@ export async function runPopulateRuntime(input: { capturedSources, selectedRowSource, notes: debugNotes, + processTrace, }, }; } @@ -281,6 +403,15 @@ function emptyClarificationResult(validationIssues: string[]): PopulateRuntimeRe capturedSources: [], selectedRowSource: "none", notes: [], + processTrace: { + runtime: "unknown", + searchQueries: [], + fetchedUrls: [], + sourceArtifacts: [], + selectedRowSource: "none", + notes: [], + steps: [], + }, }, }; } @@ -327,6 +458,7 @@ async function enrichCapturedSourcesForStructuredFallback(input: { newSources.push({ url: result.url, text: [result.title, result.snippet].filter(Boolean).join("\n"), + source: "search", }); input.metrics.fetchCalls += 1; try { @@ -334,6 +466,7 @@ async function enrichCapturedSourcesForStructuredFallback(input: { newSources.push({ url: result.url, text: [page.title, page.text].filter(Boolean).join("\n"), + source: "fetch", }); } catch (error) { input.validationIssues.push( @@ -360,6 +493,7 @@ async function captureDirectOfficialSource(input: { input.newSources.push({ url: input.url, text: `${input.entity} official source\n${input.url}`, + source: "synthetic", }); input.input.metrics.fetchCalls += 1; try { @@ -367,6 +501,7 @@ async function captureDirectOfficialSource(input: { input.newSources.push({ url: input.url, text: [page.title, page.text].filter(Boolean).join("\n"), + source: "fetch", }); } catch (error) { input.input.validationIssues.push( @@ -691,6 +826,107 @@ function selectedRowSourceForRows(input: { return "none"; } +export function populateProcessTraceFromSteps(input: { + runtime: PopulateProcessTrace["runtime"]; + steps: PopulateRuntimeTraceStep[]; + capturedSources?: PopulateRuntimeCapturedSource[]; + selectedRowSource: PopulateProcessTrace["selectedRowSource"]; + notes?: string[]; + artifactRoot?: string; + runReportPath?: string; +}): PopulateProcessTrace { + const searchQueries = input.steps.flatMap((step) => { + const query = step.kind === "search" + ? stringValue(step.input?.query) + : undefined; + return query ? [query] : []; + }); + const fetchedUrls = input.steps.flatMap((step) => { + const url = step.kind === "fetch" + ? stringValue(step.input?.url) + : undefined; + return url ? [url] : []; + }); + const sourceArtifacts: PopulateProcessTraceSourceArtifact[] = [ + ...(input.capturedSources ?? []).map((source) => ({ + url: source.url, + status: "succeeded" as const, + source: capturedSourceArtifactSource(source.source), + label: "captured-source", + })), + ...input.steps + .filter((step) => step.kind === "search" && Array.isArray(step.output?.urls)) + .flatMap((step) => + (step.output?.urls as unknown[]).flatMap((url) => { + const sourceUrl = stringValue(url); + return sourceUrl + ? [{ + url: sourceUrl, + status: step.status, + source: "search" as const, + label: step.label, + error: step.error, + }] + : []; + }) + ), + ...input.steps + .filter((step) => step.kind === "fetch") + .flatMap((step) => { + const sourceUrl = stringValue(step.input?.url); + return sourceUrl + ? [{ + url: sourceUrl, + status: step.status, + source: "fetch" as const, + label: step.label, + error: step.error, + }] + : []; + }), + ]; + + return { + runtime: input.runtime, + searchQueries: Array.from(new Set(searchQueries)), + fetchedUrls: uniqueHttpUrls(fetchedUrls), + sourceArtifacts: dedupeProcessTraceSourceArtifacts(sourceArtifacts), + selectedRowSource: input.selectedRowSource, + notes: input.notes ?? [], + steps: input.steps, + artifactRoot: input.artifactRoot, + runReportPath: input.runReportPath, + }; +} + +function capturedSourceArtifactSource( + source: PopulateRuntimeCapturedSource["source"] +): PopulateProcessTraceSourceArtifact["source"] { + if (source === "search" || source === "fetch") { + return source; + } + return "unknown"; +} + +function dedupeProcessTraceSourceArtifacts( + artifacts: PopulateProcessTraceSourceArtifact[] +): PopulateProcessTraceSourceArtifact[] { + const seen = new Set(); + const uniqueArtifacts: PopulateProcessTraceSourceArtifact[] = []; + for (const artifact of artifacts) { + if (!/^https?:\/\//i.test(artifact.url)) { + continue; + } + const key = `${artifact.url}|${artifact.status}|${artifact.source}|${artifact.label ?? ""}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + uniqueArtifacts.push(artifact); + } + return uniqueArtifacts; +} + function createPopulateRuntimeTools(input: { datasetId: string; capturedRows: PopulateRuntimeCapturedInsertedRow[]; @@ -699,6 +935,7 @@ function createPopulateRuntimeTools(input: { metrics: PopulateRuntimeResult["metrics"]; webTools: PopulateRuntimeWebTools; maxRows: number; + processTraceSteps: PopulateRuntimeTraceStep[]; }) { return { insert_row: createTool({ @@ -714,18 +951,50 @@ function createPopulateRuntimeTools(input: { }), execute: async ({ datasetId, data }) => { if (datasetId !== input.datasetId) { + input.processTraceSteps.push({ + kind: "insert_row", + label: "insert_row", + status: "failed", + input: { + datasetId, + columnNames: Object.keys(data), + }, + error: `datasetId must be ${input.datasetId}.`, + }); return { success: false, error: `datasetId must be ${input.datasetId}.`, }; } if (input.capturedRows.length >= input.maxRows) { + input.processTraceSteps.push({ + kind: "insert_row", + label: "insert_row", + status: "failed", + input: { + datasetId, + columnNames: Object.keys(data), + }, + error: `Row cap reached for this benchmark run (${input.maxRows}).`, + }); return { success: false, error: `Row cap reached for this benchmark run (${input.maxRows}).`, }; } input.capturedRows.push({ datasetId, data }); + input.processTraceSteps.push({ + kind: "insert_row", + label: "insert_row", + status: "succeeded", + input: { + datasetId, + columnNames: Object.keys(data), + }, + output: { + capturedRowCount: input.capturedRows.length, + }, + }); return { success: true }; }, }), @@ -749,12 +1018,30 @@ function createPopulateRuntimeTools(input: { ...results.map((result) => ({ url: result.url, text: [result.title, result.snippet].filter(Boolean).join("\n"), + source: "search" as const, })) ); + input.processTraceSteps.push({ + kind: "search", + label: "search_web", + status: "succeeded", + input: { query }, + output: { + resultCount: results.length, + urls: results.map((result) => result.url).slice(0, 10), + }, + }); return { results }; } catch (error) { const message = error instanceof Error ? error.message : String(error); input.validationIssues.push(`search_web failed: ${message}`); + input.processTraceSteps.push({ + kind: "search", + label: "search_web", + status: "failed", + input: { query }, + error: message, + }); return { error: message }; } }, @@ -775,11 +1062,29 @@ function createPopulateRuntimeTools(input: { input.capturedSources.push({ url, text: [page.title, page.text].filter(Boolean).join("\n"), + source: "fetch", + }); + input.processTraceSteps.push({ + kind: "fetch", + label: "fetch_page", + status: "succeeded", + input: { url }, + output: { + title: page.title, + textCharacters: page.text?.length ?? 0, + }, }); return page; } catch (error) { const message = error instanceof Error ? error.message : String(error); input.validationIssues.push(`fetch_page failed: ${message}`); + input.processTraceSteps.push({ + kind: "fetch", + label: "fetch_page", + status: "failed", + input: { url }, + error: message, + }); return { error: message }; } }, diff --git a/backend/src/pipeline/populate-self-healing.ts b/backend/src/pipeline/populate-self-healing.ts index b5f89e2..2ba75ba 100644 --- a/backend/src/pipeline/populate-self-healing.ts +++ b/backend/src/pipeline/populate-self-healing.ts @@ -2,6 +2,7 @@ import { mkdir, readFile, writeFile } from "node:fs/promises"; import { join } from "node:path"; import { + type PopulateProcessTrace, type PopulateRuntimeAgentRunner, type PopulateRuntimeResult, type PopulateRuntimeRow, @@ -25,9 +26,17 @@ export type PopulateRecipeArtifactKind = | "text" | "stderr" | "source-transcript" - | "captured-rows"; + | "captured-rows" + | "process-trace" + | "playwright-candidate-script"; const MAX_ARTIFACT_TEXT_LENGTH = 20_000; +const PROCESS_TRACE_ARTIFACT_LIMITS = [ + { maxItems: 100, maxNestedItems: 25, maxStringLength: 500 }, + { maxItems: 50, maxNestedItems: 10, maxStringLength: 240 }, + { maxItems: 25, maxNestedItems: 8, maxStringLength: 120 }, + { maxItems: 10, maxNestedItems: 5, maxStringLength: 80 }, +] as const; export interface PopulateRecipe { recipeId: string; @@ -103,6 +112,7 @@ export interface StoredPopulateRecipeRunRecord { runStatus: PopulateRecipeRunStatus; completedAt: string; productionValidation: PopulateRecipeProductionValidation; + artifacts: PopulateRecipeArtifact[]; } export interface PopulateRecipeStoreSnapshot { @@ -454,7 +464,10 @@ export class FileSystemPopulateRecipeStore implements PopulateRecipeStore { return { datasetId, recipes: parsed.recipes ?? [], - runRecords: parsed.runRecords ?? [], + runRecords: (parsed.runRecords ?? []).map((record) => ({ + ...record, + artifacts: record.artifacts ?? [], + })), }; } catch (error) { if (isNodeError(error) && error.code === "ENOENT") { @@ -828,6 +841,15 @@ function artifactsForRun(input: { } const capturedSources = input.result.debug?.capturedSources ?? []; const capturedRows = input.result.debug?.capturedRows ?? []; + const processTrace = input.result.debug?.processTrace ?? { + runtime: "unknown", + searchQueries: [], + fetchedUrls: [], + sourceArtifacts: [], + selectedRowSource: "none", + notes: [], + steps: [], + }; if (capturedSources.length > 0) { artifacts.push({ kind: "source-transcript", @@ -851,9 +873,131 @@ function artifactsForRun(input: { .slice(0, MAX_ARTIFACT_TEXT_LENGTH), }); } + if ( + processTrace.steps.length > 0 || + processTrace.searchQueries.length > 0 || + processTrace.fetchedUrls.length > 0 || + processTrace.sourceArtifacts.length > 0 + ) { + artifacts.push({ + kind: "process-trace", + label: "populate-process-trace", + content: processTraceArtifactContent(processTrace), + }); + } return artifacts; } +function processTraceArtifactContent(processTrace: PopulateProcessTrace): string { + let content = ""; + for (const limits of PROCESS_TRACE_ARTIFACT_LIMITS) { + content = JSON.stringify(truncatedProcessTrace(processTrace, limits), null, 2); + if (content.length <= MAX_ARTIFACT_TEXT_LENGTH) { + return content; + } + } + return content; +} + +function truncatedProcessTrace( + processTrace: PopulateProcessTrace, + limits: typeof PROCESS_TRACE_ARTIFACT_LIMITS[number] +) { + return { + ...processTrace, + truncated: hasProcessTraceOverflow(processTrace, limits), + searchQueries: processTrace.searchQueries + .slice(0, limits.maxItems) + .map((query) => truncateArtifactString(query, limits)), + fetchedUrls: processTrace.fetchedUrls + .slice(0, limits.maxItems) + .map((url) => truncateArtifactString(url, limits)), + sourceArtifacts: processTrace.sourceArtifacts.slice(0, limits.maxItems).map((artifact) => ({ + ...artifact, + url: truncateArtifactString(artifact.url, limits), + label: artifact.label + ? truncateArtifactString(artifact.label, limits) + : artifact.label, + error: artifact.error + ? truncateArtifactString(artifact.error, limits) + : artifact.error, + })), + notes: processTrace.notes + .slice(0, limits.maxItems) + .map((note) => truncateArtifactString(note, limits)), + steps: processTrace.steps.slice(0, limits.maxItems).map((step) => ({ + ...step, + label: truncateArtifactString(step.label, limits), + input: truncateArtifactJson(step.input, limits), + output: truncateArtifactJson(step.output, limits), + error: step.error ? truncateArtifactString(step.error, limits) : step.error, + })), + }; +} + +function hasProcessTraceOverflow( + processTrace: PopulateProcessTrace, + limits: typeof PROCESS_TRACE_ARTIFACT_LIMITS[number] +): boolean { + return ( + processTrace.searchQueries.length > limits.maxItems || + processTrace.fetchedUrls.length > limits.maxItems || + processTrace.sourceArtifacts.length > limits.maxItems || + processTrace.notes.length > limits.maxItems || + processTrace.steps.length > limits.maxItems || + processTrace.searchQueries.some((query) => query.length > limits.maxStringLength) || + processTrace.fetchedUrls.some((url) => url.length > limits.maxStringLength) || + processTrace.notes.some((note) => note.length > limits.maxStringLength) || + processTrace.sourceArtifacts.some((artifact) => + [ + artifact.url, + artifact.label ?? "", + artifact.error ?? "", + ].some((value) => value.length > limits.maxStringLength) + ) || + processTrace.steps.some((step) => + [ + step.label, + step.error ?? "", + ].some((value) => value.length > limits.maxStringLength) + ) + ); +} + +function truncateArtifactJson( + value: unknown, + limits: typeof PROCESS_TRACE_ARTIFACT_LIMITS[number] +): unknown { + if (typeof value === "string") { + return truncateArtifactString(value, limits); + } + if (Array.isArray(value)) { + return value + .slice(0, limits.maxNestedItems) + .map((nestedValue) => truncateArtifactJson(nestedValue, limits)); + } + if (value && typeof value === "object") { + return Object.fromEntries( + Object.entries(value as Record) + .slice(0, limits.maxNestedItems) + .map(([key, nestedValue]) => [ + key, + truncateArtifactJson(nestedValue, limits), + ]) + ); + } + return value; +} + +function truncateArtifactString( + value: string, + limits: typeof PROCESS_TRACE_ARTIFACT_LIMITS[number] +): string { + return value.length > limits.maxStringLength + ? `${value.slice(0, limits.maxStringLength)}\n[truncated]` + : value; +} + export function emptyPopulateRuntimeResult(validationIssues: string[]): PopulateRuntimeResult { return { rows: [], @@ -875,6 +1019,15 @@ export function emptyPopulateRuntimeResult(validationIssues: string[]): Populate capturedSources: [], selectedRowSource: "none", notes: [], + processTrace: { + runtime: "unknown", + searchQueries: [], + fetchedUrls: [], + sourceArtifacts: [], + selectedRowSource: "none", + notes: [], + steps: [], + }, }, }; } @@ -936,6 +1089,7 @@ function runRecordFromRunResult( runStatus: runResult.runStatus, completedAt: runResult.completedAt, productionValidation: runResult.productionValidation, + artifacts: runResult.artifacts, }; } diff --git a/backend/test/collection-agent-runner.test.ts b/backend/test/collection-agent-runner.test.ts index 1b88c6e..4907f91 100644 --- a/backend/test/collection-agent-runner.test.ts +++ b/backend/test/collection-agent-runner.test.ts @@ -36,6 +36,23 @@ test("collection agent runner maps vendored pipeline output into populate runtim assert.equal(result.metrics.browserCalls, 3); assert.equal(result.metrics.agentRuns, 3); assert.equal(result.metrics.agentSteps, 3); + assert.equal(result.debug?.selectedRowSource, "collection_pipeline"); + assert.equal(result.debug?.processTrace.runtime, "collection"); + assert.deepEqual(result.debug?.processTrace.searchQueries, [ + "OpenAI latest AI blog posts", + "OpenAI release notes", + ]); + assert.deepEqual(result.debug?.processTrace.fetchedUrls, [ + "https://openai.com/news", + "https://openai.com/research", + ]); + assert.equal( + result.debug?.processTrace.sourceArtifacts.some((artifact) => + artifact.url === "https://openai.com/news" && + artifact.status === "succeeded" + ), + true + ); } finally { restoreEnv(previousEnv); } @@ -202,6 +219,11 @@ function fakeCollectionPipelineModuleUrl(input: { throw new Error("required columns missing from benchmark context"); } return { + runId: "fake-run-1", + paths: { + root: "/tmp/fake-run-1", + reportPath: "/tmp/fake-run-1/run_report.json", + }, report: { errors: [], dataset_spec: { @@ -218,6 +240,15 @@ function fakeCollectionPipelineModuleUrl(input: { }, }, initial: { + search_queries: [ + "OpenAI latest AI blog posts", + "OpenAI release notes", + ], + fetched_urls: [ + "https://openai.com/news", + "https://openai.com/research", + ], + failed_urls: [], triage: { agent_dispatched: 1, agent_succeeded: 1, @@ -225,6 +256,10 @@ function fakeCollectionPipelineModuleUrl(input: { }, }, repair: { + loops: [{ + loop_index: 1, + repair_queries: ["OpenAI blog official source_url evidence"], + }], stats: { triage: { agent_dispatched: 2, @@ -236,7 +271,24 @@ function fakeCollectionPipelineModuleUrl(input: { quality: { records: [{ record_id: "pk:openai", needs_review: true }], }, - sources: ${JSON.stringify(input.sources ?? { outcomes: [] })}, + search_queries: [ + "OpenAI latest AI blog posts", + "OpenAI release notes", + ], + fetched_urls: [ + "https://openai.com/news", + "https://openai.com/research", + ], + failed_urls: [], + sources: ${JSON.stringify(input.sources ?? { + outcomes: [{ + url: "https://openai.com/news", + outcome: "success", + phase: "initial", + triage_status: "extract_now", + records_extracted: 1, + }], + })}, llm_usage: { prompt_tokens: 1, completion_tokens: 1, diff --git a/backend/test/populate-self-healing.test.ts b/backend/test/populate-self-healing.test.ts index e1be40d..7544460 100644 --- a/backend/test/populate-self-healing.test.ts +++ b/backend/test/populate-self-healing.test.ts @@ -108,6 +108,15 @@ test("Mastra populate recipe runtime maps populate rows into a healthy recipe ru assert.equal(run.debug?.selectedRowSource, "insert_row"); assert.ok(run.artifacts.some((artifact) => artifact.kind === "source-transcript")); assert.ok(run.artifacts.some((artifact) => artifact.kind === "captured-rows")); + const traceArtifact = run.artifacts.find((artifact) => + artifact.kind === "process-trace" + ); + assert.ok(traceArtifact); + const trace = JSON.parse(traceArtifact.content); + assert.equal(trace.runtime, "mastra-injected"); + assert.deepEqual(trace.searchQueries, ["OpenAI latest blog"]); + assert.deepEqual(trace.fetchedUrls, ["https://openai.com/news"]); + assert.equal(trace.selectedRowSource, "insert_row"); }); test("Mastra populate recipe runtime keeps supplemental fetch misses non-blocking", async () => { @@ -133,6 +142,55 @@ test("Mastra populate recipe runtime keeps supplemental fetch misses non-blockin assert.match(run.productionValidation.warnings.join("\n"), /timeout/); }); +test("process trace artifacts stay parseable when trace content is large", async () => { + const runtime = new MastraPopulateRecipeRuntime({ + runPopulate: async () => ({ + rows: validRows(), + validationIssues: [], + usage: emptyUsage(), + metrics: emptyMetrics(), + debug: { + capturedRows: [], + capturedSources: [], + selectedRowSource: "collection_pipeline", + notes: [], + processTrace: { + runtime: "collection", + searchQueries: Array.from({ length: 125 }, (_, index) => + `query-${index}-${"x".repeat(1_000)}` + ), + fetchedUrls: [], + sourceArtifacts: [], + selectedRowSource: "collection_pipeline", + notes: ["n".repeat(1_000)], + steps: Array.from({ length: 125 }, (_, index) => ({ + kind: "search" as const, + label: `collection-search-query-${index}`, + status: "succeeded" as const, + input: { query: "x".repeat(1_000) }, + })), + }, + }, + }), + }); + + const run = await runtime.runRecipe({ + recipe: recipe({ recipeId: "recipe-v1" }), + context, + }); + const traceArtifact = run.artifacts.find((artifact) => + artifact.kind === "process-trace" + ); + + assert.ok(traceArtifact); + assert.ok(traceArtifact.content.length <= 20_000); + const parsedTrace = JSON.parse(traceArtifact.content); + assert.equal(parsedTrace.truncated, true); + assert.ok(parsedTrace.steps.length > 0); + assert.ok(parsedTrace.steps.length <= 100); + assert.match(parsedTrace.searchQueries[0], /\[truncated\]/); +}); + test("Mastra populate recipe runtime blocks missing expected entities", async () => { const runtime = new MastraPopulateRecipeRuntime({ runPopulate: async () => ({ @@ -370,7 +428,7 @@ test("file store reloads populate recipes and run records", async () => { const service = new SelfHealingPopulateRecipeService({ store, runtime: new FakePopulateRecipeRuntime({ - "persisted-v1": validRun(generatedRecipe), + "persisted-v1": validRun(generatedRecipe, 1, [processTraceArtifact()]), }), author: new FakeRecipeAuthor({ generatedRecipe }), }); @@ -384,6 +442,11 @@ test("file store reloads populate recipes and run records", async () => { assert.equal(snapshot.recipes[0]?.status, "active"); assert.equal(snapshot.runRecords.length, 1); assert.equal(snapshot.runRecords[0]?.runStatus, "succeeded"); + assert.equal(snapshot.runRecords[0]?.artifacts[0]?.kind, "process-trace"); + assert.match( + snapshot.runRecords[0]?.artifacts[0]?.content ?? "", + /collection-search-query/ + ); }); interface ToolLike { @@ -408,12 +471,17 @@ function recipe(input: { }); } -function validRun(recipe: PopulateRecipe, score = 1): PopulateRecipeRunResult { +function validRun( + recipe: PopulateRecipe, + score = 1, + artifacts: PopulateRecipeRunResult["artifacts"] = [] +): PopulateRecipeRunResult { return runResult({ recipe, rows: validRows(), isValid: true, score, + artifacts, }); } @@ -435,6 +503,7 @@ function runResult(input: { criticalIssues?: string[]; isValid: boolean; score: number; + artifacts?: PopulateRecipeRunResult["artifacts"]; }): PopulateRecipeRunResult { return { rows: input.rows, @@ -470,7 +539,32 @@ function runResult(input: { criticalIssues: input.criticalIssues ?? [], warnings: input.validationIssues ?? [], }, - artifacts: [], + artifacts: input.artifacts ?? [], + }; +} + +function processTraceArtifact(): PopulateRecipeRunResult["artifacts"][number] { + return { + kind: "process-trace", + label: "populate-process-trace", + content: JSON.stringify({ + runtime: "collection", + searchQueries: ["OpenAI latest blog"], + fetchedUrls: ["https://openai.com/news"], + sourceArtifacts: [{ + url: "https://openai.com/news", + status: "succeeded", + source: "collection", + }], + selectedRowSource: "collection_pipeline", + notes: [], + steps: [{ + kind: "search", + label: "collection-search-query", + status: "succeeded", + input: { query: "OpenAI latest blog" }, + }], + }), }; } diff --git a/docs/data-collection-agent-migration-plan.md b/docs/data-collection-agent-migration-plan.md index 2bb1847..8c0394c 100644 --- a/docs/data-collection-agent-migration-plan.md +++ b/docs/data-collection-agent-migration-plan.md @@ -28,6 +28,9 @@ the collection pipeline is migrated into BigSet. without injecting answer-key URLs at runtime. - PR #46 surfaces no-Agent browser/form/detail follow-up as a safe capability diagnostic instead of hiding it as generic bad data or infra failure. +- PR #47-#52 document and improve collection benchmark evidence, source + coherence, official-source support, and URL-like source evidence. PR #52 fixes + the `official_website` / `company_website` / `product_url` scoring class. - `feat/data-collection-agent-v14` is no longer the branch to build on directly. It was the source of the collection pipeline port. New work should branch on top of the current draft stack, not edit Meteor's branch or the dirty main @@ -63,6 +66,8 @@ The current layer: - stores active recipes and run records in a filesystem recipe store on the durable app/commit path +- persists each run's artifacts on the run record, including a structured + `process-trace` artifact when the runtime exposes one - reruns the active recipe when one exists - generates an initial recipe when no active recipe exists - repairs a failed active recipe through `DefaultPopulateRecipeAuthor` @@ -84,12 +89,20 @@ The current layer now can: - run the real vendored collection pipeline through that same boundary - preserve `recipe.runtimeInstructions`, required columns, and benchmark metadata through the collection runner +- expose structured trace data for both Mastra and collection runs: + `runtime`, `searchQueries`, `fetchedUrls`, `sourceArtifacts`, + `selectedRowSource`, `notes`, and ordered `steps` - emit a capability diagnostic when no-Agent mode sees pages that need browser, form, or detail-page follow-up The current layer does not yet: - generate Playwright scripts as a durable production recipe +- emit `playwright-candidate-script`; that artifact kind is reserved for the + future compiler and is not produced yet +- run cron from compiled Playwright scripts +- repair or promote Playwright scripts; repair still changes durable runtime + instructions only - run a green live Convex canary in this local environment - prove Agent-enabled collection quality on a full real benchmark - prove the collection runtime should replace Mastra as the default app runtime