diff --git a/backend/BigSet_Data_Collection_Agent/src/integrations/tinyfish-agent.ts b/backend/BigSet_Data_Collection_Agent/src/integrations/tinyfish-agent.ts index 01c9da8..4e337f3 100644 --- a/backend/BigSet_Data_Collection_Agent/src/integrations/tinyfish-agent.ts +++ b/backend/BigSet_Data_Collection_Agent/src/integrations/tinyfish-agent.ts @@ -37,6 +37,10 @@ export interface TinyfishAgentJob { goal: string; } +export interface TinyfishAgentRunOptions { + pollTimeoutMs?: number; +} + function runToResult(run: Run): TinyfishAgentRunResult { const errorMessage = run.error?.message ?? @@ -114,8 +118,10 @@ export async function queueTinyfishAgent( /** Poll `runs.get` until the run reaches a terminal status or times out. */ export async function pollTinyfishAgentUntilDone( runId: string, + options: TinyfishAgentRunOptions = {}, ): Promise { const startedAt = Date.now(); + const pollTimeoutMs = options.pollTimeoutMs ?? config.agentPollTimeoutMs; let lastStatus = RunStatus.PENDING; while (true) { @@ -134,7 +140,7 @@ export async function pollTinyfishAgentUntilDone( return runToResult(run); } - if (Date.now() - startedAt >= config.agentPollTimeoutMs) { + if (Date.now() - startedAt >= pollTimeoutMs) { await cancelTinyfishAgentRun(runId); try { @@ -146,7 +152,7 @@ export async function pollTinyfishAgentUntilDone( ...result, error: result.error ?? - `Agent run cancelled after ${config.agentPollTimeoutMs}ms (was ${lastStatus})`, + `Agent run cancelled after ${pollTimeoutMs}ms (was ${lastStatus})`, }; } return result; @@ -159,7 +165,7 @@ export async function pollTinyfishAgentUntilDone( run_id: runId, status: "TIMEOUT", result: null, - error: `Agent run timed out after ${config.agentPollTimeoutMs}ms (last status: ${lastStatus}); cancel requested`, + error: `Agent run timed out after ${pollTimeoutMs}ms (last status: ${lastStatus}); cancel requested`, }; } @@ -173,6 +179,7 @@ export async function pollTinyfishAgentUntilDone( export async function runTinyfishAgent( url: string, goal: string, + options: TinyfishAgentRunOptions = {}, ): Promise { const queued = await queueTinyfishAgent(url, goal); if (queued.error || !queued.run_id) { @@ -183,7 +190,7 @@ export async function runTinyfishAgent( error: queued.error ?? "Failed to queue agent run", }; } - return pollTinyfishAgentUntilDone(queued.run_id); + return pollTinyfishAgentUntilDone(queued.run_id, options); } /** @@ -191,6 +198,7 @@ export async function runTinyfishAgent( */ export async function runTinyfishAgentsBatch( jobs: TinyfishAgentJob[], + options: TinyfishAgentRunOptions = {}, ): Promise { if (jobs.length === 0) return []; @@ -224,7 +232,7 @@ export async function runTinyfishAgentsBatch( pollTargets, config.agentPollConcurrency, async ({ index, run_id }) => { - results[index] = await pollTinyfishAgentUntilDone(run_id); + results[index] = await pollTinyfishAgentUntilDone(run_id, options); }, ); diff --git a/backend/BigSet_Data_Collection_Agent/src/orchestrator/acquisition.ts b/backend/BigSet_Data_Collection_Agent/src/orchestrator/acquisition.ts index ca169e0..6dd748c 100644 --- a/backend/BigSet_Data_Collection_Agent/src/orchestrator/acquisition.ts +++ b/backend/BigSet_Data_Collection_Agent/src/orchestrator/acquisition.ts @@ -87,6 +87,7 @@ export async function runAcquisitionPhase(options: { knownEntityKeys?: string[]; enableTriage?: boolean; enableTinyfishAgent?: boolean; + agentPollTimeoutMs?: number; memory?: WorkflowMemory; forceAgent?: boolean; /** Fetch outbound links from high-value pages (repair). */ @@ -222,6 +223,7 @@ export async function runAcquisitionPhase(options: { enableTinyfishAgent: options.enableTinyfishAgent ?? (options.forceAgent ? true : config.enableTinyfishAgent), + agentPollTimeoutMs: options.agentPollTimeoutMs, memory: options.memory, log: options.log, }); diff --git a/backend/BigSet_Data_Collection_Agent/src/orchestrator/pipeline.ts b/backend/BigSet_Data_Collection_Agent/src/orchestrator/pipeline.ts index 016566b..ae6af0d 100644 --- a/backend/BigSet_Data_Collection_Agent/src/orchestrator/pipeline.ts +++ b/backend/BigSet_Data_Collection_Agent/src/orchestrator/pipeline.ts @@ -63,6 +63,8 @@ export interface PipelineOptions { refreshInPlace?: boolean; /** When refreshing, re-fetch URLs already seen in the source run. */ refetchUrls?: boolean; + /** Per-run TinyFish Agent poll timeout. Defaults to vendored config. */ + agentPollTimeoutMs?: number; /** Override pipeline logging (benchmark adapters should log to stderr). */ onLog?: (stage: string, message: string) => void; /** Set when invoked from the dataset-agent benchmark harness. */ @@ -262,6 +264,7 @@ async function executeRunPipeline( pageIndexStart: pageIndex, enableTriage, enableTinyfishAgent, + agentPollTimeoutMs: options.agentPollTimeoutMs, memory: useMemory ? memory : undefined, log, }); @@ -366,6 +369,7 @@ async function executeRunPipeline( allFailedUrls, enableTriage, enableTinyfishAgent, + agentPollTimeoutMs: options.agentPollTimeoutMs, targetRowCap, log, }, diff --git a/backend/BigSet_Data_Collection_Agent/src/orchestrator/process-pages.ts b/backend/BigSet_Data_Collection_Agent/src/orchestrator/process-pages.ts index 649a1d0..99e2e52 100644 --- a/backend/BigSet_Data_Collection_Agent/src/orchestrator/process-pages.ts +++ b/backend/BigSet_Data_Collection_Agent/src/orchestrator/process-pages.ts @@ -71,6 +71,7 @@ export async function processFetchedPages(options: { knownEntityKeys?: string[]; enableTriage?: boolean; enableTinyfishAgent?: boolean; + agentPollTimeoutMs?: number; memory?: WorkflowMemory; log: (stage: string, message: string) => void; }): Promise { @@ -319,7 +320,9 @@ export async function processFetchedPages(options: { queueJobIndices.push(index); } - const agentRunResults = await runTinyfishAgentsBatch(queueJobs); + const agentRunResults = await runTinyfishAgentsBatch(queueJobs, { + pollTimeoutMs: options.agentPollTimeoutMs, + }); const jobsToExtract = queueJobIndices.map((jobIndex, batchIndex) => ({ job: jobsWithGoals[jobIndex]!, diff --git a/backend/BigSet_Data_Collection_Agent/src/orchestrator/repair-loop.ts b/backend/BigSet_Data_Collection_Agent/src/orchestrator/repair-loop.ts index 4bff7b9..892f531 100644 --- a/backend/BigSet_Data_Collection_Agent/src/orchestrator/repair-loop.ts +++ b/backend/BigSet_Data_Collection_Agent/src/orchestrator/repair-loop.ts @@ -41,6 +41,7 @@ export interface RepairLoopContext { allFailedUrls: string[]; enableTriage: boolean; enableTinyfishAgent: boolean; + agentPollTimeoutMs?: number; targetRowCap: number; log: (stage: string, message: string) => void; } @@ -162,6 +163,7 @@ export async function runRepairLoops(options: { knownEntityKeys: entityKeysFromRecords(ctx.spec, recordsBeforeLoop), enableTriage: ctx.enableTriage, enableTinyfishAgent: ctx.enableTinyfishAgent, + agentPollTimeoutMs: ctx.agentPollTimeoutMs, memory: ctx.memory, forceAgent: preferAgent, enableLinkFollow: config.enableRepairLinkFollow, diff --git a/backend/src/pipeline/collection-agent-runner.ts b/backend/src/pipeline/collection-agent-runner.ts index 67b7eba..bb9d90b 100644 --- a/backend/src/pipeline/collection-agent-runner.ts +++ b/backend/src/pipeline/collection-agent-runner.ts @@ -24,6 +24,7 @@ interface CollectionPipelineOptions { enableRepair?: boolean; enableTriage?: boolean; enableTinyfishAgent?: boolean; + agentPollTimeoutMs?: number; benchmark?: { promptId?: string; promptQuality?: string; @@ -91,9 +92,12 @@ interface CollectionRecordQuality { needs_review?: boolean; } +const DEFAULT_COLLECTION_AGENT_POLL_TIMEOUT_MS = 480_000; + export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner = async (input) => { const outputDir = await mkdtemp(join(tmpdir(), "bigset-collection-")); + const enableTinyfishAgent = boolEnv("COLLECTION_AGENT_ENABLE_AGENT", false); const pipeline = await loadCollectionPipelineModule(); const result = await pipeline.runPipeline({ prompt: input.prompt, @@ -102,7 +106,10 @@ export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner = memoryDir: join(outputDir, "memory"), enableRepair: boolEnv("COLLECTION_AGENT_ENABLE_REPAIR", false), enableTriage: boolEnv("COLLECTION_AGENT_ENABLE_TRIAGE", true), - enableTinyfishAgent: boolEnv("COLLECTION_AGENT_ENABLE_AGENT", true), + enableTinyfishAgent, + agentPollTimeoutMs: enableTinyfishAgent + ? collectionAgentPollTimeoutMs() + : undefined, benchmark: benchmarkContextFromInput(input), onLog: (stage, message) => { console.error(`[collection:${stage}] ${message}`); @@ -307,3 +314,35 @@ function boolEnv(name: string, fallback: boolean): boolean { } return ["1", "true", "yes", "on"].includes(raw.toLowerCase()); } + +function intEnv(name: string, fallback: number): number { + const raw = process.env[name]; + if (raw === undefined || raw === "") { + return fallback; + } + const value = Number.parseInt(raw, 10); + if (!Number.isFinite(value) || value <= 0) { + throw new Error(`Invalid ${name}: expected positive integer, got "${raw}"`); + } + return value; +} + +function optionalIntEnv(name: string): number | undefined { + const raw = process.env[name]; + if (raw === undefined || raw === "") { + return undefined; + } + const value = Number.parseInt(raw, 10); + if (!Number.isFinite(value) || value <= 0) { + throw new Error(`Invalid ${name}: expected positive integer, got "${raw}"`); + } + return value; +} + +function collectionAgentPollTimeoutMs(): number { + return optionalIntEnv("AGENT_POLL_TIMEOUT_MS") ?? + intEnv( + "COLLECTION_AGENT_POLL_TIMEOUT_MS", + DEFAULT_COLLECTION_AGENT_POLL_TIMEOUT_MS + ); +} diff --git a/backend/test/collection-agent-runner.test.ts b/backend/test/collection-agent-runner.test.ts index 0d68cc6..5c16465 100644 --- a/backend/test/collection-agent-runner.test.ts +++ b/backend/test/collection-agent-runner.test.ts @@ -4,33 +4,20 @@ import { test } from "node:test"; import { runCollectionPopulatePipeline } from "../src/pipeline/collection-agent-runner.js"; test("collection agent runner maps vendored pipeline output into populate runtime result", async () => { - const previousModule = process.env.COLLECTION_AGENT_PIPELINE_MODULE; - process.env.COLLECTION_AGENT_PIPELINE_MODULE = fakeCollectionPipelineModuleUrl(); + const previousEnv = snapshotEnv([ + "AGENT_POLL_TIMEOUT_MS", + "COLLECTION_AGENT_ENABLE_AGENT", + "COLLECTION_AGENT_PIPELINE_MODULE", + "COLLECTION_AGENT_POLL_TIMEOUT_MS", + ]); + delete process.env.AGENT_POLL_TIMEOUT_MS; + delete process.env.COLLECTION_AGENT_ENABLE_AGENT; + delete process.env.COLLECTION_AGENT_POLL_TIMEOUT_MS; + process.env.COLLECTION_AGENT_PIPELINE_MODULE = fakeCollectionPipelineModuleUrl({ + expectedCalls: [{ agentEnabled: false }], + }); try { - const result = await runCollectionPopulatePipeline({ - datasetId: "dataset-ai-posts", - datasetName: "AI posts", - description: "Find latest AI blog posts.", - columns: [ - { name: "entity_name", type: "text" }, - { name: "source_url", type: "url" }, - { name: "evidence_quote", type: "text" }, - ], - requiredColumns: ["entity_name", "source_url", "evidence_quote"], - prompt: [ - "Dataset: AI posts", - "Task: Find latest AI blog posts.", - "", - "Durable recipe instructions:", - "Prefer official source pages.", - ].join("\n"), - recipeInstructions: "Prefer official source pages.", - targetRows: 3, - promptId: "latest-ai-blog-posts", - promptQuality: "easy", - persona: "technical operator", - expectedStress: "Latest dated source pages.", - }); + const result = await runCollectionPopulatePipeline(collectionPipelineInput()); assert.equal(result.rows.length, 1); assert.equal(result.rows[0]?.cells.entity_name, "OpenAI"); @@ -50,17 +37,106 @@ test("collection agent runner maps vendored pipeline output into populate runtim assert.equal(result.metrics.agentRuns, 3); assert.equal(result.metrics.agentSteps, 3); } finally { - if (previousModule === undefined) { - delete process.env.COLLECTION_AGENT_PIPELINE_MODULE; - } else { - process.env.COLLECTION_AGENT_PIPELINE_MODULE = previousModule; - } + restoreEnv(previousEnv); } }); -function fakeCollectionPipelineModuleUrl(): string { +test("collection agent runner requires explicit Agent opt-in and caps poll timeout per warm process call", async () => { + const previousEnv = snapshotEnv([ + "AGENT_POLL_TIMEOUT_MS", + "COLLECTION_AGENT_ENABLE_AGENT", + "COLLECTION_AGENT_PIPELINE_MODULE", + "COLLECTION_AGENT_POLL_TIMEOUT_MS", + ]); + delete process.env.AGENT_POLL_TIMEOUT_MS; + delete process.env.COLLECTION_AGENT_ENABLE_AGENT; + delete process.env.COLLECTION_AGENT_POLL_TIMEOUT_MS; + process.env.COLLECTION_AGENT_PIPELINE_MODULE = fakeCollectionPipelineModuleUrl({ + expectedModuleLoadPollTimeoutMs: null, + expectedCalls: [ + { agentEnabled: false }, + { agentEnabled: true, pollTimeoutMs: 12345 }, + { agentEnabled: true, pollTimeoutMs: 23456 }, + ], + }); + + try { + assert.equal( + (await runCollectionPopulatePipeline(collectionPipelineInput())).rows.length, + 1 + ); + + process.env.COLLECTION_AGENT_ENABLE_AGENT = "true"; + process.env.COLLECTION_AGENT_POLL_TIMEOUT_MS = "12345"; + assert.equal( + (await runCollectionPopulatePipeline(collectionPipelineInput())).rows.length, + 1 + ); + + process.env.COLLECTION_AGENT_POLL_TIMEOUT_MS = "23456"; + assert.equal( + (await runCollectionPopulatePipeline(collectionPipelineInput())).rows.length, + 1 + ); + } finally { + restoreEnv(previousEnv); + } +}); + +function collectionPipelineInput() { + return { + datasetId: "dataset-ai-posts", + datasetName: "AI posts", + description: "Find latest AI blog posts.", + columns: [ + { name: "entity_name", type: "text" as const }, + { name: "source_url", type: "url" as const }, + { name: "evidence_quote", type: "text" as const }, + ], + requiredColumns: ["entity_name", "source_url", "evidence_quote"], + prompt: [ + "Dataset: AI posts", + "Task: Find latest AI blog posts.", + "", + "Durable recipe instructions:", + "Prefer official source pages.", + ].join("\n"), + recipeInstructions: "Prefer official source pages.", + targetRows: 3, + promptId: "latest-ai-blog-posts", + promptQuality: "easy", + persona: "technical operator", + expectedStress: "Latest dated source pages.", + }; +} + +function fakeCollectionPipelineModuleUrl(input: { + expectedModuleLoadPollTimeoutMs?: string | null; + expectedCalls: Array<{ + agentEnabled: boolean; + pollTimeoutMs?: number; + }>; +}): string { const source = ` + const moduleLoadPollTimeoutMs = process.env.AGENT_POLL_TIMEOUT_MS ?? null; + const expectedModuleLoadPollTimeoutMs = ${JSON.stringify(input.expectedModuleLoadPollTimeoutMs ?? null)}; + const expectedCalls = ${JSON.stringify(input.expectedCalls)}; + let callIndex = 0; + export async function runPipeline(options) { + if (moduleLoadPollTimeoutMs !== expectedModuleLoadPollTimeoutMs) { + throw new Error("unexpected module-load poll timeout"); + } + const expected = expectedCalls[callIndex++]; + if (!expected) { + throw new Error("unexpected extra pipeline call"); + } + if (options.enableTinyfishAgent !== expected.agentEnabled) { + throw new Error("unexpected TinyFish Agent setting"); + } + if ((options.agentPollTimeoutMs ?? null) !== (expected.pollTimeoutMs ?? null)) { + throw new Error("bounded agent poll timeout missing"); + } if (!options.prompt.includes("Durable recipe instructions")) { throw new Error("recipe instructions missing from prompt"); } @@ -140,3 +216,17 @@ function fakeCollectionPipelineModuleUrl(): string { `; return `data:text/javascript,${encodeURIComponent(source)}`; } + +function snapshotEnv(names: string[]): Map { + return new Map(names.map((name) => [name, process.env[name]])); +} + +function restoreEnv(snapshot: Map): void { + for (const [name, value] of snapshot) { + if (value === undefined) { + delete process.env[name]; + } else { + process.env[name] = value; + } + } +} diff --git a/benchmarks/dataset-agent/README.md b/benchmarks/dataset-agent/README.md index e9a56d7..dac804c 100644 --- a/benchmarks/dataset-agent/README.md +++ b/benchmarks/dataset-agent/README.md @@ -42,6 +42,10 @@ Real collection benchmark runs require `OPENROUTER_API_KEY`, module must export `runCollectionPopulatePipeline(input)` or a default runner that accepts `CollectionPopulatePipelineInput` and returns a `PopulateRuntimeResult`. The pipeline module must export `runPipeline(options)`. +The BigSet runner keeps TinyFish Agent/browser calls off by default so the +benchmark stays cheap and bounded. Set `COLLECTION_AGENT_ENABLE_AGENT=true` to +opt in; Agent polling is capped by `AGENT_POLL_TIMEOUT_MS`, or by +`COLLECTION_AGENT_POLL_TIMEOUT_MS` when the generic timeout is unset. App and CLI collection-runtime runs use the same runner shape, but load it from `POPULATE_COLLECTION_RUNNER_MODULE` when `POPULATE_AGENT_RUNTIME=collection`. diff --git a/docs/data-collection-agent-migration-plan.md b/docs/data-collection-agent-migration-plan.md index 7110815..1833d02 100644 --- a/docs/data-collection-agent-migration-plan.md +++ b/docs/data-collection-agent-migration-plan.md @@ -238,6 +238,12 @@ POPULATE_COLLECTION_RUNNER_MODULE=./backend/src/pipeline/collection-agent-runner COLLECTION_AGENT_PIPELINE_MODULE=./backend/BigSet_Data_Collection_Agent/src/orchestrator/pipeline.ts ``` +The BigSet runner keeps TinyFish Agent/browser calls disabled unless +`COLLECTION_AGENT_ENABLE_AGENT=true`. This makes cron and benchmark reruns cheap +and repeatable first. Agent-enabled runs should also set +`COLLECTION_AGENT_POLL_TIMEOUT_MS` or `AGENT_POLL_TIMEOUT_MS` so a browser run +cannot outlive the benchmark/job budget. + Do not switch the default runtime from Mastra to collection until the self-healing-wrapped collection benchmark has better evidence than the current Mastra lane.