diff --git a/client/bus/bus.go b/client/bus/bus.go index cbab820..a658fc5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -85,6 +85,7 @@ func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { type subBus struct { bus ari.Bus subs []ari.Subscription + mu sync.Mutex } func (b *subBus) Close() { @@ -97,14 +98,30 @@ func (b *subBus) Close() { // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. } +// Used as callback from stdbus +func (b *subBus) Cancel(s interface{}) { + b.mu.Lock() + for i, si := range b.subs { + if s == si { + b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end + b.subs[len(b.subs)-1] = nil // remove the end + b.subs = b.subs[:len(b.subs)-1] // lop off the end + break + } + } + b.mu.Unlock() +} + func (b *subBus) Send(e ari.Event) { b.bus.Send(e) } func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) - + sub.AddCancelCallback(b.Cancel) + b.mu.Lock() b.subs = append(b.subs, sub) + b.mu.Unlock() return sub } diff --git a/client/client.go b/client/client.go index d8629ab..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } - // Create the core bus - c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) c.bus = c.core.bus.SubBus()