Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is

`src/mastra/` — wraps pipelines into Mastra workflows. Runs as a separate Docker service on :4111 with `mastra dev`, which provides a Studio UI for inspecting and testing workflows.

- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (the populate agent is built per-run, not registered as a singleton)
- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (agents are built per-run, not registered as singletons)
- `src/mastra/workflows/infer-schema.ts` — `inferSchemaWorkflow`, a single-step workflow wrapping `inferSchema()`
- `src/mastra/workflows/populate.ts` — `populateWorkflow`, 3-step workflow: clear rows → build prompt → run populate agent
- `src/mastra/agents/populate.ts` — `buildPopulateAgent(authorizedDatasetId, authContext, columns)`, builds the orchestrator agent (Claude Sonnet 4.6) with 3 tools: `search_web`, `fetch_page`, `investigate_row`. No write access — all inserts go through investigate subagents.
- `src/mastra/agents/investigate.ts` — `buildInvestigateAgent(authorizedDatasetId, authContext, columns)`, builds a per-entity subagent with `insert_row`, `list_rows`, `search_web`, `fetch_page`. Researches one entity, inserts at most one row, returns structured feedback (`INSERTED/SUMMARY/CLUES/REASON`).
- `src/mastra/tools/investigate-tool.ts` — `buildInvestigateTool(authorizedDatasetId, authContext, columns)` creates the `investigate_row` tool. The orchestrator calls it to hand off a lead; it spawns a fresh investigate agent, runs it (maxSteps: 25), parses the structured output, and returns it to the orchestrator. Errors are caught and returned as structured failures so the orchestrator can self-correct.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.

### Tri-agent architecture

The populate pipeline uses three layers of agents, each with a narrow scope:

1. **Populate Orchestrator** (`src/mastra/agents/populate.ts`) — `buildPopulateAgent(authorizedDatasetId, authContext, columns, targetRows)`. Searches the web only; has no write tools. Dispatches URLs to triage-extract agents via `extract_rows`, tracks progress via `list_rows`. Runs 5 parallel searches for the first batch, up to 20 for subsequent batches. Stops when `targetRows` complete rows are reached or 2 consecutive stagnant batches occur.

2. **Triage-Extract Agent** (`src/mastra/agents/triage-extract.ts`) — `buildTriageExtractAgent(columns, primaryKeyColumn, insertRowTool, updateRowByKeyTool, investigateEntityTool)`. Receives ONE URL, fetches it, classifies the page (extract_now / needs_browser_agent / needs_form_fill / low_value / blocked), extracts ALL matching entities, then dispatches `investigate_entity` for rows with missing columns. The triage step enables future routing to TinyFish browser agents or other specialized fetchers based on triage status. No `search_web` — fetch only.

3. **Investigate Agent** (`src/mastra/agents/investigate.ts`) — `buildInvestigateAgent(columns, primaryKeyColumn, updateRowByKeyTool)`. Researches ONE specific entity to fill its missing columns. Has `search_web` + `fetch_page` + `update_row_by_key`. Returns structured output (`INSERTED: false / SUMMARY / CLUES / REASON`).

### Tool factories

- `src/mastra/tools/investigate-tool.ts` — `buildExtractTool(authorizedDatasetId, authContext, columns, targetRows)` returns `{ extractRowsTool, listRowsTool }`. Both tools share a single in-memory `rowIndex` (Map of primary-key → `{rowId, confidence, cells}`) that serves as the canonical state for the run — no Convex round-trip needed for deduplication checks. `extract_rows` dispatches one URL to a fresh triage-extract agent (maxSteps: 40); `list_rows` returns a compact text summary of all rows for the orchestrator. Also builds `investigate_entity` internally, which spawns a fresh investigate agent (maxSteps: 20) and shares the same `rowIndex`.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. Not used by the populate agent itself — used by other callers. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.
- `src/mastra/tools/web-tools.ts` — 2 TinyFish API tools: `search_web`, `fetch_page`

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. The orchestrator spawns up to 3 investigate subagents in parallel via `investigate_row`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.
### Confidence and merge semantics

`update_row_by_key` uses per-field blank-aware merge rules, enforced atomically in the `datasetRows.mergeUpdate` Convex mutation:
- **Blank cells**: always filled with any non-empty incoming value, regardless of confidence.
- **Non-blank cells**: only overwritten when the new confidence is strictly higher than the row's existing confidence.

The authoritative check lives in Convex (not in the tool layer) because the in-memory `rowIndex` is stale during parallel agent runs. Two concurrent investigate agents reading the same cached confidence could both pass a client-side check, and the slower/weaker write could win. Moving the compare-and-merge into a single Convex transaction eliminates that race.

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.

All tools return structured error messages (not thrown exceptions) so the agent can self-correct.

Expand Down
39 changes: 13 additions & 26 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"@types/node": "^22.0.0",
"mastra": "^1.10.0",
"tsx": "^4.0.0",
"typescript": "^5.0.0"
"typescript": "^5.8.3"
}
}
9 changes: 9 additions & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ export const env = {

OPENROUTER_API_KEY: process.env.OPENROUTER_API_KEY,

// Hard cap on the number of fully-complete rows the populate agent will
// insert per run. The agent stops as soon as this count is reached.
// Override with BIGSET_POPULATE_TARGET_ROWS=N in the root .env file.
// Invalid values (NaN, ≤0, non-integer) fall back to the default of 20.
POPULATE_TARGET_ROWS: (() => {
const parsed = Number(process.env.BIGSET_POPULATE_TARGET_ROWS);
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 20;
})(),

// Resend (transactional email). Optional — when RESEND_API_KEY is unset
// the email module no-ops with a log line, so local dev works without
// a Resend account. EMAIL_FROM must be a domain that's verified in the
Expand Down
20 changes: 20 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,26 @@ async function runPopulateWorkflowInBackground({
return;
}

// ── Prune incomplete rows ────────────────────────────────────────
// Delete any row the agent inserted but never fully filled, so only
// complete rows appear in the live dataset. Best-effort: log on
// failure but don't block the status transition.
try {
const columnNames = input.columns.map((c) => c.name);
const { deletedCount } = await convex.mutation(
internal.datasetRows.deleteIncomplete,
{ datasetId, columnNames },
);
if (deletedCount > 0) {
logger.info({ deletedCount, datasetId }, "Pruned incomplete rows post-workflow");
}
} catch (pruneErr) {
logger.warn(
{ err: pruneErr, datasetId },
"Failed to prune incomplete rows; proceeding with status transition",
);
}

const rowCount = await convex.query(
internal.datasetRows.countByDataset,
{ datasetId },
Expand Down
Loading