Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@electric-sql/client": "^1.3.1",
"@electric-sql/client": "1.4.0",
"@linear/sdk": "^68.1.0",
"@sentry/nextjs": "^10.32.1",
"@superset/auth": "workspace:*",
Expand All @@ -23,6 +23,7 @@
"@upstash/qstash": "^2.8.4",
"@vercel/blob": "^2.0.0",
"better-auth": "^1.4.9",
"date-fns": "^4.1.0",
"drizzle-orm": "0.45.1",
"import-in-the-middle": "2.0.1",
"jose": "^6.1.3",
Expand Down
27 changes: 13 additions & 14 deletions apps/api/src/app/api/electric/[...path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,18 @@ export async function GET(request: Request): Promise<Response> {
originUrl.searchParams.set(`params[${index + 1}]`, String(value));
});

const response = await fetch(originUrl.toString());

const headers = new Headers();
response.headers.forEach((value, key) => {
const lower = key.toLowerCase();
if (lower !== "content-encoding" && lower !== "content-length") {
headers.set(key, value);
}
});
let response = await fetch(originUrl.toString());

if (response.headers.get("content-encoding")) {
const headers = new Headers(response.headers);
headers.delete("content-encoding");
headers.delete("content-length");
response = new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
});
}

return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
});
return response;
}
5 changes: 5 additions & 0 deletions apps/api/src/app/api/electric/[...path]/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
members,
organizations,
repositories,
taskStatuses,
tasks,
users,
} from "@superset/db/schema";
Expand All @@ -12,6 +13,7 @@ import { QueryBuilder } from "drizzle-orm/pg-core";

export type AllowedTable =
| "tasks"
| "task_statuses"
| "repositories"
| "auth.members"
| "auth.organizations"
Expand Down Expand Up @@ -42,6 +44,9 @@ export async function buildWhereClause(
case "tasks":
return build(tasks, tasks.organizationId, organizationId);

case "task_statuses":
return build(taskStatuses, taskStatuses.organizationId, organizationId);

case "repositories":
return build(repositories, repositories.organizationId, organizationId);

Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/api/integrations/linear/callback/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ export async function GET(request: Request) {
},
});

