From 47371ec6517da6c1ba8809a3e6e591fd9164d7ea Mon Sep 17 00:00:00 2001 From: Yuval Tal Date: Wed, 5 Feb 2025 19:50:47 -0500 Subject: [PATCH 1/2] Fix timezone issue by using the Date() class instead of SQL NOW() --- prisma/schema.prisma | 2 +- src/PrismaQueue.ts | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f2efcff..72b8b67 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -23,7 +23,7 @@ model QueueJob { priority Int @default(0) attempts Int @default(0) maxAttempts Int? - runAt DateTime @default(now()) + runAt DateTime notBefore DateTime? finishedAt DateTime? processedAt DateTime? diff --git a/src/PrismaQueue.ts b/src/PrismaQueue.ts index 31a6ce3..99028db 100644 --- a/src/PrismaQueue.ts +++ b/src/PrismaQueue.ts @@ -180,10 +180,11 @@ export class PrismaQueue< debug(`enqueue`, this.name, payloadOrFunction, options); const { name: queueName, config } = this; const { key = null, cron = null, maxAttempts = config.maxAttempts, priority = 0, runAt } = options; + const currentDate = new Date(); const record = await this.#prisma.$transaction(async (client) => { const payload = payloadOrFunction instanceof Function ? await payloadOrFunction(client) : payloadOrFunction; - const data = { queue: queueName, cron, payload, maxAttempts, priority, key }; + const data = { queue: queueName, cron, payload, maxAttempts, priority, key, createdAt: currentDate }; if (key && runAt) { const { count } = await this.model.deleteMany({ where: { @@ -198,14 +199,14 @@ export class PrismaQueue< if (count > 0) { debug(`deleted ${count} conflicting upcoming queue jobs`); } - const update = { ...data, ...(runAt ? { runAt } : {}) }; + const update = { ...data, ...(runAt ? { runAt } : { runAt: currentDate }) }; return await this.model.upsert({ where: { key_runAt: { key, runAt } }, create: { ...update }, update, }); } - return await this.model.create({ data }); + return await this.model.create({ data: { ...data, runAt: currentDate } }); }); const job = new PrismaJob(record as DatabaseJob, { model: this.model, client: this.#prisma }); this.emit("enqueue", job); @@ -295,6 +296,7 @@ export class PrismaQueue< const { tableName: tableNameRaw, deleteOn, alignTimeZone } = this.config; const tableName = escape(tableNameRaw); const queueJobKey = uncapitalize(this.config.modelName) as "queueJob"; + const currentDate = new Date().toISOString(); const job = await this.#prisma.$transaction( async (client) => { if (alignTimeZone) { @@ -306,15 +308,16 @@ export class PrismaQueue< await client.$executeRawUnsafe(`SET LOCAL TIME ZONE '${localTimeZone}';`); } } + const rows = await client.$queryRawUnsafe[]>( - `UPDATE ${tableName} SET "processedAt" = NOW(), "attempts" = "attempts" + 1 + `UPDATE ${tableName} SET "processedAt" = '${currentDate}', "attempts" = "attempts" + 1 WHERE id = ( SELECT id FROM ${tableName} WHERE (${tableName}."queue" = $1) AND (${tableName}."finishedAt" IS NULL) - AND (${tableName}."runAt" < NOW()) - AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < NOW()) + AND (${tableName}."runAt" < '${currentDate}') + AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < '${currentDate}') ORDER BY ${tableName}."priority" ASC, ${tableName}."runAt" ASC FOR UPDATE SKIP LOCKED LIMIT 1 From 85f5cfcab35b64ba1ddb84a6d102dc39134d4663 Mon Sep 17 00:00:00 2001 From: Yuval Tal Date: Wed, 5 Mar 2025 11:15:38 -0500 Subject: [PATCH 2/2] change stop() to abort the main loop so we dont to have to wait too long --- src/PrismaQueue.ts | 36 ++++++++++++++++++++++++++---------- src/utils/time.ts | 21 ++++++++++++++++++--- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/PrismaQueue.ts b/src/PrismaQueue.ts index 99028db..04611c3 100644 --- a/src/PrismaQueue.ts +++ b/src/PrismaQueue.ts @@ -6,6 +6,7 @@ import assert from "node:assert"; import { PrismaJob } from "./PrismaJob"; import type { DatabaseJob, JobCreator, JobPayload, JobResult, JobWorker } from "./types"; import { + AbortError, calculateDelay, debug, escape, @@ -75,6 +76,7 @@ export class PrismaQueue< private concurrency = 0; private stopped = true; + private abortController = new AbortController(); /** * Constructs a PrismaQueue object with specified options and a worker function. @@ -153,11 +155,14 @@ export class PrismaQueue< * Stops the job processing in the queue. */ public async stop(): Promise { - const { pollInterval } = this.config; - debug(`stopping queue named="${this.name}"...`); + // const { pollInterval } = this.config; + debug(`stopping queue named="${this.name}"...`); this.stopped = true; + this.abortController.abort(); + // Wait for the queue to stop - await waitFor(pollInterval); + + //await waitFor(pollInterval); } /** @@ -238,16 +243,19 @@ export class PrismaQueue< `polling queue named="${this.name}" with pollInterval=${pollInterval} maxConcurrency=${maxConcurrency}...`, ); - while (!this.stopped) { - // Wait for the queue to be ready - if (this.concurrency >= maxConcurrency) { - await waitFor(pollInterval); + // If we get AbortError, we will stop the polling, but make sure we don't throw an error + + try { + while (!this.stopped) { + // Wait for the queue to be ready + if (this.concurrency >= maxConcurrency) { + await waitFor(pollInterval, this.abortController.signal); continue; } // Query the queue size only when needed to reduce database load. let estimatedQueueSize = await this.size(true); if (estimatedQueueSize === 0) { - await waitFor(pollInterval); + await waitFor(pollInterval, this.abortController.signal); continue; } @@ -276,12 +284,20 @@ export class PrismaQueue< this.concurrency--; }); }); - await waitFor(jobInterval); + await waitFor(jobInterval, this.abortController.signal); } - await waitFor(jobInterval * 2); + await waitFor(jobInterval * 2, this.abortController.signal); } + } + } catch (error) { + if (error instanceof AbortError) { + debug(`polling for queue named="${this.name}" was aborted`); + } + else { + throw error; } } +} /** * Dequeues and processes the next job in the queue. Handles locking and error management internally. diff --git a/src/utils/time.ts b/src/utils/time.ts index d9304c3..9dc4aea 100644 --- a/src/utils/time.ts +++ b/src/utils/time.ts @@ -1,6 +1,21 @@ -export const waitFor = async (ms: number) => - new Promise((resolve) => { - setTimeout(resolve, ms); + +export class AbortError extends Error { + constructor(message: string) { + super(message); + this.name = 'AbortError'; + } +} + +export const waitFor = async (ms: number, signal: AbortSignal) => + new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + resolve(undefined); + }, ms); + + signal.addEventListener('abort', () => { + clearTimeout(timeout); + reject(new AbortError('Aborted')); + }); }); export const calculateDelay = (attempts: number): number =>