Skip to content

Commit b84aadd

Browse files
authored
Merge pull request #133 from uw-labs/instrumented-sink-detect-context-cancel
Instrumented async message sink: do not increment error counter if the context was cancelled
2 parents f5a9d4c + 7a6ea28 commit b84aadd

File tree

2 files changed

+70
-2
lines changed

2 files changed

+70
-2
lines changed

instrumented/sink.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package instrumented
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/prometheus/client_golang/prometheus"
78
"github.com/uw-labs/substrate"
@@ -57,22 +58,34 @@ func (ams *AsyncMessageSink) PublishMessages(ctx context.Context, acks chan<- su
5758
case <-ctx.Done():
5859
return <-errs
5960
case err := <-errs:
60-
if err != nil {
61+
if isUnexpectedError(err) {
6162
ams.counter.WithLabelValues("error", ams.topic).Inc()
6263
}
6364
return err
6465
}
6566
case <-ctx.Done():
6667
return <-errs
6768
case err := <-errs:
68-
if err != nil {
69+
if isUnexpectedError(err) {
6970
ams.counter.WithLabelValues("error", ams.topic).Inc()
7071
}
7172
return err
7273
}
7374
}
7475
}
7576

77+
func isUnexpectedError(err error) bool {
78+
switch {
79+
case err == nil:
80+
return false
81+
case errors.Is(err, context.Canceled):
82+
// we're expecting producers to mark the stopping of producing by cancelling the context
83+
return false
84+
default:
85+
return true
86+
}
87+
}
88+
7689
// Close closes the message sink
7790
func (ams *AsyncMessageSink) Close() error {
7891
return ams.impl.Close()

instrumented/sink_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/prometheus/client_golang/prometheus"
1010
dto "github.com/prometheus/client_model/go"
1111
"github.com/stretchr/testify/assert"
12+
1213
"github.com/uw-labs/substrate"
1314
)
1415

@@ -79,6 +80,60 @@ func TestPublishMessagesSuccessfully(t *testing.T) {
7980
}
8081
}
8182

83+
func TestPublishMessagesSuccessfully_WithContextError(t *testing.T) {
84+
sink := AsyncMessageSink{
85+
impl: &asyncMessageSinkMock{
86+
publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) error {
87+
for {
88+
select {
89+
case <-ctx.Done():
90+
return ctx.Err()
91+
case msg := <-messages:
92+
acks <- msg
93+
}
94+
}
95+
},
96+
},
97+
counter: prometheus.NewCounterVec(
98+
prometheus.CounterOpts{
99+
Help: "sink_counter",
100+
Name: "sink_counter",
101+
}, []string{"status", "topic"}),
102+
topic: "testTopic",
103+
}
104+
105+
acks := make(chan substrate.Message)
106+
messages := make(chan substrate.Message)
107+
108+
sinkContext, sinkCancel := context.WithCancel(context.Background())
109+
defer sinkCancel()
110+
111+
errs := make(chan error)
112+
go func() {
113+
defer close(errs)
114+
errs <- sink.PublishMessages(sinkContext, acks, messages)
115+
}()
116+
117+
messages <- Message{}
118+
119+
for {
120+
select {
121+
case err := <-errs:
122+
assert.True(t, errors.Is(err, context.Canceled))
123+
// the error counter should not be increased when producing stops by context canceling
124+
var metric dto.Metric
125+
assert.NoError(t, sink.counter.WithLabelValues("error", "testTopic").Write(&metric))
126+
assert.Equal(t, 0, int(*metric.Counter.Value))
127+
return
128+
case <-acks:
129+
var metric dto.Metric
130+
assert.NoError(t, sink.counter.WithLabelValues("success", "testTopic").Write(&metric))
131+
assert.Equal(t, 1, int(*metric.Counter.Value))
132+
sinkCancel()
133+
}
134+
}
135+
}
136+
82137
func TestPublishMessagesWithError(t *testing.T) {
83138
producingError := errors.New("message producing error")
84139
sink := AsyncMessageSink{

0 commit comments

Comments
 (0)