Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ OPENROUTER_API_KEY=sk-or-...
# Generate at https://agent.tinyfish.ai/api-keys
TINYFISH_API_KEY=

# Populate agent row cap (optional). The populate agent stops when this many
# fully-complete rows have been inserted. Defaults to 20 when unset.
# Increase for larger datasets; decrease for faster/cheaper test runs.
BIGSET_POPULATE_TARGET_ROWS=20

# Generate once after the first `make dev` with:
# docker compose exec convex ./generate_admin_key.sh
# Used by the backend container to call internal Convex functions.
Expand Down
31 changes: 25 additions & 6 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is

`src/mastra/` — wraps pipelines into Mastra workflows. Runs as a separate Docker service on :4111 with `mastra dev`, which provides a Studio UI for inspecting and testing workflows.

- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (the populate agent is built per-run, not registered as a singleton)
- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (agents are built per-run, not registered as singletons)
- `src/mastra/workflows/infer-schema.ts` — `inferSchemaWorkflow`, a single-step workflow wrapping `inferSchema()`
- `src/mastra/workflows/populate.ts` — `populateWorkflow`, 3-step workflow: clear rows → build prompt → run populate agent
- `src/mastra/agents/populate.ts` — `buildPopulateAgent(authorizedDatasetId, authContext, columns)`, builds the orchestrator agent (Claude Sonnet 4.6) with 3 tools: `search_web`, `fetch_page`, `investigate_row`. No write access — all inserts go through investigate subagents.
- `src/mastra/agents/investigate.ts` — `buildInvestigateAgent(authorizedDatasetId, authContext, columns)`, builds a per-entity subagent with `insert_row`, `list_rows`, `search_web`, `fetch_page`. Researches one entity, inserts at most one row, returns structured feedback (`INSERTED/SUMMARY/CLUES/REASON`).
- `src/mastra/tools/investigate-tool.ts` — `buildInvestigateTool(authorizedDatasetId, authContext, columns)` creates the `investigate_row` tool. The orchestrator calls it to hand off a lead; it spawns a fresh investigate agent, runs it (maxSteps: 25), parses the structured output, and returns it to the orchestrator. Errors are caught and returned as structured failures so the orchestrator can self-correct.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.

### Tri-agent architecture

The populate pipeline uses three layers of agents, each with a narrow scope:

1. **Populate Orchestrator** (`src/mastra/agents/populate.ts`) — `buildPopulateAgent(authorizedDatasetId, authContext, columns, targetRows)`. Per-iteration: (1) runs parallel searches, (2) batches qualifying URLs and calls `extract_rows` in parallel (up to 5 URLs per call), (3) calls `list_rows` once to see all rows and which are incomplete, (4) calls `investigate_entity` in parallel for every incomplete row. Stops when `targetRows` complete rows are reached or 2 consecutive stagnant iterations occur.

2. **Extract Agent** (`src/mastra/agents/extract.ts`) — `buildExtractAgent(columns, primaryKeyColumn, batchInsertRowsTool)`. Receives a batch of 1–5 URLs. Fetches all pages in parallel, extracts every matching entity across all pages, and calls `batch_insert_rows` once with the full combined entity list. Returns leads for the orchestrator's next search round. No triage step, no investigation — purely fetch → extract → insert.

3. **Investigate Agent** (`src/mastra/agents/investigate.ts`) — `buildInvestigateAgent(columns, primaryKeyColumn, updateRowByKeyTool)`. Researches ONE specific entity to fill its missing columns. Has `search_web` + `fetch_page` + `update_row_by_key`. Returns structured output (`INSERTED: false / SUMMARY / CLUES / REASON`).

### Tool factories

