Skip to content

Commit 61e5e7e

Browse files
committed
Implement Redis Streams for persistent notification delivery
- Add RedisStreams implementation with retry logic and error handling - Support configurable batch processing and connection resilience - Add comprehensive unit tests for functionality and error scenarios - Update configuration to support both "redis" and "redis-streams" options - Maintain backwards compatibility with existing Redis pub/sub - Add configuration parameters for tuning Redis Streams behavior
1 parent 8835bef commit 61e5e7e

File tree

4 files changed

+144
-146
lines changed

4 files changed

+144
-146
lines changed

pkg/syncer/pubsub/redis_streams.go

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"github.com/go-redis/redis/v8"
2929
"github.com/rs/zerolog/log"
30-
30+
3131
"github.com/optimizely/agent/pkg/metrics"
3232
)
3333

@@ -37,7 +37,7 @@ type RedisStreams struct {
3737
Password string
3838
Database int
3939
// Stream configuration
40-
MaxLen int64
40+
MaxLen int64
4141
ConsumerGroup string
4242
ConsumerName string
4343
// Batching configuration
@@ -55,7 +55,7 @@ type RedisStreams struct {
5555

5656
func (r *RedisStreams) Publish(ctx context.Context, channel string, message interface{}) error {
5757
streamName := r.getStreamName(channel)
58-
58+
5959
// Convert message to string for consistent handling
6060
var messageStr string
6161
switch v := message.(type) {
@@ -71,7 +71,7 @@ func (r *RedisStreams) Publish(ctx context.Context, channel string, message inte
7171
}
7272
messageStr = string(jsonBytes)
7373
}
74-
74+
7575
// Add message to stream with automatic ID generation
7676
args := &redis.XAddArgs{
7777
Stream: streamName,
@@ -144,34 +144,34 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
144144
Consumer: consumerName,
145145
Streams: []string{streamName, ">"},
146146
Count: int64(batchSize - len(batch)), // Read up to remaining batch size
147-
Block: 100 * time.Millisecond, // Short block to allow flush checking
147+
Block: 100 * time.Millisecond, // Short block to allow flush checking
148148
}).Result()
149149

150150
if err != nil {
151151
if err == redis.Nil {
152152
continue // No messages, continue polling
153153
}
154-
154+
155155
// Handle connection errors with exponential backoff reconnection
156156
if r.isConnectionError(err) {
157157
r.incrementCounter("connection.error")
158158
log.Warn().Err(err).Msg("Redis connection error, attempting reconnection")
159-
159+
160160
// Apply exponential backoff for reconnection
161161
if time.Since(lastReconnect) > reconnectDelay {
162162
r.incrementCounter("connection.reconnect_attempt")
163163
client.Close()
164164
client = r.createClient()
165165
lastReconnect = time.Now()
166-
166+
167167
// Recreate consumer group after reconnection
168168
if groupErr := r.createConsumerGroupWithRetry(ctx, client, streamName, consumerGroup); groupErr != nil {
169169
r.incrementCounter("connection.group_recreate_error")
170170
log.Error().Err(groupErr).Msg("Failed to recreate consumer group after reconnection")
171171
} else {
172172
r.incrementCounter("connection.reconnect_success")
173173
}
174-
174+
175175
// Increase reconnect delay with exponential backoff
176176
reconnectDelay = time.Duration(math.Min(float64(reconnectDelay*2), float64(maxReconnectDelay)))
177177
} else {
@@ -197,12 +197,12 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
197197
if data, ok := message.Values["data"].(string); ok {
198198
batch = append(batch, data)
199199
messageCount++
200-
200+
201201
// Acknowledge the message with retry
202202
if ackErr := r.acknowledgeMessage(ctx, client, streamName, consumerGroup, message.ID); ackErr != nil {
203203
log.Warn().Err(ackErr).Str("messageID", message.ID).Msg("Failed to acknowledge message")
204204
}
205-
205+
206206
// Send batch if it's full
207207
if len(batch) >= batchSize {
208208
r.incrementCounter("batch.sent")
@@ -213,7 +213,7 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
213213
}
214214
}
215215
}
216-
216+
217217
// Track successful message reads
218218
if messageCount > 0 {
219219
r.incrementCounter("messages.read")
@@ -237,7 +237,6 @@ func (r *RedisStreams) sendBatch(ch chan string, batch []string, ctx context.Con
237237
}
238238
}
239239

240-
241240
// Helper methods
242241
func (r *RedisStreams) getStreamName(channel string) string {
243242
return fmt.Sprintf("stream:%s", channel)
@@ -318,13 +317,13 @@ func (r *RedisStreams) executeWithRetry(ctx context.Context, operation func(clie
318317
maxRetries := r.getMaxRetries()
319318
retryDelay := r.getRetryDelay()
320319
maxRetryDelay := r.getMaxRetryDelay()
321-
320+
322321
var lastErr error
323322
for attempt := 0; attempt <= maxRetries; attempt++ {
324323
client := r.createClient()
325324
err := operation(client)
326325
client.Close()
327-
326+
328327
if err == nil {
329328
// Record successful operation metrics
330329
r.incrementCounter("operations.success")
@@ -334,22 +333,22 @@ func (r *RedisStreams) executeWithRetry(ctx context.Context, operation func(clie
334333
}
335334
return nil // Success
336335
}
337-
336+
338337
lastErr = err
339338
r.incrementCounter("operations.error")
340-
339+
341340
// Don't retry on non-recoverable errors
342341
if !r.isRetryableError(err) {
343342
r.incrementCounter("errors.non_retryable")
344343
return fmt.Errorf("non-retryable error: %w", err)
345344
}
346-
345+
347346
// Don't sleep after the last attempt
348347
if attempt < maxRetries {
349348
r.incrementCounter("retries.attempt")
350349
// Calculate delay with exponential backoff
351350
delay := time.Duration(math.Min(float64(retryDelay)*math.Pow(2, float64(attempt)), float64(maxRetryDelay)))
352-
351+
353352
select {
354353
case <-ctx.Done():
355354
r.incrementCounter("operations.canceled")
@@ -359,7 +358,7 @@ func (r *RedisStreams) executeWithRetry(ctx context.Context, operation func(clie
359358
}
360359
}
361360
}
362-
361+
363362
r.incrementCounter("retries.exhausted")
364363
return fmt.Errorf("operation failed after %d retries: %w", maxRetries, lastErr)
365364
}
@@ -379,7 +378,7 @@ func (r *RedisStreams) createConsumerGroupWithRetry(ctx context.Context, _ *redi
379378
func (r *RedisStreams) acknowledgeMessage(ctx context.Context, client *redis.Client, streamName, consumerGroup, messageID string) error {
380379
maxRetries := 2 // Fewer retries for ACK operations
381380
retryDelay := 50 * time.Millisecond
382-
381+
383382
var lastErr error
384383
for attempt := 0; attempt <= maxRetries; attempt++ {
385384
err := client.XAck(ctx, streamName, consumerGroup, messageID).Err()
@@ -390,16 +389,16 @@ func (r *RedisStreams) acknowledgeMessage(ctx context.Context, client *redis.Cli
390389
}
391390
return nil // Success
392391
}
393-
392+
394393
lastErr = err
395394
r.incrementCounter("ack.error")
396-
395+
397396
// Don't retry on non-recoverable errors
398397
if !r.isRetryableError(err) {
399398
r.incrementCounter("ack.non_retryable_error")
400399
return fmt.Errorf("non-retryable ACK error: %w", err)
401400
}
402-
401+
403402
// Don't sleep after the last attempt
404403
if attempt < maxRetries {
405404
r.incrementCounter("ack.retry_attempt")
@@ -411,7 +410,7 @@ func (r *RedisStreams) acknowledgeMessage(ctx context.Context, client *redis.Cli
411410
}
412411
}
413412
}
414-
413+
415414
r.incrementCounter("ack.retry_exhausted")
416415
return fmt.Errorf("ACK failed after %d retries: %w", maxRetries, lastErr)
417416
}
@@ -421,9 +420,9 @@ func (r *RedisStreams) isRetryableError(err error) bool {
421420
if err == nil {
422421
return false
423422
}
424-
423+
425424
errStr := err.Error()
426-
425+
427426
// Network/connection errors that are retryable
428427
retryableErrors := []string{
429428
"connection refused",
@@ -438,20 +437,20 @@ func (r *RedisStreams) isRetryableError(err error) bool {
438437
"context canceled", // Handle graceful shutdowns
439438
"no such host", // DNS lookup failures
440439
}
441-
440+
442441
for _, retryable := range retryableErrors {
443442
if strings.Contains(strings.ToLower(errStr), retryable) {
444443
return true
445444
}
446445
}
447-
446+
448447
// Redis-specific retryable errors
449448
if strings.Contains(errStr, "LOADING") || // Redis is loading data
450449
strings.Contains(errStr, "READONLY") || // Redis is in read-only mode
451450
strings.Contains(errStr, "CLUSTERDOWN") { // Redis cluster is down
452451
return true
453452
}
454-
453+
455454
return false
456455
}
457456

@@ -460,9 +459,9 @@ func (r *RedisStreams) isConnectionError(err error) bool {
460459
if err == nil {
461460
return false
462461
}
463-
462+
464463
errStr := err.Error()
465-
464+
466465
connectionErrors := []string{
467466
"connection refused",
468467
"connection reset",
@@ -471,13 +470,13 @@ func (r *RedisStreams) isConnectionError(err error) bool {
471470
"eof",
472471
"connection pool exhausted",
473472
}
474-
473+
475474
for _, connErr := range connectionErrors {
476475
if strings.Contains(strings.ToLower(errStr), connErr) {
477476
return true
478477
}
479478
}
480-
479+
481480
return false
482481
}
483482

@@ -502,4 +501,4 @@ func (r *RedisStreams) recordTimer(key string, duration float64) {
502501
timer.Update(duration)
503502
}
504503
}
505-
}
504+
}

0 commit comments

Comments
 (0)