Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,16 @@ PROVIDER_TIMEOUT_MS=45000

# =============================================================================
# Worker tuning
#
# WORKER_CLAIM_BATCH_SIZE is reserved: the worker loop currently claims one
# job at a time. This key is accepted for forward compatibility with a future
# batch-claiming path and is otherwise inert.
# WORKER_RECLAIM_BATCH_SIZE caps how many expired leases are reclaimed per
# reclaim tick.
# =============================================================================
WORKER_ID=worker-1
WORKER_CLAIM_BATCH_SIZE=5
WORKER_RECLAIM_BATCH_SIZE=25
WORKER_LEASE_SECONDS=60
WORKER_POLL_MS=2000
WORKER_HEARTBEAT_MS=15000
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ Auth is implemented — see `docs/auth-plan.md` for the current auth status and
- Mutations require `Idempotency-Key` header
- Processing phases are monotonic (rank only moves forward)
- Inbound and outbound webhooks use timestamped HMAC headers
- The worker loop claims one job at a time despite `WORKER_CLAIM_BATCH_SIZE` existing in config
- The worker loop claims one job at a time; `WORKER_CLAIM_BATCH_SIZE` is reserved/dormant and accepted only for forward compatibility. The reclaim tick uses `WORKER_RECLAIM_BATCH_SIZE`.
- Soft delete with delayed `cleanup_artifacts` job (5 min)
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,12 @@ That starts:
5. worker queues `transcribe_video`, then `generate_ai` when eligible
6. frontend polls status and shows playback, transcript, edits, and enrichments

## Tooling


## Where to look next