- `src/mastra/tools/investigate-tool.ts` — `buildExtractTool(authorizedDatasetId, authContext, columns, targetRows)` returns `{ extractRowsTool, listRowsTool, investigateEntityTool }`. All three share a single in-memory `rowIndex` (Map of primary-key → `{rowId, confidence, cells}`) and a `pendingInserts` Set. `extract_rows` dispatches a batch of 1–5 URLs to a fresh extract agent (maxSteps: 40); `list_rows` returns a compact text summary for the orchestrator; `investigate_entity` (exposed to the orchestrator, not to extract agents) spawns a fresh investigate agent (maxSteps: 20). `pendingInserts` prevents two parallel extract agents from double-inserting the same entity — the check+add is atomic in JS's single-threaded event loop. A global `Semaphore(10)` caps concurrent investigate agents. The rowIndex refresh loop at the start of each `extract_rows` call picks up rows written by other parallel agents since the last refresh.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. Not used by the populate agent itself — used by other callers. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.
- `src/mastra/tools/web-tools.ts` — 2 TinyFish API tools: `search_web`, `fetch_page`

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. The orchestrator spawns up to 3 investigate subagents in parallel via `investigate_row`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.
### Confidence and merge semantics

`update_row_by_key` uses per-field blank-aware merge rules, enforced atomically in the `datasetRows.mergeUpdate` Convex mutation:
- **Blank cells**: always filled with any non-empty incoming value, regardless of confidence.
- **Non-blank cells**: only overwritten when the new confidence is strictly higher than the row's existing confidence.

The authoritative check lives in Convex (not in the tool layer) because the in-memory `rowIndex` is stale during parallel agent runs. Two concurrent investigate agents reading the same cached confidence could both pass a client-side check, and the slower/weaker write could win. Moving the compare-and-merge into a single Convex transaction eliminates that race.

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.

All tools return structured error messages (not thrown exceptions) so the agent can self-correct.

