Skip to content

[wip] resp3 notification handlers #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync/atomic"
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
)

var noDeadline = time.Time{}
Expand All @@ -25,6 +27,10 @@ type Conn struct {
createdAt time.Time

onClose func() error

// Push notification processor for handling push notifications on this connection
// This is set when the connection is created and is a reference to the processor
PushNotificationProcessor pushnotif.ProcessorInterface
}

func NewConn(netConn net.Conn) *Conn {
Expand Down Expand Up @@ -72,11 +78,23 @@ func (cn *Conn) RemoteAddr() net.Addr {
func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
// Process any pending push notifications before executing the read function
// This ensures push notifications are handled as soon as they arrive
if cn.PushNotificationProcessor != nil {
// Type assert to the processor interface
if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil {
// Log the error but don't fail the read operation
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err)
}
}

if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
}

return fn(cn.rd)
}

Expand Down
55 changes: 50 additions & 5 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pushnotif"
)

var (
Expand Down Expand Up @@ -71,6 +72,13 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// Push notification processor for connections
// This is an interface to avoid circular imports
PushNotificationProcessor pushnotif.ProcessorInterface

// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without)
Protocol int
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -228,6 +236,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {

cn := NewConn(netConn)
cn.pooled = pooled

// Set push notification processor if available
if p.cfg.PushNotificationProcessor != nil {
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
}

return cn, nil
}

Expand Down Expand Up @@ -377,9 +391,24 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
// Check if this might be push notification data
if cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 {
// Only process for RESP3 clients (push notifications only available in RESP3)
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
if err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
}
// Check again if there's still unread data after processing push notifications
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
p.Remove(ctx, cn, BadConnError{})
return
}
} else {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
}

if !cn.pooled {
Expand Down Expand Up @@ -523,8 +552,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
return false
}

if connCheck(cn.netConn) != nil {
return false
// Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data and we have push notification support,
// it might be push notifications (only for RESP3)
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 {
// Try to process any pending push notifications (only for RESP3)
ctx := context.Background()
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)
return false
}
// Check again after processing push notifications
if connCheck(cn.netConn) != nil {
return false
}
} else {
return false
}
}

cn.SetUsedAt(now)
Expand Down
21 changes: 21 additions & 0 deletions internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ func (r *Reader) PeekReplyType() (byte, error) {
return b[0], nil
}

func (r *Reader) PeekPushNotificationName() (string, error) {
// peek 32 bytes, should be enough to read the push notification name
Copy link
Preview

Copilot AI Jun 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider verifying that a fixed peek size of 32 bytes is always sufficient for push notification names, or adjust the buffer size to avoid potential truncation issues.

Copilot uses AI. Check for mistakes.

buf, err := r.rd.Peek(32)
if err != nil {
return "", err
}
if buf[0] != RespPush {
return "", fmt.Errorf("redis: can't parse push notification: %q", buf)
}
// remove push notification type and length
nextLine := buf[2:]
for i := 1; i < len(buf); i++ {
if buf[i] == '\r' && buf[i+1] == '\n' {
nextLine = buf[i+2:]
break
}
}
// return notification name or error
return r.readStringReply(nextLine)
}

// ReadLine Return a valid reply, it will check the protocol or redis error,
// and discard the attribute type.
func (r *Reader) ReadLine() ([]byte, error) {
Expand Down
186 changes: 186 additions & 0 deletions internal/pushnotif/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package pushnotif

import (
"context"
"fmt"

"github.com/redis/go-redis/v9/internal/proto"
)

// Processor handles push notifications with a registry of handlers.
type Processor struct {
registry *Registry
}

// NewProcessor creates a new push notification processor.
func NewProcessor() *Processor {
return &Processor{
registry: NewRegistry(),
}
}

// GetHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name.
func (p *Processor) GetHandler(pushNotificationName string) Handler {
return p.registry.GetHandler(pushNotificationName)
}

// RegisterHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (p *Processor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error {
return p.registry.RegisterHandler(pushNotificationName, handler, protected)
}