- [docs/system.md](docs/system.md) — runtime topology, architecture decisions, and capacity guidance
- [docs/development.md](docs/development.md) — run, debug, incident response, and safe repo changes
- [docs/contracts.md](docs/contracts.md) — API/webhook contracts, versioning stance, and contract changelog
- [docs/status.md](docs/status.md) — current gaps and next improvement areas
- [docs/auth-plan.md](docs/auth-plan.md) — current auth status and constraints
- [docs/review-auth-system.md](docs/review-auth-system.md) — auth review notes and follow-up suggestions
- [docs/review-auth-system.md](docs/review-auth-system.md) — dated auth-system code-review snapshot
- [docs/review-2026-04-10.md](docs/review-2026-04-10.md) — dated full-repo review + changelog (most recent)
3 changes: 2 additions & 1 deletion apps/web-api/src/lib/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ export {
// ---------------------------------------------------------------------------

export function getS3ClientAndBucket() {
const publicEndpoint = process.env.S3_PUBLIC_ENDPOINT ?? "http://localhost:9000";
// Default aligned with packages/config (host-mapped MinIO API port in docker-compose).
const publicEndpoint = process.env.S3_PUBLIC_ENDPOINT ?? "http://localhost:8922";
const signingEndpoint = publicEndpoint;
const region = process.env.S3_REGION ?? "us-east-1";
const accessKeyId = process.env.S3_ACCESS_KEY;
Expand Down
40 changes: 18 additions & 22 deletions apps/web-api/src/routes/auth.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import Fastify from "fastify";
import cookie from "@fastify/cookie";
import type { Logger } from "@cap/logger";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

// Minimal shape the auth routes reach for; cast through `unknown` to avoid
// pulling in every Logger method in this test mock.
const mockServiceLogger = (): Logger =>
({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
trace: vi.fn(),
withContext: vi.fn(),
logger: {},
context: {},
logRequest: vi.fn()
}) as unknown as Logger;

const queryMock = vi.fn();
const verifyPasswordMock = vi.fn();
const signTokenMock = vi.fn(() => "signed-token");
Expand Down Expand Up @@ -41,17 +57,7 @@ describe("authRoutes login hardening", () => {
verifyPasswordMock.mockResolvedValue(false);

const app = Fastify();
app.decorate("serviceLogger", {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
trace: vi.fn(),
withContext: vi.fn(),
logger: {},
context: {},
logRequest: vi.fn()
} as any);
app.decorate("serviceLogger", mockServiceLogger());
await app.register(cookie);
const { authRoutes } = await import("./auth.js");
await app.register(authRoutes);
Expand Down Expand Up @@ -88,17 +94,7 @@ describe("authRoutes login hardening", () => {
.mockResolvedValueOnce(true);

const app = Fastify();
app.decorate("serviceLogger", {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
trace: vi.fn(),
withContext: vi.fn(),
logger: {},
context: {},
logRequest: vi.fn()
} as any);
app.decorate("serviceLogger", mockServiceLogger());
await app.register(cookie);
const { authRoutes } = await import("./auth.js");
await app.register(authRoutes);
Expand Down
9 changes: 5 additions & 4 deletions apps/web/e2e/layout.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ test.describe("Responsive layout", () => {
await expect(
page.getByRole("heading", { name: "Building APIs: Architecture and Best Practices", exact: true })
).toBeVisible();
await expect(page.getByRole("button", { name: "Notes", exact: true })).toBeVisible();
await expect(page.getByRole("button", { name: "Summary", exact: true })).toBeVisible();
await expect(page.getByRole("button", { name: "Transcript", exact: true })).toBeVisible();
// VideoRail tabs are now proper ARIA tabs (role="tab"), not plain buttons.
await expect(page.getByRole("tab", { name: "Notes", exact: true })).toBeVisible();
await expect(page.getByRole("tab", { name: "Summary", exact: true })).toBeVisible();
await expect(page.getByRole("tab", { name: "Transcript", exact: true })).toBeVisible();
await expect(page.getByText("Generated by Cap5 AI")).toBeVisible();
await expect(page.getByRole("heading", { name: "Chapters", exact: true })).toBeVisible();
});
Expand All @@ -32,7 +33,7 @@ test.describe("Responsive layout", () => {
await expect(
page.getByRole("heading", { name: "Building APIs: Architecture and Best Practices", exact: true })
).toBeVisible();
await expect(page.getByRole("button", { name: "Transcript", exact: true })).toBeVisible();
await expect(page.getByRole("tab", { name: "Transcript", exact: true })).toBeVisible();

await page.getByRole("button").nth(1).click();
await expect(page.getByRole("link", { name: "Home" })).toBeVisible();
Expand Down
69 changes: 66 additions & 3 deletions apps/web/e2e/player.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ test.describe("Video watch page", () => {
await page.goto(VIDEO_URL);
await page.waitForLoadState("networkidle");

await expect(page.getByRole("button", { name: "Transcript", exact: true })).toHaveClass(/rail-tab-active/);
await expect(page.getByRole("tab", { name: "Transcript", exact: true })).toHaveAttribute("aria-selected", "true");
await expect(page.getByPlaceholder("Search transcript…")).toBeVisible();
await expect(page.getByRole("button", { name: "Current", exact: true })).toBeVisible();
await expect(page.getByRole("button", { name: "Original", exact: true })).toBeVisible();
Expand All @@ -24,9 +24,9 @@ test.describe("Video watch page", () => {
await page.goto(VIDEO_URL);
await page.waitForLoadState("networkidle");

await page.getByRole("button", { name: "Summary", exact: true }).click();
await page.getByRole("tab", { name: "Summary", exact: true }).click();

const summaryPanel = page.locator(".rail-tab-panel-enter").filter({
const summaryPanel = page.getByRole("tabpanel", { name: "Summary", exact: true }).filter({
has: page.getByText("Generated by Cap5 AI"),
});

Expand All @@ -52,4 +52,67 @@ test.describe("Video watch page", () => {
await expect(chapterSection.getByRole("button", { name: /00:20 define endpoints clearly/i })).toBeVisible();
await expect(chapterSection.getByRole("button", { name: /rate limiting/i })).toBeVisible();
});

test("selected-speaker playback skips deselected speakers and persists per video", async ({ page }) => {
await page.goto(VIDEO_URL);
await page.waitForLoadState("networkidle");

await page.locator("video").evaluate((video) => {
let currentTime = 0;
let paused = false;

Object.defineProperty(video, "duration", {
configurable: true,
get: () => 68,
});
Object.defineProperty(video, "currentTime", {
configurable: true,
get: () => currentTime,
set: (value: number) => {
currentTime = value;
},
});
Object.defineProperty(video, "paused", {
configurable: true,
get: () => paused,
});
Object.defineProperty(video, "pause", {
configurable: true,
value: () => {
paused = true;
},
});
Object.defineProperty(video, "play", {
configurable: true,
value: async () => {
paused = false;
},
});

video.dispatchEvent(new Event("loadedmetadata"));
});

const speakerChips = page.locator("button.speaker-filter-chip");
await speakerChips.filter({ hasText: "Guest" }).click();

await expect(page.getByText("Today we cover API architecture.")).toHaveCount(0);
await expect(page.getByText("Let us begin with the basics.")).toBeVisible();
await expect(page.getByText("1 of 2 selected")).toBeVisible();

await speakerChips.filter({ hasText: "Host" }).click();

await expect(page.getByText("No speakers selected.", { exact: true })).toBeVisible();
await expect(page.getByText("No speakers selected. Re-enable at least one speaker to resume filtered playback.")).toBeVisible();

const isPaused = await page.locator("video").evaluate((video) => video.paused);
expect(isPaused).toBe(true);

await page.reload();
await page.waitForLoadState("networkidle");

await expect(page.getByText("No speakers selected.", { exact: true })).toBeVisible();
await expect(page.getByText("Welcome to this demonstration.")).toHaveCount(0);
await expect(page.getByText("Today we cover API architecture.")).toHaveCount(0);
await expect(page.getByText("Let us begin with the basics.")).toBeVisible();
});
});
19 changes: 9 additions & 10 deletions apps/web/src/lib/format.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* Format a duration in seconds as "mm:ss" (or "hh:mm:ss" for durations >= 1h).
*
* `formatTimestamp` is kept as an alias for historical call sites that treat
* the same formatter as a "media timecode" rather than a "duration". Both names
* share one implementation so output is guaranteed to stay consistent.
*/
export function formatDuration(totalSeconds: number): string {
const seconds = Math.max(0, Math.floor(totalSeconds));
const hrs = Math.floor(seconds / 3600);
Expand All @@ -8,6 +15,8 @@ export function formatDuration(totalSeconds: number): string {
return `${String(mins).padStart(2, "0")}:${String(secs).padStart(2, "0")}`;
}

export const formatTimestamp = formatDuration;

export function formatBytes(bytes: number): string {
if (!Number.isFinite(bytes) || bytes <= 0) return "0 B";
const units = ["B", "KB", "MB", "GB"];
Expand Down Expand Up @@ -41,13 +50,3 @@ export function buildPublicObjectUrl(key: string): string {
return `${base}/${encodedKey}`;
}

export function formatTimestamp(secondsInput: number): string {
const totalSeconds = Math.max(0, Math.floor(secondsInput));
const hours = Math.floor(totalSeconds / 3600);
const minutes = Math.floor((totalSeconds % 3600) / 60);
const seconds = totalSeconds % 60;
if (hours > 0) {
return `${String(hours).padStart(2, "0")}:${String(minutes).padStart(2, "0")}:${String(seconds).padStart(2, "0")}`;
}
return `${String(minutes).padStart(2, "0")}:${String(seconds).padStart(2, "0")}`;
}
134 changes: 134 additions & 0 deletions apps/worker/src/queue/claim.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

const withTransactionMock = vi.fn();

vi.mock("@cap/db", () => ({
withTransaction: withTransactionMock
}));

beforeEach(() => {
process.env.DATABASE_URL = "postgres://cap5:cap5_test@localhost:5432/cap5_test";
process.env.MEDIA_SERVER_WEBHOOK_SECRET = "test-webhook-secret-with-32-plus-chars";
process.env.DEEPGRAM_API_KEY = "test-deepgram";
process.env.GROQ_API_KEY = "test-groq";
process.env.S3_ACCESS_KEY = "test-access-key";
process.env.S3_SECRET_KEY = "test-secret-key";
process.env.WORKER_ID = "worker-test";
process.env.WORKER_LEASE_SECONDS = "60";
process.env.WORKER_RECLAIM_BATCH_SIZE = "25";
});

afterEach(() => {
vi.resetModules();
vi.clearAllMocks();
});

describe("claimOne", () => {
it("returns the first row from the claim SQL and forwards the lease params", async () => {
const queryMock = vi.fn().mockResolvedValue({
rows: [
{
id: 42,
video_id: "22222222-2222-2222-2222-222222222222",
job_type: "process_video",
lease_token: "tok",
payload: {},
attempts: 1,
max_attempts: 6
}
]
});
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { claimOne } = await import("./claim.js");
const result = await claimOne();

expect(result).not.toBeNull();
expect(result?.id).toBe(42);
expect(queryMock).toHaveBeenCalledTimes(1);
const params = queryMock.mock.calls[0]?.[1] as unknown[];
// [limit, worker_id, lease_interval]
expect(params[0]).toBe(1);
expect(params[1]).toBe("worker-test");
expect(params[2]).toBe("60 seconds");
});

it("returns null when no job is available", async () => {
const queryMock = vi.fn().mockResolvedValue({ rows: [] });
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { claimOne } = await import("./claim.js");
const result = await claimOne();

expect(result).toBeNull();
});

it("appends exclude types as trailing params when excludeTypes is non-empty", async () => {
const queryMock = vi.fn().mockResolvedValue({ rows: [] });
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { claimOne } = await import("./claim.js");
await claimOne(["deliver_webhook", "cleanup_artifacts"]);

expect(queryMock).toHaveBeenCalledTimes(1);
const sql = queryMock.mock.calls[0]?.[0] as string;
const params = queryMock.mock.calls[0]?.[1] as unknown[];
// The excluded-types SQL variant references job_type NOT IN (...).
expect(sql).toContain("job_type NOT IN");
expect(params).toEqual([1, "worker-test", "60 seconds", "deliver_webhook", "cleanup_artifacts"]);
});
});

describe("reclaimExpiredLeases", () => {
it("uses WORKER_RECLAIM_BATCH_SIZE as the LIMIT parameter", async () => {
const queryMock = vi.fn().mockResolvedValue({ rows: [] });
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { reclaimExpiredLeases } = await import("./claim.js");
await reclaimExpiredLeases();

expect(queryMock).toHaveBeenCalledTimes(1);
const params = queryMock.mock.calls[0]?.[1] as unknown[];
expect(params).toEqual([25]);
});

it("returns the rows the reclaim SQL surfaced (retryable + dead)", async () => {
const stale = [
{
id: 101,
video_id: "33333333-3333-3333-3333-333333333333",
job_type: "process_video",
status: "queued"
},
{
id: 102,
video_id: "44444444-4444-4444-4444-444444444444",
job_type: "transcribe_audio",
status: "dead"
}
];
const queryMock = vi.fn().mockResolvedValue({ rows: stale });
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { reclaimExpiredLeases } = await import("./claim.js");
const result = await reclaimExpiredLeases();

expect(result).toEqual(stale);
// Verify we're actually executing the reclaim statement, not the claim one.
const sql = queryMock.mock.calls[0]?.[0] as string;
expect(sql).toContain("WITH stale AS");
expect(sql).toContain("locked_until < now()");
});

it("honors a custom WORKER_RECLAIM_BATCH_SIZE from env", async () => {
process.env.WORKER_RECLAIM_BATCH_SIZE = "7";
const queryMock = vi.fn().mockResolvedValue({ rows: [] });
withTransactionMock.mockImplementation(async (_dbUrl, fn) => fn({ query: queryMock }));

const { reclaimExpiredLeases } = await import("./claim.js");
await reclaimExpiredLeases();

const params = queryMock.mock.calls[0]?.[1] as unknown[];
expect(params).toEqual([7]);
});
});
Loading
Loading