From b59fa270726a6ed22c3e3cd1c31e87e9417acc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Thu, 19 Mar 2020 10:33:35 -0400 Subject: [PATCH 1/4] create Bus after Start, too --- client/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index d8629ab..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } - // Create the core bus - c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) c.bus = c.core.bus.SubBus() From 21fe6fb44e6ba5f1980f3198090d5788f4e43927 Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Thu, 2 Jul 2020 12:28:47 +0200 Subject: [PATCH 2/4] Attempt to fix leak using callback from stdbus --- client/bus/bus.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index cbab820..bcf81d6 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -97,13 +97,24 @@ func (b *subBus) Close() { // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. } +func (b *subBus) Cancel(s ari.Subscription) { + for i, si := range b.subs { + if s == si { + b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end + b.subs[len(b.subs)-1] = nil // remove the end + b.subs = b.subs[:len(b.subs)-1] // lop off the end + break + } + } +} + func (b *subBus) Send(e ari.Event) { b.bus.Send(e) } func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) - + sub.SetCallback(b.Cancel) b.subs = append(b.subs, sub) return sub From 8bee712917d4767d41057f34b97e594a330a32db Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Thu, 2 Jul 2020 14:45:19 +0200 Subject: [PATCH 3/4] Add locking --- client/bus/bus.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/bus/bus.go b/client/bus/bus.go index bcf81d6..4239ba7 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -85,6 +85,7 @@ func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { type subBus struct { bus ari.Bus subs []ari.Subscription + mu sync.Mutex } func (b *subBus) Close() { @@ -97,7 +98,9 @@ func (b *subBus) Close() { // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. } +// Used as callback from stdbus func (b *subBus) Cancel(s ari.Subscription) { + b.mu.Lock() for i, si := range b.subs { if s == si { b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end @@ -106,6 +109,7 @@ func (b *subBus) Cancel(s ari.Subscription) { break } } + b.mu.Unlock() } func (b *subBus) Send(e ari.Event) { @@ -115,7 +119,9 @@ func (b *subBus) Send(e ari.Event) { func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) sub.SetCallback(b.Cancel) + b.mu.Lock() b.subs = append(b.subs, sub) + b.mu.Unlock() return sub } From 56edf9574214c102843987d24da599b05db8475d Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Fri, 3 Jul 2020 08:59:34 +0200 Subject: [PATCH 4/4] Changed to fit new structure of ari module --- client/bus/bus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index 4239ba7..a658fc5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -99,7 +99,7 @@ func (b *subBus) Close() { } // Used as callback from stdbus -func (b *subBus) Cancel(s ari.Subscription) { +func (b *subBus) Cancel(s interface{}) { b.mu.Lock() for i, si := range b.subs { if s == si { @@ -118,7 +118,7 @@ func (b *subBus) Send(e ari.Event) { func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) - sub.SetCallback(b.Cancel) + sub.AddCancelCallback(b.Cancel) b.mu.Lock() b.subs = append(b.subs, sub) b.mu.Unlock()