diff --git a/src/lib/server/abortRegistry.ts b/src/lib/server/abortRegistry.ts index fc6de8a4413..5eed5e5c9e3 100644 --- a/src/lib/server/abortRegistry.ts +++ b/src/lib/server/abortRegistry.ts @@ -1,5 +1,8 @@ import { logger } from "$lib/server/logger"; +const MAX_TRACKED_CONVERSATIONS = 1000; +const MAX_CONTROLLERS_PER_CONVERSATION = 16; + /** * Tracks active upstream generation requests so they can be cancelled on demand. * Multiple controllers can be registered per conversation (for threaded/background runs). @@ -21,9 +24,24 @@ export class AbortRegistry { let set = this.controllers.get(key); if (!set) { set = new Set(); - this.controllers.set(key, set); + } + if (set.size >= MAX_CONTROLLERS_PER_CONVERSATION) { + const oldestController = set.values().next().value as AbortController | undefined; + if (oldestController) { + if (!oldestController.signal.aborted) { + logger.warn( + { conversationId: key }, + "Evicting oldest AbortController after reaching per-conversation limit" + ); + oldestController.abort(); + } + set.delete(oldestController); + } } set.add(controller); + // Refresh insertion order for LRU-style eviction + this.controllers.delete(key); + this.controllers.set(key, set); controller.signal.addEventListener( "abort", () => { @@ -31,6 +49,25 @@ export class AbortRegistry { }, { once: true } ); + + if (this.controllers.size > MAX_TRACKED_CONVERSATIONS) { + const oldestKey = this.controllers.keys().next().value as string | undefined; + if (oldestKey) { + const controllers = this.controllers.get(oldestKey); + if (controllers) { + logger.warn( + { conversationId: oldestKey }, + "Evicting AbortRegistry entry after reaching capacity" + ); + for (const ctrl of controllers) { + if (!ctrl.signal.aborted) { + ctrl.abort(); + } + } + } + this.controllers.delete(oldestKey); + } + } } public abort(conversationId: string) { diff --git a/src/lib/server/abortedGenerations.ts b/src/lib/server/abortedGenerations.ts index 57b5f738b91..86c7edc2325 100644 --- a/src/lib/server/abortedGenerations.ts +++ b/src/lib/server/abortedGenerations.ts @@ -4,10 +4,12 @@ import { logger } from "$lib/server/logger"; import { collections } from "$lib/server/database"; import { onExit } from "./exitHandler"; +const MAX_ABORTED_GENERATIONS = 1000; + export class AbortedGenerations { private static instance: AbortedGenerations; - private abortedGenerations: Record = {}; + private abortedGenerations = new Map(); private constructor() { const interval = setInterval(() => this.updateList(), 1000); @@ -25,15 +27,26 @@ export class AbortedGenerations { } public getAbortTime(conversationId: string): Date | undefined { - return this.abortedGenerations[conversationId]; + return this.abortedGenerations.get(conversationId); } private async updateList() { try { - const aborts = await collections.abortedGenerations.find({}).sort({ createdAt: 1 }).toArray(); - - this.abortedGenerations = Object.fromEntries( - aborts.map((abort) => [abort.conversationId.toString(), abort.createdAt]) + const aborts = await collections.abortedGenerations + .find({}) + .sort({ createdAt: -1 }) + .limit(MAX_ABORTED_GENERATIONS) + .toArray(); + + if (aborts.length === MAX_ABORTED_GENERATIONS) { + logger.debug( + { count: aborts.length }, + "AbortedGenerations cache reached configured capacity; trimming oldest entries" + ); + } + + this.abortedGenerations = new Map( + aborts.reverse().map((abort) => [abort.conversationId.toString(), abort.createdAt] as const) ); } catch (err) { logger.error(err); diff --git a/src/lib/server/api/routes/groups/conversations.ts b/src/lib/server/api/routes/groups/conversations.ts index 0663110e0ce..96e2d860016 100644 --- a/src/lib/server/api/routes/groups/conversations.ts +++ b/src/lib/server/api/routes/groups/conversations.ts @@ -23,21 +23,37 @@ export const conversationGroup = new Elysia().use(authPlugin).group("/conversati .get( "", async ({ locals, query }) => { - const convs = await collections.conversations - .find(authCondition(locals)) - .project>({ - title: 1, - updatedAt: 1, - model: 1, - }) - .sort({ updatedAt: -1 }) - .skip((query.p ?? 0) * CONV_NUM_PER_PAGE) - .limit(CONV_NUM_PER_PAGE) + const page = query.p ?? 0; + const skip = page * CONV_NUM_PER_PAGE; + const [aggregated] = await collections.conversations + .aggregate<{ + conversations: Pick[]; + totals: { count: number }[]; + }>([ + { $match: authCondition(locals) }, + { + $facet: { + conversations: [ + { $sort: { updatedAt: -1 } }, + { $skip: skip }, + { $limit: CONV_NUM_PER_PAGE }, + { + $project: { + _id: 1, + title: 1, + updatedAt: 1, + model: 1, + }, + }, + ], + totals: [{ $count: "count" }], + }, + }, + ]) .toArray(); - const nConversations = await collections.conversations.countDocuments( - authCondition(locals) - ); + const convs = aggregated?.conversations ?? []; + const nConversations = aggregated?.totals?.[0]?.count ?? 0; const res = convs.map((conv) => ({ _id: conv._id, diff --git a/src/lib/server/api/routes/groups/misc.ts b/src/lib/server/api/routes/groups/misc.ts index 7ddc05efc87..c72cb666fb9 100644 --- a/src/lib/server/api/routes/groups/misc.ts +++ b/src/lib/server/api/routes/groups/misc.ts @@ -9,6 +9,7 @@ import yazl from "yazl"; import { downloadFile } from "$lib/server/files/downloadFile"; import mimeTypes from "mime-types"; import { logger } from "$lib/server/logger"; +import type { Document } from "mongodb"; export interface FeatureFlags { enableAssistants: boolean; @@ -28,7 +29,35 @@ export const misc = new Elysia() const messagesBeforeLogin = config.MESSAGES_BEFORE_LOGIN ? parseInt(config.MESSAGES_BEFORE_LOGIN) : 0; - const nConversations = await collections.conversations.countDocuments(authCondition(locals)); + let nConversations = 0; + let assistantMessages = 0; + const matchCondition = authCondition(locals); + + if (requiresUser) { + const facetPipelines: Record = { + conversationCount: [{ $count: "count" }], + }; + + if (!locals.user && messagesBeforeLogin > 0) { + facetPipelines.assistantMessages = [ + { $project: { messages: 1 } }, + { $unwind: "$messages" }, + { $match: { "messages.from": "assistant" } }, + { $limit: messagesBeforeLogin + 1 }, + { $count: "count" }, + ]; + } + + const aggregation = await collections.conversations + .aggregate<{ + conversationCount: { count: number }[]; + assistantMessages?: { count: number }[]; + }>([{ $match: matchCondition }, { $facet: facetPipelines }]) + .next(); + + nConversations = aggregation?.conversationCount?.[0]?.count ?? 0; + assistantMessages = aggregation?.assistantMessages?.[0]?.count ?? 0; + } if (requiresUser && !locals.user) { if (messagesBeforeLogin === 0) { @@ -36,22 +65,7 @@ export const misc = new Elysia() } else if (nConversations >= messagesBeforeLogin) { loginRequired = true; } else { - // get the number of messages where `from === "assistant"` across all conversations. - const totalMessages = - ( - await collections.conversations - .aggregate([ - { $match: { ...authCondition(locals), "messages.from": "assistant" } }, - { $project: { messages: 1 } }, - { $limit: messagesBeforeLogin + 1 }, - { $unwind: "$messages" }, - { $match: { "messages.from": "assistant" } }, - { $count: "messages" }, - ]) - .toArray() - )[0]?.messages ?? 0; - - loginRequired = totalMessages >= messagesBeforeLogin; + loginRequired = assistantMessages >= messagesBeforeLogin; } } diff --git a/src/lib/server/files/downloadFile.ts b/src/lib/server/files/downloadFile.ts index d289fc10c85..e0b14c1a71f 100644 --- a/src/lib/server/files/downloadFile.ts +++ b/src/lib/server/files/downloadFile.ts @@ -23,12 +23,26 @@ export async function downloadFile( const fileStream = collections.bucket.openDownloadStream(file._id); - const buffer = await new Promise((resolve, reject) => { - const chunks: Uint8Array[] = []; - fileStream.on("data", (chunk) => chunks.push(chunk)); - fileStream.on("error", reject); - fileStream.on("end", () => resolve(Buffer.concat(chunks))); - }); + try { + const buffer = await new Promise((resolve, reject) => { + const chunks: Uint8Array[] = []; + const onData = (chunk: Uint8Array) => chunks.push(chunk); + const onError = (err: unknown) => { + fileStream.removeListener("data", onData); + reject(err instanceof Error ? err : new Error("Failed to read file stream")); + }; + const onEnd = () => { + fileStream.removeListener("data", onData); + resolve(Buffer.concat(chunks)); + }; - return { type: "base64", name, value: buffer.toString("base64"), mime }; + fileStream.on("data", onData); + fileStream.once("error", onError); + fileStream.once("end", onEnd); + }); + + return { type: "base64", name, value: buffer.toString("base64"), mime }; + } finally { + fileStream.destroy(); + } } diff --git a/src/lib/server/files/uploadFile.ts b/src/lib/server/files/uploadFile.ts index 97b335beaf0..15af68b4836 100644 --- a/src/lib/server/files/uploadFile.ts +++ b/src/lib/server/files/uploadFile.ts @@ -20,10 +20,23 @@ export async function uploadFile(file: File, conv: Conversation): Promise { - upload.once("finish", () => - resolve({ type: "hash", value: sha, mime: file.type, name: file.name }) - ); - upload.once("error", reject); - setTimeout(() => reject(new Error("Upload timed out")), 20_000); + const timeout = setTimeout(() => { + upload.destroy(new Error("Upload timed out")); + }, 20_000); + + const resolveOnce = () => { + clearTimeout(timeout); + resolve({ type: "hash", value: sha, mime: file.type, name: file.name }); + }; + const rejectOnce = (err: unknown) => { + clearTimeout(timeout); + if (typeof upload.destroy === "function" && !upload.destroyed) { + upload.destroy(); + } + reject(err); + }; + + upload.once("finish", resolveOnce); + upload.once("error", rejectOnce); }); }