Skip to content

Commit 43403d8

Browse files
Handling errors on pubsub
1 parent a1ea9e4 commit 43403d8

File tree

4 files changed

+494
-7
lines changed

4 files changed

+494
-7
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ Thumbs.db
8787
# Coverage Results #
8888
####################
8989
coverage.txt
90+
.coverdata/
91+
pubsub-sub-bench-test
9092

9193
# Profiler Results #
9294
####################

Makefile

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ build-race:
3333
$(GOBUILDRACE) \
3434
-ldflags=$(LDFLAGS) .
3535

36+
build-cover:
37+
@echo "Building binary with coverage instrumentation..."
38+
$(GOBUILD) -cover \
39+
-ldflags=$(LDFLAGS) .
40+
3641
checkfmt:
3742
@echo 'Checking gofmt';\
3843
bash -c "diff -u <(echo -n) <(go fmt .)";\
@@ -52,9 +57,21 @@ fmt:
5257
get:
5358
$(GOGET) -t -v ./...
5459

55-
test: get
60+
test: get build-cover
5661
$(GOFMT) ./...
57-
$(GOTEST) -race -covermode=atomic ./...
62+
@rm -rf .coverdata
63+
@mkdir -p .coverdata
64+
$(GOTEST) -v -race -covermode=atomic ./...
5865

59-
coverage: get test
60-
$(GOTEST) -race -coverprofile=coverage.txt -covermode=atomic .
66+
coverage: get build-cover
67+
$(GOFMT) ./...
68+
@rm -rf .coverdata
69+
@mkdir -p .coverdata
70+
$(GOTEST) -v -race -covermode=atomic .
71+
@if [ -d .coverdata ] && [ -n "$$(ls -A .coverdata 2>/dev/null)" ]; then \
72+
echo "Converting coverage data..."; \
73+
go tool covdata textfmt -i=.coverdata -o coverage.txt; \
74+
else \
75+
echo "No coverage data found, creating empty coverage file"; \
76+
touch coverage.txt; \
77+
fi

subscriber.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
126126
publishLatency := time.Now().UnixNano() - startPublish
127127

128128
if err != nil {
129-
log.Printf("Error publishing to channel %s: %v", ch, err)
129+
log.Printf("Publisher %s: error publishing to channel %s: %v", clientName, ch, err)
130+
// Don't send metrics on error, but still count the message attempt
130131
} else {
131132
// Send metrics to channels
132133
publishLatencyChannel <- publishLatency
@@ -218,11 +219,34 @@ func subscriberRoutine(clientName, mode string, channels []string, verbose bool,
218219
// Handle messages
219220
msg, err := pubsub.ReceiveMessage(ctx)
220221
if err != nil {
221-
// Handle Redis connection errors, e.g., reconnect immediately
222+
// Handle Redis connection errors
222223
if err == redis.Nil || err == context.DeadlineExceeded || err == context.Canceled {
223224
continue
224225
}
225-
panic(err)
226+
// Connection error (EOF, network error, etc.) - attempt to reconnect
227+
log.Printf("Subscriber %s: connection error: %v - attempting to reconnect\n", clientName, err)
228+
229+
// Close the bad connection
230+
if pubsub != nil {
231+
pubsub.Close()
232+
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels)))
233+
}
234+
235+
// Wait a bit before reconnecting
236+
time.Sleep(100 * time.Millisecond)
237+
238+
// Resubscribe
239+
switch mode {
240+
case "ssubscribe":
241+
pubsub = client.SSubscribe(ctx, channels...)
242+
default:
243+
pubsub = client.Subscribe(ctx, channels...)
244+
}
245+
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels)))
246+
atomic.AddUint64(&totalConnects, 1)
247+
248+
log.Printf("Subscriber %s: reconnected successfully\n", clientName)
249+
continue
226250
}
227251
if verbose {
228252
log.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload))

0 commit comments

Comments
 (0)