Expand Down
39 changes: 13 additions & 26 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"@types/node": "^22.0.0",
"mastra": "^1.10.0",
"tsx": "^4.0.0",
"typescript": "^5.0.0"
"typescript": "^5.8.3"
}
}
9 changes: 9 additions & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ export const env = {

OPENROUTER_API_KEY: process.env.OPENROUTER_API_KEY,

// Hard cap on the number of fully-complete rows the populate agent will
// insert per run. The agent stops as soon as this count is reached.
// Override with BIGSET_POPULATE_TARGET_ROWS=N in the root .env file.
// Invalid values (NaN, ≤0, non-integer) fall back to the default of 20.
POPULATE_TARGET_ROWS: (() => {
const parsed = Number(process.env.BIGSET_POPULATE_TARGET_ROWS);
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 20;
})(),

// Resend (transactional email). Optional — when RESEND_API_KEY is unset
// the email module no-ops with a log line, so local dev works without
// a Resend account. EMAIL_FROM must be a domain that's verified in the
Expand Down
27 changes: 27 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import { sendTransactionalEmail } from "./email/send.js";
import { datasetReadyTemplate } from "./email/templates/dataset-ready.js";
import { capture, shutdown as shutdownAnalytics } from "./analytics/posthog.js";
import { EVENTS } from "./analytics/events.js";
import { patchMastraSanitizeToolCallInput } from "./mastra/tools/model-middleware.js";

// Patch JSON.parse globally so that double-encoded tool-call inputs from kimi-k2
// (e.g. `"{"key":"val"}"` instead of `{"key":"val"}`) are recovered before
// Mastra's stream parser throws "Error converting tool call input to JSON".
// Must run before any agent or workflow is executed.
await patchMastraSanitizeToolCallInput();

/** Domain part of an email, for analytics (we never log full addresses). */
function emailDomain(email: string): string {
Expand Down Expand Up @@ -179,6 +186,26 @@ async function runPopulateWorkflowInBackground({
return;
}

// ── Prune incomplete rows ────────────────────────────────────────
// Delete any row the agent inserted but never fully filled, so only
// complete rows appear in the live dataset. Best-effort: log on
// failure but don't block the status transition.
try {
const columnNames = input.columns.map((c) => c.name);
const { deletedCount } = await convex.mutation(
internal.datasetRows.deleteIncomplete,
{ datasetId, columnNames },
);
if (deletedCount > 0) {
logger.info({ deletedCount, datasetId }, "Pruned incomplete rows post-workflow");
}
} catch (pruneErr) {
logger.warn(
{ err: pruneErr, datasetId },
"Failed to prune incomplete rows; proceeding with status transition",
);
}

const rowCount = await convex.query(
internal.datasetRows.countByDataset,
{ datasetId },
Expand Down
95 changes: 95 additions & 0 deletions backend/src/mastra/agents/extract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { Agent } from "@mastra/core/agent";
import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import { fetchPageTool } from "../tools/web-tools.js";
import type { PopulateColumn } from "../../pipeline/populate.js";

const openrouter = createOpenRouter({
apiKey: process.env.OPENROUTER_API_KEY!,
});

function buildExtractInstructions(
columns: PopulateColumn[],
primaryKeyColumn: string,
): string {
const columnNames = columns.map((c) => c.name);
const columnsDesc = columns
.map(
(c) =>
`- "${c.name}" (${c.type})${c.description ? `: ${c.description}` : ""}`,
)
.join("\n");

return `You receive a batch of URLs. Fetch all pages in parallel, extract every matching entity, and insert them in one call.

━━ DATASET SCHEMA ━━
Columns:
${columnsDesc}

Primary key column: "${primaryKeyColumn}"
Tool call data/sources keys MUST be exactly: ${JSON.stringify(columnNames)}

━━ STEP 1: FETCH (parallel) ━━
Call fetch_page for ALL URLs simultaneously in a single response.
Wait for ALL fetches to complete before proceeding.

━━ STEP 2: EXTRACT ━━
Read the full content of every successfully fetched page.
Identify ALL entities that match the dataset schema across all pages.
If the same entity appears on multiple pages, prefer the most complete data
(use non-empty values from any page; do not discard data from secondary pages).

━━ STEP 3: BATCH INSERT ━━
Call batch_insert_rows ONCE with ALL entities combined from all pages.
- Include every entity you found — do not omit any.
- For columns you cannot confirm from any page, use "" — never fabricate.
- For every column you DO fill, record the source URL.
- If no matching entities were found on any page, skip this step.

━━ RULES ━━
1. REAL VALUES ONLY. Never fabricate — use "" for unverifiable columns.
2. SOURCE ATTRIBUTION. Record the source URL for every column you fill.
3. READ ALL PAGES FIRST. Identify all entities before calling batch_insert_rows.
4. ONE CALL ONLY. Call batch_insert_rows exactly once with all entities combined.

━━ FINAL OUTPUT ━━
After all work is done, write a summary with exactly these labels:

LEADS: <URLs of other pages you noticed that likely contain more matching entities;
list each URL on its own line with a dash (- https://...);
also suggest search queries that might find more entities of this type>
SOURCE_QUALITY: <brief assessment of the pages: data richness, entity coverage, reliability>`;
}

/**
* Build a fresh extract Agent for one extract_rows call.
*
* The agent receives a batch of URLs, fetches all of them in parallel,
* extracts every matching entity across all pages, and calls batch_insert_rows
* once with the full combined entity list. It does NOT spawn investigation
* agents — that is the orchestrator's responsibility after list_rows.
*
* Tools: fetch_page, batch_insert_rows.
* No search capability — it only fetches the URLs provided.
*
* batch_insert_rows is passed in from the buildExtractTool closure so the
* shared rowIndex and pendingInserts are maintained across all agents in one
* workflow run.
*
* A fresh agent instance is constructed per extract_rows call; do not cache.
*/
export function buildExtractAgent(
columns: PopulateColumn[],
primaryKeyColumn: string,
batchInsertRowsTool: ReturnType<typeof import("@mastra/core/tools").createTool>,
): Agent {
return new Agent({
id: "extract-agent",
name: "Dataset Extract Agent",
instructions: buildExtractInstructions(columns, primaryKeyColumn),
model: openrouter("moonshotai/kimi-k2-0905"),
tools: {
fetch_page: fetchPageTool,
batch_insert_rows: batchInsertRowsTool,
},
});
}
Loading