Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add New Binance Websocket Source #11

Merged
merged 36 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c75f157
init
warittornc Feb 2, 2024
b5d1a89
reformat
warittornc Feb 2, 2024
56a24dc
minor cleanup
warittornc Feb 2, 2024
2f01e52
change package name
warittornc Feb 2, 2024
0517535
minor cleanup
warittornc Feb 2, 2024
ee05365
checkpoint
warittornc Feb 5, 2024
8f19455
checkpoint
warittornc Feb 5, 2024
bba1526
minor cleanup
warittornc Feb 5, 2024
f00e402
minor cleanup
warittornc Feb 5, 2024
d09a40b
minor change
warittornc Feb 5, 2024
858c4b3
organize imports
warittornc Feb 5, 2024
7eaa92f
minor cleanup
warittornc Feb 5, 2024
21e2031
minor cleanup
warittornc Feb 6, 2024
d304e88
fix
warittornc Feb 6, 2024
f6038c5
fix
warittornc Feb 6, 2024
8b41fcb
fix linting
warittornc Feb 6, 2024
4052ff9
cleanup cache
warittornc Feb 6, 2024
e3d5206
fix and cleanup
warittornc Feb 6, 2024
ac49660
fixed import format
warittornc Feb 6, 2024
c06cb9e
cleanup and fixed
warittornc Feb 7, 2024
e2337e5
add panic msg
warittornc Feb 7, 2024
c65e6d8
renamed variable for clarity and added comment
warittornc Feb 7, 2024
6451ce1
fixed clippy
warittornc Feb 7, 2024
156d749
fixed
warittornc Feb 12, 2024
8473881
change from set_pending to set_batch_pending
warittornc Feb 12, 2024
ef206be
fixed
warittornc Feb 12, 2024
6143cbc
remove tests
warittornc Feb 12, 2024
4c06bdb
cleanup
warittornc Feb 12, 2024
6ef569f
fixed clippy
warittornc Feb 12, 2024
bfac8e0
restructure out cache
warittornc Feb 13, 2024
156d330
fix
warittornc Feb 13, 2024
36e072b
add core
warittornc Feb 13, 2024
e423854
changes
warittornc Feb 13, 2024
4fc5f11
change cache to generic
warittornc Feb 14, 2024
b77f62e
fix clippy
warittornc Feb 14, 2024
04e1a9a
fix clippy
warittornc Feb 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb


.vscode/

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[workspace]
members = ["price-adapter-raw", "price-adapter"]
members = ["price-adapter-raw", "price-adapter", "bothan-binance"]
resolver = "2"

[workspace.dependencies]
price-adapter = { version = "0.1.8", path = "price-adapter" }
price-adapter-raw = { version = "0.1.8", path = "price-adapter-raw" }
bothan-binance = { version = "0.1.8", path = "bothan-binance" }
25 changes: 25 additions & 0 deletions bothan-binance/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "bothan-binance"
version = "0.1.0"
warittornc marked this conversation as resolved.
Show resolved Hide resolved
edition.workspace = true
license.workspace = true


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11.22", features = ["json"] }
thiserror = "1.0.56"
tracing = { version = "0.1.40", features = [] }
tokio = { version = "1.0.3", features = ["full"] }
tracing-subscriber = "0.3.17"
serde = { version = "1.0.196", features = ["std", "derive", "alloc"] }
serde_json = "1.0.108"
itertools = "0.12.0"
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
futures-util = "0.3.29"
tokio-util = "0.7.10"
chrono = "0.4.31"
rand = "0.8.4"
dashmap = "5.5.3"
derive_more = { version = "1.0.0-beta.6", features = ["full"] }
RogerKSI marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 17 additions & 0 deletions bothan-binance/examples/dummy_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use tracing_subscriber::fmt::init;

use bothan_binance::BinanceService;

