Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ CLERK_SECRET_KEY=sk_test_...
# Generate at https://openrouter.ai/settings/keys
OPENROUTER_API_KEY=sk-or-...

# TinyFish — required by populate agent web search/fetch.
# Generate at https://agent.tinyfish.ai/api-keys
TINYFISH_API_KEY=

# Generate once after the first `make dev` with:
# docker compose exec convex ./generate_admin_key.sh
# Used by the backend container to call internal Convex functions.
CONVEX_SELF_HOSTED_ADMIN_KEY=

# Durable store for self-healing populate recipe manifests.
# Docker dev overrides this to /app/.bigset/populate-recipes on a named volume.
POPULATE_RECIPE_STORE_DIR=.bigset/populate-recipes

# PostHog (optional — leave blank to disable analytics entirely in local dev).
# Get from https://us.posthog.com/project/settings/general.
NEXT_PUBLIC_POSTHOG_KEY=
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
node_modules/
backend/node_modules
.env
.env.local
Project_BigSet_brief.md
Expand All @@ -22,6 +23,7 @@ tmp/
temp/

.mastra
.bigset/

# Local tarballs
*.tgz
Expand Down
1 change: 1 addition & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CLIENT_ORIGIN=http://localhost:3500
CONVEX_URL=http://localhost:3210
PORT=3501
POPULATE_RECIPE_STORE_DIR=.bigset/populate-recipes

# Required once the backend starts writing rows via internal Convex mutations.
# Generate with: docker compose exec convex ./generate_admin_key.sh
Expand Down
6 changes: 6 additions & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ export const env = {
CLERK_PUBLISHABLE_KEY: process.env.CLERK_PUBLISHABLE_KEY,

OPENROUTER_API_KEY: process.env.OPENROUTER_API_KEY,
TINYFISH_API_KEY: process.env.TINYFISH_API_KEY,

// Durable recipe manifests for the self-healing populate layer. In Docker
// dev this points at a named volume; locally it defaults under the repo.
POPULATE_RECIPE_STORE_DIR:
process.env.POPULATE_RECIPE_STORE_DIR || ".bigset/populate-recipes",
};
61 changes: 53 additions & 8 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import { env } from "./env.js";
import clerkAuthPlugin, { requireAuth } from "./clerk-auth.js";
import { inferSchema } from "./pipeline/schema-inference.js";
import { datasetContextSchema } from "./pipeline/populate.js";
import { populateWorkflow } from "./mastra/workflows/populate.js";
import { ConvexPopulateDatasetRowWriter } from "./pipeline/populate-convex-writer.js";
import { populateRuntimePrerequisiteError } from "./pipeline/populate-runtime-prerequisites.js";
import { runSelfHealingPopulate } from "./pipeline/populate-self-healing-runner.js";
import { convex, api } from "./convex.js";

const fastify = Fastify({ logger: true });
const populateRowWriter = new ConvexPopulateDatasetRowWriter();

