Skip to content

Commit 7cc238e

Browse files
committed
gofumpt
1 parent c98da01 commit 7cc238e

26 files changed

+15
-69
lines changed

freezer/freezer.go

-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ type asyncMessageSource struct {
116116
}
117117

118118
func (ams *asyncMessageSource) ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error {
119-
120119
eg, ctx := errgroup.WithContext(ctx)
121120

122121
ackQ := make(chan substrate.Message)

freezer/freezer_url.go

-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ func init() {
2121
}
2222

2323
func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) {
24-
2524
q := u.Query()
2625

2726
cts := q.Get("compression")
@@ -160,7 +159,6 @@ func newFreezerSource(u *url.URL) (substrate.AsyncMessageSource, error) {
160159
default:
161160
return nil, fmt.Errorf("unsupported scheme : %s", u.Scheme)
162161
}
163-
164162
}
165163

166164
var sourcer = NewAsyncMessageSource

freezer/freezer_url_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ func TestFreezerSink(t *testing.T) {
7676

7777
for _, tst := range tests {
7878
t.Run(tst.name, func(t *testing.T) {
79-
8079
var conf AsyncMessageSinkConfig
8180
sinker = func(c AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) {
8281
conf = c
@@ -91,7 +90,6 @@ func TestFreezerSink(t *testing.T) {
9190
assert.Equal(tst.expected, conf)
9291
})
9392
}
94-
9593
}
9694

9795
func TestFreezerSource(t *testing.T) {
@@ -159,7 +157,6 @@ func TestFreezerSource(t *testing.T) {
159157

160158
for _, tst := range tests {
161159
t.Run(tst.name, func(t *testing.T) {
162-
163160
var conf AsyncMessageSourceConfig
164161
sourcer = func(c AsyncMessageSourceConfig) (substrate.AsyncMessageSource, error) {
165162
conf = c
@@ -174,7 +171,6 @@ func TestFreezerSource(t *testing.T) {
174171
assert.Equal(tst.expected, conf)
175172
})
176173
}
177-
178174
}
179175

180176
type mockStore struct {

freezer/integration_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
)
1313

1414
func TestAll(t *testing.T) {
15-
1615
k, err := runServer()
1716
if err != nil {
1817
t.Fatal(err)
@@ -43,7 +42,6 @@ func runServer() (*testServer, error) {
4342
}
4443

4544
func (ks *testServer) NewConsumer(topic string, groupID string) substrate.AsyncMessageSource {
46-
4745
s, err := NewAsyncMessageSource(AsyncMessageSourceConfig{
4846
StreamStore: ks.ss,
4947
FreezerConfig: freezer.MessageSourceConfig{
@@ -75,7 +73,7 @@ func (ks *testServer) TestEnd() {
7573
}
7674

7775
func (ks *testServer) Kill() error {
78-
//log.Printf("TODO: remove %s\n", ks.dir)
76+
// log.Printf("TODO: remove %s\n", ks.dir)
7977
os.RemoveAll(ks.dir)
8078
return nil
8179
}

internal/debug/debug.go

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ func messageHash(data []byte) string {
1414
_, _ = f.Write(data)
1515
sum := f.Sum(nil)
1616
return base64.StdEncoding.EncodeToString(sum)
17-
1817
}
1918

2019
type Debugger struct {

internal/helper/sink_ack_ordering.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ import (
77
"golang.org/x/sync/errgroup"
88
)
99

10-
var (
11-
_ substrate.AsyncMessageSink = (*AckOrderingSink)(nil)
12-
)
10+
var _ substrate.AsyncMessageSink = (*AckOrderingSink)(nil)
1311

1412
func NewAckOrderingSink(sink substrate.AsyncMessageSink) *AckOrderingSink {
1513
return &AckOrderingSink{sink}
@@ -66,7 +64,7 @@ func (s *AckOrderingSink) PublishMessages(ctx context.Context, acks chan<- subst
6664
for len(gotAcks) > 0 && contains(gotAcks, needed[0]) {
6765
select {
6866
case m := <-needAcks:
69-
needed = append(needed, m)
67+
needed = append(needed, m)
7068
case acks <- needed[0]:
7169
delete(gotAcks, needed[0])
7270
needed = needed[1:]

internal/helper/sink_ack_ordering_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ func TestAckOrderingSink(t *testing.T) {
4646

4747
err := eg.Wait()
4848
assert.NoError(err)
49-
5049
}
5150

5251
type myMessage struct {

internal/testshared/testshared.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ func testOnePublisherOneMessageOneConsumer(t *testing.T, ts TestServer) {
101101
if err := <-prodErrs; err != nil {
102102
t.Errorf("unexpected error from consume : %s", err)
103103
}
104-
105104
}
106105

107106
func testOnePublisherOneConsumerConsumeWithoutAcking(t *testing.T, ts TestServer) {
@@ -214,7 +213,6 @@ func testOnePublisherOneMessageTwoConsumers(t *testing.T, ts TestServer) {
214213
if err := <-cons1Errs; err != nil {
215214
t.Errorf("unexpected error from consume : %s", err)
216215
}
217-
218216
}
219217

220218
func testPublisherShouldNotBlock(t *testing.T, ts TestServer) {
@@ -368,7 +366,7 @@ func testConsumeWithoutAck(t *testing.T, ts TestServer) {
368366
case <-cons1Msgs:
369367
case err := <-cons1Errs:
370368
if err != nil {
371-
t.Error(err) //TODO: prolly not.
369+
t.Error(err) // TODO: prolly not.
372370
}
373371
}
374372

@@ -391,7 +389,6 @@ func testConsumeWithoutAck(t *testing.T, ts TestServer) {
391389
if err := <-cons2Errs; err != nil {
392390
t.Errorf("unexpected error from consume : %s", err)
393391
}
394-
395392
}
396393

397394
func testProduceStatusOk(t *testing.T, ts TestServer) {
@@ -471,7 +468,6 @@ func testPublishMultipleMessagesOneConsumer(t *testing.T, ts TestServer) {
471468
if err := <-prodErrs; err != nil {
472469
t.Errorf("unexpected error from consume : %s", err)
473470
}
474-
475471
}
476472

477473
func testOnePublisherOneConsumerConsumeWithoutAckingDiscardedPayload(t *testing.T, ts TestServer) {
@@ -588,7 +584,6 @@ func produceAndCheckAck(ctx context.Context, t *testing.T, prodMsgs chan<- subst
588584
}
589585

590586
func consumeAndAck(ctx context.Context, t *testing.T, consMsgs <-chan substrate.Message, consAcks chan<- substrate.Message) string {
591-
592587
var msg substrate.Message
593588

594589
// receive it
@@ -615,7 +610,6 @@ func consumeAndAck(ctx context.Context, t *testing.T, consMsgs <-chan substrate.
615610
}
616611

617612
func consumeAndAckDiscard(ctx context.Context, t *testing.T, consMsgs <-chan substrate.Message, consAcks chan<- substrate.Message) string {
618-
619613
var msg substrate.Message
620614

621615
// receive it

kafka/integration_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
)
1919

2020
func TestAll(t *testing.T) {
21-
2221
k, err := runServer()
2322
if err != nil {
2423
t.Fatal(err)
@@ -133,7 +132,6 @@ func (ks *testServer) NewConsumer(topic string, groupID string) substrate.AsyncM
133132
Offset: OffsetOldest,
134133
Version: "2.4.0",
135134
})
136-
137135
if err != nil {
138136
panic(err)
139137
}

kafka/producer.go

-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type AsyncMessageSinkConfig struct {
2828
}
2929

3030
func NewAsyncMessageSink(config AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) {
31-
3231
conf, err := config.buildSaramaProducerConfig()
3332
if err != nil {
3433
return nil, err
@@ -60,7 +59,6 @@ type asyncMessageSink struct {
6059
}
6160

6261
func (ams *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) (rerr error) {
63-
6462
producer, err := sarama.NewAsyncProducerFromClient(ams.client)
6563
if err != nil {
6664
return err
@@ -75,7 +73,6 @@ func (ams *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- su
7573
}
7674

7775
func (ams *asyncMessageSink) doPublishMessages(ctx context.Context, producer sarama.AsyncProducer, acks chan<- substrate.Message, messages <-chan substrate.Message) (rerr error) {
78-
7976
input := producer.Input()
8077
errs := producer.Errors()
8178
successes := producer.Successes()

kafka/status.go

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
)
77

88
func status(client sarama.Client, topic string) (*substrate.Status, error) {
9-
109
status := &substrate.Status{}
1110

1211
writablePartitions, err := client.WritablePartitions(topic)

kafka/url_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ func TestKafkaSink(t *testing.T) {
6969

7070
for _, tst := range tests {
7171
t.Run(tst.name, func(t *testing.T) {
72-
7372
var conf AsyncMessageSinkConfig
7473
kafkaSinker = func(c AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) {
7574
conf = c
@@ -84,7 +83,6 @@ func TestKafkaSink(t *testing.T) {
8483
assert.Equal(tst.expected, conf)
8584
})
8685
}
87-
8886
}
8987

9088
func TestKafkaSource(t *testing.T) {
@@ -140,7 +138,6 @@ func TestKafkaSource(t *testing.T) {
140138

141139
for _, tst := range tests {
142140
t.Run(tst.name, func(t *testing.T) {
143-
144141
var conf AsyncMessageSourceConfig
145142
kafkaSourcer = func(c AsyncMessageSourceConfig) (substrate.AsyncMessageSource, error) {
146143
conf = c
@@ -155,5 +152,4 @@ func TestKafkaSource(t *testing.T) {
155152
assert.Equal(tst.expected, conf)
156153
})
157154
}
158-
159155
}

natsstreaming/integration_test.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ import (
2323
)
2424

2525
func TestAll(t *testing.T) {
26-
2726
k, err := runServer()
28-
2927
if err != nil {
3028
t.Fatal(err)
3129
}
@@ -155,8 +153,8 @@ loop3:
155153
//
156154
if err == nil {
157155
break loop3
158-
//ks.Kill()
159-
//return nil, errors.New("what?")
156+
// ks.Kill()
157+
// return nil, errors.New("what?")
160158
}
161159
if err == cancelled.Err() {
162160
break loop3
@@ -182,7 +180,6 @@ func (ks *testServer) canConsume(topic string, groupID string) error {
182180
}
183181

184182
func TestConsumerErrorOnBackendDisconnect(t *testing.T) {
185-
186183
// seed nats with some test data
187184
stanServerOpts := stand.GetDefaultOptions()
188185
natsServerOpts := stand.DefaultNatsServerOptions
@@ -278,8 +275,8 @@ func TestProducerOnDisconnectedError(t *testing.T) {
278275
ConnectionPingInterval: 1,
279276
ConnectionNumPings: 3,
280277
})
281-
//hnd, err := newNatsStreamingProduceHandler(
282-
//fmt.Sprintf("nats://%s", proxy.Listen), stand.DefaultClusterID, 1, 1, 3)
278+
// hnd, err := newNatsStreamingProduceHandler(
279+
// fmt.Sprintf("nats://%s", proxy.Listen), stand.DefaultClusterID, 1, 1, 3)
283280
require.NoError(t, err)
284281
success := make(chan struct{})
285282
egrp, groupCtx := errgroup.WithContext(ctx)

natsstreaming/nats_streaming.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ type asyncMessageSink struct {
8080
}
8181

8282
func (p *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) (rerr error) {
83-
8483
ctx, cancel := context.WithCancel(ctx)
8584
defer cancel()
8685

@@ -94,7 +93,7 @@ func (p *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- subs
9493
for {
9594
select {
9695
case <-ctx.Done():
97-
//return ctx.Err()
96+
// return ctx.Err()
9897
break LOOP
9998
case msg := <-messages:
10099
_, err := conn.PublishAsync(p.subject, msg.Data(), func(guid string, err error) {
@@ -201,7 +200,6 @@ func (cm *consumerMessage) Data() []byte {
201200
}
202201

203202
func (c *asyncMessageSource) ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error {
204-
205203
msgsToAck := make(chan *consumerMessage)
206204

207205
f := func(msg *stan.Msg) {
@@ -285,7 +283,7 @@ func handleAcks(ctx context.Context, msgsToAck chan *consumerMessage, acks <-cha
285283
}
286284
toAck = toAck[1:]
287285
case <-ctx.Done():
288-
//return ctx.Err()
286+
// return ctx.Err()
289287
return nil
290288
case e, ok := <-disconnected:
291289
if ok {

natsstreaming/nats_streaming_url_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func TestNatsStreamingURLSink(t *testing.T) {
8888
assert.Equal(tst.expected, c)
8989
})
9090
}
91-
9291
}
9392

9493
func TestNatsStreamingURLSource(t *testing.T) {
@@ -174,5 +173,4 @@ func TestNatsStreamingURLSource(t *testing.T) {
174173
assert.Equal(tst.expected, c)
175174
})
176175
}
177-
178176
}

proximo/integration_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
)
1717

1818
func TestAll(t *testing.T) {
19-
2019
k, err := runServer()
2120
if err != nil {
2221
t.Fatal(err)
@@ -40,7 +39,6 @@ func (ts *testServer) NewConsumer(topic string, groupID string) substrate.AsyncM
4039
Offset: OffsetOldest,
4140
Insecure: true,
4241
})
43-
4442
if err != nil {
4543
panic(err)
4644
}
@@ -67,7 +65,6 @@ func (ts *testServer) Kill() error {
6765
}
6866

6967
func runServer() (*testServer, error) {
70-
7168
cmd := exec.CommandContext(
7269
context.Background(),
7370
"proximo-server",

proximo/proximo_sink.go

-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ type AsyncMessageSinkConfig struct {
2929
}
3030

3131
func NewAsyncMessageSink(c AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) {
32-
3332
conn, err := dialProximo(dialConfig{
3433
broker: c.Broker,
3534
insecure: c.Insecure,
@@ -56,7 +55,6 @@ type asyncMessageSink struct {
5655
}
5756

5857
func (ams *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) (rerr error) {
59-
6058
rg, ctx := rungroup.New(ctx)
6159

6260
client := proto.NewMessageSinkClient(ams.conn)

0 commit comments

Comments
 (0)