Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket stream skips messages #53

Open
dima-dmytruk23 opened this issue Dec 4, 2024 · 2 comments
Open

Websocket stream skips messages #53

dima-dmytruk23 opened this issue Dec 4, 2024 · 2 comments

Comments

@dima-dmytruk23
Copy link

dima-dmytruk23 commented Dec 4, 2024

Hello. I was looking for a library that would implement websockets functionality. I decided to stop at yours, but since it does not work with futures, I decided to improve its functionality (also, I want to process messages in separate goroutines). During testing, I encountered the following problem.
I have 372 symbols. For each of them, I subscribe to 5 intervals (1h, 4h, 1d, 1w, 1M). As far as I know, Binance does not allow subscribing to more than 200 symbols per stream. Therefore, I made a small method that breaks symbols into chunks.

func ChunkBy[T any](items []T, chunkSize int) [][]T {
	var _chunks = make([][]T, 0, (len(items)/chunkSize)+1)
	for chunkSize < len(items) {
		items, _chunks = items[chunkSize:], append(_chunks, items[0:chunkSize:chunkSize])
	}
	return append(_chunks, items)
}

Also, I create a new stream each time for the interval-chunk.

intervals := []string{"1h", "4h", "1d", "1w", "1M"}
pairsChunks := shared.ChunkBy(pairs, 200)

for _, interval := range intervals {
	for _, pairsChunk := range pairsChunks {
		websocketClient := getWebsocketClient()
		for _, pair := range pairsChunk {
			err := startWebsocketClient(pair, interval, websocketClient, app)
			if err != nil {
				log.Fatal(err)
			}
		}
	}
}

But, I see that some data is skipped for 1w and 1M.
If I reduce the number of intervals (for example, to 3) or reduce the number of symbols, for example, to 50 - all data comes correctly.
Any ideas why this is happening and how it can be handled?

My Websocket client implementation

type WsContinuousContractKlineHandler func(event *WsContinuousContractKlineEvent)

func getWebsocketClient() *FuturesWebsocketStreamClient {
	websocketClient := binanceconnector.NewWebsocketStreamClient(false, "wss://fstream.binance.com")
	return &FuturesWebsocketStreamClient{websocketClient}
}

func startWebsocketClient(pair, interval string, websocketClient *FuturesWebsocketStreamClient, handler MessageHandler) error {
	WsContinuousContractKlineCallback := func(event *WsContinuousContractKlineEvent) {
		go handler.HandleMessage(event)
	}

	errHandler := func(err error) {
		fmt.Println(err)
	}

	go func(s, interval string) {
		doneCh, stopCh, err := websocketClient.WsContinuousContractKlineServe(s, interval, WsContinuousContractKlineCallback, errHandler)
		if err != nil {
			fmt.Printf("Error for symbol %s: %v\n", s, err)
			return
		}
		go func() {
			stopCh <- struct{}{}
		}()

		<-doneCh
	}(pair, interval)
	return nil
}

func newWsConfig(endpoint string) *binanceconnector.WsConfig {
	return &binanceconnector.WsConfig{
		Endpoint: endpoint,
	}
}

func (c *FuturesWebsocketStreamClient) WsContinuousContractKlineServe(
	symbol string,
	interval string,
	handler WsContinuousContractKlineHandler,
	errHandler binanceconnector.ErrHandler,
) (doneCh, stopCh chan struct{}, err error) {
	endpoint := fmt.Sprintf("%s/%s_perpetual@continuousKline_%s", c.Endpoint, strings.ToLower(symbol), interval)
	cfg := newWsConfig(endpoint)
	wsHandler := func(message []byte) {
		event := new(WsContinuousContractKlineEvent)
		err := json.Unmarshal(message, event)
		if err != nil {
			errHandler(err)
			return
		}
		handler(event)
	}
	return wsServe(cfg, wsHandler, errHandler)
}

