From bba1526944baf0b530fbeb4833d7b12c91c34d2a Mon Sep 17 00:00:00 2001 From: Warittorn Cheevachaipimol Date: Mon, 5 Feb 2024 17:29:50 +0700 Subject: [PATCH] minor cleanup --- bothan-binance/src/cache/cache.rs | 4 +- bothan-binance/src/service.rs | 84 ++++++++++++++++--------------- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/bothan-binance/src/cache/cache.rs b/bothan-binance/src/cache/cache.rs index 8b0a4e5e..ae19c0d3 100644 --- a/bothan-binance/src/cache/cache.rs +++ b/bothan-binance/src/cache/cache.rs @@ -50,8 +50,8 @@ impl Cache { let keys = cloned_price_map.iter().filter_map(|r| { let (k, v) = r.pair(); if check_timeout(v.last_used) { - cloned_price_map.remove(k); - cloned_subscription_map.remove(k) + cloned_price_map.remove(k); + cloned_subscription_map.remove(k) } else { None } diff --git a/bothan-binance/src/service.rs b/bothan-binance/src/service.rs index 89146517..fa33baed 100644 --- a/bothan-binance/src/service.rs +++ b/bothan-binance/src/service.rs @@ -46,53 +46,57 @@ impl BinanceService { Some(cmd) = command_rx.recv() => { match &cmd { Command::Subscribe(ids) => { - let dd = ids.iter().map(|x| x.as_str()).collect::>(); + let vec_ids = ids.iter().map(|x| x.as_str()).collect::>(); + if ws.subscribe(vec_ids.as_slice()).await.is_err() { + warn!("Failed to subscribe to ids: {:?}", ids); + } + for id in ids { cloned_cache.set_pending(id.clone()); } - if ws.subscribe(dd.as_slice()).await.is_err() { - warn!("Failed to subscribe to ids: {:?}", ids); - } } } }, result = timeout(Duration::new(120, 0), ws.next()) => { - if let Ok(binance_result) = &result { - match binance_result { - Ok(BinanceResponse::Stream(resp)) => { - match &resp.data { - Data::MiniTicker(ticker) => { - let price_data = PriceData { - id: ticker.symbol.clone(), - price: ticker.close_price.clone().to_string(), - timestamp: ticker.event_time.clone() - }; - info!("received prices: {:?}", price_data); - cloned_cache.set_data(ticker.symbol.clone(), price_data); - }, + match &result { + Ok(binance_result) => { + match binance_result { + Ok(BinanceResponse::Stream(resp)) => { + match &resp.data { + Data::MiniTicker(ticker) => { + let price_data = PriceData { + id: ticker.symbol.clone(), + price: ticker.close_price.clone().to_string(), + timestamp: ticker.event_time.clone() + }; + info!("received prices: {:?}", price_data); + cloned_cache.set_data(ticker.symbol.clone(), price_data); + }, + } + } + Ok(BinanceResponse::Success(_)) => { + // TODO: better logging + info!("subscribed to ids"); + } + Ok(BinanceResponse::Error(_)) => { + // TODO: better logging + error!("error received from binance"); + } + Err(e) => { + error!("unable able to parse message from binance: {:?}", e); } } - Ok(BinanceResponse::Success(_)) => { - // TODO: better logging - info!("subscribed to ids"); - } - Ok(BinanceResponse::Error(_)) => { - // TODO: better logging - error!("error received from binance"); - } - Err(e) => { - error!("unable able to parse message from binance: {:?}", e); - } - } - } else { - error!("timeout waiting for response from binance, attempting to reconnect"); - _ = ws.disconnect().await; - _ = ws.connect().await; - // resub - let keys = cloned_cache.keys(); - if !keys.is_empty() { - if let Err(_) = cloned_command_tx.send(Command::Subscribe(keys)).await { - warn!("Failed to send subscribe command"); + }, + Err(e) => { + error!("timeout waiting for response from binance, attempting to reconnect"); + _ = ws.disconnect().await; + _ = ws.connect().await; + // resub + let keys = cloned_cache.keys(); + if !keys.is_empty() { + if let Err(_) = cloned_command_tx.send(Command::Subscribe(keys)).await { + warn!("Failed to send subscribe command"); + } } } } @@ -101,8 +105,8 @@ impl BinanceService { // TODO: this may reinit data that is meant to be removed if // a packet comes in before the unsubscribe is completed. Need // to resolve this later - let dd = ids.iter().map(|x| x.as_str()).collect::>(); - if ws.unsubscribe(dd.as_slice()).await.is_err() { + let vec_ids = ids.iter().map(|x| x.as_str()).collect::>(); + if ws.unsubscribe(vec_ids.as_slice()).await.is_err() { warn!("failed to unsubscribe to ids: {:?}", ids); } }