// UnregisterHandler removes a handler for a specific push notification name.
// Returns an error if the handler is protected or doesn't exist.
func (p *Processor) UnregisterHandler(pushNotificationName string) error {
return p.registry.UnregisterHandler(pushNotificationName)
}

// ProcessPendingNotifications checks for and processes any pending push notifications.
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
// Check for nil reader
if rd == nil {
return nil
}

// Check if there are any buffered bytes that might contain push notifications
if rd.Buffered() == 0 {
return nil
}

// Process all available push notifications
for {
// Peek at the next reply type to see if it's a push notification
replyType, err := rd.PeekReplyType()
if err != nil {
// No more data available or error reading
break
}

// Push notifications use RespPush type in RESP3
if replyType != proto.RespPush {
break
}

notificationName, err := rd.PeekPushNotificationName()
if err != nil {
// Error reading - continue to next iteration
break
}

// Skip notifications that should be handled by other systems
if shouldSkipNotification(notificationName) {
break
}

// Try to read the push notification
reply, err := rd.ReadReply()
if err != nil {
return fmt.Errorf("failed to read push notification: %w", err)
}

// Convert to slice of interfaces
notification, ok := reply.([]interface{})
if !ok {
continue
}

// Handle the notification directly
if len(notification) > 0 {
// Extract the notification type (first element)
if notificationType, ok := notification[0].(string); ok {
// Skip notifications that should be handled by other systems
if shouldSkipNotification(notificationType) {
continue
}

// Get the handler for this notification type
if handler := p.registry.GetHandler(notificationType); handler != nil {
// Handle the notification
handler.HandlePushNotification(ctx, notification)
}
}
}
}

return nil
}

// shouldSkipNotification checks if a notification type should be ignored by the push notification
// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.).
func shouldSkipNotification(notificationType string) bool {
switch notificationType {
// Pub/Sub notifications - handled by pub/sub system
case "message", // Regular pub/sub message
"pmessage", // Pattern pub/sub message
"subscribe", // Subscription confirmation
"unsubscribe", // Unsubscription confirmation
"psubscribe", // Pattern subscription confirmation
"punsubscribe", // Pattern unsubscription confirmation
"smessage", // Sharded pub/sub message (Redis 7.0+)
"ssubscribe", // Sharded subscription confirmation
"sunsubscribe", // Sharded unsubscription confirmation

// Stream notifications - handled by stream consumers
"xread-from", // Stream reading notifications
"xreadgroup-from", // Stream consumer group notifications

// Client tracking notifications - handled by client tracking system
"invalidate", // Client-side caching invalidation

// Keyspace notifications - handled by keyspace notification subscribers
// Note: Keyspace notifications typically have prefixes like "__keyspace@0__:" or "__keyevent@0__:"
// but we'll handle the base notification types here
"expired", // Key expiration events
"evicted", // Key eviction events
"set", // Key set events
"del", // Key deletion events
"rename", // Key rename events
"move", // Key move events
"copy", // Key copy events
"restore", // Key restore events
"sort", // Sort operation events
"flushdb", // Database flush events
"flushall": // All databases flush events
return true
default:
return false
}
}

// VoidProcessor discards all push notifications without processing them.
type VoidProcessor struct{}

// NewVoidProcessor creates a new void push notification processor.
func NewVoidProcessor() *VoidProcessor {
return &VoidProcessor{}
}

// GetHandler returns nil for void processor since it doesn't maintain handlers.
func (v *VoidProcessor) GetHandler(pushNotificationName string) Handler {
return nil
}

// RegisterHandler returns an error for void processor since it doesn't maintain handlers.
// This helps developers identify when they're trying to register handlers on disabled push notifications.
func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error {
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}

// UnregisterHandler returns an error for void processor since it doesn't maintain handlers.
// This helps developers identify when they're trying to unregister handlers on disabled push notifications.
func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}

// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
// are only available in RESP3 and this processor is used when they're disabled.
// This avoids unnecessary buffer scanning overhead.
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
// VoidProcessor is used when push notifications are disabled (typically RESP2 or disabled RESP3).
// Since push notifications only exist in RESP3, we can safely skip all processing
// to avoid unnecessary buffer scanning overhead.
return nil
}
Loading
Loading