diff --git a/cmd/ockafka/main.go b/cmd/ockafka/main.go index a5515b3..21f8e6a 100644 --- a/cmd/ockafka/main.go +++ b/cmd/ockafka/main.go @@ -59,10 +59,6 @@ func main() { } addresses := strings.Split(*kafka.Addresses, ",") wg := new(sync.WaitGroup) - respChan := make(chan *pb.SubscribeResponse) - defer close(respChan) - errChan := make(chan error) - defer close(errChan) for i, grpcAddr := range grpcAddrs { key := keys[i] p, err := newProducer(addresses, *kafka.Topic, key, grpcAddr) @@ -72,27 +68,31 @@ func main() { glog.Infof("Initialized Kafka producer for %s", grpcAddr) } wg.Add(1) - p.Start() - defer p.Stop() - c, err := client.Dial(config) - if err != nil { - glog.Fatal(err) - } - subscribeOptions := &client.SubscribeOptions{ - Paths: client.SplitPaths(subscriptions), - } - go client.Subscribe(ctx, c, subscribeOptions, respChan, errChan) - for { - select { - case resp, open := <-respChan: - if !open { - return - } - p.Write(resp) - case err := <-errChan: + go func() { + p.Start() + defer p.Stop() + respChan := make(chan *pb.SubscribeResponse) + errChan := make(chan error) + c, err := client.Dial(config) + if err != nil { glog.Fatal(err) } - } + subscribeOptions := &client.SubscribeOptions{ + Paths: client.SplitPaths(subscriptions), + } + go client.Subscribe(ctx, c, subscribeOptions, respChan, errChan) + for { + select { + case resp, open := <-respChan: + if !open { + return + } + p.Write(resp) + case err := <-errChan: + glog.Fatal(err) + } + } + }() } wg.Wait() } diff --git a/gnmi/operation.go b/gnmi/operation.go index e619420..a9b7589 100644 --- a/gnmi/operation.go +++ b/gnmi/operation.go @@ -402,6 +402,7 @@ func Set(ctx context.Context, client pb.GNMIClient, setOps []*Operation, // Deprecated: Use SubscribeErr instead. func Subscribe(ctx context.Context, client pb.GNMIClient, subscribeOptions *SubscribeOptions, respChan chan<- *pb.SubscribeResponse, errChan chan<- error) { + defer close(errChan) if err := SubscribeErr(ctx, client, subscribeOptions, respChan); err != nil { errChan <- err }