Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ model QueueJob {
priority Int @default(0)
attempts Int @default(0)
maxAttempts Int?
runAt DateTime @default(now())
runAt DateTime
notBefore DateTime?
finishedAt DateTime?
processedAt DateTime?
Expand Down
51 changes: 35 additions & 16 deletions src/PrismaQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import assert from "node:assert";
import { PrismaJob } from "./PrismaJob";
import type { DatabaseJob, JobCreator, JobPayload, JobResult, JobWorker } from "./types";
import {
AbortError,
calculateDelay,
debug,
escape,
Expand Down Expand Up @@ -75,6 +76,7 @@ export class PrismaQueue<

private concurrency = 0;
private stopped = true;
private abortController = new AbortController();

/**
* Constructs a PrismaQueue object with specified options and a worker function.
Expand Down Expand Up @@ -153,11 +155,14 @@ export class PrismaQueue<
* Stops the job processing in the queue.
*/
public async stop(): Promise<void> {
const { pollInterval } = this.config;
debug(`stopping queue named="${this.name}"...`);
// const { pollInterval } = this.config;
debug(`stopping queue named="${this.name}"...`);
this.stopped = true;
this.abortController.abort();

// Wait for the queue to stop
await waitFor(pollInterval);

//await waitFor(pollInterval);
}

/**
Expand All @@ -180,10 +185,11 @@ export class PrismaQueue<
debug(`enqueue`, this.name, payloadOrFunction, options);
const { name: queueName, config } = this;
const { key = null, cron = null, maxAttempts = config.maxAttempts, priority = 0, runAt } = options;
const currentDate = new Date();
const record = await this.#prisma.$transaction(async (client) => {
const payload =
payloadOrFunction instanceof Function ? await payloadOrFunction(client) : payloadOrFunction;
const data = { queue: queueName, cron, payload, maxAttempts, priority, key };
const data = { queue: queueName, cron, payload, maxAttempts, priority, key, createdAt: currentDate };
if (key && runAt) {
const { count } = await this.model.deleteMany({
where: {
Expand All @@ -198,14 +204,14 @@ export class PrismaQueue<
if (count > 0) {
debug(`deleted ${count} conflicting upcoming queue jobs`);
}
const update = { ...data, ...(runAt ? { runAt } : {}) };
const update = { ...data, ...(runAt ? { runAt } : { runAt: currentDate }) };
return await this.model.upsert({
where: { key_runAt: { key, runAt } },
create: { ...update },
update,
});
}
return await this.model.create({ data });
return await this.model.create({ data: { ...data, runAt: currentDate } });
});
const job = new PrismaJob(record as DatabaseJob<T, U>, { model: this.model, client: this.#prisma });
this.emit("enqueue", job);
Expand Down Expand Up @@ -237,16 +243,19 @@ export class PrismaQueue<
`polling queue named="${this.name}" with pollInterval=${pollInterval} maxConcurrency=${maxConcurrency}...`,
);

while (!this.stopped) {
// Wait for the queue to be ready
if (this.concurrency >= maxConcurrency) {
await waitFor(pollInterval);
// If we get AbortError, we will stop the polling, but make sure we don't throw an error

try {
while (!this.stopped) {
// Wait for the queue to be ready
if (this.concurrency >= maxConcurrency) {
await waitFor(pollInterval, this.abortController.signal);
continue;
}
// Query the queue size only when needed to reduce database load.
let estimatedQueueSize = await this.size(true);
if (estimatedQueueSize === 0) {
await waitFor(pollInterval);
await waitFor(pollInterval, this.abortController.signal);
continue;
}

Expand Down Expand Up @@ -275,12 +284,20 @@ export class PrismaQueue<
this.concurrency--;
});
});
await waitFor(jobInterval);
await waitFor(jobInterval, this.abortController.signal);
}
await waitFor(jobInterval * 2);
await waitFor(jobInterval * 2, this.abortController.signal);
}
}
} catch (error) {
if (error instanceof AbortError) {
debug(`polling for queue named="${this.name}" was aborted`);
}
else {
throw error;
}
}
}

/**
* Dequeues and processes the next job in the queue. Handles locking and error management internally.
Expand All @@ -295,6 +312,7 @@ export class PrismaQueue<
const { tableName: tableNameRaw, deleteOn, alignTimeZone } = this.config;
const tableName = escape(tableNameRaw);
const queueJobKey = uncapitalize(this.config.modelName) as "queueJob";
const currentDate = new Date().toISOString();
const job = await this.#prisma.$transaction(
async (client) => {
if (alignTimeZone) {
Expand All @@ -306,15 +324,16 @@ export class PrismaQueue<
await client.$executeRawUnsafe(`SET LOCAL TIME ZONE '${localTimeZone}';`);
}
}

const rows = await client.$queryRawUnsafe<DatabaseJob<T, U>[]>(
`UPDATE ${tableName} SET "processedAt" = NOW(), "attempts" = "attempts" + 1
`UPDATE ${tableName} SET "processedAt" = '${currentDate}', "attempts" = "attempts" + 1
WHERE id = (
SELECT id
FROM ${tableName}
WHERE (${tableName}."queue" = $1)
AND (${tableName}."finishedAt" IS NULL)
AND (${tableName}."runAt" < NOW())
AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < NOW())
AND (${tableName}."runAt" < '${currentDate}')
AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < '${currentDate}')
ORDER BY ${tableName}."priority" ASC, ${tableName}."runAt" ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
Expand Down
21 changes: 18 additions & 3 deletions src/utils/time.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
export const waitFor = async (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);

export class AbortError extends Error {
constructor(message: string) {
super(message);
this.name = 'AbortError';
}
}

export const waitFor = async (ms: number, signal: AbortSignal) =>
new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
resolve(undefined);
}, ms);

signal.addEventListener('abort', () => {
clearTimeout(timeout);
reject(new AbortError('Aborted'));
});
});

export const calculateDelay = (attempts: number): number =>
Expand Down