await fastify.register(fastifyCors, {
origin: env.CLIENT_ORIGIN,
Expand Down Expand Up @@ -72,17 +75,42 @@ await fastify.register(async (instance) => {
if (dataset.ownerId !== authenticatedUserId) {
return reply.code(403).send({ error: "Not authorized to populate this dataset" });
}
const prerequisiteError = populateRuntimePrerequisiteError({
convexAdminKey: env.CONVEX_ADMIN_KEY,
openRouterApiKey: env.OPENROUTER_API_KEY,
tinyFishApiKey: env.TINYFISH_API_KEY,
});
if (prerequisiteError) {
return reply.code(500).send({
error: prerequisiteError,
});
}

const run = await populateWorkflow.createRun();
const result = await run.start({ inputData: parsed.data });

req.log.info({ workflowStatus: result.status, steps: JSON.stringify(result.steps).slice(0, 2000) }, "Populate workflow completed");
const result = await runSelfHealingPopulate({
context: parsed.data,
recipeStoreDirectory: env.POPULATE_RECIPE_STORE_DIR,
rowWriter: populateRowWriter,
shouldCommitRows: true,
});

if (result.status !== "success") {
throw new Error(`Workflow ended with status: ${result.status}`);
req.log.info({
action: result.action,
datasetId: result.datasetId,
committedRows: result.committedRows?.insertedRowCount ?? 0,
validationIssues: result.validationIssues.slice(0, 5),
}, "Self-healing populate completed");

if (!result.success) {
return reply.code(422).send({
error: "Self-healing populate failed validation.",
result: responseSafePopulateResult(result),
});
}

return { success: true, result: result.result };
return {
success: true,
result: responseSafePopulateResult(result),
};
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("validator") || msg.includes("Invalid")) {
Expand All @@ -100,3 +128,20 @@ try {
fastify.log.error(err);
process.exit(1);
}

function responseSafePopulateResult(
result: Awaited<ReturnType<typeof runSelfHealingPopulate>>
) {
const diagnosticRun = result.selectedRun ?? result.diagnosticRun;
return {
action: result.action,
datasetId: result.datasetId,
success: result.success,
committedRows: result.committedRows,
rejectionReasons: result.rejectionReasons,
validationIssues: result.validationIssues,
productionValidation: diagnosticRun?.productionValidation,
metrics: diagnosticRun?.metrics,
rowCount: diagnosticRun?.rows.length ?? 0,
};
}
71 changes: 71 additions & 0 deletions backend/src/pipeline/populate-convex-writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { env } from "../env.js";
import { convex, internal } from "../convex.js";
import type {
PopulateDatasetRowWriter,
PopulateDatasetWriteResult,
} from "./populate-self-healing-runner.js";

interface ConvexMutationClient {
mutation(functionReference: unknown, args: unknown): Promise<unknown>;
}

export class ConvexPopulateDatasetRowWriter implements PopulateDatasetRowWriter {
constructor(
private readonly input: {
convexClient?: ConvexMutationClient;
internalApi?: typeof internal;
} = {}
) {}

async replaceRows(input: Parameters<PopulateDatasetRowWriter["replaceRows"]>[0]):
Promise<PopulateDatasetWriteResult> {
if (!env.CONVEX_ADMIN_KEY) {
throw new Error(
"CONVEX_SELF_HOSTED_ADMIN_KEY is required to commit self-healed populate rows."
);
}

const convexClient = this.input.convexClient ?? convex;
const internalApi = this.input.internalApi ?? internal;
const replacement = await convexClient.mutation(
internalApi.datasetRows.replaceByDataset,
{
datasetId: input.datasetId,
rows: input.rows.map((row) => ({
data: row.cells,
sources: row.sourceUrls,
})),
}
);

return normalizeReplacementResult(replacement, input.rows.length);
}
}

function normalizeReplacementResult(
value: unknown,
fallbackInsertedRowCount: number
): PopulateDatasetWriteResult {
if (
typeof value === "object" &&
value !== null &&
"insertedRowCount" in value
) {
const replacement = value as {
clearedRowCount?: unknown;
insertedRowCount?: unknown;
};
return {
clearedRowCount: typeof replacement.clearedRowCount === "number"
? replacement.clearedRowCount
: undefined,
insertedRowCount: typeof replacement.insertedRowCount === "number"
? replacement.insertedRowCount
: fallbackInsertedRowCount,
};
}

return {
insertedRowCount: fallbackInsertedRowCount,
};
}
29 changes: 29 additions & 0 deletions backend/src/pipeline/populate-runtime-prerequisites.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export interface PopulateRuntimePrerequisites {
convexAdminKey?: string;
openRouterApiKey?: string;
tinyFishApiKey?: string;
}

export function missingPopulateRuntimePrerequisites(
input: PopulateRuntimePrerequisites
): string[] {
const requiredKeys: Array<[string, string | undefined]> = [
["CONVEX_SELF_HOSTED_ADMIN_KEY", input.convexAdminKey],
["OPENROUTER_API_KEY", input.openRouterApiKey],
["TINYFISH_API_KEY", input.tinyFishApiKey],
];

return requiredKeys
.filter(([, value]) => !value)
.map(([name]) => name);
}

export function populateRuntimePrerequisiteError(
input: PopulateRuntimePrerequisites
): string | undefined {
const missingNames = missingPopulateRuntimePrerequisites(input);
if (missingNames.length === 0) {
return undefined;
}
return `Backend is missing required populate runtime keys: ${missingNames.join(", ")}.`;
}
128 changes: 128 additions & 0 deletions backend/src/pipeline/populate-self-healing-runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { join } from "node:path";

import type { DatasetContext } from "./populate.js";
import {
DefaultPopulateRecipeAuthor,
FileSystemPopulateRecipeStore,
MastraPopulateRecipeRuntime,
SelfHealingPopulateRecipeService,
type PopulateRecipeAuthor,
type PopulateRecipeRunResult,
type PopulateRecipeRuntime,
type PopulateRecipeStore,
type SelfHealingPopulateTickResult,
} from "./populate-self-healing.js";

export interface PopulateDatasetRowWriter {
replaceRows(input: {
datasetId: string;
rows: PopulateRecipeRunResult["rows"];
}): Promise<PopulateDatasetWriteResult>;
}

export interface PopulateDatasetWriteResult {
clearedRowCount?: number;
insertedRowCount: number;
}

export interface RunSelfHealingPopulateInput {
context: DatasetContext;
store?: PopulateRecipeStore;
runtime?: PopulateRecipeRuntime;
author?: PopulateRecipeAuthor;
rowWriter?: PopulateDatasetRowWriter;
shouldCommitRows?: boolean;
recipeStoreDirectory?: string;
}

export interface RunSelfHealingPopulateResult {
success: boolean;
action: SelfHealingPopulateTickResult["action"];
datasetId: string;
selectedRun?: PopulateRecipeRunResult;
diagnosticRun?: PopulateRecipeRunResult;
committedRows?: PopulateDatasetWriteResult;
rejectionReasons: string[];
validationIssues: string[];
tick: SelfHealingPopulateTickResult;
}

export async function runSelfHealingPopulate(
input: RunSelfHealingPopulateInput
): Promise<RunSelfHealingPopulateResult> {
if (input.shouldCommitRows && !input.rowWriter) {
throw new Error("rowWriter is required when shouldCommitRows is true.");
}
const rowWriter = input.rowWriter;

const store = input.store ?? new FileSystemPopulateRecipeStore(
input.recipeStoreDirectory ?? defaultPopulateRecipeStoreDirectory()
);
const service = new SelfHealingPopulateRecipeService({
store,
runtime: input.runtime ?? new MastraPopulateRecipeRuntime(),
author: input.author ?? new DefaultPopulateRecipeAuthor(),
});
const tick = await service.tick({
datasetId: input.context.datasetId,
context: input.context,
});
const selectedRun = successfulRunForTick(tick);
const diagnosticRun = diagnosticRunForTick(tick);
let committedRows: PopulateDatasetWriteResult | undefined;

if (input.shouldCommitRows && selectedRun && rowWriter) {
committedRows = await rowWriter.replaceRows({
datasetId: input.context.datasetId,
rows: selectedRun.rows,
});
}

return {
success: Boolean(selectedRun),
action: tick.action,
datasetId: input.context.datasetId,
selectedRun,
diagnosticRun,
committedRows,
rejectionReasons: tick.rejectionReasons,
validationIssues: validationIssuesForSelfHealingTick(tick),
tick,
};
}

export function successfulRunForTick(
tick: SelfHealingPopulateTickResult
): PopulateRecipeRunResult | undefined {
if (tick.action === "active_rerun_succeeded") {
return tick.activeRun;
}
if (
tick.action === "generated_initial_recipe" ||
tick.action === "repaired_active_recipe"
) {
return tick.candidateRun;
}
return undefined;
}

export function diagnosticRunForTick(
tick: SelfHealingPopulateTickResult
): PopulateRecipeRunResult | undefined {
return successfulRunForTick(tick) ?? tick.candidateRun ?? tick.activeRun;
}

export function validationIssuesForSelfHealingTick(
tick: SelfHealingPopulateTickResult
): string[] {
const run = diagnosticRunForTick(tick);
return Array.from(new Set([
...(run?.validationIssues ?? []),
...(run?.productionValidation.criticalIssues ?? []),
...tick.rejectionReasons,
]));
}

function defaultPopulateRecipeStoreDirectory(): string {
return join(process.cwd(), ".bigset", "populate-recipes");
}
Loading