var wsServe = func(cfg *binanceconnector.WsConfig, handler binanceconnector.WsHandler, errHandler binanceconnector.ErrHandler) (doneCh, stopCh chan struct{}, err error) {
	Dialer := websocket.Dialer{
		Proxy:             http.ProxyFromEnvironment,
		HandshakeTimeout:  45 * time.Second,
		EnableCompression: false,
	}
	headers := http.Header{}
	headers.Add("User-Agent", fmt.Sprintf("%s/%s", binanceconnector.Name, binanceconnector.Version))
	c, _, err := Dialer.Dial(cfg.Endpoint, headers)
	if err != nil {
		return nil, nil, err
	}
	c.SetReadLimit(655350)
	doneCh = make(chan struct{})
	stopCh = make(chan struct{})
	go func() {
		// This function will exit either on error from
		// websocket.Conn.ReadMessage or when the stopC channel is
		// closed by the client.
		defer close(doneCh)
		if binanceconnector.WebsocketKeepalive {
			keepAlive(c, binanceconnector.WebsocketTimeout)
		}
		// Wait for the stopC channel to be closed.  We do that in a
		// separate goroutine because ReadMessage is a blocking
		// operation.
		silent := false
		go func() {
			for {
				_, message, err := c.ReadMessage()
				if err != nil {
					if !silent {
						errHandler(err)
					}
					stopCh <- struct{}{}
					return
				}

				go func(msg []byte) {
					handler(msg)
				}(message)
			}
		}()

		for {
			select {
			case <-stopCh:
				silent = true
				return
			case <-doneCh:
			}
		}
	}()
	return
}

func keepAlive(c *websocket.Conn, timeout time.Duration) {
	ticker := time.NewTicker(timeout)

	lastResponse := time.Now()
	c.SetPongHandler(func(msg string) error {
		lastResponse = time.Now()
		return nil
	})

	go func() {
		defer ticker.Stop()
		for {
			deadline := time.Now().Add(10 * time.Second)
			err := c.WriteControl(websocket.PingMessage, []byte{}, deadline)
			if err != nil {
				return
			}
			<-ticker.C
			if time.Since(lastResponse) > timeout {
				return
			}
		}
	}()
}
@dima-dmytruk23
Copy link
Author

dima-dmytruk23 commented Dec 4, 2024

After investigating I noticed that I start to exceed the context deadline exceeded and dial tcp 3.114.57.84:443: i/o timeout errors when the number of subscriptions approaches 1000. I increased the wait time between creating subscriptions to 0.5 seconds, but the problem remains.
I understand that I am violating one of these limits

https://developers.binance.com/docs/derivatives/portfolio-margin-pro#websocket-limits

If I understand correctly, then I have no more than 200 streams per connection, i.e. I do not violate this point?

A single connection can listen to a maximum of 1024 streams.

Most likely, this is the limit

WebSocket connections have a limit of 5 incoming messages per second. A message is considered:
A PING frame
A PONG frame
A JSON controlled message (e.g. subscribe, unsubscribe)

But I don't really understand when I violate it. Even if there is a ping, pong and creation of subscriptions (with an interval of 0.5 seconds) during one second - there will be a maximum of 4 messages per second (but keepAlive disabled by deafaut).

@dima-dmytruk23
Copy link
Author

dima-dmytruk23 commented Dec 4, 2024

Also this code on the Python (library — https://github.com/binance/binance-futures-connector-python) work fine for me with the same pairs and intervals (with timeout 0.2 sec between subscriptions creation)

    intervals = ['1h', '4h', '1d', '1w', '1M']
    for interval in intervals:
        for pair_chunk in chunks(pairs, 200):
            manager = UMFuturesWebsocketClient(
                stream_url="wss://fstream.binance.com",
                on_message=g
            )
            for pair in pair_chunk:
                manager.continuous_kline(pair=pair, contractType='perpetual', interval=interval, id=uuid.uuid4().hex)
                time.sleep(0.2)

Upd: It seems that the problem is with my IP address. When I connect via VPN in the go service - everything is ok. It's strange, why then everything works correctly in python?

@dima-dmytruk23 dima-dmytruk23 changed the title Websocket stream skips messages if many symbols are used Websocket stream skips messages Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant