Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
node_modules
coverage
coverage
data
22 changes: 20 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.7'
version: "3.7"
services:
rabbitmq:
image: "rabbitmq:3-management"
Expand All @@ -16,4 +16,22 @@ services:
labels:
NAME: "rabbitmq"
volumes:
- "./etc/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json"
- "./etc/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json"

redis:
image: "bitnami/redis:5.0"
container_name: "redis"
hostname: "redis"
restart: always
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
ports:
- "6379:6379"
volumes:
- "./data/redis:/bitnami/redis/data"

rebrow:
image: "marian/rebrow"
ports:
- "8082:5001"
15 changes: 10 additions & 5 deletions etc/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module.exports = {
pubsub : {
endpoint : process.env.PUBSUB_ENDPOINT || 'localhost:5672',
login : process.env.PUBSUB_LOGIN || 'test',
password : process.env.PUBSUB_PASSWORD || 'test'
}
rabbitPubsub: {
endpoint: process.env.PUBSUB_RABBIT_ENDPOINT || "localhost:5672",
login: process.env.PUBSUB_RABBIT_LOGIN || "test",
password: process.env.PUBSUB_RABBIT_PASSWORD || "test",
},
redisPubsub: {
port: process.env.PUBSUB_REDIS_PORT || 6379,
host: process.env.PUBSUB_REDIS_HOST || "localhost",
database: process.env.PUBSUB_REDIS_DATABASE || 1,
},
};
14 changes: 0 additions & 14 deletions examples/notificator/notify.js

This file was deleted.

14 changes: 14 additions & 0 deletions examples/notificator/notifyRabbit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRabbit");

async function main() {
let iterator = 0;
await notificator.init();

setInterval(() => {
notificator.notify({ text: `iteration ${iterator}` });
++iterator;
}, 5000);
}

main();
14 changes: 14 additions & 0 deletions examples/notificator/notifyRedis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRedis");

async function main() {
let iterator = 0;
await notificator.init();

setInterval(() => {
notificator.notify({ text: `iteration ${iterator}` });
++iterator;
}, 5000);
}

main();
14 changes: 0 additions & 14 deletions examples/notificator/receive.js

This file was deleted.

14 changes: 14 additions & 0 deletions examples/notificator/receiveRabbit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRabbit");

async function main() {
await notificator.init();

notificator.receive(customMessageHandler);
}

function customMessageHandler(message) {
console.log(`Via notificator received ${JSON.stringify(message)}`);
}

main();
14 changes: 14 additions & 0 deletions examples/notificator/receiveRedis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRedis");

async function main() {
await notificator.init();

notificator.receive(customMessageHandler);
}

function customMessageHandler(message) {
console.log(`Via notificator received ${JSON.stringify(message)}`);
}

main();
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
const notificator = require('./notificatorSingletone');
const notificator = require("./notificatorSingletonRabbit");

