Skip to content

Commit

Permalink
cmd/ockafka: Start the gnmi subscription in a goroutine.
Browse files Browse the repository at this point in the history
Change-Id: I8425c2ba7a8591bec5bec69238bb26ba4f557362
  • Loading branch information
tdacquet-arista committed May 12, 2020
1 parent 1940253 commit baa7c0d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
46 changes: 23 additions & 23 deletions cmd/ockafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ func main() {
}
addresses := strings.Split(*kafka.Addresses, ",")
wg := new(sync.WaitGroup)
respChan := make(chan *pb.SubscribeResponse)
defer close(respChan)
errChan := make(chan error)
defer close(errChan)
for i, grpcAddr := range grpcAddrs {
key := keys[i]
p, err := newProducer(addresses, *kafka.Topic, key, grpcAddr)
Expand All @@ -72,27 +68,31 @@ func main() {
glog.Infof("Initialized Kafka producer for %s", grpcAddr)
}
wg.Add(1)
p.Start()
defer p.Stop()
c, err := client.Dial(config)
if err != nil {
glog.Fatal(err)
}
subscribeOptions := &client.SubscribeOptions{
Paths: client.SplitPaths(subscriptions),
}
go client.Subscribe(ctx, c, subscribeOptions, respChan, errChan)
for {
select {
case resp, open := <-respChan:
if !open {
return
}
p.Write(resp)
case err := <-errChan:
go func() {
p.Start()
defer p.Stop()
respChan := make(chan *pb.SubscribeResponse)
errChan := make(chan error)
c, err := client.Dial(config)
if err != nil {
glog.Fatal(err)
}
}
subscribeOptions := &client.SubscribeOptions{
Paths: client.SplitPaths(subscriptions),
}
go client.Subscribe(ctx, c, subscribeOptions, respChan, errChan)
for {
select {
case resp, open := <-respChan:
if !open {
return
}
p.Write(resp)
case err := <-errChan:
glog.Fatal(err)
}
}
}()
}
wg.Wait()
}
1 change: 1 addition & 0 deletions gnmi/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func Set(ctx context.Context, client pb.GNMIClient, setOps []*Operation,
// Deprecated: Use SubscribeErr instead.
func Subscribe(ctx context.Context, client pb.GNMIClient, subscribeOptions *SubscribeOptions,
respChan chan<- *pb.SubscribeResponse, errChan chan<- error) {
defer close(errChan)
if err := SubscribeErr(ctx, client, subscribeOptions, respChan); err != nil {
errChan <- err
}
Expand Down

0 comments on commit baa7c0d

Please sign in to comment.