Skip to content

Commit

Permalink
[fix] Bug Fixes (#100)
Browse files Browse the repository at this point in the history
* fix htx timestamp issue

* fixed bybit max args issue

* fixed error in builders where builder does not subscribe to store on build()

* format
  • Loading branch information
warittornc authored Nov 6, 2024
1 parent d7bdc63 commit 318444c
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 29 deletions.
1 change: 1 addition & 0 deletions bothan-bybit/src/api/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};

/// The default URL for the Bybit WebSocket API.
pub const DEFAULT_URL: &str = "wss://stream.bybit.com/v5/public/spot";
pub const MAX_ARGS: u32 = 10;

/// Represents the different types of responses from the Bybit API.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
Expand Down
21 changes: 15 additions & 6 deletions bothan-bybit/src/worker/asset_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bothan_core::store::WorkerStore;
use bothan_core::types::AssetInfo;

use crate::api::error::{MessageError, SendError};
use crate::api::types::{BybitResponse, Ticker};
use crate::api::types::{BybitResponse, Ticker, MAX_ARGS};
use crate::api::{BybitWebSocketConnection, BybitWebSocketConnector};
use crate::worker::error::WorkerError;
use crate::worker::types::{DEFAULT_TIMEOUT, RECONNECT_BUFFER};
Expand Down Expand Up @@ -54,8 +54,13 @@ async fn subscribe(
connection: &mut BybitWebSocketConnection,
) -> Result<(), SendError> {
if !ids.is_empty() {
let ids_vec = ids.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
connection.subscribe_ticker(&ids_vec).await?
for batched_ids in ids.chunks(MAX_ARGS as usize) {
let symbols = batched_ids
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>();
connection.subscribe_ticker(&symbols).await?
}
}

Ok(())
Expand All @@ -74,9 +79,13 @@ async fn unsubscribe(
connection: &mut BybitWebSocketConnection,
) -> Result<(), SendError> {
if !ids.is_empty() {
connection
.unsubscribe_ticker(&ids.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
.await?
for batched_ids in ids.chunks(MAX_ARGS as usize) {
let symbols = batched_ids
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>();
connection.unsubscribe_ticker(&symbols).await?
}
}

Ok(())
Expand Down
11 changes: 11 additions & 0 deletions bothan-bybit/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ impl<'a> AssetWorkerBuilder<'a> for BybitWorkerBuilder {

let (sub_tx, sub_rx) = channel(ch_size);
let (unsub_tx, unsub_rx) = channel(ch_size);
let to_sub = self
.store
.get_query_ids()
.await?
.into_iter()
.collect::<Vec<String>>();

if !to_sub.is_empty() {
// Unwrap here as the channel is guaranteed to be open
sub_tx.send(to_sub).await.unwrap();
}

let worker = Arc::new(BybitWorker::new(connector, self.store, sub_tx, unsub_tx));

Expand Down
11 changes: 8 additions & 3 deletions bothan-bybit/src/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub(crate) enum WorkerError {
SetFailed(#[from] store::error::Error),
}

#[derive(Error, Debug)]
#[error(transparent)]
pub struct BuildError(#[from] api::ConnectionError);
#[derive(Debug, Error)]
pub enum BuildError {
#[error("failed to connect: {0}")]
FailedToConnect(#[from] api::ConnectionError),

#[error("store error: {0}")]
StoreError(#[from] store::error::Error),
}
12 changes: 12 additions & 0 deletions bothan-coinbase/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for CoinbaseWorkerBuilder {
let (sub_tx, sub_rx) = channel(ch_size);
let (unsub_tx, unsub_rx) = channel(ch_size);

let to_sub = self
.store
.get_query_ids()
.await?
.into_iter()
.collect::<Vec<String>>();

if !to_sub.is_empty() {
// Unwrap here as the channel is guaranteed to be open
sub_tx.send(to_sub).await.unwrap();
}

let worker = Arc::new(CoinbaseWorker::new(connector, self.store, sub_tx, unsub_tx));

tokio::spawn(start_asset_worker(
Expand Down
11 changes: 8 additions & 3 deletions bothan-coinbase/src/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ pub(crate) enum WorkerError {
SetFailed(#[from] store::error::Error),
}

#[derive(Error, Debug)]
#[error(transparent)]
pub struct BuildError(#[from] api::ConnectionError);
#[derive(Debug, Error)]
pub enum BuildError {
#[error("failed to connect: {0}")]
FailedToConnect(#[from] api::ConnectionError),

#[error("store error: {0}")]
StoreError(#[from] store::error::Error),
}
23 changes: 15 additions & 8 deletions bothan-htx/src/worker/asset_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,22 @@ async fn handle_reconnect(
}

/// Parses a tick response into an AssetInfo structure.
fn parse_tick(id: &str, tick: Tick) -> Result<AssetInfo, WorkerError> {
fn parse_tick(id: &str, tick: Tick, timestamp: i64) -> Result<AssetInfo, WorkerError> {
let price_value =
Decimal::from_f64(tick.last_price).ok_or(WorkerError::InvalidPrice(tick.last_price))?;
Ok(AssetInfo::new(id.to_string(), price_value, 0))
Ok(AssetInfo::new(id.to_string(), price_value, timestamp))
}

/// Stores tick information into the worker store.
async fn store_tick(store: &WorkerStore, id: &str, tick: Tick) -> Result<(), WorkerError> {
store.set_asset(id, parse_tick(id, tick)?).await?;
async fn store_tick(
store: &WorkerStore,
id: &str,
tick: Tick,
timestamp: i64,
) -> Result<(), WorkerError> {
store
.set_asset(id, parse_tick(id, tick, timestamp)?)
.await?;
debug!("stored data for id {}", id);
Ok(())
}
Expand All @@ -149,7 +156,7 @@ async fn process_response(
debug!("received data update from channel {}", data.ch);
if let Some(id) = data.ch.split('.').nth(1) {
// Handle processing of data update, e.g., storing tick data
match store_tick(store, id, data.tick).await {
match store_tick(store, id, data.tick, data.timestamp).await {
Ok(_) => debug!("saved data"),
Err(e) => error!("failed to save data: {}", e),
}
Expand Down Expand Up @@ -222,13 +229,13 @@ mod test {
};

// Parse the tick into AssetInfo
let result = parse_tick("btcusdt", tick);
let result = parse_tick("btcusdt", tick, 1000);

// Expected AssetInfo object
let expected = AssetInfo::new(
"btcusdt".to_string(),
Decimal::from_str("52735.63").unwrap(),
0,
1000,
);

// Assert that the parsed result matches the expected output
Expand Down Expand Up @@ -256,6 +263,6 @@ mod test {
};

// Assert that parsing the tick with an invalid price results in an error
assert!(parse_tick("btcusdt", tick).is_err());
assert!(parse_tick("btcusdt", tick, 1000).is_err());
}
}
12 changes: 12 additions & 0 deletions bothan-htx/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for HtxWorkerBuilder {
let (sub_tx, sub_rx) = channel(ch_size);
let (unsub_tx, unsub_rx) = channel(ch_size);

let to_sub = self
.store
.get_query_ids()
.await?
.into_iter()
.collect::<Vec<String>>();

if !to_sub.is_empty() {
// Unwrap here as the channel is guaranteed to be open
sub_tx.send(to_sub).await.unwrap();
}

let worker = Arc::new(HtxWorker::new(connector, self.store, sub_tx, unsub_tx));

tokio::spawn(start_asset_worker(
Expand Down
11 changes: 8 additions & 3 deletions bothan-htx/src/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ pub(crate) enum WorkerError {
SetFailed(#[from] store::error::Error),
}

#[derive(Error, Debug)]
#[error(transparent)]
pub struct BuildError(#[from] api::ConnectionError);
#[derive(Debug, Error)]
pub enum BuildError {
#[error("failed to connect: {0}")]
FailedToConnect(#[from] api::ConnectionError),

#[error("store error: {0}")]
StoreError(#[from] store::error::Error),
}
12 changes: 12 additions & 0 deletions bothan-kraken/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for KrakenWorkerBuilder {
let (sub_tx, sub_rx) = channel(ch_size);
let (unsub_tx, unsub_rx) = channel(ch_size);

let to_sub = self
.store
.get_query_ids()
.await?
.into_iter()
.collect::<Vec<String>>();

if !to_sub.is_empty() {
// Unwrap here as the channel is guaranteed to be open
sub_tx.send(to_sub).await.unwrap();
}

let worker = Arc::new(KrakenWorker::new(connector, self.store, sub_tx, unsub_tx));

tokio::spawn(start_asset_worker(
Expand Down
11 changes: 8 additions & 3 deletions bothan-kraken/src/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ pub(crate) enum WorkerError {
SetFailed(#[from] store::error::Error),
}

#[derive(Error, Debug)]
#[error(transparent)]
pub struct BuildError(#[from] api::ConnectionError);
#[derive(Debug, Error)]
pub enum BuildError {
#[error("failed to connect: {0}")]
FailedToConnect(#[from] api::ConnectionError),

#[error("store error: {0}")]
StoreError(#[from] store::error::Error),
}
12 changes: 12 additions & 0 deletions bothan-okx/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<'a> AssetWorkerBuilder<'a> for OkxWorkerBuilder {
let (sub_tx, sub_rx) = channel(ch_size);
let (unsub_tx, unsub_rx) = channel(ch_size);

let to_sub = self
.store
.get_query_ids()
.await?
.into_iter()
.collect::<Vec<String>>();

if !to_sub.is_empty() {
// Unwrap here as the channel is guaranteed to be open
sub_tx.send(to_sub).await.unwrap();
}

let worker = Arc::new(OkxWorker::new(connector, self.store, sub_tx, unsub_tx));

tokio::spawn(start_asset_worker(
Expand Down
11 changes: 8 additions & 3 deletions bothan-okx/src/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ pub(crate) enum WorkerError {
SetFailed(#[from] store::error::Error),
}

#[derive(Error, Debug)]
#[error(transparent)]
pub struct BuildError(#[from] api::ConnectionError);
#[derive(Debug, Error)]
pub enum BuildError {
#[error("failed to connect: {0}")]
FailedToConnect(#[from] api::ConnectionError),

#[error("store error: {0}")]
StoreError(#[from] store::error::Error),
}

0 comments on commit 318444c

Please sign in to comment.