Skip to content
Open

Optim #1931

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/lib/server/abortRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { logger } from "$lib/server/logger";

const MAX_TRACKED_CONVERSATIONS = 1000;
const MAX_CONTROLLERS_PER_CONVERSATION = 16;

/**
* Tracks active upstream generation requests so they can be cancelled on demand.
* Multiple controllers can be registered per conversation (for threaded/background runs).
Expand All @@ -21,16 +24,50 @@ export class AbortRegistry {
let set = this.controllers.get(key);
if (!set) {
set = new Set();
this.controllers.set(key, set);
}
if (set.size >= MAX_CONTROLLERS_PER_CONVERSATION) {
const oldestController = set.values().next().value as AbortController | undefined;
if (oldestController) {
if (!oldestController.signal.aborted) {
logger.warn(
{ conversationId: key },
"Evicting oldest AbortController after reaching per-conversation limit"
);
oldestController.abort();
}
set.delete(oldestController);
}
}
set.add(controller);
// Refresh insertion order for LRU-style eviction
this.controllers.delete(key);
this.controllers.set(key, set);
controller.signal.addEventListener(
"abort",
() => {
this.unregister(key, controller);
},
{ once: true }
);

if (this.controllers.size > MAX_TRACKED_CONVERSATIONS) {
const oldestKey = this.controllers.keys().next().value as string | undefined;
if (oldestKey) {
const controllers = this.controllers.get(oldestKey);
if (controllers) {
logger.warn(
{ conversationId: oldestKey },
"Evicting AbortRegistry entry after reaching capacity"
);
for (const ctrl of controllers) {
if (!ctrl.signal.aborted) {
ctrl.abort();
}
}
}
this.controllers.delete(oldestKey);
}
}
}

public abort(conversationId: string) {
Expand Down
25 changes: 19 additions & 6 deletions src/lib/server/abortedGenerations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import { logger } from "$lib/server/logger";
import { collections } from "$lib/server/database";
import { onExit } from "./exitHandler";

const MAX_ABORTED_GENERATIONS = 1000;

export class AbortedGenerations {
private static instance: AbortedGenerations;

private abortedGenerations: Record<string, Date> = {};
private abortedGenerations = new Map<string, Date>();

private constructor() {
const interval = setInterval(() => this.updateList(), 1000);
Expand All @@ -25,15 +27,26 @@ export class AbortedGenerations {
}

public getAbortTime(conversationId: string): Date | undefined {
return this.abortedGenerations[conversationId];
return this.abortedGenerations.get(conversationId);
}

private async updateList() {
try {
const aborts = await collections.abortedGenerations.find({}).sort({ createdAt: 1 }).toArray();

this.abortedGenerations = Object.fromEntries(
aborts.map((abort) => [abort.conversationId.toString(), abort.createdAt])
const aborts = await collections.abortedGenerations
.find({})
.sort({ createdAt: -1 })
.limit(MAX_ABORTED_GENERATIONS)
.toArray();

if (aborts.length === MAX_ABORTED_GENERATIONS) {
logger.debug(
{ count: aborts.length },
"AbortedGenerations cache reached configured capacity; trimming oldest entries"
);
}

this.abortedGenerations = new Map(
aborts.reverse().map((abort) => [abort.conversationId.toString(), abort.createdAt] as const)
);
} catch (err) {
logger.error(err);
Expand Down
42 changes: 29 additions & 13 deletions src/lib/server/api/routes/groups/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,37 @@ export const conversationGroup = new Elysia().use(authPlugin).group("/conversati
.get(
"",
async ({ locals, query }) => {
const convs = await collections.conversations
.find(authCondition(locals))
.project<Pick<Conversation, "_id" | "title" | "updatedAt" | "model">>({
title: 1,
updatedAt: 1,
model: 1,
})
.sort({ updatedAt: -1 })
.skip((query.p ?? 0) * CONV_NUM_PER_PAGE)
.limit(CONV_NUM_PER_PAGE)
const page = query.p ?? 0;
const skip = page * CONV_NUM_PER_PAGE;
const [aggregated] = await collections.conversations
.aggregate<{
conversations: Pick<Conversation, "_id" | "title" | "updatedAt" | "model">[];
totals: { count: number }[];
}>([
{ $match: authCondition(locals) },
{
$facet: {
conversations: [
{ $sort: { updatedAt: -1 } },
{ $skip: skip },
{ $limit: CONV_NUM_PER_PAGE },
{
$project: {
_id: 1,
title: 1,
updatedAt: 1,
model: 1,
},
},
],
totals: [{ $count: "count" }],
},
},
])
.toArray();

const nConversations = await collections.conversations.countDocuments(
authCondition(locals)
);
const convs = aggregated?.conversations ?? [];
const nConversations = aggregated?.totals?.[0]?.count ?? 0;

const res = convs.map((conv) => ({
_id: conv._id,
Expand Down
48 changes: 31 additions & 17 deletions src/lib/server/api/routes/groups/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import yazl from "yazl";
import { downloadFile } from "$lib/server/files/downloadFile";
import mimeTypes from "mime-types";
import { logger } from "$lib/server/logger";
import type { Document } from "mongodb";

export interface FeatureFlags {
enableAssistants: boolean;
Expand All @@ -28,30 +29,43 @@ export const misc = new Elysia()
const messagesBeforeLogin = config.MESSAGES_BEFORE_LOGIN
? parseInt(config.MESSAGES_BEFORE_LOGIN)
: 0;
const nConversations = await collections.conversations.countDocuments(authCondition(locals));
let nConversations = 0;
let assistantMessages = 0;
const matchCondition = authCondition(locals);

if (requiresUser) {
const facetPipelines: Record<string, Document[]> = {
conversationCount: [{ $count: "count" }],
};

if (!locals.user && messagesBeforeLogin > 0) {
facetPipelines.assistantMessages = [
{ $project: { messages: 1 } },
{ $unwind: "$messages" },
{ $match: { "messages.from": "assistant" } },
{ $limit: messagesBeforeLogin + 1 },
{ $count: "count" },
];
}

const aggregation = await collections.conversations
.aggregate<{
conversationCount: { count: number }[];
assistantMessages?: { count: number }[];
}>([{ $match: matchCondition }, { $facet: facetPipelines }])
.next();

nConversations = aggregation?.conversationCount?.[0]?.count ?? 0;
assistantMessages = aggregation?.assistantMessages?.[0]?.count ?? 0;
}

if (requiresUser && !locals.user) {
if (messagesBeforeLogin === 0) {
loginRequired = true;
} else if (nConversations >= messagesBeforeLogin) {
loginRequired = true;
} else {
// get the number of messages where `from === "assistant"` across all conversations.
const totalMessages =
(
await collections.conversations
.aggregate([
{ $match: { ...authCondition(locals), "messages.from": "assistant" } },
{ $project: { messages: 1 } },
{ $limit: messagesBeforeLogin + 1 },
{ $unwind: "$messages" },
{ $match: { "messages.from": "assistant" } },
{ $count: "messages" },
])
.toArray()
)[0]?.messages ?? 0;

loginRequired = totalMessages >= messagesBeforeLogin;
loginRequired = assistantMessages >= messagesBeforeLogin;
}
}

Expand Down
28 changes: 21 additions & 7 deletions src/lib/server/files/downloadFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,26 @@ export async function downloadFile(

const fileStream = collections.bucket.openDownloadStream(file._id);

const buffer = await new Promise<Buffer>((resolve, reject) => {
const chunks: Uint8Array[] = [];
fileStream.on("data", (chunk) => chunks.push(chunk));
fileStream.on("error", reject);
fileStream.on("end", () => resolve(Buffer.concat(chunks)));
});
try {
const buffer = await new Promise<Buffer>((resolve, reject) => {
const chunks: Uint8Array[] = [];
const onData = (chunk: Uint8Array) => chunks.push(chunk);
const onError = (err: unknown) => {
fileStream.removeListener("data", onData);
reject(err instanceof Error ? err : new Error("Failed to read file stream"));
};
const onEnd = () => {
fileStream.removeListener("data", onData);
resolve(Buffer.concat(chunks));
};

return { type: "base64", name, value: buffer.toString("base64"), mime };
fileStream.on("data", onData);
fileStream.once("error", onError);
fileStream.once("end", onEnd);
});

return { type: "base64", name, value: buffer.toString("base64"), mime };
} finally {
fileStream.destroy();
}
}
23 changes: 18 additions & 5 deletions src/lib/server/files/uploadFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,23 @@ export async function uploadFile(file: File, conv: Conversation): Promise<Messag

// only return the filename when upload throws a finish event or a 20s time out occurs
return new Promise((resolve, reject) => {
upload.once("finish", () =>
resolve({ type: "hash", value: sha, mime: file.type, name: file.name })
);
upload.once("error", reject);
setTimeout(() => reject(new Error("Upload timed out")), 20_000);
const timeout = setTimeout(() => {
upload.destroy(new Error("Upload timed out"));
}, 20_000);

const resolveOnce = () => {
clearTimeout(timeout);
resolve({ type: "hash", value: sha, mime: file.type, name: file.name });
};
const rejectOnce = (err: unknown) => {
clearTimeout(timeout);
if (typeof upload.destroy === "function" && !upload.destroyed) {
upload.destroy();
}
reject(err);
};

upload.once("finish", resolveOnce);
upload.once("error", rejectOnce);
});
}
Loading