diff --git a/Cargo.lock b/Cargo.lock index 041fef7f..a47d0e00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,9 +121,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" [[package]] name = "arraydeque" @@ -166,7 +166,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -177,7 +177,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -308,7 +308,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -367,7 +367,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", "syn_derive", ] @@ -433,11 +433,12 @@ dependencies = [ "bothan-core", "futures-util", "humantime-serde", + "opentelemetry", "rand", "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -456,7 +457,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -489,7 +490,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -510,7 +511,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tracing", "tracing-subscriber", @@ -531,7 +532,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tracing", "tracing-subscriber", @@ -558,7 +559,7 @@ dependencies = [ "semver", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tracing", ] @@ -573,7 +574,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -592,7 +593,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -612,7 +613,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -631,7 +632,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.6", + "thiserror 2.0.9", "tokio", "tokio-tungstenite", "tracing", @@ -816,7 +817,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -1009,7 +1010,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -1033,7 +1034,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -1044,7 +1045,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -1104,7 +1105,7 @@ dependencies = [ "convert_case", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", "unicode-xid", ] @@ -1300,7 +1301,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2028,7 +2029,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2049,6 +2050,20 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.65", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -2147,7 +2162,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2178,7 +2193,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2285,7 +2300,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2752,14 +2767,14 @@ checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] name = "serde_json" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" dependencies = [ "itoa", "memchr", @@ -2815,7 +2830,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -2987,9 +3002,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.90" +version = "2.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +checksum = "987bc0be1cdea8b10216bd06e2ca407d40b9543468fafd3ddfb02f36e77f71f3" dependencies = [ "proc-macro2", "quote", @@ -3005,7 +3020,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -3080,11 +3095,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" dependencies = [ - "thiserror-impl 2.0.6", + "thiserror-impl 2.0.9", ] [[package]] @@ -3095,18 +3110,18 @@ checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -3200,7 +3215,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -3391,7 +3406,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] @@ -3627,7 +3642,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", "wasm-bindgen-shared", ] @@ -3661,7 +3676,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3960,7 +3975,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn 2.0.94", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e8fa645b..b2c4ab1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ futures-util = "0.3.31" humantime-serde = "1.1.1" itertools = "0.13.0" mockito = "1.4.0" +opentelemetry = { version = "0.26.0", features = ["metrics"] } prost = "0.13.1" protoc-gen-prost = "0.4.0" protoc-gen-tonic = "0.4.1" diff --git a/bothan-binance/Cargo.toml b/bothan-binance/Cargo.toml index 35ed9691..e979533c 100644 --- a/bothan-binance/Cargo.toml +++ b/bothan-binance/Cargo.toml @@ -10,6 +10,7 @@ async-trait = { workspace = true } bothan-core = { workspace = true } futures-util = { workspace = true, features = ["sink", "std"] } humantime-serde = { workspace = true } +opentelemetry = { workspace = true } rand = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } diff --git a/bothan-binance/src/api/msgs.rs b/bothan-binance/src/api/msgs.rs index 162e0422..d3ab4ed2 100644 --- a/bothan-binance/src/api/msgs.rs +++ b/bothan-binance/src/api/msgs.rs @@ -3,14 +3,14 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct SuccessResponse { pub result: Option, - pub id: u64, + pub id: i64, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ErrorResponse { - pub code: u16, + pub code: i16, pub msg: String, - pub id: u64, + pub id: i64, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/bothan-binance/src/api/websocket.rs b/bothan-binance/src/api/websocket.rs index bd40f032..97cdd389 100644 --- a/bothan-binance/src/api/websocket.rs +++ b/bothan-binance/src/api/websocket.rs @@ -70,10 +70,11 @@ impl BinanceWebSocketConnection { /// ``` pub async fn subscribe_mini_ticker_stream>( &mut self, - ids: &[K], + id: i64, + tickers: &[K], ) -> Result<(), SendError> { // Format the stream IDs for subscription. - let stream_ids = ids + let tickers = tickers .iter() .map(|id| format!("{}@miniTicker", id.as_ref())) .collect::>(); @@ -81,8 +82,8 @@ impl BinanceWebSocketConnection { // Create the subscription payload. let payload = json!({ "method": "SUBSCRIBE", - "params": stream_ids, - "id": rand::random::() + "params": tickers, + "id": id, }); // Send the subscription message. @@ -101,10 +102,11 @@ impl BinanceWebSocketConnection { /// ``` pub async fn unsubscribe_mini_ticker_stream>( &mut self, - ids: &[K], + id: i64, + tickers: &[K], ) -> Result<(), SendError> { // Format the stream IDs for unsubscription. - let stream_ids = ids + let stream_ids = tickers .iter() .map(|id| format!("{}@miniTicker", id.as_ref())) .collect::>(); @@ -113,7 +115,7 @@ impl BinanceWebSocketConnection { let payload = json!({ "method": "UNSUBSCRIBE", "params": stream_ids, - "id": rand::random::() + "id":id, }); // Send the unsubscription message. diff --git a/bothan-binance/src/worker.rs b/bothan-binance/src/worker.rs index 8937e328..7dd53635 100644 --- a/bothan-binance/src/worker.rs +++ b/bothan-binance/src/worker.rs @@ -49,7 +49,7 @@ impl AssetWorker for BinanceWorker { async fn set_query_ids(&self, ids: Vec) -> Result<(), SetQueryIDError> { let (to_sub, to_unsub) = self .store - .set_query_ids(ids) + .compute_query_id_differences(ids) .await .map_err(|e| SetQueryIDError::new(e.to_string()))?; diff --git a/bothan-binance/src/worker/asset_worker.rs b/bothan-binance/src/worker/asset_worker.rs index 75834a47..1bc21f8e 100644 --- a/bothan-binance/src/worker/asset_worker.rs +++ b/bothan-binance/src/worker/asset_worker.rs @@ -1,5 +1,9 @@ +use std::collections::HashMap; use std::sync::Weak; +use opentelemetry::{global, KeyValue}; +use rand::random; +use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use tokio::select; use tokio::sync::mpsc::Receiver; @@ -9,27 +13,33 @@ use tracing::{debug, error, info, warn}; use bothan_core::store::WorkerStore; use bothan_core::types::AssetInfo; -use crate::api::error::{MessageError, SendError}; -use crate::api::msgs::{BinanceResponse, Data}; +use crate::api::error::MessageError; +use crate::api::msgs::{BinanceResponse, Data, ErrorResponse, SuccessResponse}; use crate::api::{BinanceWebSocketConnection, BinanceWebSocketConnector}; -use crate::worker::error::WorkerError; -use crate::worker::types::{DEFAULT_TIMEOUT, RECONNECT_BUFFER}; +use crate::worker::types::{DEFAULT_TIMEOUT, METER_NAME, RECONNECT_BUFFER}; use crate::worker::BinanceWorker; +enum Event { + Subscribe(Vec), + Unsubscribe(Vec), +} + pub(crate) async fn start_asset_worker( worker: Weak, mut connection: BinanceWebSocketConnection, mut subscribe_rx: Receiver>, mut unsubscribe_rx: Receiver>, ) { + let mut subscription_map = HashMap::new(); while let Some(worker) = worker.upgrade() { select! { - Some(ids) = subscribe_rx.recv() => handle_subscribe_recv(ids, &worker.store, &mut connection).await, - Some(ids) = unsubscribe_rx.recv() => handle_unsubscribe_recv(ids, &worker.store, &mut connection).await, + Some(ids) = subscribe_rx.recv() => handle_subscribe_recv(ids, &mut connection, &mut subscription_map).await, + Some(ids) = unsubscribe_rx.recv() => handle_unsubscribe_recv(ids, &mut connection, &mut subscription_map).await, result = timeout(DEFAULT_TIMEOUT, connection.next()) => { match result { + // Assume that the connection has been closed on timeout and attempt to reconnect Err(_) => handle_reconnect(&worker.connector, &mut connection, &worker.store).await, - Ok(binance_result) => handle_connection_recv(binance_result, &worker.connector, &mut connection, &worker.store).await, + Ok(binance_result) => handle_connection_recv(binance_result, &worker.connector, &mut connection, &worker.store, &mut subscription_map).await, } } } @@ -45,60 +55,103 @@ pub(crate) async fn start_asset_worker( debug!("asset worker has been dropped, stopping asset worker"); } -async fn subscribe>( - ids: &[T], - connection: &mut BinanceWebSocketConnection, -) -> Result<(), SendError> { - if !ids.is_empty() { - connection - .subscribe_mini_ticker_stream(&ids.iter().map(|s| s.as_ref()).collect::>()) - .await? - } - - Ok(()) -} - async fn handle_subscribe_recv( ids: Vec, - worker_store: &WorkerStore, connection: &mut BinanceWebSocketConnection, + subscription_map: &mut HashMap, ) { - match subscribe(&ids, connection).await { - Ok(_) => info!("subscribed to ids {:?}", ids), + if ids.is_empty() { + return; + } + + let packet_id = random(); + let tickers = ids.iter().map(|s| s.as_ref()).collect::>(); + + let meter = global::meter(METER_NAME); + meter.u64_counter("subscribe_attempt").init().add( + 1, + &[ + KeyValue::new("subscription.id", packet_id), + KeyValue::new("subscription.tickers", tickers.join(",")), + ], + ); + + match connection + .subscribe_mini_ticker_stream(packet_id, &tickers) + .await + { + Ok(_) => { + info!("attempt to subscribe to ids {:?}", ids); + subscription_map.insert(packet_id, Event::Subscribe(ids)); + } Err(e) => { - error!("failed to subscribe to ids {:?}: {}", ids, e); - if worker_store.remove_query_ids(ids).await.is_err() { - error!("failed to remove query ids from store") - } + error!("failed attempt to subscribe to ids {:?}: {}", ids, e); + meter + .u64_counter("failed_subscribe_attempt") + .init() + .add(1, &[KeyValue::new("subscription.id", packet_id)]); } } } -async fn unsubscribe>( - ids: &[T], +async fn handle_unsubscribe_recv( + ids: Vec, connection: &mut BinanceWebSocketConnection, -) -> Result<(), SendError> { - if !ids.is_empty() { - connection - .unsubscribe_mini_ticker_stream(&ids.iter().map(|s| s.as_ref()).collect::>()) - .await? + subscription_map: &mut HashMap, +) { + if ids.is_empty() { + return; } - Ok(()) + let packet_id = random(); + let tickers = ids.iter().map(|s| s.as_ref()).collect::>(); + + let meter = global::meter(METER_NAME); + meter.u64_counter("unsubscribe_attempt").init().add( + 1, + &[ + KeyValue::new("subscription.id", packet_id), + KeyValue::new("subscription.tickers", tickers.join(",")), + ], + ); + + match connection + .unsubscribe_mini_ticker_stream(packet_id, &tickers) + .await + { + Ok(_) => { + info!("attempt to unsubscribe to ids {:?}", ids); + subscription_map.insert(packet_id, Event::Unsubscribe(ids)); + } + Err(e) => { + error!("failed attempt to unsubscribe to ids {:?}: {}", ids, e); + meter + .u64_counter("failed_unsubscribe_attempt") + .init() + .add(1, &[KeyValue::new("subscription.id", packet_id)]); + } + } } -async fn handle_unsubscribe_recv( - ids: Vec, - worker_store: &WorkerStore, +async fn handle_connection_recv( + recv_result: Result, + connector: &BinanceWebSocketConnector, connection: &mut BinanceWebSocketConnection, + store: &WorkerStore, + subscription_map: &mut HashMap, ) { - match unsubscribe(&ids, connection).await { - Ok(_) => info!("unsubscribed to ids {:?}", ids), - Err(e) => { - error!("failed to unsubscribe to ids {:?}: {}", ids, e); - if worker_store.add_query_ids(ids).await.is_err() { - error!("failed to add query ids to store") - } + match recv_result { + Ok(resp) => { + process_response(store, resp, subscription_map).await; + } + Err(MessageError::ChannelClosed) => { + handle_reconnect(connector, connection, store).await; + } + Err(MessageError::UnsupportedMessage) => { + error!("unsupported message received from binance"); + } + Err(MessageError::Parse(..)) => { + error!("unable to parse message from binance"); } } } @@ -110,6 +163,9 @@ async fn handle_reconnect( ) { let mut retry_count: usize = 1; loop { + let meter = global::meter(METER_NAME); + meter.u64_counter("reconnect-attempts").init().add(1, &[]); + warn!("reconnecting: attempt {}", retry_count); if let Ok(new_connection) = connector.connect().await { @@ -123,7 +179,18 @@ async fn handle_reconnect( let ids_vec = ids.into_iter().collect::>(); - if subscribe(&ids_vec, connection).await.is_ok() { + if ids_vec.is_empty() { + info!("no ids to resubscribe to"); + return; + } + + let packet_id = random(); + + if connection + .subscribe_mini_ticker_stream(packet_id, &ids_vec) + .await + .is_ok() + { info!("resubscribed to all ids"); return; } else { @@ -138,58 +205,98 @@ async fn handle_reconnect( } } -async fn store_data(store: &WorkerStore, data: Data) -> Result<(), WorkerError> { +async fn process_response( + store: &WorkerStore, + resp: BinanceResponse, + subscription_map: &mut HashMap, +) { + match resp { + BinanceResponse::Stream(r) => store_data(store, r.data).await, + BinanceResponse::Success(r) => process_success(store, r, subscription_map).await, + BinanceResponse::Ping => process_ping(), + BinanceResponse::Error(e) => process_error(e), + } +} + +async fn store_data(store: &WorkerStore, data: Data) { match data { Data::MiniTicker(ticker) => { let id = ticker.symbol.to_lowercase(); - let price = Decimal::from_str_exact(&ticker.close_price)?; + let Ok(price) = Decimal::from_str_exact(&ticker.close_price) else { + error!("failed to parse price for id {}", id); + return; + }; + let timestamp = ticker.event_time / 1000; let asset_info = AssetInfo::new(id.clone(), price, timestamp); - store.set_asset(&id, asset_info).await?; - debug!("stored data for id {}", id); - } - } - - Ok(()) -} - -async fn process_response(store: &WorkerStore, resp: BinanceResponse) { - match resp { - BinanceResponse::Stream(resp) => match store_data(store, resp.data).await { - Ok(_) => debug!("saved data"), - Err(e) => error!("failed to save data: {}", e), - }, - BinanceResponse::Success(_) => { - info!("subscription success"); - } - BinanceResponse::Ping => { - debug!("received ping from binance"); - } - BinanceResponse::Error(e) => { - error!("error code {} received from binance: {}", e.code, e.msg); + match store.set_asset(&id, asset_info).await { + Ok(_) => { + info!("stored data for id {}", id); + global::meter(METER_NAME) + .f64_gauge("asset-prices") + .init() + .record( + price.to_f64().unwrap(), // Prices should never be NaN so unwrap here + &[KeyValue::new("asset.symbol", id)], + ); + } + Err(e) => error!("failed to store data for id {}: {}", id, e), + } } } } -async fn handle_connection_recv( - recv_result: Result, - connector: &BinanceWebSocketConnector, - connection: &mut BinanceWebSocketConnection, +async fn process_success( store: &WorkerStore, + success_response: SuccessResponse, + subscription_map: &mut HashMap, ) { - match recv_result { - Ok(resp) => { - process_response(store, resp).await; - } - Err(MessageError::ChannelClosed) => { - handle_reconnect(connector, connection, store).await; - } - Err(MessageError::UnsupportedMessage) => { - error!("unsupported message received from binance"); + let meter = global::meter(METER_NAME); + + match subscription_map.remove(&success_response.id) { + Some(Event::Subscribe(ids)) => { + info!("subscribed to ids {:?}", ids); + meter + .u64_counter("subscribe_success") + .init() + .add(1, &[KeyValue::new("id", success_response.id)]); + if store.add_query_ids(ids).await.is_err() { + error!("failed to add query ids to store"); + }; } - Err(MessageError::Parse(..)) => { - error!("unable to parse message from binance"); + Some(Event::Unsubscribe(ids)) => { + info!("unsubscribed to ids {:?}", ids); + meter + .u64_counter("unsubscribe_success") + .init() + .add(1, &[KeyValue::new("subscription.id", success_response.id)]); + if store.remove_query_ids(ids).await.is_err() { + error!("failed to remove query ids from store"); + }; } + None => error!("received response for unknown id: {}", success_response.id), } } + +fn process_ping() { + debug!("received ping from binance"); + global::meter(METER_NAME) + .u64_counter("pings") + .init() + .add(1, &[]); +} + +fn process_error(error: ErrorResponse) { + error!( + "error code {} received from binance: {}", + error.code, error.msg + ); + global::meter(METER_NAME).u64_counter("errors").init().add( + 1, + &[ + KeyValue::new("msg.code", error.code as i64), + KeyValue::new("msg.error", error.msg), + ], + ); +} diff --git a/bothan-binance/src/worker/builder.rs b/bothan-binance/src/worker/builder.rs index b987e1db..32333a7f 100644 --- a/bothan-binance/src/worker/builder.rs +++ b/bothan-binance/src/worker/builder.rs @@ -12,7 +12,7 @@ use crate::worker::opts::BinanceWorkerBuilderOpts; use crate::worker::BinanceWorker; /// Builds a `BinanceWorker` with custom options. -/// Methods can be chained to set the configuration values and the +/// Methods can be chained to set the configuration values, and the /// service is constructed by calling the [`build`](BinanceWorkerBuilder::build) method. pub struct BinanceWorkerBuilder { store: WorkerStore, diff --git a/bothan-binance/src/worker/error.rs b/bothan-binance/src/worker/error.rs index a0fe065f..8b75135c 100644 --- a/bothan-binance/src/worker/error.rs +++ b/bothan-binance/src/worker/error.rs @@ -2,15 +2,6 @@ use crate::api; use bothan_core::store; use thiserror::Error; -#[derive(Debug, Error)] -pub(crate) enum WorkerError { - #[error("value is not a valid decimal: {0}")] - InvalidDecimal(#[from] rust_decimal::Error), - - #[error("store error: {0}")] - StoreError(#[from] store::error::Error), -} - #[derive(Debug, Error)] pub enum BuildError { #[error("failed to connect: {0}")] diff --git a/bothan-binance/src/worker/types.rs b/bothan-binance/src/worker/types.rs index 285fad74..db4b7691 100644 --- a/bothan-binance/src/worker/types.rs +++ b/bothan-binance/src/worker/types.rs @@ -3,3 +3,5 @@ use tokio::time::Duration; pub const DEFAULT_CHANNEL_SIZE: usize = 1000; pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(720); pub const RECONNECT_BUFFER: Duration = Duration::from_secs(5); + +pub(crate) const METER_NAME: &str = "binance-worker"; diff --git a/bothan-core/src/store/worker.rs b/bothan-core/src/store/worker.rs index f0b82daf..e472eef3 100644 --- a/bothan-core/src/store/worker.rs +++ b/bothan-core/src/store/worker.rs @@ -46,6 +46,7 @@ impl WorkerStore { self.store.insert_asset_infos(&self.prefix, assets).await } + // TODO: Deprecate when the new query_id system is in place pub async fn set_query_ids(&self, ids: Vec) -> Result<(Vec, Vec), Error> where K: Into + Clone, @@ -67,6 +68,30 @@ impl WorkerStore { Ok((added, removed)) } + // Computes the signals to add and remove from the query set if the given IDs is to replace + // the current query_id set + pub async fn compute_query_id_differences( + &self, + ids: Vec, + ) -> Result<(Vec, Vec), Error> + where + K: Into + Clone, + { + let current_ids = self.get_query_ids().await?; + let new_ids: QueryIDs = HashSet::from_iter(ids.into_iter().map(Into::into)); + + let to_add = new_ids + .difference(¤t_ids) + .cloned() + .collect::>(); + let to_remove = current_ids + .difference(&new_ids) + .cloned() + .collect::>(); + + Ok((to_add, to_remove)) + } + pub async fn add_query_ids(&self, ids: Vec) -> Result, Error> where K: Into + Clone,