Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interceptor for pre and post work around commands #5

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added prometheus metrics
anaskhan96 committed Nov 29, 2023
commit 2c37d66d70270a6e7a36198bd9ec15d5cf5a431b
15 changes: 14 additions & 1 deletion lib/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="node" />
import { Redis as _Redis, Cluster } from "ioredis";
import EventEmitter from "events";
import { Registry } from "prom-client";
interface RedisConfig {
/** provide host ip/url, default - localhost */
host?: string;
@@ -46,12 +47,24 @@ declare class Redis {
config: RedisConfig;
client: Cluster | _Redis;
commandTimeout?: number;
metrics?: {
register: Registry;
labels: {
[key: string]: string;
};
};
/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
* @param {Registry} metrics - prometheus client
*/
constructor(name: string, emitter: EventEmitter, config: RedisConfig);
constructor(name: string, emitter: EventEmitter, config: RedisConfig, metrics?: {
register: Registry;
labels: {
[key: string]: string;
};
});
log(message: string, data: unknown): void;
success(message: string, data: unknown): void;
error(err: Error, data: unknown): void;
20 changes: 17 additions & 3 deletions lib/index.js
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -24,7 +24,8 @@
"homepage": "https://github.com/akshendra/redis-wrapper#readme",
"dependencies": {
"@ioredis/commands": "^1.2.0",
"ioredis": "5.3.2"
"ioredis": "5.3.2",
"prom-client": "^15.0.0"
},
"devDependencies": {
"@types/node": "^16.11.7",
176 changes: 129 additions & 47 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import IoRedis, {
} from "ioredis";
import EventEmitter from "events";
import { exists as isCommand } from "@ioredis/commands";
import { Registry, Counter, Histogram } from "prom-client";
import { performance } from "perf_hooks";

function retryStrategy(times: number): number {
if (times > 1000) {
@@ -69,16 +71,60 @@ class Redis {
config: RedisConfig;
client: Cluster | _Redis;
commandTimeout?: number;
metrics?: {
register: Registry;
labels: { [key: string]: string };
};
trackers?: { [key: string]: Counter | Histogram };

/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
* @param {Registry} metrics - prometheus client
*/
constructor(name: string, emitter: EventEmitter, config: RedisConfig) {
constructor(
name: string,
emitter: EventEmitter,
config: RedisConfig,
metrics?: {
register: Registry;
labels: { [key: string]: string };
}
) {
this.name = name;
this.emitter = emitter;
this.commandTimeout = config.commandTimeout;
this.metrics = metrics;
this.trackers = {};

if (this.metrics) {
// register counters
// create counter for tracking the number of times redis commands are called
this.trackers["commands"] = new Counter({
name: "redis_command_counter",
help: "keep track of all redis commands",
labelNames: [...Object.keys(this.metrics.labels), "command"],
registers: [this.metrics.register],
});

// create counter for tracking the number of times redis commands have failed
this.trackers["errors"] = new Counter({
name: "redis_command_error_counter",
help: "keep track of all redis command errors",
labelNames: [...Object.keys(this.metrics.labels), "command"],
registers: [this.metrics.register],
});

// create histogram for tracking latencies of redis commands
this.trackers["latencies"] = new Histogram({
name: "redis_command_latency",
help: "keep track of redis command latencies",
labelNames: [...Object.keys(this.metrics.labels), "command"],
registers: [this.metrics.register],
});
}

this.config = Object.assign(
{
host: "localhost",
@@ -140,6 +186,87 @@ class Redis {
return error;
}

makeProxy() {
return new Proxy(this.client, {
get: (target, prop) => {
if (isCommand(String(prop))) {
// check if client in ready state
if (this.client.status !== "ready") {
throw this.makeError("redis.NOT_READY", {
command: prop,
});
}

// check if cluster and command timeout is set
let promiseTimeout;
if (this.client.isCluster && this.commandTimeout) {
promiseTimeout = (ms) =>
new Promise((_, reject) =>
setTimeout(() => {
reject(
this.makeError("redis.COMMAND_TIMEOUT", {
command: prop,
timeout: ms,
})
);
}, ms)
);
}

return async (...args) => {
const startTime = performance.now();
try {
const promises = [];
promises.push(target[prop](...args));
(this.trackers["commands"] as Counter).inc(
{
...this.metrics.labels,
command: String(prop),
},
1
);
if (promiseTimeout) {
promises.push(promiseTimeout(this.commandTimeout));
}
const result = await Promise.race(promises);
const endTime = performance.now();
(this.trackers["latencies"] as Histogram).observe(
{
...this.metrics.labels,
command: String(prop),
},
endTime - startTime
);
return result;
} catch (err) {
const endTime = performance.now();
(this.trackers["latencies"] as Histogram).observe(
{
...this.metrics.labels,
command: String(prop),
},
endTime - startTime
);
(this.trackers["errors"] as Counter).inc(
{
...this.metrics.labels,
command: String(prop),
},
1
);
throw this.makeError("redis.COMMAND_ERROR", {
command: prop,
args,
error: err,
});
}
};
}
return target[prop];
},
});
}

/**
* Connect to redis server with the config
*
@@ -245,52 +372,7 @@ class Redis {

this.log(`Connecting in ${infoObj.mode} mode`, infoObj);

client = new Proxy(client, {
get: (target, prop) => {
if (isCommand(String(prop))) {
// check if client in ready state
if (this.client.status !== "ready") {
throw this.makeError("redis.NOT_READY", {
command: prop,
});
}

// check if cluster and command timeout is set
let promiseTimeout;
if (this.client.isCluster && this.commandTimeout) {
promiseTimeout = (ms) =>
new Promise((_, reject) =>
setTimeout(() => {
reject(
this.makeError("redis.COMMAND_TIMEOUT", {
command: prop,
timeout: ms,
})
);
}, ms)
);
}

return async (...args) => {
try {
const promises = [];
promises.push(target[prop](...args));
if (promiseTimeout) {
promises.push(promiseTimeout(this.commandTimeout));
}
return await Promise.race(promises);
} catch (err) {
throw this.makeError("redis.COMMAND_ERROR", {
command: prop,
args,
error: err,
});
}
};
}
return target[prop];
},
});
client = this.makeProxy();

// common events
client.on("connect", () => {