Skip to content

Commit c076690

Browse files
committed
Add linter and fix issues it identified.
1 parent 383680a commit c076690

File tree

9 files changed

+36
-13
lines changed

9 files changed

+36
-13
lines changed

.circleci/config.yml

+4
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ jobs:
77
- image: circleci/golang:1.12
88
environment:
99
GO111MODULE: "on"
10+
GOLANGCI_VERSION: "v1.15.0"
1011
steps:
12+
- run: curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s ${GOLANGCI_VERSION}
13+
- run: mv ./bin/golangci-lint $GOPATH/bin/ && rm -rf bin
1114
- checkout
1215
- run: go get -t -d -v ./...
16+
- run: golangci-lint run --enable-all -D=lll,gochecknoglobals,gosec
1317
- run: go test -v -race -cover ./...
1418
- run: go build -o ./bin/sever ./cmd/proximo-server
1519
- run: go build -o ./bin/client ./cmd/proximo-client

backend/mem/mem.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,27 @@ func (h memBackend) loop() {
6060

6161
case inm := <-h.incomingMessages:
6262
consumers := subs[inm.topic]
63-
for _, consumer := range consumers {
64-
var remaining []*sub
63+
64+
remainingConsumers := make(map[string][]*sub, len(consumers))
65+
for id, consumer := range consumers {
66+
var remainingSubs []*sub
6567
sentOne := false
6668
for _, sub := range consumer {
6769
if !sentOne {
6870
select {
6971
case <-sub.ctx.Done():
7072
// drop expired consumers
7173
case sub.msgs <- inm.message:
72-
remaining = append(remaining, sub)
74+
remainingSubs = append(remainingSubs, sub)
7375
sentOne = true
7476
}
7577
} else {
76-
remaining = append(remaining, sub)
78+
remainingSubs = append(remainingSubs, sub)
7779
}
7880
}
81+
remainingConsumers[id] = remainingSubs
7982
}
83+
subs[inm.topic] = remainingConsumers
8084

8185
h.last100[inm.topic] = append(h.last100[inm.topic], inm.message)
8286
for len(h.last100[inm.topic]) > 100 {

cmd/proximo-client/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"syscall"
1111

1212
"github.com/urfave/cli"
13+
"github.com/uw-labs/proximo/internal/id"
1314
"github.com/uw-labs/substrate"
1415
"github.com/uw-labs/substrate/proximo"
1516
)
@@ -52,6 +53,9 @@ func main() {
5253
}
5354

5455
func consume(endpoint string, topic string, consumerID string) error {
56+
if consumerID == "" {
57+
consumerID = id.Generate()
58+
}
5559
source, err := proximo.NewAsyncMessageSource(proximo.AsyncMessageSourceConfig{
5660
Broker: endpoint,
5761
Topic: topic,
@@ -106,7 +110,7 @@ func produce(endpoint string, topic string) error {
106110
func newContext() context.Context {
107111
ctx, cancel := context.WithCancel(context.Background())
108112
go func() {
109-
sCh := make(chan os.Signal)
113+
sCh := make(chan os.Signal, 1)
110114
signal.Notify(sCh, os.Interrupt, syscall.SIGTERM)
111115
<-sCh
112116
cancel()

cmd/proximo-server/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ func main() {
168168
if err := app.Run(os.Args); err != nil {
169169
log.Fatal(err)
170170
}
171-
log.Fatal(listenAndServe(sourceFactory, sinkFactory, *port))
171+
if err := listenAndServe(sourceFactory, sinkFactory, *port); err != nil {
172+
log.Fatal(err)
173+
}
174+
log.Println("Server terminated cleanly")
172175
}
173176

174177
func parseEndpoints(endpoints string) map[string]bool {
@@ -186,6 +189,7 @@ func parseEndpoints(endpoints string) map[string]bool {
186189

187190
return enabled
188191
}
192+
189193
func listenAndServe(sourceFactory proximo.AsyncSourceFactory, sinkFactory proximo.AsyncSinkFactory, port int) error {
190194
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
191195
if err != nil {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ require (
3131
github.com/sirupsen/logrus v1.4.0 // indirect
3232
github.com/stretchr/testify v1.2.2
3333
github.com/urfave/cli v1.20.0
34-
github.com/uw-labs/substrate v0.0.0-20190329123054-36acad44151f
34+
github.com/uw-labs/substrate v0.0.0-20190329132135-be046b27cb7d
3535
github.com/uw-labs/sync v0.0.0-20190307114256-1bb306bf6e71
3636
go.etcd.io/bbolt v1.3.2 // indirect
3737
golang.org/x/net v0.0.0-20190318221613-d196dffd7c2b

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1
9595
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
9696
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
9797
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
98-
github.com/uw-labs/substrate v0.0.0-20190329123054-36acad44151f h1:dZjrXRkei35wu5oHsJjZGJTnLLtjoYRnzEkgiv6XbL0=
99-
github.com/uw-labs/substrate v0.0.0-20190329123054-36acad44151f/go.mod h1:dyyAUJcMCwdJ5EaYf8bh2rCNvSH9wwQEIpfZwt+bOn8=
98+
github.com/uw-labs/substrate v0.0.0-20190329132135-be046b27cb7d h1:SN02TsiAK4YQOc/84xJC35RB4n3mi2SEqn2XWh5k0cE=
99+
github.com/uw-labs/substrate v0.0.0-20190329132135-be046b27cb7d/go.mod h1:dyyAUJcMCwdJ5EaYf8bh2rCNvSH9wwQEIpfZwt+bOn8=
100100
github.com/uw-labs/sync v0.0.0-20190307114256-1bb306bf6e71 h1:1fL6kfRp9ebXBzK3powp5uKM3dCtK62RCiadQDqcR1Q=
101101
github.com/uw-labs/sync v0.0.0-20190307114256-1bb306bf6e71/go.mod h1:zTU/pesbF9d3ZN7+XwKhdE50y0SCCblmzwukXRnZkXs=
102102
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=

id.go renamed to internal/id/id.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
package proximo
1+
package id
22

33
import (
44
"crypto/rand"
55
"encoding/hex"
66
)
77

8-
func generateID() string {
8+
// Generate generates random string id and panics in case it fails.
9+
func Generate() string {
910
random := []byte{0, 0, 0, 0, 0, 0, 0, 0}
1011
_, err := rand.Read(random)
1112
if err != nil {

server_source.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"google.golang.org/grpc/codes"
99
"google.golang.org/grpc/status"
1010

11+
"github.com/uw-labs/proximo/internal/id"
1112
"github.com/uw-labs/proximo/proto"
1213
"github.com/uw-labs/substrate"
1314
"github.com/uw-labs/sync/gogroup"
@@ -130,7 +131,7 @@ func (s *SourceServer) sendMessages(ctx context.Context, stream sendSourceStream
130131
return nil
131132
case msg := <-messages:
132133
aMsg := &ackMsg{
133-
id: generateID(),
134+
id: id.Generate(),
134135
msg: msg,
135136
}
136137
pMsg := &proto.Message{

server_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package proximo
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"net"
78
"os"
89
"testing"
@@ -42,7 +43,11 @@ func Setup() error {
4243

4344
proto.RegisterMessageSourceServer(grpcServer, &SourceServer{SourceFactory: backend})
4445
proto.RegisterMessageSinkServer(grpcServer, &SinkServer{SinkFactory: backend})
45-
go func() { grpcServer.Serve(lis) }()
46+
go func() {
47+
if err := grpcServer.Serve(lis); err != nil {
48+
log.Printf("gRPC server stopped with error: %s", err)
49+
}
50+
}()
4651

4752
// Wait for server to start
4853
cc, err := grpc.Dial("localhost:6868", grpc.WithInsecure(), grpc.WithBlock())

0 commit comments

Comments
 (0)