From 2bbb82adfe4bc671af541e4c11605a6120e5179a Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 25 May 2021 15:28:34 +0300 Subject: [PATCH 1/7] ability to instantly subscripbe to 20 channels and rate limit afterwards if needed --- pkg/mux/mux.go | 44 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/pkg/mux/mux.go b/pkg/mux/mux.go index 3d6712bd6..2992f3a79 100644 --- a/pkg/mux/mux.go +++ b/pkg/mux/mux.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + "time" "github.com/bitfinexcom/bitfinex-api-go/pkg/models/event" "github.com/bitfinexcom/bitfinex-api-go/pkg/mux/client" @@ -33,8 +34,15 @@ type Mux struct { publicURL string authURL string online bool + subsRateLimit int } +// api rate limit is 20 calls per minute. 1x3s, 20x1min +const ( + rateLimitDuration = 3 * time.Second + rateLimitQueueSize = 20 +) + // New returns pointer to instance of mux func New() *Mux { return &Mux{ @@ -96,20 +104,26 @@ func (m *Mux) Close() bool { } // Subscribe - given the details in form of event.Subscribe, -// subscribes client to public channels +// queues the subscriptions for eventual submission func (m *Mux) Subscribe(sub event.Subscribe) *Mux { if m.Err != nil { return m } - m.mtx.Lock() - defer m.mtx.Unlock() + // if limit is reached, wait 1 second and recuresively + // call Subscribe again with same subscription details + if m.subsRateLimit == rateLimitQueueSize { + time.Sleep(1 * time.Second) + return m.Subscribe(sub) + } - if subscribed := m.publicClients[m.cid].SubAdded(sub); subscribed { + m.mtx.RLock() + defer m.mtx.RUnlock() + if m.publicClients[m.cid].SubAdded(sub) { return m } - if m.Err = m.publicClients[m.cid].Subscribe(sub); m.Err != nil { + if err := m.publicClients[m.cid].Subscribe(sub); err != nil { return m } @@ -117,6 +131,8 @@ func (m *Mux) Subscribe(sub event.Subscribe) *Mux { log.Printf("subs limit is reached on cid: %d, spawning new conn\n", m.cid) m.addPublicClient() } + + m.subsRateLimit++ return m } @@ -126,6 +142,7 @@ func (m *Mux) Start() *Mux { m.addPrivateClient() } + m.watchRateLimit() return m.addPublicClient() } @@ -138,7 +155,6 @@ func (m *Mux) Listen(cb func(interface{}, error)) error { } m.online = true - for { select { case ms, ok := <-m.publicChan: @@ -286,7 +302,7 @@ func (m *Mux) addPublicClient() *Mux { c, err := client. New(). WithID(m.cid). - WithSubsLimit(20). + WithSubsLimit(30). Public(m.publicURL) if err != nil { m.Err = err @@ -311,3 +327,17 @@ func (m *Mux) addPrivateClient() *Mux { go c.Read(m.privateChan) return m } + +// watchRateLimit will run once every rateLimitDuration +// and free up the queue +func (m *Mux) watchRateLimit() { + go func() { + for { + if m.subsRateLimit > 0 { + m.subsRateLimit-- + } + + time.Sleep(rateLimitDuration) + } + }() +} From 66f38193f5e178b82fdbfda2bab6e534364610d1 Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 25 May 2021 15:39:22 +0300 Subject: [PATCH 2/7] chenge in comment and error handling --- pkg/mux/mux.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/mux/mux.go b/pkg/mux/mux.go index 2992f3a79..ac1b7d9f0 100644 --- a/pkg/mux/mux.go +++ b/pkg/mux/mux.go @@ -103,8 +103,8 @@ func (m *Mux) Close() bool { return true } -// Subscribe - given the details in form of event.Subscribe, -// queues the subscriptions for eventual submission +// Subscribe - given the details in form of event.Subscribe, subscribes client to public +// channels. If rate limit is reached, calls itself recursively after 1s with same params func (m *Mux) Subscribe(sub event.Subscribe) *Mux { if m.Err != nil { return m @@ -123,7 +123,7 @@ func (m *Mux) Subscribe(sub event.Subscribe) *Mux { return m } - if err := m.publicClients[m.cid].Subscribe(sub); err != nil { + if m.Err = m.publicClients[m.cid].Subscribe(sub); m.Err != nil { return m } From babf6f6688f730caaa47a297c8856f7f7b3c7cd1 Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Wed, 26 May 2021 15:39:42 +0300 Subject: [PATCH 3/7] better logging, more comments --- pkg/mux/mux.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/mux/mux.go b/pkg/mux/mux.go index ac1b7d9f0..dcd488ebf 100644 --- a/pkg/mux/mux.go +++ b/pkg/mux/mux.go @@ -76,7 +76,7 @@ func (m *Mux) WithDeadManSwitch() *Mux { return m } -// WithAPISEC accepts and persists api sec +// WithAPISEC accepts and persists api secret func (m *Mux) WithAPISEC(sec string) *Mux { m.apisec = sec return m @@ -143,7 +143,8 @@ func (m *Mux) Start() *Mux { } m.watchRateLimit() - return m.addPublicClient() + m.addPublicClient() + return m } // Listen accepts a callback func that will get called each time mux @@ -159,7 +160,7 @@ func (m *Mux) Listen(cb func(interface{}, error)) error { select { case ms, ok := <-m.publicChan: if !ok { - return errors.New("channel has closed unexpectedly") + return errors.New("public channel has closed unexpectedly") } if ms.Err != nil { cb(nil, fmt.Errorf("conn:%d has failed | err:%s | reconnecting", ms.CID, ms.Err)) @@ -195,7 +196,7 @@ func (m *Mux) Listen(cb func(interface{}, error)) error { cb(nil, fmt.Errorf("unrecognized msg signature: %s", ms.Data)) case ms, ok := <-m.privateChan: if !ok { - return errors.New("channel has closed unexpectedly") + return errors.New("private channel has closed unexpectedly") } if ms.Err != nil { cb(nil, fmt.Errorf("err: %s | reconnecting", ms.Err)) From 53502bf5734943c90d26a58d15b38bbdbd98f4ab Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 1 Jun 2021 13:23:36 +0300 Subject: [PATCH 4/7] updating changelog --- CHANGELOG | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG b/CHANGELOG index 22872c853..9cf552b80 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +3.0.5 +- Adds rate limit to avoid 429 HTTP status codes + 3.0.4 - Adds new rest v2 functions - tickers/hist From badc9110c50b27b340929b8fd6b364069d0decc0 Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 1 Jun 2021 13:25:40 +0300 Subject: [PATCH 5/7] meaningful naming --- pkg/mux/mux.go | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/mux/mux.go b/pkg/mux/mux.go index dcd488ebf..24f2e39f3 100644 --- a/pkg/mux/mux.go +++ b/pkg/mux/mux.go @@ -17,30 +17,30 @@ import ( // to all incomming client messages and reconnect client with all its subscriptions // in case of a failure type Mux struct { - cid int - dms int - publicChan chan msg.Msg - publicClients map[int]*client.Client - privateChan chan msg.Msg - closeChan chan bool - privateClient *client.Client - mtx *sync.RWMutex - Err error - transform bool - apikey string - apisec string - subInfo map[int64]event.Info - authenticated bool - publicURL string - authURL string - online bool - subsRateLimit int + cid int + dms int + publicChan chan msg.Msg + publicClients map[int]*client.Client + privateChan chan msg.Msg + closeChan chan bool + privateClient *client.Client + mtx *sync.RWMutex + Err error + transform bool + apikey string + apisec string + subInfo map[int64]event.Info + authenticated bool + publicURL string + authURL string + online bool + rateLimitQueueSize int } // api rate limit is 20 calls per minute. 1x3s, 20x1min const ( - rateLimitDuration = 3 * time.Second - rateLimitQueueSize = 20 + rateLimitDuration = 3 * time.Second + maxRateLimitQueueSize = 20 ) // New returns pointer to instance of mux @@ -112,7 +112,7 @@ func (m *Mux) Subscribe(sub event.Subscribe) *Mux { // if limit is reached, wait 1 second and recuresively // call Subscribe again with same subscription details - if m.subsRateLimit == rateLimitQueueSize { + if m.rateLimitQueueSize == maxRateLimitQueueSize { time.Sleep(1 * time.Second) return m.Subscribe(sub) } @@ -132,7 +132,7 @@ func (m *Mux) Subscribe(sub event.Subscribe) *Mux { m.addPublicClient() } - m.subsRateLimit++ + m.rateLimitQueueSize++ return m } @@ -334,8 +334,8 @@ func (m *Mux) addPrivateClient() *Mux { func (m *Mux) watchRateLimit() { go func() { for { - if m.subsRateLimit > 0 { - m.subsRateLimit-- + if m.rateLimitQueueSize > 0 { + m.rateLimitQueueSize-- } time.Sleep(rateLimitDuration) From 04bf1705bb111645c9b933d8bd4b5695e70a141f Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 1 Jun 2021 15:21:05 +0300 Subject: [PATCH 6/7] updating public feed example, fixed auth channel submitting payload event --- examples/ws/public-feed-ingest/main.go | 8 ++++++++ pkg/mux/client/client.go | 1 - 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/ws/public-feed-ingest/main.go b/examples/ws/public-feed-ingest/main.go index 16f944d42..e5f9c46cb 100644 --- a/examples/ws/public-feed-ingest/main.go +++ b/examples/ws/public-feed-ingest/main.go @@ -32,27 +32,32 @@ func main() { for _, pair := range pairs { tradePld := event.Subscribe{ + Event: "subscribe", Channel: "trades", Symbol: "t" + pair, } tickPld := event.Subscribe{ + Event: "subscribe", Channel: "ticker", Symbol: "t" + pair, } candlesPld := event.Subscribe{ + Event: "subscribe", Channel: "candles", Key: "trade:1m:t" + pair, } rawBookPld := event.Subscribe{ + Event: "subscribe", Channel: "book", Precision: "R0", Symbol: "t" + pair, } bookPld := event.Subscribe{ + Event: "subscribe", Channel: "book", Precision: "P0", Frequency: "F0", @@ -67,16 +72,19 @@ func main() { } derivStatusPld := event.Subscribe{ + Event: "subscribe", Channel: "status", Key: "deriv:tBTCF0:USTF0", } liqStatusPld := event.Subscribe{ + Event: "subscribe", Channel: "status", Key: "liq:global", } fundingPairTrade := event.Subscribe{ + Event: "subscribe", Channel: "trades", Symbol: "fUSD", } diff --git a/pkg/mux/client/client.go b/pkg/mux/client/client.go index b70e90b13..a1905b9c5 100644 --- a/pkg/mux/client/client.go +++ b/pkg/mux/client/client.go @@ -88,7 +88,6 @@ func (c *Client) Private(key, sec, url string, dms int) (*Client, error) { // Subscribe takes subscription payload as per docs and subscribes client to it. // We keep track of subscriptions so that when client failes, we can resubscribe. func (c *Client) Subscribe(sub event.Subscribe) error { - sub.Event = "subscribe" if err := c.Send(sub); err != nil { return err } From 22a1b9bf73d4b471cee8f61deb156087cd97e6b0 Mon Sep 17 00:00:00 2001 From: Daniel1984 Date: Tue, 1 Jun 2021 15:23:52 +0300 Subject: [PATCH 7/7] updating changelog --- CHANGELOG | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 9cf552b80..47490696b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,8 @@ 3.0.5 -- Adds rate limit to avoid 429 HTTP status codes +- Features + - rate limit to avoid 429 HTTP status codes when subscribing too often +- Fixes + - auth channel payload event name to avoid invalid channel exception 3.0.4 - Adds new rest v2 functions