Skip to content

Commit c98da01

Browse files
authored
Merge pull request #115 from uw-labs/fix_more_channel_and_goroutine_issues_in_sink
Fix more channel and goroutine issues in sink
2 parents c107c81 + 4cce853 commit c98da01

File tree

1 file changed

+47
-27
lines changed

1 file changed

+47
-27
lines changed

kafka/producer.go

+47-27
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/uw-labs/substrate/internal/debug"
1010
"github.com/uw-labs/substrate/internal/helper"
1111
"github.com/uw-labs/substrate/internal/unwrap"
12+
"golang.org/x/sync/errgroup"
1213
)
1314

1415
var (
@@ -79,41 +80,60 @@ func (ams *asyncMessageSink) doPublishMessages(ctx context.Context, producer sar
7980
errs := producer.Errors()
8081
successes := producer.Successes()
8182

82-
go func() {
83-
for suc := range successes {
84-
msg := suc.Metadata.(substrate.Message)
85-
acks <- msg
86-
ams.debugger.Logf("substrate : sent ack to caller for message : %s\n", msg)
87-
}
88-
}()
89-
for {
90-
select {
91-
case m := <-messages:
92-
message := &sarama.ProducerMessage{
93-
Topic: ams.Topic,
94-
}
83+
eg, ctx := errgroup.WithContext(ctx)
9584

96-
message.Value = sarama.ByteEncoder(m.Data())
97-
98-
if ams.KeyFunc != nil {
99-
// Provide original user message to the partition key function.
100-
unwrappedMsg := unwrap.Unwrap(m)
101-
message.Key = sarama.ByteEncoder(ams.KeyFunc(unwrappedMsg))
85+
eg.Go(func() error {
86+
for {
87+
select {
88+
case suc := <-successes:
89+
msg := suc.Metadata.(substrate.Message)
90+
select {
91+
case acks <- msg:
92+
ams.debugger.Logf("substrate : sent ack to caller for message : %s\n", msg)
93+
case <-ctx.Done():
94+
return ctx.Err()
95+
}
96+
case <-ctx.Done():
97+
return ctx.Err()
10298
}
99+
}
100+
})
103101

104-
message.Metadata = m
102+
eg.Go(func() error {
103+
for {
105104
select {
106-
case input <- message:
105+
case m := <-messages:
106+
message := &sarama.ProducerMessage{
107+
Topic: ams.Topic,
108+
}
109+
110+
message.Value = sarama.ByteEncoder(m.Data())
111+
112+
if ams.KeyFunc != nil {
113+
// Provide original user message to the partition key function.
114+
unwrappedMsg := unwrap.Unwrap(m)
115+
message.Key = sarama.ByteEncoder(ams.KeyFunc(unwrappedMsg))
116+
}
117+
118+
message.Metadata = m
119+
select {
120+
case input <- message:
121+
case <-ctx.Done():
122+
return ctx.Err()
123+
}
124+
ams.debugger.Logf("substrate : sent to kafka : %s\n", m)
107125
case <-ctx.Done():
108-
return nil
126+
return ctx.Err()
127+
case err := <-errs:
128+
return err
109129
}
110-
ams.debugger.Logf("substrate : sent to kafka : %s\n", m)
111-
case <-ctx.Done():
112-
return nil
113-
case err := <-errs:
114-
return err
115130
}
131+
})
132+
133+
if err := eg.Wait(); err != nil && err != context.Canceled {
134+
return err
116135
}
136+
return nil
117137
}
118138

119139
func (ams *asyncMessageSink) Status() (*substrate.Status, error) {

0 commit comments

Comments
 (0)