Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@robojs/mq - Plugin for persistent message queue. #306

Open
Pkmmte opened this issue Oct 11, 2024 — with Volta.net · 4 comments
Open

@robojs/mq - Plugin for persistent message queue. #306

Pkmmte opened this issue Oct 11, 2024 — with Volta.net · 4 comments

Comments

Copy link
Member

Pkmmte commented Oct 11, 2024

Description

A plugin to provide a dead-simple, fully functional message queue API for Robo.js projects without relying on heavyweight external services or binaries, and without requiring separate worker processes. By default, it ships with an in-memory queue engine (optionally persisting messages). Users may easily swap out this engine with a custom one (e.g., Redis-based) using a single plugin option, engine.

This plugin aims to offer:

  1. Lightweight Setup: No extra binaries or complicated installs. (unlike BullMQ's Redis requirement)
  2. Functionality: Enqueue, subscribe, acknowledge, fail, and concurrency control.
  3. Zero Worker Requirements: Everything runs in the same context/runtime.
  4. Customizability: Override the default engine by providing a class instance that extends MqEngine.

Goals

  1. Lightweight, Minimal Footprint

    • Use an in-memory queue by default.
    • Persistence is optional (e.g., with Flashcore or another approach).
  2. Functional + Subscription API

    • Provide standalone functions for enqueueing and acknowledging.
    • Include a subscription model (subscribe) to continuously handle incoming messages without manual polling or intervals.
  3. Custom Engine Support

    • Users can replace the default engine by passing an object implementing the MqEngine abstract class.
    • Example: Redis-based engine (see Custom Engine Example).
  4. Message Acknowledgements

    • Optionally requires explicit ack/fail, preventing re-delivery unless fail is called or the engine times out.
    • Concurrency support ensures we only pull a limited number of messages at once.
  5. Type-Safe Payloads

    • Generics in TypeScript to ensure your message payloads are properly typed.

API Overview

1. Plugin Options

export interface Message<T = any> {
  id: string
  payload: T
  queuedAt: number
  // Could store more metadata, e.g., attempts, etc.
}

export interface DequeueOptions {
  limit?: number      // Max number of messages to pull
}

export interface SubscribeOptions {
  concurrency?: number
}

export abstract class MqEngine {
  abstract enqueue<T>(queueName: string, data: T): Promise<void>

  // Under the hood, subscribe might poll or use an event-based approach
  // We'll unify this with repeated dequeue calls or push notifications
  abstract dequeue<T>(queueName: string, options?: DequeueOptions): Promise<Message<T>[]>

  abstract ack(messageId: string): Promise<void>

  abstract fail(messageId: string): Promise<void>
}

export interface MqPluginOptions {
  engine?: MqEngine
}
  • engine: If not provided, uses a default in-memory engine (optionally with internal persistence).
  • subscribe will rely on repeated dequeue calls or an event-based mechanism within the engine to pull messages.
  • concurrency: Number of in-flight messages we’ll process at once for a given subscription.

2. Functional API

// Provided by the plugin
export declare function enqueue<T>(queueName: string, data: T): Promise<void>

export declare function subscribe<T>(
  queueName: string,
  onMessage: (message: Message<T>) => Promise<void>,
  options?: SubscribeOptions
): () => void

export declare function ack(messageId: string): Promise<void>
export declare function fail(messageId: string): Promise<void>
  1. enqueue: Add a new message to the specified queue.
  2. subscribe: Continuously listen for incoming messages on queueName and call onMessage each time.
    • Returns an unsubscribe function to stop listening when needed.
    • Respects the concurrency option to avoid overloading your app.
  3. ack: Acknowledge successful processing of a message, removing it from the queue.
  4. fail: Indicate a failure, allowing the engine to requeue or discard the message based on your configuration.

3. Type-Safe Payloads

You can specify the payload type when you subscribe or enqueue:

interface UserUpdate {
  userId: number
  changes: Record<string, any>
}

// Enqueue typed data
await enqueue<UserUpdate>("userUpdates", {
  userId: 42,
  changes: { name: "Alice" },
})

// Subscribe with typed messages
subscribe<UserUpdate>("userUpdates", async (msg) => {
  console.log(`Received update for user ${msg.payload.userId}`)
  // Process data...
  await ack(msg.id)
})

4. Concurrency Control in subscribe

concurrency determines how many messages can be processed simultaneously for a given queue subscription. If concurrency is set to 1, each message must finish processing (via ack or fail) before the next message is delivered.

subscribe<UserUpdate>(
  "userUpdates", 
  async (msg) => {
    // This will be called up to 'concurrency' times in parallel
    try {
      // Process the message
      await doWork(msg.payload)
      await ack(msg.id)
    } catch (err) {
      await fail(msg.id)
    }
  },
  { concurrency: 2 }
)

5. Same-Context Runtime

No separate workers or extra processes needed. Everything runs in the same environment as the Robo.js project, making setup and debugging simpler.

6. Persistence (Optional)

  • The default in-memory engine may optionally persist messages using Flashcore to survive restarts or crashes.
  • For a more robust solution, devs can create your own engine with built-in persistence (e.g., Redis, RabbitMQ, etc.).

7. Example: Custom Redis Engine

Below is a simplified example demonstrating how a Redis-based engine might be implemented. This shows how you can override the default in-memory queue by extending the MqEngine class.

import Redis from 'ioredis'
import { BaseMqEngine, Message, DequeueOptions } from '@robojs/mq'

export class RedisEngine extends BaseMqEngine {
  private client: Redis

  constructor(redisUrl: string) {
    super()
    this.client = new Redis(redisUrl)
  }

  async enqueue<T>(queueName: string, data: T): Promise<void> {
    const messageId = this.generateId()
    const msg: Message<T> = {
      id: messageId,
      payload: data,
      queuedAt: Date.now(),
    }
    // Store message in a Redis list
    await this.client.lpush(queueName, JSON.stringify(msg))
  }

  async dequeue<T>(queueName: string, options?: DequeueOptions): Promise<Message<T>[]> {
    const limit = options?.limit ?? 1
    const messages: Message<T>[] = []

    for (let i = 0; i < limit; i++) {
      // RPOP to get from the "end" of the list
      const rawMsg = await this.client.rpop(queueName)
      if (!rawMsg) break
      messages.push(JSON.parse(rawMsg))
    }
    return messages
  }

  async ack(messageId: string): Promise<void> {
    // With a robust approach, you'd track "in-flight" messages
    // in a Redis structure. Once acked, remove from in-flight.
    // Implementation details may vary.
  }

  async fail(messageId: string): Promise<void> {
    // Re-queue the message or put it into a dead-letter queue, etc.
    // Implementation details may vary.
  }

  private generateId() {
    return Math.random().toString(16).slice(2)
  }
}

Usage

Plugin config:

import { enqueue, subscribe } from '@robojs/mq'
import { RedisEngine } from './redisEngine.js'

// Start the plugin with a custom Redis engine
export default {
  engine: new RedisEngine("redis://localhost:6379")
}

Start event usage:

import { enqueue, subscribe } from '@robojs/mq'
import { RedisEngine } from './redisEngine.js'

export default async () => {
  // Now we can enqueue or subscribe with a consistent API
  await enqueue("alerts", { message: "Hello from Redis" })

  subscribe("alerts", async (msg) => {
    console.log("Redis-based engine message:", msg.payload)
    // ...
  })
}

The @robojs/mq plugin provides:

  1. A functional interface:
    • enqueue, subscribe, ack, fail
  2. In-Memory default engine that does not require external services.
  3. Optional persistence, no forced mention of Flashcore beyond possibility.
  4. Concurrency options to prevent overloading.
  5. Continuous message handling in the same context with subscribe.
  6. A pluggable MqEngine interface to seamlessly switch to more robust or remote solutions (Redis, RabbitMQ, etc.).
@Pkmmte Pkmmte added the good first issue Good for newcomers label Oct 25, 2024
@branberry
Copy link
Contributor

branberry commented Jan 1, 2025

Hi @Pkmmte! I'd be interested in looking into this feature. If I understand correctly, you'd ideally like to use some message queue system that would use Flashcore as the tool to persist the messages, correct?

You've reference BullMQ, which I'm looking into. If we decide to use that tool, this package could be a wrapper around BullMQ, but with the ability to persist the messages sent to it. If that's the case, I'm curious to know how you'd like to be able to retrieve the persisted messages. Would that be through using Flashcore directly? If you could briefly describe use cases for this, that would be amazing. Thanks!!

Copy link
Member Author

Pkmmte commented Jan 4, 2025

Hi @branberry and thanks for your interest!

You've reference BullMQ, which I'm looking into. If we decide to use that tool, this package could be a wrapper around BullMQ, but with the ability to persist the messages sent to it.

Yup, sounds about right! However, after looking more into BullMQ, I would like to retract my earlier comment and suggest something else instead, like queue. The reason being that BullMQ relies on Redis, which detracts from the main selling point for this plugin: ease of use.

Packages like queue are more barebones but don't require a Redis instance to be running on the same machine. This allows users to just install and use the plugin right away, which is our biggest goal for this.

If that's the case, I'm curious to know how you'd like to be able to retrieve the persisted messages. Would that be through using Flashcore directly?

Also yes. Ideally, this would take place automatically when Robo projects start using the _start event handler to restore data and _stop or _restart to save the data. That way, users can be sure that whatever messages were in queue won't disappear if their process dies or restarts. For example, a bot running on a production server processing user messages restarting when updated.

Copy link
Member Author

Pkmmte commented Jan 4, 2025

@branberry I've updated this issue's description to be more detailed. You can think of the above spec as a strong recommendation rather than strict requirement. Feel free to improve upon it or suggest an alternative API syntax as long as it avoids OOP for consistency with the rest of our plugins.

@branberry
Copy link
Contributor

Hey @Pkmmte, thank you for the thorough response! I appreciate the insights into what you're looking for. I think I should be equipped with enough info to get started. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants