diff --git a/Cargo.toml b/Cargo.toml index b1540014..4d01fc71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,12 +4,23 @@ edition = "2021" license = "MIT OR Apache-2.0" [workspace] -members = ["price-adapter-raw", "price-adapter", "bothan-core", "bothan-binance", "bothan-coin-gecko"] +members = ["bothan-binance", "bothan-coingecko", "bothan-core", "price-adapter", "price-adapter-raw"] resolver = "2" [workspace.dependencies] +futures = "0.3.30" +async-trait = "0.1.77" +reqwest = { version = "0.11.24", features = ["json"] } +thiserror = "1.0.57" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +serde = { version = "1.0.197", features = ["std", "derive", "alloc"] } +tokio = { version = "1.36.0", features = ["full"] } +tokio-util = "0.7.10" +derive_more = { version = "1.0.0-beta.6", features = ["full"] } + price-adapter = { version = "0.1.8", path = "price-adapter" } price-adapter-raw = { version = "0.1.8", path = "price-adapter-raw" } bothan-core = { version = "0.1.0", path = "bothan-core" } bothan-binance = { version = "0.1.0", path = "bothan-binance" } -bothan-coin-gecko = { version = "0.1.0", path = "bothan-coin-gecko" } +bothan-coingecko = { version = "0.1.0", path = "bothan-coingecko" } diff --git a/bothan-binance/Cargo.toml b/bothan-binance/Cargo.toml index d470a407..21ffea65 100644 --- a/bothan-binance/Cargo.toml +++ b/bothan-binance/Cargo.toml @@ -4,25 +4,18 @@ version = "0.1.0" edition.workspace = true license.workspace = true - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -reqwest = { version = "0.11.22", features = ["json"] } -thiserror = "1.0.56" -tracing = { version = "0.1.40", features = [] } -tokio = { version = "1.36.0", features = ["full"] } -tracing-subscriber = "0.3.17" -serde = { version = "1.0.196", features = ["std", "derive", "alloc"] } -serde_json = "1.0.108" -itertools = "0.12.0" +serde_json = "1.0.114" tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } futures-util = "0.3.29" -tokio-util = "0.7.10" -chrono = "0.4.31" rand = "0.8.4" -dashmap = "5.5.3" -derive_more = { version = "1.0.0-beta.6", features = ["full"] } -futures = "0.3.30" -bothan-core = { path = "../bothan-core" } -async-trait = "0.1.77" + +bothan-core = { workspace = true } +async-trait = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } diff --git a/bothan-coin-gecko/Cargo.toml b/bothan-coin-gecko/Cargo.toml deleted file mode 100644 index b52fec36..00000000 --- a/bothan-coin-gecko/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "bothan-coin-gecko" -version = "0.1.0" -edition.workspace = true -license.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -reqwest = { version = "0.11.24", features = ["json"] } -thiserror = "1.0.56" -serde = { version = "1.0.196", features = ["std", "derive", "alloc"] } -derive_more = { version = "1.0.0-beta.6", features = ["full"] } -tokio = { version = "1.36.0", features = ["full"] } -futures = "0.3.30" -url = "2.5.0" -bothan-core = { path = "../bothan-core" } -async-trait = "0.1.77" -chrono = "0.4.34" -tracing = "0.1.40" -tracing-subscriber = "0.3.18" diff --git a/bothan-coin-gecko/src/api/rest.rs b/bothan-coin-gecko/src/api/rest.rs deleted file mode 100644 index f5d0f063..00000000 --- a/bothan-coin-gecko/src/api/rest.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::collections::HashMap; - -use reqwest::{Client, RequestBuilder, Response, Url}; - -use crate::api::error::Error; -use crate::api::types::{Coin, Market, MAX_PAGE_SIZE}; - -pub struct CoinGeckoRestAPI { - url: Url, - client: Client, -} - -impl CoinGeckoRestAPI { - pub fn new(url: Url, client: Client) -> Self { - Self { url, client } - } - - pub async fn get_coins_list(&self) -> Result, Error> { - let url = format!("{}coins/list", self.url); - let builder = self.client.get(url); - let response = send_request(builder).await?; - - Ok(response.json::>().await?) - } - - pub async fn get_coins_market(&self, ids: &[&str]) -> Vec> { - let ids_per_pages = ids - .chunks(MAX_PAGE_SIZE) - .enumerate() - .collect::>(); - - let url = format!("{}coins/markets", self.url); - let base_params = vec![ - ("vs_currency", "usd".to_string()), - ("per_page", MAX_PAGE_SIZE.to_string()), - ("ids", ids.join(",")), - ]; - - let mut markets = Vec::with_capacity(ids.len()); - for (page, page_ids) in ids_per_pages { - let mut params = base_params.clone(); - params.push(("page", (page + 1).to_string())); - - let builder_with_query = self.client.get(&url).query(¶ms); - let market_data = match send_request(builder_with_query).await { - Ok(response) => match parse_response::>(response).await { - Ok(markets) => { - let map: HashMap = - HashMap::from_iter(markets.into_iter().map(|m| (m.id.clone(), m))); - page_ids - .iter() - .map(|id| { - let val = map.get(*id).cloned(); - if let Some(market) = val { - Ok(market) - } else { - Err(Error::InvalidID) - } - }) - .collect::>>() - } - Err(e) => vec![Err(e.clone()); page_ids.len()], - }, - Err(e) => vec![Err(e.clone()); page_ids.len()], - }; - markets.extend(market_data); - } - markets - } -} - -async fn send_request(request_builder: RequestBuilder) -> Result { - let response = request_builder.send().await?; - - let status = response.status(); - if status.is_client_error() || status.is_server_error() { - return Err(Error::Http(status)); - } - - Ok(response) -} - -async fn parse_response(response: Response) -> Result { - Ok(response.json::().await?) -} diff --git a/bothan-coingecko/Cargo.toml b/bothan-coingecko/Cargo.toml new file mode 100644 index 00000000..16cccff6 --- /dev/null +++ b/bothan-coingecko/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "bothan-coingecko" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +url = "2.5.0" +chrono = "0.4.34" + +bothan-core = { workspace = true } +async-trait = { workspace = true } +reqwest = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +serde ={ workspace = true } +tokio = { workspace = true } diff --git a/bothan-coin-gecko/examples/dummy_example.rs b/bothan-coingecko/examples/dummy_example.rs similarity index 92% rename from bothan-coin-gecko/examples/dummy_example.rs rename to bothan-coingecko/examples/dummy_example.rs index 41a1c4d4..ea7999b9 100644 --- a/bothan-coin-gecko/examples/dummy_example.rs +++ b/bothan-coingecko/examples/dummy_example.rs @@ -2,7 +2,7 @@ use std::time::Duration; use tracing_subscriber::fmt::init; -use bothan_coin_gecko::CoinGeckoServiceBuilder; +use bothan_coingecko::CoinGeckoServiceBuilder; use bothan_core::service::Service; #[tokio::main] diff --git a/bothan-coin-gecko/src/api.rs b/bothan-coingecko/src/api.rs similarity index 100% rename from bothan-coin-gecko/src/api.rs rename to bothan-coingecko/src/api.rs diff --git a/bothan-coin-gecko/src/api/builder.rs b/bothan-coingecko/src/api/builder.rs similarity index 95% rename from bothan-coin-gecko/src/api/builder.rs rename to bothan-coingecko/src/api/builder.rs index 19cfe97b..b09cfee8 100644 --- a/bothan-coin-gecko/src/api/builder.rs +++ b/bothan-coingecko/src/api/builder.rs @@ -6,7 +6,6 @@ use crate::api::error::Error; use crate::api::types::{DEFAULT_PRO_URL, DEFAULT_URL, DEFAULT_USER_AGENT}; use crate::api::CoinGeckoRestAPI; -#[derive(Default)] pub struct CoinGeckoRestAPIBuilder { url: Option, api_key: Option, @@ -14,14 +13,6 @@ pub struct CoinGeckoRestAPIBuilder { } impl CoinGeckoRestAPIBuilder { - pub fn new() -> Self { - CoinGeckoRestAPIBuilder { - url: None, - api_key: None, - user_agent: DEFAULT_USER_AGENT.into(), - } - } - pub fn set_url(&mut self, url: &str) -> &Self { self.url = Some(url.into()); self @@ -61,3 +52,13 @@ impl CoinGeckoRestAPIBuilder { Ok(CoinGeckoRestAPI::new(parsed_url, client)) } } + +impl Default for CoinGeckoRestAPIBuilder { + fn default() -> Self { + CoinGeckoRestAPIBuilder { + url: None, + api_key: None, + user_agent: DEFAULT_USER_AGENT.into(), + } + } +} diff --git a/bothan-coin-gecko/src/api/error.rs b/bothan-coingecko/src/api/error.rs similarity index 100% rename from bothan-coin-gecko/src/api/error.rs rename to bothan-coingecko/src/api/error.rs diff --git a/bothan-coingecko/src/api/rest.rs b/bothan-coingecko/src/api/rest.rs new file mode 100644 index 00000000..c489217f --- /dev/null +++ b/bothan-coingecko/src/api/rest.rs @@ -0,0 +1,65 @@ +use std::collections::HashMap; + +use reqwest::{Client, RequestBuilder, Response, Url}; + +use crate::api::error::Error; +use crate::api::types::{Coin, Market}; + +pub struct CoinGeckoRestAPI { + url: Url, + client: Client, +} + +impl CoinGeckoRestAPI { + pub fn new(url: Url, client: Client) -> Self { + Self { url, client } + } + + pub async fn get_coins_list(&self) -> Result, Error> { + let url = format!("{}coins/list", self.url); + let builder = self.client.get(url); + let response = send_request(builder).await?; + + Ok(response.json::>().await?) + } + + pub async fn get_coins_market( + &self, + ids: &[&str], + page_size: usize, + page: usize, + ) -> Result>, Error> { + let url = format!("{}coins/markets", self.url); + let params = vec![ + ("vs_currency", "usd".to_string()), + ("per_page", page_size.to_string()), + ("ids", ids.join(",")), + ("page", page.to_string()), + ]; + + let builder_with_query = self.client.get(&url).query(¶ms); + let response = send_request(builder_with_query).await?; + let market_data = parse_response::>(response).await?; + let mut market_data_map: HashMap = + HashMap::from_iter(market_data.into_iter().map(|m| (m.id.clone(), m))); + Ok(ids + .iter() + .map(|id| market_data_map.remove(&id.to_string())) + .collect()) + } +} + +async fn send_request(request_builder: RequestBuilder) -> Result { + let response = request_builder.send().await?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + return Err(Error::Http(status)); + } + + Ok(response) +} + +async fn parse_response(response: Response) -> Result { + Ok(response.json::().await?) +} diff --git a/bothan-coin-gecko/src/api/types.rs b/bothan-coingecko/src/api/types.rs similarity index 93% rename from bothan-coin-gecko/src/api/types.rs rename to bothan-coingecko/src/api/types.rs index 3b32d0a8..8346ed44 100644 --- a/bothan-coin-gecko/src/api/types.rs +++ b/bothan-coingecko/src/api/types.rs @@ -1,6 +1,5 @@ use serde::{Deserialize, Serialize}; -pub(crate) const MAX_PAGE_SIZE: usize = 250; pub(crate) const DEFAULT_USER_AGENT: &str = "Bothan"; pub(crate) const DEFAULT_URL: &str = "https://api.coingecko.com/api/v3/"; pub(crate) const DEFAULT_PRO_URL: &str = "https://pro-api.coingecko.com/api/v3/"; diff --git a/bothan-coin-gecko/src/builder.rs b/bothan-coingecko/src/builder.rs similarity index 72% rename from bothan-coin-gecko/src/builder.rs rename to bothan-coingecko/src/builder.rs index 871c3aaa..c969a928 100644 --- a/bothan-coin-gecko/src/builder.rs +++ b/bothan-coingecko/src/builder.rs @@ -3,28 +3,22 @@ use tokio::time::Duration; use crate::api::types::DEFAULT_USER_AGENT; use crate::api::CoinGeckoRestAPIBuilder; use crate::error::Error; -use crate::types::{DEFAULT_UPDATE_INTERVAL, DEFAULT_UPDATE_SUPPORTED_ASSETS_INTERVAL}; +use crate::types::{ + DEFAULT_PAGE_SIZE, DEFAULT_UPDATE_INTERVAL, DEFAULT_UPDATE_SUPPORTED_ASSETS_INTERVAL, +}; use crate::CoinGeckoService; -#[derive(Default)] pub struct CoinGeckoServiceBuilder { url: Option, api_key: Option, user_agent: String, update_interval: Duration, update_supported_assets_interval: Duration, + page_size: usize, + page_query_delay: Option, } impl CoinGeckoServiceBuilder { - pub fn new() -> Self { - CoinGeckoServiceBuilder { - url: None, - api_key: None, - user_agent: DEFAULT_USER_AGENT.into(), - update_interval: DEFAULT_UPDATE_INTERVAL, - update_supported_assets_interval: DEFAULT_UPDATE_SUPPORTED_ASSETS_INTERVAL, - } - } pub fn set_url(mut self, url: &str) -> Self { self.url = Some(url.into()); self @@ -53,6 +47,16 @@ impl CoinGeckoServiceBuilder { self } + pub fn set_page_size(mut self, page_size: usize) -> Self { + self.page_size = page_size; + self + } + + pub fn set_page_query_delay(mut self, page_query_delay: Duration) -> Self { + self.page_query_delay = Some(page_query_delay); + self + } + pub async fn build(self) -> Result { let mut api_builder = CoinGeckoRestAPIBuilder::default(); if let Some(url) = &self.url { @@ -64,11 +68,29 @@ impl CoinGeckoServiceBuilder { api_builder.set_user_agent(&self.user_agent); let api = api_builder.build()?; - Ok(CoinGeckoService::new( + let service = CoinGeckoService::new( api, self.update_interval, self.update_supported_assets_interval, + self.page_size, + self.page_query_delay, ) - .await) + .await; + + Ok(service) + } +} + +impl Default for CoinGeckoServiceBuilder { + fn default() -> Self { + CoinGeckoServiceBuilder { + url: None, + api_key: None, + user_agent: DEFAULT_USER_AGENT.into(), + update_interval: DEFAULT_UPDATE_INTERVAL, + update_supported_assets_interval: DEFAULT_UPDATE_SUPPORTED_ASSETS_INTERVAL, + page_size: DEFAULT_PAGE_SIZE, + page_query_delay: None, + } } } diff --git a/bothan-coin-gecko/src/error.rs b/bothan-coingecko/src/error.rs similarity index 100% rename from bothan-coin-gecko/src/error.rs rename to bothan-coingecko/src/error.rs diff --git a/bothan-coin-gecko/src/lib.rs b/bothan-coingecko/src/lib.rs similarity index 100% rename from bothan-coin-gecko/src/lib.rs rename to bothan-coingecko/src/lib.rs diff --git a/bothan-coin-gecko/src/service.rs b/bothan-coingecko/src/service.rs similarity index 70% rename from bothan-coin-gecko/src/service.rs rename to bothan-coingecko/src/service.rs index 9f706cd8..0e1b13e4 100644 --- a/bothan-coin-gecko/src/service.rs +++ b/bothan-coingecko/src/service.rs @@ -25,6 +25,8 @@ impl CoinGeckoService { rest_api: CoinGeckoRestAPI, update_interval: Duration, update_supported_assets_interval: Duration, + page_size: usize, + page_query_delay: Option, ) -> Self { let cache = Arc::new(Cache::new(None)); let coin_list = Arc::new(RwLock::new(HashSet::::new())); @@ -32,11 +34,13 @@ impl CoinGeckoService { let update_supported_assets_interval = interval(update_supported_assets_interval); start_service( - rest_api, + Arc::new(rest_api), cache.clone(), update_price_interval, update_supported_assets_interval, coin_list.clone(), + page_size, + page_query_delay, ) .await; @@ -66,7 +70,7 @@ impl Service for CoinGeckoService { Err(ServiceError::InvalidSymbol) } } - Err(CacheError::Invalid) => Err(ServiceError::Pending), + Err(CacheError::Invalid) => Err(ServiceError::InvalidSymbol), Err(_) => Err(ServiceError::Pending), }) .collect(); @@ -80,11 +84,13 @@ impl Service for CoinGeckoService { } pub async fn start_service( - rest_api: CoinGeckoRestAPI, + rest_api: Arc, cache: Arc>, mut update_price_interval: Interval, mut update_supported_assets_interval: Interval, coin_list: Arc>>, + page_size: usize, + page_query_delay: Option, ) { update_coin_list(&rest_api, &coin_list).await; @@ -92,7 +98,7 @@ pub async fn start_service( loop { select! { _ = update_price_interval.tick() => { - update_price_data(&rest_api, &cache).await; + update_price_data(&rest_api, &cache, page_size, page_query_delay).await; }, _ = update_supported_assets_interval.tick() => { update_coin_list(&rest_api, &coin_list).await; @@ -102,20 +108,60 @@ pub async fn start_service( }); } -async fn update_price_data(rest_api: &CoinGeckoRestAPI, cache: &Arc>) { +async fn update_price_data( + rest_api: &Arc, + cache: &Arc>, + page_size: usize, + page_query_delay: Option, +) { + let mut delay = page_query_delay.map(interval); + + let pages = { + let keys = cache.keys().await; + keys.len().div_ceil(page_size) + }; + + for page in 1..=pages { + if let Some(interval) = delay.as_mut() { + interval.tick().await; + } + + let cloned_api = rest_api.clone(); + let cloned_cache = cache.clone(); + tokio::spawn(async move { + update_price_data_from_api(&cloned_api, &cloned_cache, page, page_size).await; + }); + } +} + +async fn update_price_data_from_api( + rest_api: &Arc, + cache: &Arc>, + page: usize, + page_size: usize, +) { let keys = cache.keys().await; let ids = keys.iter().map(|x| x.as_str()).collect::>(); - let market_results = rest_api.get_coins_market(&ids).await; - for market_result in market_results { - if let Ok(market) = market_result { - process_market_data(&market, cache).await; - } else { - warn!("failed to get market data"); + if let Ok(markets) = rest_api + .get_coins_market(ids.as_slice(), page_size, page) + .await + { + for (id, market) in ids.iter().zip(markets.iter()) { + if let Some(m) = market { + process_market_data(m, cache).await; + } else { + warn!("id {} is missing market data", id); + } } + } else { + warn!("failed to get market data"); } } -async fn update_coin_list(rest_api: &CoinGeckoRestAPI, coin_list: &Arc>>) { +async fn update_coin_list( + rest_api: &Arc, + coin_list: &Arc>>, +) { if let Ok(new_coin_list) = rest_api.get_coins_list().await { let new_coin_set = HashSet::::from_iter(new_coin_list.into_iter().map(|x| x.id)); let mut locked = coin_list.write().await; diff --git a/bothan-coin-gecko/src/types.rs b/bothan-coingecko/src/types.rs similarity index 80% rename from bothan-coin-gecko/src/types.rs rename to bothan-coingecko/src/types.rs index 1bc091ed..10b68263 100644 --- a/bothan-coin-gecko/src/types.rs +++ b/bothan-coingecko/src/types.rs @@ -2,3 +2,4 @@ use tokio::time::Duration; pub(crate) const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(60); pub(crate) const DEFAULT_UPDATE_SUPPORTED_ASSETS_INTERVAL: Duration = Duration::from_secs(86400); +pub(crate) const DEFAULT_PAGE_SIZE: usize = 250; diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml index 68fda9cb..58e1fcb9 100644 --- a/bothan-core/Cargo.toml +++ b/bothan-core/Cargo.toml @@ -7,12 +7,11 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -thiserror = "1.0.57" -futures = "0.3.30" -tokio = { version = "1.36.0", features = ["full"] } -tokio-util = "0.7.10" -tracing = "0.1.40" -log = "0.4.20" -serde = { version = "1.0.196", features = ["std", "derive", "alloc"] } -derive_more = { version = "1.0.0-beta.6", features = ["full"] } -async-trait = "0.1.77" +thiserror = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +tracing = { workspace = true } +serde = { workspace = true } +derive_more = { workspace = true } +async-trait = { workspace = true }