diff --git a/backend/src/index.ts b/backend/src/index.ts index a606b45..e4e6155 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,10 +1,11 @@ -import Fastify from "fastify"; +import Fastify, { type FastifyBaseLogger } from "fastify"; import fastifyCors from "@fastify/cors"; +import type { ClerkClient } from "@clerk/backend"; import { env } from "./env.js"; import clerkAuthPlugin, { requireAuth, getUserEmail } from "./clerk-auth.js"; import { inferSchema } from "./pipeline/schema-inference.js"; -import { datasetContextSchema } from "./pipeline/populate.js"; +import { datasetContextSchema, type DatasetContext } from "./pipeline/populate.js"; import { populateWorkflow } from "./mastra/workflows/populate.js"; import { updateWorkflow } from "./mastra/workflows/update.js"; import { convex, internal } from "./convex.js"; @@ -19,6 +20,211 @@ function emailDomain(email: string): string { return at >= 0 ? email.slice(at + 1).toLowerCase() : "unknown"; } +type DatasetPopulateStatus = "building" | "live" | "failed"; +type DatasetPopulateBeginOutcome = + | "started" + | "not_found" + | "forbidden" + | "already_building"; +type PopulateWorkflowRun = Awaited>; + +function statusErrorMessage(err: unknown): string { + const message = err instanceof Error ? err.message : String(err); + return message.slice(0, 500); +} + +async function setDatasetPopulateStatus( + datasetId: string, + status: DatasetPopulateStatus, + lastStatusError?: string, +): Promise { + await convex.mutation(internal.datasets.setStatusInternal, { + id: datasetId, + status, + lastStatusError, + }); +} + +async function beginDatasetPopulate( + datasetId: string, + ownerId: string, +): Promise { + const claim = await convex.mutation(internal.datasets.beginPopulateInternal, { + id: datasetId, + ownerId, + }); + + return claim.outcome; +} + +async function sendDatasetReadyNotification({ + logger, + clerk, + userId, + datasetId, + datasetName, + rowCount, +}: { + logger: FastifyBaseLogger; + clerk: ClerkClient; + userId: string; + datasetId: string; + datasetName: string; + rowCount: number; +}): Promise { + const baseProps = { + datasetId, + datasetName, + rowCount, + workflowType: "populate" as const, + }; + + try { + const email = await getUserEmail(clerk, userId); + if (!email) { + logger.warn( + { userId }, + "No primary email on Clerk record; skipping dataset-ready notification", + ); + capture({ + distinctId: userId, + event: EVENTS.DATASET_READY_EMAIL_FAILED, + properties: { ...baseProps, error_kind: "no_recipient" }, + }); + return; + } + + try { + await sendTransactionalEmail( + email, + datasetReadyTemplate({ + datasetName, + rowCount, + datasetUrl: `${env.CLIENT_ORIGIN}/dataset/${datasetId}`, + }), + ); + capture({ + distinctId: userId, + event: EVENTS.DATASET_READY_EMAIL_SENT, + properties: { + ...baseProps, + recipientDomain: emailDomain(email), + }, + }); + } catch (sendErr) { + logger.error( + { err: sendErr, datasetId }, + "Failed to send dataset-ready email; populate already succeeded", + ); + capture({ + distinctId: userId, + event: EVENTS.DATASET_READY_EMAIL_FAILED, + properties: { ...baseProps, error_kind: "send_failed" }, + }); + } + } catch (notifyErr) { + logger.error( + { err: notifyErr, datasetId }, + "Notify block crashed unexpectedly; populate already succeeded", + ); + } +} + +async function runPopulateWorkflowInBackground({ + input, + run, + authorizedUserId, + logger, + clerk, +}: { + input: DatasetContext; + run: PopulateWorkflowRun; + authorizedUserId: string; + logger: FastifyBaseLogger; + clerk: ClerkClient; +}): Promise { + const datasetId = input.datasetId; + + try { + const result = await run.start({ + inputData: { + ...input, + authContext: { + authorizedUserId, + workflowRunId: run.runId, + }, + }, + }); + + logger.info( + { + workflowStatus: result.status, + steps: JSON.stringify(result.steps).slice(0, 2000), + }, + "Populate workflow completed", + ); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists post-workflow; skipping status transition and notification", + ); + return; + } + + const rowCount = await convex.query( + internal.datasetRows.countByDataset, + { datasetId }, + ); + if (rowCount === 0) { + throw new Error("Populate workflow completed with 0 rows"); + } + + await setDatasetPopulateStatus(datasetId, "live"); + await sendDatasetReadyNotification({ + logger, + clerk, + userId: authorizedUserId, + datasetId, + datasetName: currentDataset.name, + rowCount, + }); + } catch (err) { + const lastStatusError = statusErrorMessage(err); + logger.error( + { err, datasetId }, + "Populate background workflow failed", + ); + + try { + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists after failed populate; skipping failed status transition", + ); + return; + } + + await setDatasetPopulateStatus(datasetId, "failed", lastStatusError); + } catch (statusErr) { + logger.error( + { err: statusErr, datasetId }, + "Failed to transition dataset status to 'failed'", + ); + } + } +} + const fastify = Fastify({ logger: true }); await fastify.register(fastifyCors, { @@ -83,165 +289,42 @@ await fastify.register(async (instance) => { return reply.code(401).send({ error: "Authentication required" }); } - // Ownership check uses the INTERNAL (admin-callable, no-authz) getter. - // We can't use `api.datasets.get` here because that runs through - // `loadReadableDataset`, which requires either a Clerk-identified - // caller OR visibility="public". The backend's ConvexHttpClient is - // admin-authed but does NOT impersonate a user, so private datasets - // (the typical case) get rejected as `anonymous_private`. - // - // The /populate route enforces ownership against `req.auth.userId` - // (from the verified Clerk JWT) immediately below — that's the - // authoritative check, not Convex's user-identity authz. - const dataset = await convex.query(internal.datasets.getInternal, { - id: parsed.data.datasetId, - }); - if (!dataset) { + const populateOutcome = await beginDatasetPopulate( + parsed.data.datasetId, + auth.userId, + ); + + if (populateOutcome === "not_found") { return reply.code(404).send({ error: "Dataset not found" }); } - if (dataset.ownerId !== auth.userId) { + if (populateOutcome === "forbidden") { return reply.code(403).send({ error: "Not authorized to populate this dataset" }); } - - const run = await populateWorkflow.createRun(); - // Server-set auth/run context — threaded through every step so the - // dataset-tools layer can attribute capability-violation logs and - // PostHog events to a specific user + workflow run. NOT validated - // against the client request body (see populateInputSchema in - // mastra/workflows/populate.ts). - const result = await run.start({ - inputData: { - ...parsed.data, - authContext: { - authorizedUserId: auth.userId, - workflowRunId: run.runId, - }, - }, - }); - - req.log.info({ workflowStatus: result.status, steps: JSON.stringify(result.steps).slice(0, 2000) }, "Populate workflow completed"); - - if (result.status !== "success") { - throw new Error(`Workflow ended with status: ${result.status}`); + if (populateOutcome === "already_building") { + return reply.code(409).send({ error: "Dataset is already being populated" }); + } + if (populateOutcome !== "started") { + throw new Error(`Unexpected populate claim outcome: ${populateOutcome}`); } - // Fire the "dataset ready" email. Best-effort: any failure here - // is logged + tracked but does NOT fail the API response. The - // dataset is ready regardless of whether we managed to notify. - // - // Order of guards (all must pass to send): - // 1. Dataset still exists (delete-race protection) - // 2. Dataset has at least one row (no "ready" email for empty datasets) - // 3. User has a primary email on their Clerk record - // 4. Resend accepts the send - // - // The dataset doc is re-read from Convex so we use the CURRENT name - // in the email subject + body (rename-race protection) — the value - // in `parsed.data.datasetName` came from the request body and could - // be stale by the time the workflow finishes. - const notifyUserId = req.auth!.userId; - const notifyDatasetId = parsed.data.datasetId; + let run: Awaited>; try { - const currentDataset = await convex.query( - internal.datasets.getInternal, - { id: notifyDatasetId }, - ); - if (!currentDataset) { - req.log.info( - { datasetId: notifyDatasetId }, - "Dataset no longer exists post-workflow; skipping notification", - ); - } else { - const rowCount = await convex.query( - internal.datasetRows.countByDataset, - { datasetId: notifyDatasetId }, - ); - if (rowCount === 0) { - req.log.info( - { datasetId: notifyDatasetId }, - "Populate workflow succeeded but produced 0 rows; skipping notification", - ); - } else { - // ── Lifecycle transition ───────────────────────────────── - // Dataset has rows + is usable → flip status from "building" - // to "live". Patch is idempotent; safe to call when status - // is already "live" (e.g. a manual repopulate of an existing - // live dataset). Done BEFORE the email so a Resend hiccup - // can't leave a usable dataset stuck in "building". - try { - await convex.mutation(internal.datasets.setStatusInternal, { - id: notifyDatasetId, - status: "live", - }); - } catch (statusErr) { - // Status update failure is logged but doesn't block the - // rest of the notify flow — the dataset is still usable, - // the badge just stays "building" until the next populate. - req.log.error( - { err: statusErr, datasetId: notifyDatasetId }, - "Failed to transition dataset status to 'live'; populate already succeeded", - ); - } - - const email = await getUserEmail(req.server.clerk, notifyUserId); - const baseProps = { - datasetId: notifyDatasetId, - datasetName: currentDataset.name, - rowCount, - workflowType: "populate" as const, - }; - if (!email) { - req.log.warn( - { userId: notifyUserId }, - "No primary email on Clerk record; skipping dataset-ready notification", - ); - capture({ - distinctId: notifyUserId, - event: EVENTS.DATASET_READY_EMAIL_FAILED, - properties: { ...baseProps, error_kind: "no_recipient" }, - }); - } else { - try { - await sendTransactionalEmail( - email, - datasetReadyTemplate({ - datasetName: currentDataset.name, - rowCount, - datasetUrl: `${env.CLIENT_ORIGIN}/dataset/${notifyDatasetId}`, - }), - ); - capture({ - distinctId: notifyUserId, - event: EVENTS.DATASET_READY_EMAIL_SENT, - properties: { - ...baseProps, - recipientDomain: emailDomain(email), - }, - }); - } catch (sendErr) { - req.log.error( - { err: sendErr, datasetId: notifyDatasetId }, - "Failed to send dataset-ready email; populate already succeeded", - ); - capture({ - distinctId: notifyUserId, - event: EVENTS.DATASET_READY_EMAIL_FAILED, - properties: { ...baseProps, error_kind: "send_failed" }, - }); - } - } - } - } - } catch (notifyErr) { - // Catch-all for unexpected errors in the notify flow itself - // (e.g. Convex query failure). Already logged; never re-thrown. - req.log.error( - { err: notifyErr, datasetId: notifyDatasetId }, - "Notify block crashed unexpectedly; populate already succeeded", - ); + run = await populateWorkflow.createRun(); + } catch (runErr) { + req.log.error(runErr, "Failed to create workflow run; releasing dataset claim"); + await setDatasetPopulateStatus(parsed.data.datasetId, "failed", statusErrorMessage(runErr)); + return reply.code(502).send({ error: "Failed to populate dataset. Please try again." }); } - return { success: true, result: result.result }; + void runPopulateWorkflowInBackground({ + input: parsed.data, + run, + authorizedUserId: auth.userId, + logger: req.log, + clerk: req.server.clerk, + }); + + return reply.code(202).send({ success: true, runId: run.runId }); } catch (err) { const msg = err instanceof Error ? err.message : String(err); if (msg.includes("validator") || msg.includes("Invalid")) { diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index ae518af..ee64d5c 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -125,7 +125,7 @@ const agentStep = createStep({ } catch (err) { const msg = err instanceof Error ? err.message : String(err); console.error(`[populate-agent] agent.generate failed: ${msg}`); - return { text: `Agent failed: ${msg}` }; + throw err; } }, }); diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index b5d9048..56a473a 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -93,7 +93,7 @@ export default function DatasetPage() { } async function handleUpdate() { - if (!dataset || updating) return; + if (!dataset || updating || dataset.status === "building") return; setUpdating(true); try { const token = await getToken(); @@ -118,22 +118,23 @@ export default function DatasetPage() { } async function handlePopulate() { - if (!dataset || populating) return; + if (!dataset || populating || dataset.status === "building") return; setPopulating(true); try { const token = await getToken(); if (!token) throw new Error("Not authenticated"); - await populate( + const startedRun = await populate( dataset._id, dataset.name, dataset.description, dataset.columns, token, ); - track(EVENTS.DATASET_POPULATED, { + track(EVENTS.DATASET_POPULATE_STARTED, { datasetId: dataset._id, column_count: dataset.columns.length, + runId: startedRun.runId, }); } catch (err) { console.error("[populate] failed", err); @@ -159,6 +160,21 @@ export default function DatasetPage() { // the "Dataset not found" UI. const exportDisabled = exporting !== null || rows.length === 0; + const isDatasetBuilding = dataset.status === "building"; + const updateDisabled = updating || isDatasetBuilding; + const populateDisabled = populating || isDatasetBuilding; + const updateLabel = isDatasetBuilding + ? "Building…" + : updating + ? "Updating…" + : "Update Dataset"; + const populateLabel = isDatasetBuilding + ? "Building…" + : populating + ? "Starting…" + : dataset.status === "failed" + ? "Retry Populate" + : "Clear & Populate"; const csvLabel = exporting === "csv" ? "Exporting…" @@ -216,17 +232,17 @@ export default function DatasetPage() {
@@ -234,9 +250,16 @@ export default function DatasetPage() {
-

- {dataset.description} -

+
+

+ {dataset.description} +

+ {dataset.status === "failed" && dataset.lastStatusError && ( +

+ Last populate failed: {dataset.lastStatusError} +

+ )} +
{selectedCount > 0 && ( <> diff --git a/frontend/app/dataset/new/page.tsx b/frontend/app/dataset/new/page.tsx index 1333798..2112be2 100644 --- a/frontend/app/dataset/new/page.tsx +++ b/frontend/app/dataset/new/page.tsx @@ -218,7 +218,7 @@ export default function NewDatasetPage() { Create a new dataset

- Describe what data you want to collect. Our agents will figure out the schema and start populating it. + Describe what data you want to collect. Our agents will figure out the schema; you can start populating it from the dataset page.

diff --git a/frontend/components/dataset/StatusBadge.tsx b/frontend/components/dataset/StatusBadge.tsx index e2ca159..5fd7cf2 100644 --- a/frontend/components/dataset/StatusBadge.tsx +++ b/frontend/components/dataset/StatusBadge.tsx @@ -1,15 +1,17 @@ -export type DatasetStatus = "live" | "paused" | "building"; +export type DatasetStatus = "live" | "paused" | "building" | "failed"; const STYLES: Record = { live: "border-emerald-600/20 bg-emerald-600/5 text-emerald-700 dark:text-emerald-400", paused: "border-border bg-background text-muted", building: "border-amber-600/20 bg-amber-600/5 text-amber-700 dark:text-amber-400", + failed: "border-red-600/20 bg-red-600/5 text-red-700 dark:text-red-400", }; const LABELS: Record = { live: "Live", paused: "Paused", building: "Building...", + failed: "Failed", }; export function StatusBadge({ status }: { status: DatasetStatus }) { @@ -23,6 +25,9 @@ export function StatusBadge({ status }: { status: DatasetStatus }) { {status === "building" && ( )} + {status === "failed" && ( + + )} {LABELS[status]} ); diff --git a/frontend/components/table/types.ts b/frontend/components/table/types.ts index 903ea6f..54b2b01 100644 --- a/frontend/components/table/types.ts +++ b/frontend/components/table/types.ts @@ -10,7 +10,8 @@ export interface DatasetMeta { _id: string; name: string; description: string; - status: "live" | "paused" | "building"; + status: "live" | "paused" | "building" | "failed"; + lastStatusError?: string; cadence: string; columns: DatasetColumn[]; } diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 281420b..017f90f 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -114,6 +114,38 @@ export const getInternal = internalQuery({ }, }); +/** + * Atomically claims a user-requested populate run for a dataset. + * + * This is the concurrency gate for backend /populate calls. The workflow + * starts by clearing existing rows, so duplicate background runs for the same + * dataset must be rejected before either one reaches the row-clearing step. + */ +export const beginPopulateInternal = internalMutation({ + args: { + id: v.id("datasets"), + ownerId: v.string(), + }, + handler: async (ctx, args) => { + const dataset = await ctx.db.get(args.id); + if (!dataset) { + return { outcome: "not_found" as const }; + } + if (dataset.ownerId !== args.ownerId) { + return { outcome: "forbidden" as const }; + } + if (dataset.status === "building") { + return { outcome: "already_building" as const }; + } + + await ctx.db.patch(dataset._id, { + status: "building", + lastStatusError: undefined, + }); + return { outcome: "started" as const }; + }, +}); + /** * Admin-only status transition. Used by the backend orchestration layer * to move a dataset between lifecycle states after a workflow completes. @@ -123,16 +155,10 @@ export const getInternal = internalQuery({ * run). This mutation is purely a controlled patch on the `status` field. * * Lifecycle today: - * - "building" : set by `datasets.create`, before any rows exist - * - "live" : set by /populate handler after successful population - * - "paused" : reserved for the future user-facing Pause/Resume UI - * - * Future statuses (extend the schema's `status` union when they land — - * the validator below auto-picks up new values since it points at the - * same union): - * - "refreshing" : scheduled refresh in progress (Inngest / cron) - * - "failed" : last populate / refresh failed - * - "quota_exceeded" : last attempt blocked by quota + * - "paused" : default for newly created datasets before first run + * - "building" : set by beginPopulateInternal after ownership passes + * - "live" : set by background populate after rows exist + * - "failed" : set by background populate on workflow failure * * NOTE: the public `datasets.updateStatus` mutation still exists for * user-initiated transitions (Pause/Resume) — that one goes through @@ -145,10 +171,15 @@ export const setStatusInternal = internalMutation({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("failed"), ), + lastStatusError: v.optional(v.string()), }, handler: async (ctx, args) => { - await ctx.db.patch(args.id, { status: args.status }); + await ctx.db.patch(args.id, { + status: args.status, + lastStatusError: args.status === "failed" ? args.lastStatusError : undefined, + }); }, }); @@ -170,7 +201,7 @@ export const create = mutation({ return await ctx.db.insert("datasets", { ...args, ownerId: identity.subject, - status: "building", + status: "paused", visibility: "private", rowCount: 0, }); diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index c1346a3..3f29482 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -9,8 +9,10 @@ export default defineSchema({ status: v.union( v.literal("live"), v.literal("paused"), - v.literal("building") + v.literal("building"), + v.literal("failed") ), + lastStatusError: v.optional(v.string()), cadence: v.string(), // Optional for backward compat with rows seeded before this field existed. // Treat undefined as "private" in authorization helpers. diff --git a/frontend/lib/analytics.ts b/frontend/lib/analytics.ts index 6b620f7..440587b 100644 --- a/frontend/lib/analytics.ts +++ b/frontend/lib/analytics.ts @@ -32,7 +32,7 @@ export const EVENTS = { // Dataset interaction DATASET_OPENED: "dataset_opened", DATASET_EXPORTED: "dataset_exported", - DATASET_POPULATED: "dataset_populated", + DATASET_POPULATE_STARTED: "dataset_populate_started", // Creation flow DATASET_CREATION_STARTED: "dataset_creation_started", diff --git a/frontend/lib/backend.ts b/frontend/lib/backend.ts index e5018f1..a55c7ba 100644 --- a/frontend/lib/backend.ts +++ b/frontend/lib/backend.ts @@ -23,7 +23,12 @@ export interface PopulateColumn { description?: string; } -export interface PopulateResult { +export interface PopulateStartResult { + success: boolean; + runId: string; +} + +export interface WorkflowResult { success: boolean; result: unknown; } @@ -59,7 +64,7 @@ export async function populate( description: string, columns: PopulateColumn[], token: string, -): Promise { +): Promise { const res = await fetch(`${BACKEND_URL}/populate`, { method: "POST", headers: { @@ -84,7 +89,7 @@ export async function update( description: string, columns: PopulateColumn[], token: string, -): Promise { +): Promise { const res = await fetch(`${BACKEND_URL}/update`, { method: "POST", headers: {