Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export interface TinyfishAgentJob {
goal: string;
}

export interface TinyfishAgentRunOptions {
pollTimeoutMs?: number;
}

function runToResult(run: Run): TinyfishAgentRunResult {
const errorMessage =
run.error?.message ??
Expand Down Expand Up @@ -114,8 +118,10 @@ export async function queueTinyfishAgent(
/** Poll `runs.get` until the run reaches a terminal status or times out. */
export async function pollTinyfishAgentUntilDone(
runId: string,
options: TinyfishAgentRunOptions = {},
): Promise<TinyfishAgentRunResult> {
const startedAt = Date.now();
const pollTimeoutMs = options.pollTimeoutMs ?? config.agentPollTimeoutMs;
let lastStatus = RunStatus.PENDING;

while (true) {
Expand All @@ -134,7 +140,7 @@ export async function pollTinyfishAgentUntilDone(
return runToResult(run);
}

if (Date.now() - startedAt >= config.agentPollTimeoutMs) {
if (Date.now() - startedAt >= pollTimeoutMs) {
await cancelTinyfishAgentRun(runId);

try {
Expand All @@ -146,7 +152,7 @@ export async function pollTinyfishAgentUntilDone(
...result,
error:
result.error ??
`Agent run cancelled after ${config.agentPollTimeoutMs}ms (was ${lastStatus})`,
`Agent run cancelled after ${pollTimeoutMs}ms (was ${lastStatus})`,
};
}
return result;
Expand All @@ -159,7 +165,7 @@ export async function pollTinyfishAgentUntilDone(
run_id: runId,
status: "TIMEOUT",
result: null,
error: `Agent run timed out after ${config.agentPollTimeoutMs}ms (last status: ${lastStatus}); cancel requested`,
error: `Agent run timed out after ${pollTimeoutMs}ms (last status: ${lastStatus}); cancel requested`,
};
}

Expand All @@ -173,6 +179,7 @@ export async function pollTinyfishAgentUntilDone(
export async function runTinyfishAgent(
url: string,
goal: string,
options: TinyfishAgentRunOptions = {},
): Promise<TinyfishAgentRunResult> {
const queued = await queueTinyfishAgent(url, goal);
if (queued.error || !queued.run_id) {
Expand All @@ -183,14 +190,15 @@ export async function runTinyfishAgent(
error: queued.error ?? "Failed to queue agent run",
};
}
return pollTinyfishAgentUntilDone(queued.run_id);
return pollTinyfishAgentUntilDone(queued.run_id, options);
}

/**
* Queue all jobs quickly, then poll in parallel — better overlap than sync `/run` waves.
*/
export async function runTinyfishAgentsBatch(
jobs: TinyfishAgentJob[],
options: TinyfishAgentRunOptions = {},
): Promise<TinyfishAgentRunResult[]> {
if (jobs.length === 0) return [];

Expand Down Expand Up @@ -224,7 +232,7 @@ export async function runTinyfishAgentsBatch(
pollTargets,
config.agentPollConcurrency,
async ({ index, run_id }) => {
results[index] = await pollTinyfishAgentUntilDone(run_id);
results[index] = await pollTinyfishAgentUntilDone(run_id, options);
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export async function runAcquisitionPhase(options: {
knownEntityKeys?: string[];
enableTriage?: boolean;
enableTinyfishAgent?: boolean;
agentPollTimeoutMs?: number;
memory?: WorkflowMemory;
forceAgent?: boolean;
/** Fetch outbound links from high-value pages (repair). */
Expand Down Expand Up @@ -222,6 +223,7 @@ export async function runAcquisitionPhase(options: {
enableTinyfishAgent:
options.enableTinyfishAgent ??
(options.forceAgent ? true : config.enableTinyfishAgent),
agentPollTimeoutMs: options.agentPollTimeoutMs,
memory: options.memory,
log: options.log,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export interface PipelineOptions {
refreshInPlace?: boolean;
/** When refreshing, re-fetch URLs already seen in the source run. */
refetchUrls?: boolean;
/** Per-run TinyFish Agent poll timeout. Defaults to vendored config. */
agentPollTimeoutMs?: number;
/** Override pipeline logging (benchmark adapters should log to stderr). */
onLog?: (stage: string, message: string) => void;
/** Set when invoked from the dataset-agent benchmark harness. */
Expand Down Expand Up @@ -262,6 +264,7 @@ async function executeRunPipeline(
pageIndexStart: pageIndex,
enableTriage,
enableTinyfishAgent,
agentPollTimeoutMs: options.agentPollTimeoutMs,
memory: useMemory ? memory : undefined,
log,
});
Expand Down Expand Up @@ -366,6 +369,7 @@ async function executeRunPipeline(
allFailedUrls,
enableTriage,
enableTinyfishAgent,
agentPollTimeoutMs: options.agentPollTimeoutMs,
targetRowCap,
log,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export async function processFetchedPages(options: {
knownEntityKeys?: string[];
enableTriage?: boolean;
enableTinyfishAgent?: boolean;
agentPollTimeoutMs?: number;
memory?: WorkflowMemory;
log: (stage: string, message: string) => void;
}): Promise<ProcessPagesResult> {
Expand Down Expand Up @@ -319,7 +320,9 @@ export async function processFetchedPages(options: {
queueJobIndices.push(index);
}

const agentRunResults = await runTinyfishAgentsBatch(queueJobs);
const agentRunResults = await runTinyfishAgentsBatch(queueJobs, {
pollTimeoutMs: options.agentPollTimeoutMs,
});

const jobsToExtract = queueJobIndices.map((jobIndex, batchIndex) => ({
job: jobsWithGoals[jobIndex]!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface RepairLoopContext {
allFailedUrls: string[];
enableTriage: boolean;
enableTinyfishAgent: boolean;
agentPollTimeoutMs?: number;
targetRowCap: number;
log: (stage: string, message: string) => void;
}
Expand Down Expand Up @@ -162,6 +163,7 @@ export async function runRepairLoops(options: {
knownEntityKeys: entityKeysFromRecords(ctx.spec, recordsBeforeLoop),
enableTriage: ctx.enableTriage,
enableTinyfishAgent: ctx.enableTinyfishAgent,
agentPollTimeoutMs: ctx.agentPollTimeoutMs,
memory: ctx.memory,
forceAgent: preferAgent,
enableLinkFollow: config.enableRepairLinkFollow,
Expand Down
41 changes: 40 additions & 1 deletion backend/src/pipeline/collection-agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface CollectionPipelineOptions {
enableRepair?: boolean;
enableTriage?: boolean;
enableTinyfishAgent?: boolean;
agentPollTimeoutMs?: number;
benchmark?: {
promptId?: string;
promptQuality?: string;
Expand Down Expand Up @@ -91,9 +92,12 @@ interface CollectionRecordQuality {
needs_review?: boolean;
}

const DEFAULT_COLLECTION_AGENT_POLL_TIMEOUT_MS = 480_000;

export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner =
async (input) => {
const outputDir = await mkdtemp(join(tmpdir(), "bigset-collection-"));
const enableTinyfishAgent = boolEnv("COLLECTION_AGENT_ENABLE_AGENT", false);
const pipeline = await loadCollectionPipelineModule();
const result = await pipeline.runPipeline({
prompt: input.prompt,
Expand All @@ -102,7 +106,10 @@ export const runCollectionPopulatePipeline: CollectionPopulatePipelineRunner =
memoryDir: join(outputDir, "memory"),
enableRepair: boolEnv("COLLECTION_AGENT_ENABLE_REPAIR", false),
enableTriage: boolEnv("COLLECTION_AGENT_ENABLE_TRIAGE", true),
enableTinyfishAgent: boolEnv("COLLECTION_AGENT_ENABLE_AGENT", true),
enableTinyfishAgent,
agentPollTimeoutMs: enableTinyfishAgent
? collectionAgentPollTimeoutMs()
: undefined,
benchmark: benchmarkContextFromInput(input),
onLog: (stage, message) => {
console.error(`[collection:${stage}] ${message}`);
Expand Down Expand Up @@ -307,3 +314,35 @@ function boolEnv(name: string, fallback: boolean): boolean {
}
return ["1", "true", "yes", "on"].includes(raw.toLowerCase());
}

function intEnv(name: string, fallback: number): number {
const raw = process.env[name];
if (raw === undefined || raw === "") {
return fallback;
}
const value = Number.parseInt(raw, 10);
if (!Number.isFinite(value) || value <= 0) {
throw new Error(`Invalid ${name}: expected positive integer, got "${raw}"`);
}
return value;
}

function optionalIntEnv(name: string): number | undefined {
const raw = process.env[name];
if (raw === undefined || raw === "") {
return undefined;
}
const value = Number.parseInt(raw, 10);
if (!Number.isFinite(value) || value <= 0) {
throw new Error(`Invalid ${name}: expected positive integer, got "${raw}"`);
}
return value;
}

function collectionAgentPollTimeoutMs(): number {
return optionalIntEnv("AGENT_POLL_TIMEOUT_MS") ??
intEnv(
"COLLECTION_AGENT_POLL_TIMEOUT_MS",
DEFAULT_COLLECTION_AGENT_POLL_TIMEOUT_MS
);
}
Loading