module.exports = notificator;
module.exports = notificator;
66 changes: 33 additions & 33 deletions lib/Notificator.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
class Notificator {
constructor(args) {
this.pubsub = args.pubsub;
this.isInited = false;
}
constructor(args) {
this.pubsub = args.pubsub;
this.isInitialized = false;
}

async init() {
if (this.isInited) return;
try {
console.info('Notificator initialization started...');
await this.pubsub.connect();
await this.pubsub.createChannel('notifications');
this.isInited = true;
console.info('Notificator initialization completed.');
} catch (error) {
console.error('Notificator initialization failend.');
console.error(error.message);
}
async init() {
if (this.isInitialized) return;
try {
console.info("Notificator initialization started...");
await this.pubsub.connect();
await this.pubsub.createChannel("notifications");
this.isInitialized = true;
console.info("Notificator initialization completed.");
} catch (error) {
console.error("Notificator initialization failend.");
console.error(error.message);
}
}

notify(message) {
if (!this.isInited) {
console.warn('Can not notify. Notificator not inited');
notify(message) {
if (!this.isInitialized) {
console.warn("Can not notify. Notificator not inited");

return;
}
try {
this.pubsub.publish('notifications', message);
} catch (error) {
console.error('Failed to notify');
console.error(error.message);
}
return;
}
try {
this.pubsub.publish("notifications", message);
} catch (error) {
console.error("Failed to notify");
console.error(error.message);
}
}

receive(messageHandler) {
if (!this.isInited) {
console.warn('Can not receive. Notificator not inited');
receive(messageHandler) {
if (!this.isInitialized) {
console.warn("Can not receive. Notificator not inited");

return;
}
this.pubsub.subscribe('notifications', messageHandler);
return;
}
this.pubsub.subscribe("notifications", messageHandler);
}
}

module.exports = Notificator;
157 changes: 157 additions & 0 deletions lib/drivers/Redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
const redis = require("redis");
const { formatMessage, parseMessage } = require("../utils");
const PubSubDriverInterface = require("./PubSubDriverInterface");

class Redis extends PubSubDriverInterface {
constructor(args) {
super(args);
if (!args.port) throw new Error('"port" is required');
if (!args.host) throw new Error('"host" is required');
if (!args.database) throw new Error('"database" is required');
this.isReconnecting = false;
this.port = args.port;
this.host = args.host;
this.database = args.database;
}

async connect() {
return new Promise(async (resolve) => {
try {
this.connection = await new Promise((res) => {
const subscriber = redis.createClient(this.port, this.host, {
db: this.database,
});
const publisher = redis.createClient(this.port, this.host, {
db: this.database,
});
res({ subscriber, publisher });
});
} catch (error) {
console.error(`Failed to connect to ${this.host}:${this.port}`);
await new Promise((res) => setTimeout(() => res(), 5000));
console.info("Trying to reconnect...");

return this.connect();
}

this.connection.subscriber.on("error", (error) => {
console.error(error);
});

this.connection.publisher.on("error", (error) => {
console.error(error);
});

this.connection.subscriber.on("ready", () => {
console.info(
`Subscriber connected to Redis on ${this.host}:${this.port}`
);
});

this.connection.publisher.on("ready", () => {
console.info(
`Publisher connected to Redis on ${this.host}:${this.port}`
);
});

if (this.isReconnecting) {
await this._recreateChannels();
await this._reassignHandlers();
console.info("Reconnected successfully.");
this.isReconnecting = false;
}

resolve(this.connection);
});
}

async _recreateChannels() {
console.info("Recreating channels...");
for (const channelName in this.channels) {
if (!this.channels[channelName]) continue;
await this.createChannel(channelName);
}
console.info("Recreating channels completed.");
}

_reassignHandlers() {
console.info("Reassigning handlers...");
for (const channelName in this.handlers) {
if (!this.handlers[channelName]) continue;
console.info(`For channel: "${channelName}"`);
for (const handler of this.handlers[channelName]) {
console.info(`Subscribing for handler: "${handler.name}"`);
this.subscribe(channelName, handler, true);
}
}
console.info("Reassign handlers completed.");
}

async createChannel(channelName) {
this.channels[channelName] = await new Promise((res, rej) => {
this.connection.subscriber.on("subscribe", function (channel, count) {
console.info(`Created channel "${channelName}"`);
res(channel);
});

this.connection.subscriber.subscribe(channelName);
});

if (!this.handlers[channelName]) this.handlers[channelName] = [];

return this.channels[channelName];
}

publish(exchange, message) {
try {
const formattedMessage = formatMessage(message);

console.info(
`Publishing message '${formattedMessage.slice(
0,
40
)}...' to channel "${exchange}"`
);
if (!this.channels[exchange])
throw Error(`Channel for exchange ${exchange} not exists`);
this.connection.publisher.publish(
exchange,
Buffer.from(formattedMessage)
);
} catch (error) {
console.log(error);
throw error;
}
}

subscribe(exchange, messageHandler, isReconnecting = false) {
console.log("subscribe()");
if (!this.channels[exchange])
throw Error(`Channel for queue ${exchange} not exists`);

this.connection.subscriber.on("message", (channel, message) => {
this._messageHandler({ channel, message }, messageHandler);
});

if (!isReconnecting) this.handlers[exchange].push(messageHandler);
}

close() {
console.log("close()");
this.connection.subscriber.end(true);
this.connection.publisher.end(true);
console.info("Closed connection.");
}

_messageHandler({ channel, message }, messageHandler) {
const messageString = message.toString();

console.info(
` [x] Received on channel ${channel}: "${messageString.slice(0, 40)}...`
);
if (typeof messageHandler === "function")
messageHandler(parseMessage(messageString));
}
}

module.exports = Redis;
Loading