diff --git a/pubsub.go b/pubsub.go index 2a0e7a81e..589d19b8e 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package redis import ( "context" "fmt" + "sort" "strings" "sync" "time" @@ -574,6 +575,9 @@ type channel struct { chanSize int chanSendTimeout time.Duration checkInterval time.Duration + + subscriptions map[string]struct{} + subscriptionsMu sync.RWMutex } func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { @@ -583,6 +587,8 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { chanSize: 100, chanSendTimeout: time.Minute, checkInterval: 3 * time.Second, + + subscriptions: make(map[string]struct{}), } for _, opt := range opts { opt(c) @@ -621,6 +627,20 @@ func (c *channel) initHealthCheck() { }() } +// Helper function to format subscription information +func (c *channel) getSubscriptionInfo() string { + if len(c.subscriptions) == 0 { + return "none" + } + + subs := make([]string, 0, len(c.subscriptions)) + for sub := range c.subscriptions { + subs = append(subs, sub) + } + sort.Strings(subs) // Sort for consistent output + return strings.Join(subs, ", ") +} + // initMsgChan must be in sync with initAllChan. func (c *channel) initMsgChan() { ctx := context.TODO() @@ -666,9 +686,13 @@ func (c *channel) initMsgChan() { <-timer.C } case <-timer.C: + c.subscriptionsMu.RLock() + subInfo := c.getSubscriptionInfo() + c.subscriptionsMu.RUnlock() + internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", - c, c.chanSendTimeout) + ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s", + c.chanSendTimeout, subInfo) } default: internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) @@ -720,9 +744,13 @@ func (c *channel) initAllChan() { <-timer.C } case <-timer.C: + c.subscriptionsMu.RLock() + subInfo := c.getSubscriptionInfo() + c.subscriptionsMu.RUnlock() + internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", - c, c.chanSendTimeout) + ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s", + c.chanSendTimeout, subInfo) } default: internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)