#[tokio::main]
async fn main() {
init();

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
warittornc marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(mut service) = BinanceService::new(None).await {
loop {
let data = service.get_price_data(&["btcusdt", "ethusdt"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
3 changes: 3 additions & 0 deletions bothan-binance/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod error;
pub mod types;
pub mod websocket;
22 changes: 22 additions & 0 deletions bothan-binance/src/api/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use tokio_tungstenite::tungstenite::{self, http::StatusCode};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown error")]
Unknown,

#[error("failed to connect with response code {0}")]
Connection(StatusCode),
warittornc marked this conversation as resolved.
Show resolved Hide resolved

#[error("failed to parse")]
Parse(#[from] serde_json::Error),

#[error("not connected")]
NotConnected(),

#[error("already connected")]
AlreadyConnected(),

#[error("tungstenite error")]
Tungstenite(#[from] tungstenite::Error),
}
62 changes: 62 additions & 0 deletions bothan-binance/src/api/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SuccessResponse {
pub result: Option<String>,
pub id: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub code: u16,
pub msg: String,
pub id: String,
}
warittornc marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "e")]
pub enum Data {
#[serde(rename = "24hrMiniTicker")]
MiniTicker(MiniTickerInfo),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MiniTickerInfo {
#[serde(rename = "E")]
pub event_time: u64,

#[serde(rename = "s")]
pub symbol: String,

#[serde(rename = "c")]
pub close_price: String,

#[serde(rename = "o")]
pub open_price: String,

#[serde(rename = "h")]
pub high_price: String,

#[serde(rename = "l")]
pub low_price: String,

#[serde(rename = "v")]
pub base_volume: String,

#[serde(rename = "q")]
pub quote_volume: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StreamResponse {
pub stream: String,
pub data: Data,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum BinanceResponse {
Success(SuccessResponse),
Error(ErrorResponse),
Stream(StreamResponse),
}
106 changes: 106 additions & 0 deletions bothan-binance/src/api/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::warn;

use crate::api::error::Error;
use crate::api::types::BinanceResponse;

const DEFAULT_URL: &str = "wss://stream.binance.com:9443/stream";

pub struct BinanceWebsocket {
url: String,
sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
receiver: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
}

impl BinanceWebsocket {
pub fn new(url: &str) -> Self {
warittornc marked this conversation as resolved.
Show resolved Hide resolved
Self {
url: url.to_string(),
sender: None,
receiver: None,
}
}

pub async fn connect(&mut self) -> Result<(), Error> {
let (socket, response) = connect_async(&self.url).await?;

let status = response.status();
if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) {
return Err(Error::Connection(status));
}

let (sender, receiver) = socket.split();
self.sender = Some(sender);
self.receiver = Some(receiver);

Ok(())
}

pub async fn disconnect(&mut self) -> Result<(), Error> {
let mut sender = self.sender.take().ok_or(Error::NotConnected())?;
// Ignore result as we just want to send a close message
if sender.send(Message::Close(None)).await.is_err() {
warn!("unable to send close frame")
}

self.receiver = None;
Ok(())
}

pub async fn subscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;

let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

let payload = json!({
"method": "SUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});
let message = Message::Text(payload.to_string());
Ok(sender.send(message).await?)
RogerKSI marked this conversation as resolved.
Show resolved Hide resolved
}

pub async fn unsubscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;
let stream_ids = ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we try to unscribe symbols that we never subscribe? will we see error anywhere?

.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();
let payload = json!({
"method": "UNSUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

let message = Message::Text(payload.to_string());

warittornc marked this conversation as resolved.
Show resolved Hide resolved
sender.send(message).await?;

Ok(())
}

pub async fn next(&mut self) -> Result<BinanceResponse, Error> {
let receiver = self.receiver.as_mut().ok_or(Error::NotConnected())?;
if let Some(Ok(msg)) = receiver.next().await {
Ok(serde_json::from_str::<BinanceResponse>(&msg.to_string())?)
} else {
Err(Error::Unknown)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be error::unknown, we should propagate whatever error we got from receiver.next

}
}
}

impl Default for BinanceWebsocket {
fn default() -> Self {
Self::new(DEFAULT_URL)
}
}
8 changes: 8 additions & 0 deletions bothan-binance/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// TODO: move into own crate

pub use dash_map::Cache;
pub use error::Error;

pub mod dash_map;
pub mod error;
mod types;
108 changes: 108 additions & 0 deletions bothan-binance/src/cache/dash_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::ops::Sub;
use std::sync::Arc;

use dashmap::{DashMap, DashSet};
use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::time::{interval, Duration, Instant};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::cache::error::Error;
use crate::cache::types::{StoredPriceData, DEFAULT_TIMEOUT};
use crate::types::PriceData;

pub struct Cache {
price_map: Arc<DashMap<String, StoredPriceData>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should use DashMap. They do not offer async api. Let's use RwLock<HashMap<..>> (RwLock from tokio::sync)

in this specific case the critical path is small so shouldn't matter but still if we are using tokio i think we should use their locking mechanism.

also i prefer we just call this variable prices or cached_prices. the fact that it is a map is already in its type

subscription_map: Arc<DashSet<String>>,
warittornc marked this conversation as resolved.
Show resolved Hide resolved
token: CancellationToken,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the name more descriptive. cxl? cancel? cancel_token? any of these works for me.

}

impl Drop for Cache {
fn drop(&mut self) {
self.token.cancel()
}
}

impl Cache {
pub fn new(sender: Sender<Vec<String>>) -> Self {
let price_map = Arc::new(DashMap::<String, StoredPriceData>::new());
let subscription_map = Arc::new(DashSet::<String>::new());
let token = CancellationToken::new();

let cloned_token = token.clone();
let cloned_price_map = price_map.clone();
let cloned_subscription_map = subscription_map.clone();

tokio::spawn(async move {
warittornc marked this conversation as resolved.
Show resolved Hide resolved
let mut interval = interval(Duration::from_secs(10));
warittornc marked this conversation as resolved.
Show resolved Hide resolved
loop {
select! {
_ = interval.tick() => {
let keys = cloned_price_map.iter().filter_map(|r| {
let (k, v) = r.pair();
if check_timeout(v.last_used) {
cloned_price_map.remove(k);
cloned_subscription_map.remove(k)
} else {
None
}
}).collect::<Vec<String>>();
if !keys.is_empty() {
info!("Removing unused keys: {:?}", keys);
warittornc marked this conversation as resolved.
Show resolved Hide resolved
let _res = sender.send(keys).await;
}
}
_ = cloned_token.cancelled() => {
break
}
}
}
});

Self {
price_map,
subscription_map,
token,
}
}

pub fn set_pending(&self, id: String) {
self.subscription_map.insert(id.to_ascii_lowercase());
}

pub fn set_data(&self, id: String, data: PriceData) {
if let Some((k, mut v)) = self.price_map.remove(&id) {
v.data = data;
self.price_map.insert(k, v);
} else {
self.price_map.insert(
id.to_ascii_lowercase(),
StoredPriceData {
data,
last_used: Instant::now(),
},
);
}
}

pub fn get(&self, id: &str) -> Result<PriceData, Error> {
if let Some(r) = self.price_map.get(&id.to_ascii_lowercase()) {
return Ok(r.data.clone());
}

if self.subscription_map.contains(&id.to_ascii_lowercase()) {
Err(Error::Invalid)
} else {
Err(Error::DoesNotExist)
}
}

pub fn keys(&self) -> Vec<String> {
self.price_map.iter().map(|r| r.key().clone()).collect()
}
}

fn check_timeout(last_used: Instant) -> bool {
Instant::now().sub(last_used) > Duration::from_secs(DEFAULT_TIMEOUT)
}
4 changes: 4 additions & 0 deletions bothan-binance/src/cache/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub enum Error {
DoesNotExist,
Invalid,
}
10 changes: 10 additions & 0 deletions bothan-binance/src/cache/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use crate::types::PriceData;
use tokio::time::Instant;

pub const DEFAULT_TIMEOUT: u64 = 6000;
warittornc marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Clone)]
pub struct StoredPriceData {
pub data: PriceData,
pub last_used: Instant,
}
Loading
Loading