Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
warittornc committed Feb 5, 2024
1 parent 8f19455 commit bba1526
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 42 deletions.
4 changes: 2 additions & 2 deletions bothan-binance/src/cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl Cache {
let keys = cloned_price_map.iter().filter_map(|r| {
let (k, v) = r.pair();
if check_timeout(v.last_used) {
cloned_price_map.remove(k);
cloned_subscription_map.remove(k)
cloned_price_map.remove(k);
cloned_subscription_map.remove(k)
} else {
None
}
Expand Down
84 changes: 44 additions & 40 deletions bothan-binance/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,53 +46,57 @@ impl BinanceService {
Some(cmd) = command_rx.recv() => {
match &cmd {
Command::Subscribe(ids) => {
let dd = ids.iter().map(|x| x.as_str()).collect::<Vec<&str>>();
let vec_ids = ids.iter().map(|x| x.as_str()).collect::<Vec<&str>>();
if ws.subscribe(vec_ids.as_slice()).await.is_err() {
warn!("Failed to subscribe to ids: {:?}", ids);
}

for id in ids {
cloned_cache.set_pending(id.clone());
}
if ws.subscribe(dd.as_slice()).await.is_err() {
warn!("Failed to subscribe to ids: {:?}", ids);
}
}
}
},
result = timeout(Duration::new(120, 0), ws.next()) => {
if let Ok(binance_result) = &result {
match binance_result {
Ok(BinanceResponse::Stream(resp)) => {
match &resp.data {
Data::MiniTicker(ticker) => {
let price_data = PriceData {
id: ticker.symbol.clone(),
price: ticker.close_price.clone().to_string(),
timestamp: ticker.event_time.clone()
};
info!("received prices: {:?}", price_data);
cloned_cache.set_data(ticker.symbol.clone(), price_data);
},
match &result {
Ok(binance_result) => {
match binance_result {
Ok(BinanceResponse::Stream(resp)) => {
match &resp.data {
Data::MiniTicker(ticker) => {
let price_data = PriceData {
id: ticker.symbol.clone(),
price: ticker.close_price.clone().to_string(),
timestamp: ticker.event_time.clone()
};
info!("received prices: {:?}", price_data);
cloned_cache.set_data(ticker.symbol.clone(), price_data);
},
}
}
Ok(BinanceResponse::Success(_)) => {
// TODO: better logging
info!("subscribed to ids");
}
Ok(BinanceResponse::Error(_)) => {
// TODO: better logging
error!("error received from binance");
}
Err(e) => {
error!("unable able to parse message from binance: {:?}", e);
}
}
Ok(BinanceResponse::Success(_)) => {
// TODO: better logging
info!("subscribed to ids");
}
Ok(BinanceResponse::Error(_)) => {
// TODO: better logging
error!("error received from binance");
}
Err(e) => {
error!("unable able to parse message from binance: {:?}", e);
}
}
} else {
error!("timeout waiting for response from binance, attempting to reconnect");
_ = ws.disconnect().await;
_ = ws.connect().await;
// resub
let keys = cloned_cache.keys();
if !keys.is_empty() {
if let Err(_) = cloned_command_tx.send(Command::Subscribe(keys)).await {
warn!("Failed to send subscribe command");
},
Err(e) => {
error!("timeout waiting for response from binance, attempting to reconnect");
_ = ws.disconnect().await;
_ = ws.connect().await;
// resub
let keys = cloned_cache.keys();
if !keys.is_empty() {
if let Err(_) = cloned_command_tx.send(Command::Subscribe(keys)).await {
warn!("Failed to send subscribe command");
}
}
}
}
Expand All @@ -101,8 +105,8 @@ impl BinanceService {
// TODO: this may reinit data that is meant to be removed if
// a packet comes in before the unsubscribe is completed. Need
// to resolve this later
let dd = ids.iter().map(|x| x.as_str()).collect::<Vec<&str>>();
if ws.unsubscribe(dd.as_slice()).await.is_err() {
let vec_ids = ids.iter().map(|x| x.as_str()).collect::<Vec<&str>>();
if ws.unsubscribe(vec_ids.as_slice()).await.is_err() {
warn!("failed to unsubscribe to ids: {:?}", ids);
}
}
Expand Down

0 comments on commit bba1526

Please sign in to comment.