Skip to content

Commit aa5d177

Browse files
fix sharded test
1 parent b015b34 commit aa5d177

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

lib/adapter.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ class RedisStreamsAdapter extends ClusterAdapter {
276276
readonly #opts: Required<RedisStreamsAdapterOptions>;
277277
readonly #streamName: string;
278278
readonly #publicChannel: string;
279-
readonly #privateChannel: string;
280279

281280
constructor(
282281
nsp: any,
@@ -291,12 +290,12 @@ class RedisStreamsAdapter extends ClusterAdapter {
291290
this.#streamName = computeStreamName(nsp.name, opts);
292291

293292
this.#publicChannel = `${opts.channelPrefix}#${nsp.name}#`;
294-
this.#privateChannel = `${opts.channelPrefix}#${nsp.name}#${this.uid}#`;
293+
const privateChannel = `${opts.channelPrefix}#${nsp.name}#${this.uid}#`;
295294

296295
subClientPromise.then((subClient) => {
297296
(this.#opts.useShardedPubSub ? SSUBSCRIBE : SUBSCRIBE)(
298297
subClient,
299-
[this.#publicChannel, this.#privateChannel],
298+
[this.#publicChannel, privateChannel],
300299
(payload: Buffer) => {
301300
try {
302301
const message = decode(payload) as ClusterMessage;

lib/util.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,14 @@ export function SSUBSCRIBE(
246246
listener: (payload: Buffer) => void
247247
) {
248248
if (isRedisV4Client(subClient)) {
249-
subClient.sSubscribe(channels, listener, RETURN_BUFFERS);
249+
// note: we could also have used a hash tag ({...}) to ensure the channels are mapped to the same slot
250+
for (const channel of channels) {
251+
subClient.sSubscribe(channel, listener, RETURN_BUFFERS);
252+
}
250253
} else {
251-
subClient.ssubscribe(channels);
254+
for (const channel of channels) {
255+
subClient.ssubscribe(channel);
256+
}
252257
subClient.on("smessageBuffer", (channel: Buffer, payload: Buffer) => {
253258
if (channels.includes(channel.toString())) {
254259
listener(payload);

0 commit comments

Comments
 (0)