diff --git a/lib/index.d.ts b/lib/index.d.ts
index 28f176d..66e3c16 100644
--- a/lib/index.d.ts
+++ b/lib/index.d.ts
@@ -1,6 +1,7 @@
///
-import { Redis as _Redis, Cluster } from "ioredis";
+import IoRedis, { Redis as _Redis, Cluster } from "ioredis";
import EventEmitter from "events";
+import { Registry, Counter, Histogram } from "prom-client";
interface RedisConfig {
/** provide host ip/url, default - localhost */
host?: string;
@@ -45,15 +46,43 @@ declare class Redis {
emitter: EventEmitter;
config: RedisConfig;
client: Cluster | _Redis;
+ commandTimeout?: number;
+ metrics?: {
+ register: Registry;
+ labels: {
+ [key: string]: string;
+ };
+ };
+ trackers?: {
+ commands?: Counter;
+ errors?: Counter;
+ latencies?: Histogram;
+ };
/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
+ * @param {Registry} metrics - prometheus client
*/
- constructor(name: string, emitter: EventEmitter, config: RedisConfig);
+ constructor(name: string, emitter: EventEmitter, config: RedisConfig, metrics?: {
+ register: Registry;
+ labels: {
+ [key: string]: string;
+ };
+ });
log(message: string, data: unknown): void;
success(message: string, data: unknown): void;
error(err: Error, data: unknown): void;
+ makeError(message: string, data: unknown): Error;
+ trackCommand(command: string): void;
+ trackErrors(command: string, errorMessage: string): void;
+ trackLatencies(command: string, startTime: number): void;
+ createTimeoutPromise(ms: number, command: string): {
+ timeoutPromise: Promise;
+ clear: () => void;
+ };
+ executeCommand(target: any, prop: any, args: any): Promise;
+ makeProxy(client: Cluster | _Redis): Cluster | IoRedis;
/**
* Connect to redis server with the config
*
diff --git a/lib/index.js b/lib/index.js
index b2ad13d..8eb50f7 100644
--- a/lib/index.js
+++ b/lib/index.js
@@ -23,6 +23,9 @@ var __importStar = (this && this.__importStar) || function (mod) {
return result;
};
const ioredis_1 = __importStar(require("ioredis"));
+const commands_1 = require("@ioredis/commands");
+const prom_client_1 = require("prom-client");
+const perf_hooks_1 = require("perf_hooks");
function retryStrategy(times) {
if (times > 1000) {
// eslint-disable-next-line no-console
@@ -53,14 +56,49 @@ class Redis {
emitter;
config;
client;
+ commandTimeout;
+ metrics;
+ trackers;
/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
+ * @param {Registry} metrics - prometheus client
*/
- constructor(name, emitter, config) {
+ constructor(name, emitter, config, metrics) {
this.name = name;
this.emitter = emitter;
+ this.commandTimeout = config.commandTimeout;
+ this.metrics = metrics;
+ if (this.metrics) {
+ // register counters
+ this.trackers = {};
+ // create counter for tracking the number of times redis commands are called
+ this.trackers.commands = new prom_client_1.Counter({
+ name: `${this.name.replaceAll("-", "_")}:commands`,
+ help: "keep track of all redis commands",
+ labelNames: [...Object.keys(this.metrics.labels), "command"],
+ registers: [this.metrics.register],
+ });
+ // create counter for tracking the number of times redis commands have failed
+ this.trackers.errors = new prom_client_1.Counter({
+ name: `${this.name.replaceAll("-", "_")}:errors`,
+ help: "keep track of all redis command errors",
+ labelNames: [
+ ...Object.keys(this.metrics.labels),
+ "command",
+ "errorMessage",
+ ],
+ registers: [this.metrics.register],
+ });
+ // create histogram for tracking latencies of redis commands
+ this.trackers.latencies = new prom_client_1.Histogram({
+ name: `${this.name.replaceAll("-", "_")}:latencies`,
+ help: "keep track of redis command latencies",
+ labelNames: [...Object.keys(this.metrics.labels), "command"],
+ registers: [this.metrics.register],
+ });
+ }
this.config = Object.assign({
host: "localhost",
port: 6379,
@@ -99,6 +137,101 @@ class Redis {
err,
});
}
+ makeError(message, data) {
+ const error = new Error(message);
+ this.error(error, data);
+ return error;
+ }
+ trackCommand(command) {
+ if (this.trackers?.commands) {
+ this.trackers.commands.inc({
+ ...this.metrics.labels,
+ command,
+ }, 1);
+ }
+ }
+ trackErrors(command, errorMessage) {
+ if (this.trackers?.errors) {
+ this.trackers.errors.inc({
+ ...this.metrics.labels,
+ command,
+ errorMessage,
+ }, 1);
+ }
+ }
+ trackLatencies(command, startTime) {
+ if (this.trackers?.latencies) {
+ const endTime = perf_hooks_1.performance.now();
+ this.trackers.latencies.observe({
+ ...this.metrics.labels,
+ command,
+ }, endTime - startTime);
+ }
+ }
+ createTimeoutPromise(ms, command) {
+ let timeoutId;
+ const timeoutPromise = new Promise((_, reject) => {
+ timeoutId = setTimeout(() => {
+ reject(this.makeError("redis.COMMAND_TIMEOUT", {
+ command,
+ timeout: ms,
+ }));
+ }, ms);
+ });
+ return {
+ timeoutPromise,
+ clear: () => {
+ clearTimeout(timeoutId);
+ },
+ };
+ }
+ async executeCommand(target, prop, args) {
+ const startTime = perf_hooks_1.performance.now();
+ try {
+ this.trackCommand(String(prop));
+ const result = await target[prop](...args);
+ this.trackLatencies(String(prop), startTime);
+ return result;
+ }
+ catch (err) {
+ this.trackLatencies(String(prop), startTime);
+ this.trackErrors(String(prop), err.message);
+ throw this.makeError("redis.COMMAND_ERROR", {
+ command: prop,
+ args,
+ error: err,
+ });
+ }
+ }
+ makeProxy(client) {
+ return new Proxy(client, {
+ get: (target, prop) => {
+ // check if a command or not
+ if (!(0, commands_1.exists)(String(prop))) {
+ return target[prop];
+ }
+ // check if client in ready state
+ if (this.client.status !== "ready") {
+ throw this.makeError("redis.NOT_READY", {
+ command: prop,
+ });
+ }
+ return (...args) => {
+ // If timeout is set, apply Promise.race
+ if (this.client.isCluster && this.commandTimeout) {
+ const { timeoutPromise, clear } = this.createTimeoutPromise(this.commandTimeout, String(prop));
+ return Promise.race([
+ this.executeCommand(target, prop, args),
+ timeoutPromise,
+ ]).finally(clear);
+ }
+ else {
+ return this.executeCommand(target, prop, args);
+ }
+ };
+ },
+ });
+ }
/**
* Connect to redis server with the config
*
@@ -198,6 +331,7 @@ class Redis {
// single node finish
}
this.log(`Connecting in ${infoObj.mode} mode`, infoObj);
+ client = this.makeProxy(client);
// common events
client.on("connect", () => {
this.success(`Successfully connected in ${infoObj.mode} mode`, null);
@@ -249,4 +383,4 @@ class Redis {
}
}
module.exports = Redis;
-//# sourceMappingURL=data:application/json;base64,
\ No newline at end of file
+//# sourceMappingURL=data:application/json;base64,
\ No newline at end of file
diff --git a/package.json b/package.json
index 62c0135..2fb0fb0 100644
--- a/package.json
+++ b/package.json
@@ -23,7 +23,9 @@
},
"homepage": "https://github.com/akshendra/redis-wrapper#readme",
"dependencies": {
- "ioredis": "5.3.2"
+ "@ioredis/commands": "^1.2.0",
+ "ioredis": "5.3.2",
+ "prom-client": "^15.0.0"
},
"devDependencies": {
"@types/node": "^16.11.7",
@@ -36,4 +38,4 @@
"eslint-plugin-import": "^2.23.4",
"typescript": "^4.4.4"
}
-}
\ No newline at end of file
+}
diff --git a/src/index.ts b/src/index.ts
index 2900adf..1db290e 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -5,6 +5,9 @@ import IoRedis, {
RedisOptions,
} from "ioredis";
import EventEmitter from "events";
+import { exists as isCommand } from "@ioredis/commands";
+import { Registry, Counter, Histogram } from "prom-client";
+import { performance } from "perf_hooks";
function retryStrategy(times: number): number {
if (times > 1000) {
@@ -67,15 +70,66 @@ class Redis {
emitter: EventEmitter;
config: RedisConfig;
client: Cluster | _Redis;
+ commandTimeout?: number;
+ metrics?: {
+ register: Registry;
+ labels: { [key: string]: string };
+ };
+ trackers?: { commands?: Counter; errors?: Counter; latencies?: Histogram };
/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
+ * @param {Registry} metrics - prometheus client
*/
- constructor(name: string, emitter: EventEmitter, config: RedisConfig) {
+ constructor(
+ name: string,
+ emitter: EventEmitter,
+ config: RedisConfig,
+ metrics?: {
+ register: Registry;
+ labels: { [key: string]: string };
+ }
+ ) {
this.name = name;
this.emitter = emitter;
+ this.commandTimeout = config.commandTimeout;
+ this.metrics = metrics;
+
+ if (this.metrics) {
+ // register counters
+ this.trackers = {};
+
+ // create counter for tracking the number of times redis commands are called
+ this.trackers.commands = new Counter({
+ name: `${this.name.replaceAll("-", "_")}:commands`,
+ help: "keep track of all redis commands",
+ labelNames: [...Object.keys(this.metrics.labels), "command"],
+ registers: [this.metrics.register],
+ });
+
+ // create counter for tracking the number of times redis commands have failed
+ this.trackers.errors = new Counter({
+ name: `${this.name.replaceAll("-", "_")}:errors`,
+ help: "keep track of all redis command errors",
+ labelNames: [
+ ...Object.keys(this.metrics.labels),
+ "command",
+ "errorMessage",
+ ],
+ registers: [this.metrics.register],
+ });
+
+ // create histogram for tracking latencies of redis commands
+ this.trackers.latencies = new Histogram({
+ name: `${this.name.replaceAll("-", "_")}:latencies`,
+ help: "keep track of redis command latencies",
+ labelNames: [...Object.keys(this.metrics.labels), "command"],
+ registers: [this.metrics.register],
+ });
+ }
+
this.config = Object.assign(
{
host: "localhost",
@@ -131,6 +185,128 @@ class Redis {
});
}
+ makeError(message: string, data: unknown): Error {
+ const error = new Error(message);
+ this.error(error, data);
+ return error;
+ }
+
+ trackCommand(command: string): void {
+ if (this.trackers?.commands) {
+ this.trackers.commands.inc(
+ {
+ ...this.metrics.labels,
+ command,
+ },
+ 1
+ );
+ }
+ }
+
+ trackErrors(command: string, errorMessage: string): void {
+ if (this.trackers?.errors) {
+ this.trackers.errors.inc(
+ {
+ ...this.metrics.labels,
+ command,
+ errorMessage,
+ },
+ 1
+ );
+ }
+ }
+
+ trackLatencies(command: string, startTime: number): void {
+ if (this.trackers?.latencies) {
+ const endTime = performance.now();
+ this.trackers.latencies.observe(
+ {
+ ...this.metrics.labels,
+ command,
+ },
+ endTime - startTime
+ );
+ }
+ }
+
+ createTimeoutPromise(
+ ms: number,
+ command: string
+ ): {
+ timeoutPromise: Promise;
+ clear: () => void;
+ } {
+ let timeoutId: NodeJS.Timeout;
+ const timeoutPromise = new Promise((_, reject) => {
+ timeoutId = setTimeout(() => {
+ reject(
+ this.makeError("redis.COMMAND_TIMEOUT", {
+ command,
+ timeout: ms,
+ })
+ );
+ }, ms);
+ });
+ return {
+ timeoutPromise,
+ clear: () => {
+ clearTimeout(timeoutId);
+ },
+ };
+ }
+
+ async executeCommand(target, prop, args): Promise {
+ const startTime = performance.now();
+ try {
+ this.trackCommand(String(prop));
+ const result = await target[prop](...args);
+ this.trackLatencies(String(prop), startTime);
+ return result;
+ } catch (err) {
+ this.trackLatencies(String(prop), startTime);
+ this.trackErrors(String(prop), err.message);
+ throw this.makeError("redis.COMMAND_ERROR", {
+ command: prop,
+ args,
+ error: err,
+ });
+ }
+ }
+
+ makeProxy(client: Cluster | _Redis) {
+ return new Proxy(client, {
+ get: (target, prop) => {
+ // check if a command or not
+ if (!isCommand(String(prop))) {
+ return target[prop];
+ }
+
+ // check if client in ready state
+ if (this.client.status !== "ready") {
+ throw this.makeError("redis.NOT_READY", {
+ command: prop,
+ });
+ }
+
+ return (...args: unknown[]): Promise => {
+ // If timeout is set, apply Promise.race
+ if (this.client.isCluster && this.commandTimeout) {
+ const { timeoutPromise, clear } = this.createTimeoutPromise(
+ this.commandTimeout,
+ String(prop)
+ );
+ return Promise.race([
+ this.executeCommand(target, prop, args),
+ timeoutPromise,
+ ]).finally(clear);
+ } else {
+ return this.executeCommand(target, prop, args);
+ }
+ };
+ },
+ });
+ }
+
/**
* Connect to redis server with the config
*
@@ -236,6 +412,8 @@ class Redis {
this.log(`Connecting in ${infoObj.mode} mode`, infoObj);
+ client = this.makeProxy(client);
+
// common events
client.on("connect", () => {
this.success(`Successfully connected in ${infoObj.mode} mode`, null);
diff --git a/tests/single.js b/tests/single.js
index aaee6be..45ab77a 100644
--- a/tests/single.js
+++ b/tests/single.js
@@ -1,65 +1,77 @@
-
-
-const Redis = require('../lib/index');
-const { EventEmitter } = require('events');
+const Redis = require("../lib/index");
+const { EventEmitter } = require("events");
+const promClient = require("prom-client");
const emitter = new EventEmitter();
-emitter.on('log', console.log.bind(console));
-emitter.on('success', console.log.bind(console));
-emitter.on('error', console.error.bind(console));
+emitter.on("log", console.log.bind(console));
+emitter.on("success", console.log.bind(console));
+emitter.on("error", console.error.bind(console));
async function doSome(client) {
for (let i = 0; i < 1000; i += 1) {
console.log(i);
- await client.set(`Key:${i}`, i);
+ // add timeout of 1s
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ try {
+ await client.set(`Key:${i}`, i);
+ const data = await client.get(`Key:${i}`);
+ console.log(`Fetched: ${data}`);
+ } catch (err) {
+ console.log(err);
+ }
}
}
async function doPPL(redis) {
- await redis.ppl([{
- command: 'hset',
- args: [
- 'Map',
- 'one',
- '1',
- ],
- }, {
- command: 'hmset',
- args: [
- 'Map',
- { 'two': 2, 'three': 3 },
- ],
- }, {
- command: 'set',
- args: [
- 'count',
- '3',
- ],
- }]);
- const response = await redis.ppl([{
- command: 'hgetall',
- args: [
- 'Map',
- ],
- }, {
- command: 'get',
- args: [
- 'count',
- ],
- action(val) {
- return {
- count: val,
- };
+ await redis.ppl([
+ {
+ command: "hset",
+ args: ["Map", "one", "1"],
+ },
+ {
+ command: "hmset",
+ args: ["Map", { two: 2, three: 3 }],
+ },
+ {
+ command: "set",
+ args: ["count", "3"],
},
- }]);
+ ]);
+ const response = await redis.ppl([
+ {
+ command: "hgetall",
+ args: ["Map"],
+ },
+ {
+ command: "get",
+ args: ["count"],
+ action(val) {
+ return {
+ count: val,
+ };
+ },
+ },
+ ]);
console.log(JSON.stringify(response, null, 2));
}
-const redis = new Redis('redis', emitter, {
- host: '127.0.0.1',
- port: 6379,
-});
-redis.init()
+const redis = new Redis(
+ "redis",
+ emitter,
+ {
+ host: "127.0.0.1",
+ port: 6379,
+ commandTimeout: 100,
+ },
+ {
+ register: promClient.register,
+ labels: {
+ service: "test-test_test9",
+ },
+ }
+);
+redis
+ .init()
.then(() => {
const client = redis.client;
return doSome(client);
@@ -68,10 +80,10 @@ redis.init()
return doPPL(redis);
})
.then(() => {
- console.log('Started');
+ console.log("Started");
process.exit(0);
})
- .catch(err => {
+ .catch((err) => {
console.error(err);
process.exit(1);
});