const qstashBaseUrl = env.NEXT_PUBLIC_API_URL;
try {
await qstash.publishJSON({
url: `${qstashBaseUrl}/api/integrations/linear/jobs/initial-sync`,
url: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/jobs/initial-sync`,
body: { organizationId, creatorUserId: userId },
retries: 3,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { LinearClient } from "@linear/sdk";
import { buildConflictUpdateColumns, db } from "@superset/db";
import { integrationConnections, tasks, users } from "@superset/db/schema";
import {
integrationConnections,
taskStatuses,
tasks,
users,
} from "@superset/db/schema";
import { Receiver } from "@upstash/qstash";
import { and, eq, inArray } from "drizzle-orm";
import chunk from "lodash.chunk";
import { z } from "zod";
import { env } from "@/env";
import { syncWorkflowStates } from "./syncWorkflowStates";
import { fetchAllIssues, mapIssueToTask } from "./utils";

const BATCH_SIZE = 100;
Expand All @@ -28,11 +34,10 @@ export async function POST(request: Request) {
return Response.json({ error: "Missing signature" }, { status: 401 });
}

const qstashBaseUrl = env.NEXT_PUBLIC_API_URL;
const isValid = await receiver.verify({
body,
signature,
url: `${qstashBaseUrl}/api/integrations/linear/jobs/initial-sync`,
url: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/jobs/initial-sync`,
});

if (!isValid) {
Expand Down Expand Up @@ -68,6 +73,21 @@ async function performInitialSync(
organizationId: string,
creatorUserId: string,
) {
await syncWorkflowStates({ client, organizationId });

const statusByExternalId = new Map<string, string>();
const statuses = await db.query.taskStatuses.findMany({
where: and(
eq(taskStatuses.organizationId, organizationId),
eq(taskStatuses.externalProvider, "linear"),
),
});
for (const status of statuses) {
if (status.externalId) {
statusByExternalId.set(status.externalId, status.id);
}
}

const issues = await fetchAllIssues(client);

if (issues.length === 0) {
Expand All @@ -90,7 +110,13 @@ async function performInitialSync(
const userByEmail = new Map(matchedUsers.map((u) => [u.email, u.id]));

const taskValues = issues.map((issue) =>
mapIssueToTask(issue, organizationId, creatorUserId, userByEmail),
mapIssueToTask(
issue,
organizationId,
creatorUserId,
userByEmail,
statusByExternalId,
),
);

const batches = chunk(taskValues, BATCH_SIZE);
Expand All @@ -100,15 +126,17 @@ async function performInitialSync(
.insert(tasks)
.values(batch)
.onConflictDoUpdate({
target: [tasks.externalProvider, tasks.externalId],
target: [
tasks.organizationId,
tasks.externalProvider,
tasks.externalId,
],
set: {
...buildConflictUpdateColumns(tasks, [
"slug",
"title",
"description",
"status",
"statusColor",
"statusType",
"statusId",
"priority",
"assigneeId",
"estimate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { LinearClient } from "@linear/sdk";
import { buildConflictUpdateColumns } from "@superset/db";
import { db } from "@superset/db/client";
import { taskStatuses } from "@superset/db/schema";
import { calculateProgressForStates } from "./utils";

export async function syncWorkflowStates({
client,
organizationId,
}: {
client: LinearClient;
organizationId: string;
}): Promise<void> {
const teams = await client.teams();

for (const team of teams.nodes) {
const states = await team.states();

const statesByType = new Map<string, typeof states.nodes>();
for (const state of states.nodes) {
if (!statesByType.has(state.type)) {
statesByType.set(state.type, []);
}
statesByType.get(state.type)?.push(state);
}

const startedStates = statesByType.get("started") || [];
const progressMap = calculateProgressForStates(
startedStates.map((s) => ({ name: s.name, position: s.position })),
);

const values = states.nodes.map((state) => ({
organizationId,
name: state.name,
color: state.color,
type: state.type,
position: state.position,
progressPercent:
state.type === "started" ? (progressMap.get(state.name) ?? null) : null,
externalProvider: "linear" as const,
externalId: state.id,
}));

if (values.length > 0) {
await db
.insert(taskStatuses)
.values(values)
.onConflictDoUpdate({
target: [
taskStatuses.organizationId,
taskStatuses.externalProvider,
taskStatuses.externalId,
],
set: {
...buildConflictUpdateColumns(taskStatuses, [
"name",
"color",
"type",
"position",
"progressPercent",
]),
updatedAt: new Date(),
},
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { LinearClient } from "@linear/sdk";
import { mapPriorityFromLinear } from "@superset/trpc/integrations/linear";
import { subMonths } from "date-fns";

export interface LinearIssue {
id: string;
Expand All @@ -9,11 +10,18 @@ export interface LinearIssue {
priority: number;
estimate: number | null;
dueDate: string | null;
createdAt: string;
url: string;
startedAt: string | null;
completedAt: string | null;
assignee: { id: string; email: string } | null;
state: { id: string; name: string; color: string; type: string };
state: {
id: string;
name: string;
color: string;
type: string;
position: number;
};
labels: { nodes: Array<{ id: string; name: string }> };
}

Expand All @@ -24,6 +32,51 @@ interface IssuesQueryResponse {
};
}

interface WorkflowStateWithPosition {
name: string;
position: number;
}

/**
* Calculates progress percentage for "started" type workflow states
* using Linear's rendering formula:
* - 1 state: 50%
* - 2 states: [50%, 75%]
* - 3+ states: evenly spaced using (index + 1) / (total + 1)
*/
export function calculateProgressForStates(
states: WorkflowStateWithPosition[],
): Map<string, number> {
const progressMap = new Map<string, number>();

if (states.length === 0) {
return progressMap;
}

const sorted = [...states].sort((a, b) => a.position - b.position);

const total = sorted.length;

for (let i = 0; i < total; i++) {
const state = sorted[i];
if (!state) continue;

let progress: number;

if (total === 1) {
progress = 50;
} else if (total === 2) {
progress = i === 0 ? 50 : 75;
} else {
progress = ((i + 1) / (total + 1)) * 100;
}

progressMap.set(state.name, Math.round(progress));
}

return progressMap;
}

const ISSUES_QUERY = `
query Issues($first: Int!, $after: String, $filter: IssueFilter) {
issues(first: $first, after: $after, filter: $filter) {
Expand All @@ -39,6 +92,7 @@ const ISSUES_QUERY = `
priority
estimate
dueDate
createdAt
url
startedAt
completedAt
Expand All @@ -51,6 +105,7 @@ const ISSUES_QUERY = `
name
color
type
position
}
labels {
nodes {
Expand All @@ -68,6 +123,7 @@ export async function fetchAllIssues(
): Promise<LinearIssue[]> {
const allIssues: LinearIssue[] = [];
let cursor: string | undefined;
const threeMonthsAgo = subMonths(new Date(), 3);

do {
const response = await client.client.request<
Expand All @@ -76,7 +132,7 @@ export async function fetchAllIssues(
>(ISSUES_QUERY, {
first: 100,
after: cursor,
filter: { state: { type: { nin: ["canceled", "completed"] } } },
filter: { updatedAt: { gte: threeMonthsAgo.toISOString() } },
});
allIssues.push(...response.issues.nodes);
cursor =
Expand All @@ -93,27 +149,32 @@ export function mapIssueToTask(
organizationId: string,
creatorId: string,
userByEmail: Map<string, string>,
statusByExternalId: Map<string, string>,
) {
const assigneeId = issue.assignee?.email
? (userByEmail.get(issue.assignee.email) ?? null)
: null;

const statusId = statusByExternalId.get(issue.state.id);
if (!statusId) {
throw new Error(`Status not found for state ${issue.state.id}`);
}
Comment on lines +158 to +161
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how mapIssueToTask is called and error handling context
cd apps/api
rg -n -B5 -A10 'mapIssueToTask' --type ts

Repository: superset-sh/superset

Length of output: 4174


🏁 Script executed:

#!/bin/bash
# Read the full route.ts file to understand the complete sync flow
cd apps/api
cat -n src/app/api/integrations/linear/jobs/initial-sync/route.ts

Repository: superset-sh/superset

Length of output: 4848


🏁 Script executed:

#!/bin/bash
# Read syncWorkflowStates implementation
cd apps/api
cat -n src/app/api/integrations/linear/jobs/initial-sync/syncWorkflowStates.ts

Repository: superset-sh/superset

Length of output: 2251


Replace Error with null-return pattern for graceful handling of missing workflow states.

When an issue references a workflow state that wasn't synced (due to deletion, archival, or race conditions), throwing an Error aborts the entire sync batch. syncWorkflowStates is already called before fetchAllIssues, but this doesn't guarantee all states referenced by existing issues will be present in the database.

Return null from mapIssueToTask and filter nulls at the call site to allow partial sync success, or log the missing state and skip that issue:

const taskValues = issues
  .map((issue) =>
    mapIssueToTask(
      issue,
      organizationId,
      creatorUserId,
      userByEmail,
      statusByExternalId,
    ),
  )
  .filter((task) => task !== null);
🤖 Prompt for AI Agents
In @apps/api/src/app/api/integrations/linear/jobs/initial-sync/utils.ts around
lines 158 - 161, The current mapIssueToTask throws an Error when
statusByExternalId.get(issue.state.id) is missing; change mapIssueToTask to
return null instead of throwing, and log a warning including identifying info
(e.g., issue.id or issue.key and issue.state.id) so the missing workflow state
is visible; update callers that map issues (the code that calls mapIssueToTask
in fetchAllIssues/initial sync) to filter out nulls (e.g., .map(...).filter(task
=> task !== null)) so missing-state issues are skipped and the batch continues.


return {
organizationId,
creatorId,
slug: issue.identifier,
title: issue.title,
description: issue.description,
status: issue.state.name,
statusColor: issue.state.color,
statusType: issue.state.type,
statusId,
priority: mapPriorityFromLinear(issue.priority),
assigneeId,
estimate: issue.estimate,
dueDate: issue.dueDate ? new Date(issue.dueDate) : null,
labels: issue.labels.nodes.map((l) => l.name),
startedAt: issue.startedAt ? new Date(issue.startedAt) : null,
completedAt: issue.completedAt ? new Date(issue.completedAt) : null,
createdAt: new Date(issue.createdAt),
externalProvider: "linear" as const,
externalId: issue.id,
externalKey: issue.identifier,
Expand Down
Loading