Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { join } from "node:path";
export interface AgentDeferredEntry {
url: string;
status: SourceStatus;
reason: "agent_budget" | "agent_disabled";
}

export interface ProcessPagesResult {
Expand Down Expand Up @@ -216,6 +217,7 @@ export async function processFetchedPages(options: {

const extractPages: { page: FetchedPage; triage: SourceTriageResult }[] = [];
const agentQueue: { page: FetchedPage; triage: SourceTriageResult }[] = [];
const agentDisabledDeferredEntries: AgentDeferredEntry[] = [];

for (const triage of triageResults) {
bumpStatus(summary, triage.status);
Expand All @@ -241,6 +243,11 @@ export async function processFetchedPages(options: {
extractPages.push({ page, triage });
} else if (sourcePolicy.requiresOfficialSource) {
summary.skipped += 1;
agentDisabledDeferredEntries.push({
url: triage.final_url || page.url,
status: triage.status,
reason: "agent_disabled",
});
options.log(
options.label,
`Agent disabled — skip navigation-only official source ${triage.final_url} [${triage.status}]`,
Expand Down Expand Up @@ -296,17 +303,21 @@ export async function processFetchedPages(options: {

const agentBudget = agentEnabled ? config.maxAgentRunsPerPhase : 0;
const toRun = agentQueue.slice(0, agentBudget);
const deferredEntries: AgentDeferredEntry[] = agentQueue
.slice(agentBudget)
.map(({ page, triage }) => ({
url: triage.final_url || page.url,
status: triage.status,
}));
const deferredEntries: AgentDeferredEntry[] = [
...agentDisabledDeferredEntries,
...agentQueue
.slice(agentBudget)
.map(({ page, triage }) => ({
url: triage.final_url || page.url,
status: triage.status,
reason: "agent_budget" as const,
})),
];

if (deferredEntries.length > 0) {
options.log(
options.label,
`Agent budget: running ${toRun.length}/${agentQueue.length} (${deferredEntries.length} deferred)`,
`Agent capability: running ${toRun.length}/${agentQueue.length} (${deferredEntries.length} deferred)`,
);
}

Expand Down
10 changes: 8 additions & 2 deletions backend/BigSet_Data_Collection_Agent/src/quality/build-report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ export interface BuildSourcesOptions {
fetchedUrls: string[];
triageResults: SourceTriageResult[];
agentRuns: AgentRunRecord[];
agentDeferred: { url: string; status: string }[];
agentDeferred: {
url: string;
status: string;
reason?: "agent_budget" | "agent_disabled";
}[];
}

export function buildSourcesReport(
Expand Down Expand Up @@ -133,7 +137,9 @@ export function buildSourcesReport(
phase: options.phase,
outcome: "agent_deferred",
triage_status: deferred.status,
error: "Exceeded MAX_AGENT_RUNS_PER_PHASE budget",
error: deferred.reason === "agent_disabled"
? "TinyFish Agent disabled for browser/form/detail follow-up"
: "Exceeded MAX_AGENT_RUNS_PER_PHASE budget",
});
}

Expand Down
59 changes: 59 additions & 0 deletions backend/src/pipeline/collection-agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ interface CollectionPipelineResult {
quality?: {
records?: CollectionRecordQuality[];
};
sources?: CollectionSourcesReport;
llm_usage?: {
prompt_tokens?: number;
completion_tokens?: number;
Expand Down Expand Up @@ -92,6 +93,21 @@ interface CollectionRecordQuality {
needs_review?: boolean;
}

interface CollectionSourcesReport {
outcomes?: CollectionSourceOutcome[];
}

interface CollectionSourceOutcome {
outcome?: string;
triage_status?: string;
}

const AGENT_REQUIRED_TRIAGE_STATUSES = new Set([
"requires_navigation",
"requires_form_submission",
"requires_detail_page_followup",
]);

const DEFAULT_COLLECTION_AGENT_POLL_TIMEOUT_MS = 480_000;

export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner =
Expand Down Expand Up @@ -119,6 +135,7 @@ export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner =
return collectionPipelineResultToPopulateRuntimeResult({
pipeline: result,
requiredColumns: input.requiredColumns,
enableTinyfishAgent,
});
};

Expand Down Expand Up @@ -157,6 +174,7 @@ function benchmarkContextFromInput(input: CollectionPopulatePipelineInput) {
function collectionPipelineResultToPopulateRuntimeResult(input: {
pipeline: CollectionPipelineResult;
requiredColumns: string[];
enableTinyfishAgent: boolean;
}): PopulateRuntimeResult {
const records = selectOutputRecords(input.pipeline);
const qualityById = qualityByRecordId(input.pipeline.report.quality?.records);
Expand All @@ -168,18 +186,59 @@ function collectionPipelineResultToPopulateRuntimeResult(input: {
qualityById,
})
);
const capabilityDiagnostics = capabilityDiagnosticsFromReport({
report: input.pipeline.report,
enableTinyfishAgent: input.enableTinyfishAgent,
});

return {
rows,
validationIssues: [
...(input.pipeline.report.errors ?? []),
...capabilityDiagnostics,
...(rows.length === 0 ? ["No rows returned from collection pipeline."] : []),
],
usage: usageFromPipeline(input.pipeline),
metrics: metricsFromReport(input.pipeline.report),
};
}

function capabilityDiagnosticsFromReport(input: {
report: CollectionPipelineResult["report"];
enableTinyfishAgent: boolean;
}): string[] {
if (input.enableTinyfishAgent) {
return [];
}
const agentRequiredOutcomes = (input.report.sources?.outcomes ?? []).filter(
isAgentRequiredSourceOutcome
);
if (agentRequiredOutcomes.length === 0) {
return [];
}

const statusCounts = new Map<string, number>();
for (const outcome of agentRequiredOutcomes) {
const status = outcome.triage_status as string;
statusCounts.set(status, (statusCounts.get(status) ?? 0) + 1);
}
const statusSummary = Array.from(statusCounts.entries())
.map(([status, count]) => `${status}=${count}`)
.join(", ");

return [
`Capability diagnostic: TinyFish Agent disabled; triage requested browser/form/detail follow-up for ${agentRequiredOutcomes.length} page(s) (${statusSummary}). Enable COLLECTION_AGENT_ENABLE_AGENT=true for live navigation.`,
];
}

function isAgentRequiredSourceOutcome(outcome: CollectionSourceOutcome): boolean {
return (
typeof outcome.triage_status === "string" &&
AGENT_REQUIRED_TRIAGE_STATUSES.has(outcome.triage_status) &&
outcome.outcome !== "success"
);
}

function selectOutputRecords(
pipeline: CollectionPipelineResult
): CollectionExtractedRecord[] {
Expand Down
50 changes: 50 additions & 0 deletions backend/test/collection-agent-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,54 @@ test("collection agent runner requires explicit Agent opt-in and caps poll timeo
}
});

test("collection agent runner surfaces Agent-required capability diagnostics from source outcomes", async () => {
const previousEnv = snapshotEnv([
"AGENT_POLL_TIMEOUT_MS",
"COLLECTION_AGENT_ENABLE_AGENT",
"COLLECTION_AGENT_PIPELINE_MODULE",
"COLLECTION_AGENT_POLL_TIMEOUT_MS",
]);
delete process.env.AGENT_POLL_TIMEOUT_MS;
delete process.env.COLLECTION_AGENT_ENABLE_AGENT;
delete process.env.COLLECTION_AGENT_POLL_TIMEOUT_MS;
process.env.COLLECTION_AGENT_PIPELINE_MODULE = fakeCollectionPipelineModuleUrl({
expectedCalls: [{ agentEnabled: false }],
sources: {
outcomes: [
{
outcome: "agent_deferred",
triage_status: "requires_navigation",
},
{
outcome: "no_records",
triage_status: "requires_form_submission",
},
{
outcome: "success",
triage_status: "requires_detail_page_followup",
},
],
},
});

try {
const result = await runCollectionPopulatePipeline(collectionPipelineInput());
const diagnostic = result.validationIssues.join("\n");

assert.equal(result.rows.length, 1);
assert.match(diagnostic, /Capability diagnostic: TinyFish Agent disabled/);
assert.match(diagnostic, /2 page\(s\)/);
assert.match(diagnostic, /requires_navigation=1/);
assert.match(diagnostic, /requires_form_submission=1/);
assert.doesNotMatch(
diagnostic,
/failed|missing|no rows|not found|invented|invalid/i
);
} finally {
restoreEnv(previousEnv);
}
});

function collectionPipelineInput() {
return {
datasetId: "dataset-ai-posts",
Expand Down Expand Up @@ -116,6 +164,7 @@ function fakeCollectionPipelineModuleUrl(input: {
agentEnabled: boolean;
pollTimeoutMs?: number;
}>;
sources?: unknown;
}): string {
const source = `
const moduleLoadPollTimeoutMs = process.env.AGENT_POLL_TIMEOUT_MS ?? null;
Expand Down Expand Up @@ -187,6 +236,7 @@ function fakeCollectionPipelineModuleUrl(input: {
quality: {
records: [{ record_id: "pk:openai", needs_review: true }],
},
sources: ${JSON.stringify(input.sources ?? { outcomes: [] })},
llm_usage: {
prompt_tokens: 1,
completion_tokens: 1,
Expand Down
51 changes: 51 additions & 0 deletions backend/test/populate-collection-runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,57 @@ test("collection runtime threads recipe instructions into the collection prompt"
assert.equal(run.rows[0]?.cells.entity_name, "OpenAI");
});

test("collection runtime treats capability diagnostics as non-fatal warnings for healthy rows", async () => {
const runtime = new CollectionPopulateRecipeRuntime({
targetRows: 3,
runPipeline: async () => ({
rows: [{
cells: {
entity_name: "OpenAI",
latest_post_title: "Release notes from OpenAI",
source_url: "https://openai.com/news",
evidence_quote: "Release notes from OpenAI",
},
sourceUrls: ["https://openai.com/news"],
evidence: [{
columnName: "latest_post_title",
sourceUrl: "https://openai.com/news",
quote: "Release notes from OpenAI",
}],
needsReview: false,
}],
validationIssues: [
"Capability diagnostic: TinyFish Agent disabled; triage requested browser/form/detail follow-up for 2 page(s) (requires_navigation=1, requires_form_submission=1). Enable COLLECTION_AGENT_ENABLE_AGENT=true for live navigation.",
],
usage: {
promptTokens: 11,
completionTokens: 7,
totalTokens: 18,
},
metrics: {
searchCalls: 1,
fetchCalls: 1,
browserCalls: 0,
agentRuns: 0,
agentSteps: 0,
},
}),
});

const run = await runtime.runRecipe({
recipe: collectionRecipe(),
context,
});

assert.equal(run.runStatus, "succeeded");
assert.equal(run.productionValidation.isValid, true);
assert.deepEqual(run.productionValidation.criticalIssues, []);
assert.match(
run.productionValidation.warnings.join("\n"),
/Capability diagnostic: TinyFish Agent disabled/
);
});

test("collection pipeline input builder trims empty recipe instructions", () => {
const input = collectionPipelineInputFromRecipe({
recipe: collectionRecipe({ runtimeInstructions: " " }),
Expand Down
Loading