Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type * as follows from "../follows.js";
import type * as generatedImages from "../generatedImages.js";
import type * as http from "../http.js";
import type * as lib_batchGenerationState from "../lib/batchGenerationState.js";
import type * as lib_cloudflareWorkerHttp from "../lib/cloudflareWorkerHttp.js";
import type * as lib_crypto from "../lib/crypto.js";
import type * as lib_dirtberryCrop from "../lib/dirtberryCrop.js";
import type * as lib_groq from "../lib/groq.js";
Expand Down Expand Up @@ -69,6 +70,7 @@ declare const fullApi: ApiFromModules<{
generatedImages: typeof generatedImages;
http: typeof http;
"lib/batchGenerationState": typeof lib_batchGenerationState;
"lib/cloudflareWorkerHttp": typeof lib_cloudflareWorkerHttp;
"lib/crypto": typeof lib_crypto;
"lib/dirtberryCrop": typeof lib_dirtberryCrop;
"lib/groq": typeof lib_groq;
Expand Down
48 changes: 43 additions & 5 deletions convex/contentAnalysis.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, expect, it } from "vitest"
import {
getNextAnalysisRunDelayMs,
getProviderRecoveryDelayMs,
getVisionFailureDispatchStatus,
shouldRunRecoveryPromptInference,
} from "./contentAnalysis"
Expand All @@ -10,25 +11,62 @@ describe("getNextAnalysisRunDelayMs", () => {
expect(
getNextAnalysisRunDelayMs({
queuedImageCount: 2,
allProvidersRateLimited: false,
providerRecoveryDelayMs: null,
})
).toBe(2100)
})

it("schedules another run when providers are rate-limited", () => {
it("schedules another run when providers have a recovery delay", () => {
expect(
getNextAnalysisRunDelayMs({
queuedImageCount: 2,
allProvidersRateLimited: true,
providerRecoveryDelayMs: 60_000,
})
).toBe(2100)
).toBe(60_000)
})

it("does not schedule another run when there is no lookahead work", () => {
expect(
getNextAnalysisRunDelayMs({
queuedImageCount: 1,
allProvidersRateLimited: false,
providerRecoveryDelayMs: null,
})
).toBeNull()
})
})

