Skip to content

Commit b6e712b

Browse files
committed
feat: add proactive push notification processing to WithReader
- Add push notification processing to Conn.WithReader method - Process notifications immediately before every read operation - Provides proactive notification handling vs reactive processing - Add proper error handling with internal.Logger - Non-blocking implementation that doesn't break Redis operations - Complements existing processing in Pool.Put and isHealthyConn Benefits: - Immediate processing when notifications arrive - Called before every read operation for optimal timing - Prevents notification backlog accumulation - More responsive to Redis cluster changes - Better user experience during migrations - Optimal placement for catching asynchronous notifications Implementation: - Type-safe interface assertion for processor - Context-aware error handling with logging - Maintains backward compatibility - Consistent with existing pool patterns - Three-layer processing strategy: WithReader (proactive) + Pool.Put + isHealthyConn (reactive) Use cases: - MOVING/MIGRATING/MIGRATED notifications for slot migrations - FAILING_OVER/FAILED_OVER notifications for failover scenarios - Real-time cluster topology change awareness - Improved connection utilization efficiency
1 parent d820ade commit b6e712b

File tree

3 files changed

+21
-14
lines changed

3 files changed

+21
-14
lines changed

internal/pool/conn.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync/atomic"
88
"time"
99

10+
"github.com/redis/go-redis/v9/internal"
1011
"github.com/redis/go-redis/v9/internal/proto"
1112
"github.com/redis/go-redis/v9/internal/pushnotif"
1213
)
@@ -77,11 +78,23 @@ func (cn *Conn) RemoteAddr() net.Addr {
7778
func (cn *Conn) WithReader(
7879
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
7980
) error {
81+
// Process any pending push notifications before executing the read function
82+
// This ensures push notifications are handled as soon as they arrive
83+
if cn.PushNotificationProcessor != nil {
84+
// Type assert to the processor interface
85+
if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil {
86+
// Log the error but don't fail the read operation
87+
// Push notification processing errors shouldn't break normal Redis operations
88+
internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err)
89+
}
90+
}
91+
8092
if timeout >= 0 {
8193
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
8294
return err
8395
}
8496
}
97+
8598
return fn(cn.rd)
8699
}
87100

redis.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
386386

387387
// for redis-server versions that do not support the HELLO command,
388388
// RESP2 will continue to be used.
389-
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
389+
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
390390
// Authentication successful with HELLO command
391391
} else if !isRedisError(err) {
392392
// When the server responds with the RESP protocol and the result is not a normal
@@ -534,12 +534,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
534534
readReplyFunc = cmd.readRawReply
535535
}
536536
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
537-
// Check for push notifications before reading the command reply
538-
if c.opt.Protocol == 3 {
539-
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
540-
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
541-
}
542-
}
543537
return readReplyFunc(rd)
544538
}); err != nil {
545539
if cmd.readTimeout() == nil {
@@ -813,25 +807,25 @@ func (c *Client) Options() *Options {
813807

814808
// initializePushProcessor initializes the push notification processor for any client type.
815809
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
816-
func initializePushProcessor(opt *Options, useVoidByDefault bool) PushNotificationProcessorInterface {
810+
func initializePushProcessor(opt *Options) PushNotificationProcessorInterface {
817811
// Always use custom processor if provided
818812
if opt.PushNotificationProcessor != nil {
819813
return opt.PushNotificationProcessor
820814
}
821815

822816
// For regular clients, respect the PushNotifications setting
823-
if !useVoidByDefault && opt.PushNotifications {
817+
if opt.PushNotifications {
824818
// Create default processor when push notifications are enabled
825819
return NewPushNotificationProcessor()
826820
}
827821

828-
// Create void processor when push notifications are disabled or for specialized clients
822+
// Create void processor when push notifications are disabled
829823
return NewVoidPushNotificationProcessor()
830824
}
831825

832826
// initializePushProcessor initializes the push notification processor for this client.
833827
func (c *Client) initializePushProcessor() {
834-
c.pushProcessor = initializePushProcessor(c.opt, false)
828+
c.pushProcessor = initializePushProcessor(c.opt)
835829
}
836830

837831
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
@@ -987,7 +981,7 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn
987981

988982
// Initialize push notification processor using shared helper
989983
// Use void processor by default for connections (typically don't need push notifications)
990-
c.pushProcessor = initializePushProcessor(opt, true)
984+
c.pushProcessor = initializePushProcessor(opt)
991985

992986
c.cmdable = c.Process
993987
c.statefulCmdable = c.Process

sentinel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
433433

434434
// Initialize push notification processor using shared helper
435435
// Use void processor by default for failover clients (typically don't need push notifications)
436-
rdb.pushProcessor = initializePushProcessor(opt, true)
436+
rdb.pushProcessor = initializePushProcessor(opt)
437437

438438
connPool = newConnPool(opt, rdb.dialHook)
439439
rdb.connPool = connPool
@@ -503,7 +503,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
503503

504504
// Initialize push notification processor using shared helper
505505
// Use void processor by default for sentinel clients (typically don't need push notifications)
506-
c.pushProcessor = initializePushProcessor(opt, true)
506+
c.pushProcessor = initializePushProcessor(opt)
507507

508508
c.initHooks(hooks{
509509
dial: c.baseClient.dial,

0 commit comments

Comments
 (0)