6
6
"strings"
7
7
"testing"
8
8
9
+ "github.com/redis/go-redis/v9/internal"
9
10
"github.com/redis/go-redis/v9/internal/proto"
10
11
)
11
12
@@ -40,6 +41,7 @@ func (h *TestHandler) Reset() {
40
41
// TestReaderInterface defines the interface needed for testing
41
42
type TestReaderInterface interface {
42
43
PeekReplyType () (byte , error )
44
+ PeekPushNotificationName () (string , error )
43
45
ReadReply () (interface {}, error )
44
46
}
45
47
@@ -95,6 +97,29 @@ func (m *MockReader) ReadReply() (interface{}, error) {
95
97
return reply , err
96
98
}
97
99
100
+ func (m * MockReader ) PeekPushNotificationName () (string , error ) {
101
+ // return the notification name from the next read reply
102
+ if m .readIndex >= len (m .readReplies ) {
103
+ return "" , io .EOF
104
+ }
105
+ reply := m .readReplies [m .readIndex ]
106
+ if reply == nil {
107
+ return "" , nil
108
+ }
109
+ notification , ok := reply .([]interface {})
110
+ if ! ok {
111
+ return "" , nil
112
+ }
113
+ if len (notification ) == 0 {
114
+ return "" , nil
115
+ }
116
+ name , ok := notification [0 ].(string )
117
+ if ! ok {
118
+ return "" , nil
119
+ }
120
+ return name , nil
121
+ }
122
+
98
123
func (m * MockReader ) Reset () {
99
124
m .readIndex = 0
100
125
m .peekIndex = 0
@@ -119,10 +144,22 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context,
119
144
break
120
145
}
121
146
147
+ notificationName , err := reader .PeekPushNotificationName ()
148
+ if err != nil {
149
+ // Error reading - continue to next iteration
150
+ break
151
+ }
152
+
153
+ // Skip pub/sub messages - they should be handled by the pub/sub system
154
+ if isPubSubMessage (notificationName ) {
155
+ break
156
+ }
157
+
122
158
// Read the push notification
123
159
reply , err := reader .ReadReply ()
124
160
if err != nil {
125
161
// Error reading - continue to next iteration
162
+ internal .Logger .Printf (ctx , "push: error reading push notification: %v" , err )
126
163
continue
127
164
}
128
165
@@ -420,7 +457,7 @@ func TestProcessor(t *testing.T) {
420
457
// Test with mock reader - push notification with ReadReply error
421
458
mockReader = NewMockReader ()
422
459
mockReader .AddPeekReplyType (proto .RespPush , nil )
423
- mockReader .AddReadReply (nil , io .ErrUnexpectedEOF ) // ReadReply fails
460
+ mockReader .AddReadReply (nil , io .ErrUnexpectedEOF ) // ReadReply fails
424
461
mockReader .AddPeekReplyType (proto .RespString , io .EOF ) // No more push notifications
425
462
err = testProcessPendingNotifications (processor , ctx , mockReader )
426
463
if err != nil {
@@ -430,7 +467,7 @@ func TestProcessor(t *testing.T) {
430
467
// Test with mock reader - push notification with invalid reply type
431
468
mockReader = NewMockReader ()
432
469
mockReader .AddPeekReplyType (proto .RespPush , nil )
433
- mockReader .AddReadReply ("not-a-slice" , nil ) // Invalid reply type
470
+ mockReader .AddReadReply ("not-a-slice" , nil ) // Invalid reply type
434
471
mockReader .AddPeekReplyType (proto .RespString , io .EOF ) // No more push notifications
435
472
err = testProcessPendingNotifications (processor , ctx , mockReader )
436
473
if err != nil {
@@ -620,4 +657,112 @@ func TestVoidProcessor(t *testing.T) {
620
657
t .Errorf ("VoidProcessor ProcessPendingNotifications should never error, got: %v" , err )
621
658
}
622
659
})
623
- }
660
+ }
661
+
662
+ // TestIsPubSubMessage tests the isPubSubMessage function
663
+ func TestIsPubSubMessage (t * testing.T ) {
664
+ t .Run ("PubSubMessages" , func (t * testing.T ) {
665
+ pubSubMessages := []string {
666
+ "message" , // Regular pub/sub message
667
+ "pmessage" , // Pattern pub/sub message
668
+ "subscribe" , // Subscription confirmation
669
+ "unsubscribe" , // Unsubscription confirmation
670
+ "psubscribe" , // Pattern subscription confirmation
671
+ "punsubscribe" , // Pattern unsubscription confirmation
672
+ "smessage" , // Sharded pub/sub message (Redis 7.0+)
673
+ }
674
+
675
+ for _ , msgType := range pubSubMessages {
676
+ if ! isPubSubMessage (msgType ) {
677
+ t .Errorf ("isPubSubMessage(%q) should return true" , msgType )
678
+ }
679
+ }
680
+ })
681
+
682
+ t .Run ("NonPubSubMessages" , func (t * testing.T ) {
683
+ nonPubSubMessages := []string {
684
+ "MOVING" , // Cluster slot migration
685
+ "MIGRATING" , // Cluster slot migration
686
+ "MIGRATED" , // Cluster slot migration
687
+ "FAILING_OVER" , // Cluster failover
688
+ "FAILED_OVER" , // Cluster failover
689
+ "unknown" , // Unknown message type
690
+ "" , // Empty string
691
+ "MESSAGE" , // Case sensitive - should not match
692
+ "PMESSAGE" , // Case sensitive - should not match
693
+ }
694
+
695
+ for _ , msgType := range nonPubSubMessages {
696
+ if isPubSubMessage (msgType ) {
697
+ t .Errorf ("isPubSubMessage(%q) should return false" , msgType )
698
+ }
699
+ }
700
+ })
701
+ }
702
+
703
+ // TestPubSubFiltering tests that pub/sub messages are filtered out during processing
704
+ func TestPubSubFiltering (t * testing.T ) {
705
+ t .Run ("PubSubMessagesIgnored" , func (t * testing.T ) {
706
+ processor := NewProcessor ()
707
+ handler := NewTestHandler ("test" , true )
708
+ ctx := context .Background ()
709
+
710
+ // Register a handler for a non-pub/sub notification
711
+ err := processor .RegisterHandler ("MOVING" , handler , false )
712
+ if err != nil {
713
+ t .Fatalf ("Failed to register handler: %v" , err )
714
+ }
715
+
716
+ // Test with mock reader - pub/sub message should be ignored
717
+ mockReader := NewMockReader ()
718
+ mockReader .AddPeekReplyType (proto .RespPush , nil )
719
+ pubSubNotification := []interface {}{"message" , "channel" , "data" }
720
+ mockReader .AddReadReply (pubSubNotification , nil )
721
+ mockReader .AddPeekReplyType (proto .RespString , io .EOF ) // No more push notifications
722
+
723
+ handler .Reset ()
724
+ err = testProcessPendingNotifications (processor , ctx , mockReader )
725
+ if err != nil {
726
+ t .Errorf ("ProcessPendingNotifications should handle pub/sub messages gracefully, got: %v" , err )
727
+ }
728
+
729
+ // Check that handler was NOT called for pub/sub message
730
+ handled := handler .GetHandledNotifications ()
731
+ if len (handled ) != 0 {
732
+ t .Errorf ("Expected 0 handled notifications for pub/sub message, got: %d" , len (handled ))
733
+ }
734
+ })
735
+
736
+ t .Run ("NonPubSubMessagesProcessed" , func (t * testing.T ) {
737
+ processor := NewProcessor ()
738
+ handler := NewTestHandler ("test" , true )
739
+ ctx := context .Background ()
740
+
741
+ // Register a handler for a non-pub/sub notification
742
+ err := processor .RegisterHandler ("MOVING" , handler , false )
743
+ if err != nil {
744
+ t .Fatalf ("Failed to register handler: %v" , err )
745
+ }
746
+
747
+ // Test with mock reader - non-pub/sub message should be processed
748
+ mockReader := NewMockReader ()
749
+ mockReader .AddPeekReplyType (proto .RespPush , nil )
750
+ clusterNotification := []interface {}{"MOVING" , "slot" , "12345" }
751
+ mockReader .AddReadReply (clusterNotification , nil )
752
+ mockReader .AddPeekReplyType (proto .RespString , io .EOF ) // No more push notifications
753
+
754
+ handler .Reset ()
755
+ err = testProcessPendingNotifications (processor , ctx , mockReader )
756
+ if err != nil {
757
+ t .Errorf ("ProcessPendingNotifications should handle cluster notifications, got: %v" , err )
758
+ }
759
+
760
+ // Check that handler WAS called for cluster notification
761
+ handled := handler .GetHandledNotifications ()
762
+ if len (handled ) != 1 {
763
+ t .Errorf ("Expected 1 handled notification for cluster message, got: %d" , len (handled ))
764
+ } else if len (handled [0 ]) != 3 || handled [0 ][0 ] != "MOVING" {
765
+ t .Errorf ("Expected MOVING notification, got: %v" , handled [0 ])
766
+ }
767
+ })
768
+ }
0 commit comments