Skip to content

Commit

Permalink
Merge pull request #11 from bandprotocol/revamp
Browse files Browse the repository at this point in the history
[feat] Add New Binance Websocket Source
  • Loading branch information
warittornc authored Feb 15, 2024
2 parents 5e7e574 + 04e1a9a commit 42cde97
Show file tree
Hide file tree
Showing 20 changed files with 769 additions and 3 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb


.vscode/

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[workspace]
members = ["price-adapter-raw", "price-adapter"]
members = ["price-adapter-raw", "price-adapter", "bothan-core", "bothan-binance"]
resolver = "2"

[workspace.dependencies]
price-adapter = { version = "0.1.8", path = "price-adapter" }
price-adapter-raw = { version = "0.1.8", path = "price-adapter-raw" }
bothan-core = { version = "0.1.0", path = "bothan-core" }
bothan-binance = { version = "0.1.0", path = "bothan-binance" }
28 changes: 28 additions & 0 deletions bothan-binance/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "bothan-binance"
version = "0.1.0"
edition.workspace = true
license.workspace = true


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11.22", features = ["json"] }
thiserror = "1.0.56"
tracing = { version = "0.1.40", features = [] }
tokio = { version = "1.36.0", features = ["full"] }
tracing-subscriber = "0.3.17"
serde = { version = "1.0.196", features = ["std", "derive", "alloc"] }
serde_json = "1.0.108"
itertools = "0.12.0"
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
futures-util = "0.3.29"
tokio-util = "0.7.10"
chrono = "0.4.31"
rand = "0.8.4"
dashmap = "5.5.3"
derive_more = { version = "1.0.0-beta.6", features = ["full"] }
futures = "0.3.30"
bothan-core = { path = "../bothan-core" }
async-trait = "0.1.77"
16 changes: 16 additions & 0 deletions bothan-binance/examples/dummy_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use bothan_binance::Binance;
use bothan_core::service::Service;
use tracing_subscriber::fmt::init;

#[tokio::main]
async fn main() {
init();

if let Ok(mut service) = Binance::default().await {
loop {
let data = service.get_price_data(&["btcusdt", "ethusdt"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
}
}
3 changes: 3 additions & 0 deletions bothan-binance/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod error;
pub mod types;
pub mod websocket;
25 changes: 25 additions & 0 deletions bothan-binance/src/api/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tokio_tungstenite::tungstenite::{self, http::StatusCode};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to connect with response code {0}")]
ConnectionFailure(StatusCode),

#[error("failed to parse")]
Parse(#[from] serde_json::Error),

#[error("not connected")]
NotConnected(),

#[error("already connected")]
AlreadyConnected(),

#[error("tungstenite error")]
Tungstenite(#[from] tungstenite::Error),

#[error("channel closed")]
ChannelClosed,

#[error("unsupported message")]
UnsupportedMessage,
}
63 changes: 63 additions & 0 deletions bothan-binance/src/api/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SuccessResponse {
pub result: Option<String>,
pub id: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub code: u16,
pub msg: String,
pub id: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "e")]
pub enum Data {
#[serde(rename = "24hrMiniTicker")]
MiniTicker(MiniTickerInfo),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MiniTickerInfo {
#[serde(rename = "E")]
pub event_time: u64,

#[serde(rename = "s")]
pub symbol: String,

#[serde(rename = "c")]
pub close_price: String,

#[serde(rename = "o")]
pub open_price: String,

#[serde(rename = "h")]
pub high_price: String,

#[serde(rename = "l")]
pub low_price: String,

#[serde(rename = "v")]
pub base_volume: String,

#[serde(rename = "q")]
pub quote_volume: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StreamResponse {
pub stream: String,
pub data: Data,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BinanceResponse {
Success(SuccessResponse),
Error(ErrorResponse),
Stream(StreamResponse),
Ping,
}
112 changes: 112 additions & 0 deletions bothan-binance/src/api/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::warn;

use crate::api::error::Error;
use crate::api::types::BinanceResponse;

const DEFAULT_URL: &str = "wss://stream.binance.com:9443/stream";

pub struct BinanceWebsocket {
url: String,
sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
receiver: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
}

impl BinanceWebsocket {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
sender: None,
receiver: None,
}
}

pub async fn connect(&mut self) -> Result<(), Error> {
let (socket, response) = connect_async(&self.url).await?;

let status = response.status();
if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) {
return Err(Error::ConnectionFailure(status));
}

let (sender, receiver) = socket.split();
self.sender = Some(sender);
self.receiver = Some(receiver);

Ok(())
}

pub async fn disconnect(&mut self) -> Result<(), Error> {
let mut sender = self.sender.take().ok_or(Error::NotConnected())?;
// Ignore result as we just want to send a close message
if sender.send(Message::Close(None)).await.is_err() {
warn!("unable to send close frame")
}

self.receiver = None;
Ok(())
}

pub async fn subscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;

let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

let payload = json!({
"method": "SUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

let message = Message::Text(payload.to_string());
Ok(sender.send(message).await?)
}

pub async fn unsubscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;

let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

let payload = json!({
"method": "UNSUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

let message = Message::Text(payload.to_string());
Ok(sender.send(message).await?)
}

pub async fn next(&mut self) -> Result<BinanceResponse, Error> {
let receiver = self.receiver.as_mut().ok_or(Error::NotConnected())?;

if let Some(result_msg) = receiver.next().await {
return match result_msg {
Ok(Message::Text(msg)) => Ok(serde_json::from_str::<BinanceResponse>(&msg)?),
Ok(Message::Ping(_)) => Ok(BinanceResponse::Ping),
Ok(Message::Close(_)) => Err(Error::ChannelClosed),
_ => Err(Error::UnsupportedMessage),
};
}

Err(Error::ChannelClosed)
}
}

impl Default for BinanceWebsocket {
fn default() -> Self {
Self::new(DEFAULT_URL)
}
}
18 changes: 18 additions & 0 deletions bothan-binance/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// TODO: Add more errors apart from catch all
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown error: {0}")]
Unknown(String),

#[error("pending result")]
Pending,

#[error("invalid symbol")]
InvalidSymbol,

#[error("tungstenite error")]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),

#[error("api error: {0}")]
Api(#[from] crate::api::error::Error),
}
7 changes: 7 additions & 0 deletions bothan-binance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub use api::websocket::BinanceWebsocket;
pub use service::Binance;

mod api;
mod error;
mod service;
mod types;
Loading

0 comments on commit 42cde97

Please sign in to comment.