diff --git a/bothan-bybit/src/api/types.rs b/bothan-bybit/src/api/types.rs index 54c22b32..8a19dbad 100644 --- a/bothan-bybit/src/api/types.rs +++ b/bothan-bybit/src/api/types.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; /// The default URL for the Bybit WebSocket API. pub const DEFAULT_URL: &str = "wss://stream.bybit.com/v5/public/spot"; +pub const MAX_ARGS: u32 = 10; /// Represents the different types of responses from the Bybit API. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] diff --git a/bothan-bybit/src/worker/asset_worker.rs b/bothan-bybit/src/worker/asset_worker.rs index 71017614..e9bcb6b1 100644 --- a/bothan-bybit/src/worker/asset_worker.rs +++ b/bothan-bybit/src/worker/asset_worker.rs @@ -10,7 +10,7 @@ use bothan_core::store::WorkerStore; use bothan_core::types::AssetInfo; use crate::api::error::{MessageError, SendError}; -use crate::api::types::{BybitResponse, Ticker}; +use crate::api::types::{BybitResponse, Ticker, MAX_ARGS}; use crate::api::{BybitWebSocketConnection, BybitWebSocketConnector}; use crate::worker::error::WorkerError; use crate::worker::types::{DEFAULT_TIMEOUT, RECONNECT_BUFFER}; @@ -54,8 +54,13 @@ async fn subscribe( connection: &mut BybitWebSocketConnection, ) -> Result<(), SendError> { if !ids.is_empty() { - let ids_vec = ids.iter().map(|s| s.as_str()).collect::>(); - connection.subscribe_ticker(&ids_vec).await? + for batched_ids in ids.chunks(MAX_ARGS as usize) { + let symbols = batched_ids + .iter() + .map(|s| s.as_str()) + .collect::>(); + connection.subscribe_ticker(&symbols).await? + } } Ok(()) @@ -74,9 +79,13 @@ async fn unsubscribe( connection: &mut BybitWebSocketConnection, ) -> Result<(), SendError> { if !ids.is_empty() { - connection - .unsubscribe_ticker(&ids.iter().map(|s| s.as_str()).collect::>()) - .await? + for batched_ids in ids.chunks(MAX_ARGS as usize) { + let symbols = batched_ids + .iter() + .map(|s| s.as_str()) + .collect::>(); + connection.unsubscribe_ticker(&symbols).await? + } } Ok(()) diff --git a/bothan-bybit/src/worker/builder.rs b/bothan-bybit/src/worker/builder.rs index 6b5fe1a2..c0d31d18 100644 --- a/bothan-bybit/src/worker/builder.rs +++ b/bothan-bybit/src/worker/builder.rs @@ -72,6 +72,17 @@ impl<'a> AssetWorkerBuilder<'a> for BybitWorkerBuilder { let (sub_tx, sub_rx) = channel(ch_size); let (unsub_tx, unsub_rx) = channel(ch_size); + let to_sub = self + .store + .get_query_ids() + .await? + .into_iter() + .collect::>(); + + if !to_sub.is_empty() { + // Unwrap here as the channel is guaranteed to be open + sub_tx.send(to_sub).await.unwrap(); + } let worker = Arc::new(BybitWorker::new(connector, self.store, sub_tx, unsub_tx)); diff --git a/bothan-bybit/src/worker/error.rs b/bothan-bybit/src/worker/error.rs index baf79fa8..25b5a5a1 100644 --- a/bothan-bybit/src/worker/error.rs +++ b/bothan-bybit/src/worker/error.rs @@ -14,6 +14,11 @@ pub(crate) enum WorkerError { SetFailed(#[from] store::error::Error), } -#[derive(Error, Debug)] -#[error(transparent)] -pub struct BuildError(#[from] api::ConnectionError); +#[derive(Debug, Error)] +pub enum BuildError { + #[error("failed to connect: {0}")] + FailedToConnect(#[from] api::ConnectionError), + + #[error("store error: {0}")] + StoreError(#[from] store::error::Error), +} diff --git a/bothan-coinbase/src/worker/builder.rs b/bothan-coinbase/src/worker/builder.rs index 5d0f3a31..88343fca 100644 --- a/bothan-coinbase/src/worker/builder.rs +++ b/bothan-coinbase/src/worker/builder.rs @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for CoinbaseWorkerBuilder { let (sub_tx, sub_rx) = channel(ch_size); let (unsub_tx, unsub_rx) = channel(ch_size); + let to_sub = self + .store + .get_query_ids() + .await? + .into_iter() + .collect::>(); + + if !to_sub.is_empty() { + // Unwrap here as the channel is guaranteed to be open + sub_tx.send(to_sub).await.unwrap(); + } + let worker = Arc::new(CoinbaseWorker::new(connector, self.store, sub_tx, unsub_tx)); tokio::spawn(start_asset_worker( diff --git a/bothan-coinbase/src/worker/error.rs b/bothan-coinbase/src/worker/error.rs index 7ccf8bab..ff6cdc33 100644 --- a/bothan-coinbase/src/worker/error.rs +++ b/bothan-coinbase/src/worker/error.rs @@ -11,6 +11,11 @@ pub(crate) enum WorkerError { SetFailed(#[from] store::error::Error), } -#[derive(Error, Debug)] -#[error(transparent)] -pub struct BuildError(#[from] api::ConnectionError); +#[derive(Debug, Error)] +pub enum BuildError { + #[error("failed to connect: {0}")] + FailedToConnect(#[from] api::ConnectionError), + + #[error("store error: {0}")] + StoreError(#[from] store::error::Error), +} diff --git a/bothan-htx/src/worker/asset_worker.rs b/bothan-htx/src/worker/asset_worker.rs index b5d549aa..3ceda7bb 100644 --- a/bothan-htx/src/worker/asset_worker.rs +++ b/bothan-htx/src/worker/asset_worker.rs @@ -119,15 +119,22 @@ async fn handle_reconnect( } /// Parses a tick response into an AssetInfo structure. -fn parse_tick(id: &str, tick: Tick) -> Result { +fn parse_tick(id: &str, tick: Tick, timestamp: i64) -> Result { let price_value = Decimal::from_f64(tick.last_price).ok_or(WorkerError::InvalidPrice(tick.last_price))?; - Ok(AssetInfo::new(id.to_string(), price_value, 0)) + Ok(AssetInfo::new(id.to_string(), price_value, timestamp)) } /// Stores tick information into the worker store. -async fn store_tick(store: &WorkerStore, id: &str, tick: Tick) -> Result<(), WorkerError> { - store.set_asset(id, parse_tick(id, tick)?).await?; +async fn store_tick( + store: &WorkerStore, + id: &str, + tick: Tick, + timestamp: i64, +) -> Result<(), WorkerError> { + store + .set_asset(id, parse_tick(id, tick, timestamp)?) + .await?; debug!("stored data for id {}", id); Ok(()) } @@ -149,7 +156,7 @@ async fn process_response( debug!("received data update from channel {}", data.ch); if let Some(id) = data.ch.split('.').nth(1) { // Handle processing of data update, e.g., storing tick data - match store_tick(store, id, data.tick).await { + match store_tick(store, id, data.tick, data.timestamp).await { Ok(_) => debug!("saved data"), Err(e) => error!("failed to save data: {}", e), } @@ -222,13 +229,13 @@ mod test { }; // Parse the tick into AssetInfo - let result = parse_tick("btcusdt", tick); + let result = parse_tick("btcusdt", tick, 1000); // Expected AssetInfo object let expected = AssetInfo::new( "btcusdt".to_string(), Decimal::from_str("52735.63").unwrap(), - 0, + 1000, ); // Assert that the parsed result matches the expected output @@ -256,6 +263,6 @@ mod test { }; // Assert that parsing the tick with an invalid price results in an error - assert!(parse_tick("btcusdt", tick).is_err()); + assert!(parse_tick("btcusdt", tick, 1000).is_err()); } } diff --git a/bothan-htx/src/worker/builder.rs b/bothan-htx/src/worker/builder.rs index 7d8868de..165f6171 100644 --- a/bothan-htx/src/worker/builder.rs +++ b/bothan-htx/src/worker/builder.rs @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for HtxWorkerBuilder { let (sub_tx, sub_rx) = channel(ch_size); let (unsub_tx, unsub_rx) = channel(ch_size); + let to_sub = self + .store + .get_query_ids() + .await? + .into_iter() + .collect::>(); + + if !to_sub.is_empty() { + // Unwrap here as the channel is guaranteed to be open + sub_tx.send(to_sub).await.unwrap(); + } + let worker = Arc::new(HtxWorker::new(connector, self.store, sub_tx, unsub_tx)); tokio::spawn(start_asset_worker( diff --git a/bothan-htx/src/worker/error.rs b/bothan-htx/src/worker/error.rs index c2cc80d9..c2ef356f 100644 --- a/bothan-htx/src/worker/error.rs +++ b/bothan-htx/src/worker/error.rs @@ -11,6 +11,11 @@ pub(crate) enum WorkerError { SetFailed(#[from] store::error::Error), } -#[derive(Error, Debug)] -#[error(transparent)] -pub struct BuildError(#[from] api::ConnectionError); +#[derive(Debug, Error)] +pub enum BuildError { + #[error("failed to connect: {0}")] + FailedToConnect(#[from] api::ConnectionError), + + #[error("store error: {0}")] + StoreError(#[from] store::error::Error), +} diff --git a/bothan-kraken/src/worker/builder.rs b/bothan-kraken/src/worker/builder.rs index b00af56d..5c7a73d7 100644 --- a/bothan-kraken/src/worker/builder.rs +++ b/bothan-kraken/src/worker/builder.rs @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for KrakenWorkerBuilder { let (sub_tx, sub_rx) = channel(ch_size); let (unsub_tx, unsub_rx) = channel(ch_size); + let to_sub = self + .store + .get_query_ids() + .await? + .into_iter() + .collect::>(); + + if !to_sub.is_empty() { + // Unwrap here as the channel is guaranteed to be open + sub_tx.send(to_sub).await.unwrap(); + } + let worker = Arc::new(KrakenWorker::new(connector, self.store, sub_tx, unsub_tx)); tokio::spawn(start_asset_worker( diff --git a/bothan-kraken/src/worker/error.rs b/bothan-kraken/src/worker/error.rs index c2cc80d9..c2ef356f 100644 --- a/bothan-kraken/src/worker/error.rs +++ b/bothan-kraken/src/worker/error.rs @@ -11,6 +11,11 @@ pub(crate) enum WorkerError { SetFailed(#[from] store::error::Error), } -#[derive(Error, Debug)] -#[error(transparent)] -pub struct BuildError(#[from] api::ConnectionError); +#[derive(Debug, Error)] +pub enum BuildError { + #[error("failed to connect: {0}")] + FailedToConnect(#[from] api::ConnectionError), + + #[error("store error: {0}")] + StoreError(#[from] store::error::Error), +} diff --git a/bothan-okx/src/worker/builder.rs b/bothan-okx/src/worker/builder.rs index 1076637f..95284552 100644 --- a/bothan-okx/src/worker/builder.rs +++ b/bothan-okx/src/worker/builder.rs @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for OkxWorkerBuilder { let (sub_tx, sub_rx) = channel(ch_size); let (unsub_tx, unsub_rx) = channel(ch_size); + let to_sub = self + .store + .get_query_ids() + .await? + .into_iter() + .collect::>(); + + if !to_sub.is_empty() { + // Unwrap here as the channel is guaranteed to be open + sub_tx.send(to_sub).await.unwrap(); + } + let worker = Arc::new(OkxWorker::new(connector, self.store, sub_tx, unsub_tx)); tokio::spawn(start_asset_worker( diff --git a/bothan-okx/src/worker/error.rs b/bothan-okx/src/worker/error.rs index 7ccf8bab..ff6cdc33 100644 --- a/bothan-okx/src/worker/error.rs +++ b/bothan-okx/src/worker/error.rs @@ -11,6 +11,11 @@ pub(crate) enum WorkerError { SetFailed(#[from] store::error::Error), } -#[derive(Error, Debug)] -#[error(transparent)] -pub struct BuildError(#[from] api::ConnectionError); +#[derive(Debug, Error)] +pub enum BuildError { + #[error("failed to connect: {0}")] + FailedToConnect(#[from] api::ConnectionError), + + #[error("store error: {0}")] + StoreError(#[from] store::error::Error), +}