describe("getProviderRecoveryDelayMs", () => {
it("waits until the earliest provider reset instead of hot-looping", () => {
expect(
getProviderRecoveryDelayMs({
now: 1_000,
providerHealths: [
{ isAvailable: false, rateLimitedUntil: 11_000 },
{ isAvailable: false, rateLimitedUntil: 21_000 },
],
})
).toBe(10_000)
})

it("returns the floor delay when reset is too close", () => {
expect(
getProviderRecoveryDelayMs({
now: 1_000,
providerHealths: [
{ isAvailable: false, rateLimitedUntil: 1_500 },
],
})
).toBe(2100)
})

it("returns null when a provider is already available", () => {
expect(
getProviderRecoveryDelayMs({
now: 1_000,
providerHealths: [
{ isAvailable: true },
null,
],
})
).toBeNull()
})
Expand Down
184 changes: 167 additions & 17 deletions convex/contentAnalysis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DELAY_BETWEEN_REQUESTS_MS = 2100

/** Fetch a small lookahead so we know whether to schedule the next recovery action. */
const ANALYSIS_QUEUE_LOOKAHEAD = 2
const ANALYSIS_RECOVERY_JOB_NAME = "content_analysis_recovery"

const moderationStageValidator = v.union(
v.literal("prompt_inference"),
Expand All @@ -36,6 +37,11 @@ type VisionAnalysisPayload = {
analyzedAt: number
}

type ProviderHealthSnapshot = {
isAvailable: boolean
rateLimitedUntil?: number
}

function isSensitivityResolved(image: Doc<"generatedImages"> | null): boolean {
return image?.isSensitive !== undefined && image?.isSensitive !== null
}
Expand Down Expand Up @@ -112,17 +118,34 @@ async function upsertVisionAnalysis(

export function getNextAnalysisRunDelayMs(args: {
queuedImageCount: number
allProvidersRateLimited: boolean
providerRecoveryDelayMs: number | null
}): number | null {
if (args.allProvidersRateLimited) {
return DELAY_BETWEEN_REQUESTS_MS
if (args.providerRecoveryDelayMs !== null) {
return args.providerRecoveryDelayMs
}

return args.queuedImageCount === ANALYSIS_QUEUE_LOOKAHEAD
? DELAY_BETWEEN_REQUESTS_MS
: null
}

export function getProviderRecoveryDelayMs(args: {
providerHealths: Array<ProviderHealthSnapshot | null>
now: number
}): number | null {
const unavailableResets = args.providerHealths
.filter((health): health is ProviderHealthSnapshot => health !== null)
.filter((health) => !health.isAvailable)
.map((health) => health.rateLimitedUntil)
.filter((resetAt): resetAt is number => typeof resetAt === "number" && resetAt > args.now)

if (unavailableResets.length === 0) {
return null
}

return Math.max(DELAY_BETWEEN_REQUESTS_MS, Math.min(...unavailableResets) - args.now)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

export function shouldRunRecoveryPromptInference(args: {
hasPrompt: boolean
hasPromptInference: boolean
Expand Down Expand Up @@ -253,6 +276,8 @@ export const claimModerationForWorker = internalMutation({
},
returns: v.object({
claimed: v.boolean(),
prompt: v.optional(v.string()),
imageUrl: v.optional(v.string()),
}),
handler: async (ctx, args) => {
const image = await ctx.db.get(args.imageId)
Expand Down Expand Up @@ -293,7 +318,100 @@ export const claimModerationForWorker = internalMutation({
moderationUpdatedAt: Date.now(),
})

return { claimed: true }
if (args.stage === "prompt_inference") {
return {
claimed: true,
prompt: image.prompt,
}
}

return {
claimed: true,
imageUrl: image.url,
}
},
})

export const scheduleAnalyzeRecentImagesRun = internalMutation({
args: {
delayMs: v.number(),
},
returns: v.object({
scheduled: v.boolean(),
nextRunAt: v.number(),
}),
handler: async (ctx, args) => {
const now = Date.now()
const delayMs = Math.max(0, args.delayMs)
const nextRunAt = now + delayMs
const existingJob = await ctx.db
.query("backgroundJobState")
.withIndex("by_job_name", (q) => q.eq("jobName", ANALYSIS_RECOVERY_JOB_NAME))
.first()

if (existingJob?.nextRunAt !== undefined && existingJob.nextRunAt <= nextRunAt) {
return {
scheduled: false,
nextRunAt: existingJob.nextRunAt,
}
}
Comment thread
Simplereally marked this conversation as resolved.

const scheduledToken = `${now}-${nextRunAt}`

if (existingJob) {
await ctx.db.patch(existingJob._id, {
nextRunAt,
scheduledToken,
updatedAt: now,
})
} else {
await ctx.db.insert("backgroundJobState", {
jobName: ANALYSIS_RECOVERY_JOB_NAME,
nextRunAt,
scheduledToken,
updatedAt: now,
})
}
Comment thread
Simplereally marked this conversation as resolved.
Outdated

await ctx.scheduler.runAfter(delayMs, internal.contentAnalysis.analyzeRecentImages, {
scheduleToken: scheduledToken,
})

return {
scheduled: true,
nextRunAt,
}
},
})

export const claimScheduledAnalyzeRecentImagesRun = internalMutation({
args: {
scheduleToken: v.string(),
},
returns: v.boolean(),
handler: async (ctx, args) => {
const now = Date.now()
const existingJob = await ctx.db
.query("backgroundJobState")
.withIndex("by_job_name", (q) => q.eq("jobName", ANALYSIS_RECOVERY_JOB_NAME))
.first()

if (!existingJob || existingJob.scheduledToken !== args.scheduleToken) {
return false
}

if (existingJob.nextRunAt !== undefined && existingJob.nextRunAt > now + 1000) {
return false
}

await ctx.db.patch(existingJob._id, {
nextRunAt: undefined,
scheduledToken: undefined,
lastRunAt: now,
updatedAt: now,
})

return true
},
})

Expand Down Expand Up @@ -581,7 +699,24 @@ export const failVisionAnalysisFromWorker = internalMutation({
})

if (nextDispatchStatus === "pending") {
await ctx.scheduler.runAfter(DELAY_BETWEEN_REQUESTS_MS, internal.contentAnalysis.analyzeRecentImages, {})
const providerHealths: Array<ProviderHealthSnapshot | null> = await Promise.all([
ctx.db
.query("providerHealth")
.withIndex("by_provider", (q) => q.eq("provider", "groq"))
.first(),
ctx.db
.query("providerHealth")
.withIndex("by_provider", (q) => q.eq("provider", "openrouter"))
.first(),
])
const recoveryDelayMs = getProviderRecoveryDelayMs({
providerHealths,
now: Date.now(),
}) ?? DELAY_BETWEEN_REQUESTS_MS

await ctx.runMutation(internal.contentAnalysis.scheduleAnalyzeRecentImagesRun, {
delayMs: recoveryDelayMs,
})
}

return { released: true, duplicate: false }
Expand Down Expand Up @@ -613,8 +748,20 @@ export const analyzeImage = internalAction({
* Recovery action: re-dispatch one unanalyzed image without doing external work in Convex.
*/
export const analyzeRecentImages = internalAction({
args: {},
handler: async (ctx: ActionCtx) => {
args: {
scheduleToken: v.optional(v.string()),
},
handler: async (ctx: ActionCtx, args) => {
if (args.scheduleToken) {
const claimed = await ctx.runMutation(internal.contentAnalysis.claimScheduledAnalyzeRecentImagesRun, {
scheduleToken: args.scheduleToken,
})

if (!claimed) {
return
}
}

await ctx.runMutation(internal.lib.providerHealthFunctions.refreshExpiredLimits, {})

const images = await ctx.runQuery(
Expand All @@ -626,7 +773,7 @@ export const analyzeRecentImages = internalAction({
return
}

let allProvidersRateLimited = false
let providerRecoveryDelayMs: number | null = null
const image = images[0]
if (!image) {
return
Expand All @@ -646,17 +793,18 @@ export const analyzeRecentImages = internalAction({
imageId: image._id,
})
} else {
const providersAvailable = await ctx.runQuery(
internal.lib.providerHealthFunctions.checkProvidersAvailable, {}
const providerHealths = await ctx.runQuery(
internal.lib.providerHealthFunctions.getAllHealth, {}
)
const providersAvailable = providerHealths.some((health) => !health || health.isAvailable)

if (!providersAvailable) {
allProvidersRateLimited = true
const now = Date.now()
const [groq, openrouter] = await Promise.all([
ctx.runQuery(internal.lib.providerHealthFunctions.getHealth, { provider: "groq" }),
ctx.runQuery(internal.lib.providerHealthFunctions.getHealth, { provider: "openrouter" }),
])
const [groq, openrouter] = providerHealths
providerRecoveryDelayMs = getProviderRecoveryDelayMs({
providerHealths,
now,
})
console.log(
`[Vision] Rate-limited (Groq: ${formatResetTime(groq?.rateLimitedUntil, now)}, OpenRouter: ${formatResetTime(openrouter?.rateLimitedUntil, now)})`
)
Expand All @@ -669,11 +817,13 @@ export const analyzeRecentImages = internalAction({

const nextRunDelayMs = getNextAnalysisRunDelayMs({
queuedImageCount: images.length,
allProvidersRateLimited,
providerRecoveryDelayMs,
})

if (nextRunDelayMs !== null) {
await ctx.scheduler.runAfter(nextRunDelayMs, internal.contentAnalysis.analyzeRecentImages, {})
await ctx.runMutation(internal.contentAnalysis.scheduleAnalyzeRecentImagesRun, {
delayMs: nextRunDelayMs,
})
}
},
})
Expand Down
1 change: 1 addition & 0 deletions convex/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ crons.interval(
"analyze unanalyzed images",
{ hours: 1 },
internal.contentAnalysis.analyzeRecentImages,
{},
);

export default crons;
Loading
Loading