Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add New Binance Websocket Source #11

Merged
merged 36 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c75f157
init
warittornc Feb 2, 2024
b5d1a89
reformat
warittornc Feb 2, 2024
56a24dc
minor cleanup
warittornc Feb 2, 2024
2f01e52
change package name
warittornc Feb 2, 2024
0517535
minor cleanup
warittornc Feb 2, 2024
ee05365
checkpoint
warittornc Feb 5, 2024
8f19455
checkpoint
warittornc Feb 5, 2024
bba1526
minor cleanup
warittornc Feb 5, 2024
f00e402
minor cleanup
warittornc Feb 5, 2024
d09a40b
minor change
warittornc Feb 5, 2024
858c4b3
organize imports
warittornc Feb 5, 2024
7eaa92f
minor cleanup
warittornc Feb 5, 2024
21e2031
minor cleanup
warittornc Feb 6, 2024
d304e88
fix
warittornc Feb 6, 2024
f6038c5
fix
warittornc Feb 6, 2024
8b41fcb
fix linting
warittornc Feb 6, 2024
4052ff9
cleanup cache
warittornc Feb 6, 2024
e3d5206
fix and cleanup
warittornc Feb 6, 2024
ac49660
fixed import format
warittornc Feb 6, 2024
c06cb9e
cleanup and fixed
warittornc Feb 7, 2024
e2337e5
add panic msg
warittornc Feb 7, 2024
c65e6d8
renamed variable for clarity and added comment
warittornc Feb 7, 2024
6451ce1
fixed clippy
warittornc Feb 7, 2024
156d749
fixed
warittornc Feb 12, 2024
8473881
change from set_pending to set_batch_pending
warittornc Feb 12, 2024
ef206be
fixed
warittornc Feb 12, 2024
6143cbc
remove tests
warittornc Feb 12, 2024
4c06bdb
cleanup
warittornc Feb 12, 2024
6ef569f
fixed clippy
warittornc Feb 12, 2024
bfac8e0
restructure out cache
warittornc Feb 13, 2024
156d330
fix
warittornc Feb 13, 2024
36e072b
add core
warittornc Feb 13, 2024
e423854
changes
warittornc Feb 13, 2024
4fc5f11
change cache to generic
warittornc Feb 14, 2024
b77f62e
fix clippy
warittornc Feb 14, 2024
04e1a9a
fix clippy
warittornc Feb 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb


.vscode/

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[workspace]
members = ["price-adapter-raw", "price-adapter"]
members = ["price-adapter-raw", "price-adapter", "bothan-core", "bothan-binance", "bothan-coin-gecko"]
resolver = "2"

[workspace.dependencies]
price-adapter = { version = "0.1.8", path = "price-adapter" }
price-adapter-raw = { version = "0.1.8", path = "price-adapter-raw" }
bothan-core = { version = "0.1.0", path = "bothan-core" }
bothan-binance = { version = "0.1.0", path = "bothan-binance" }
bothan-coin-gecko = { version = "0.1.0", path = "bothan-coin-gecko" }
28 changes: 28 additions & 0 deletions bothan-binance/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "bothan-binance"
version = "0.1.0"
warittornc marked this conversation as resolved.
Show resolved Hide resolved
edition.workspace = true
license.workspace = true


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11.22", features = ["json"] }
thiserror = "1.0.56"
tracing = { version = "0.1.40", features = [] }
tokio = { version = "1.36.0", features = ["full"] }
tracing-subscriber = "0.3.17"
serde = { version = "1.0.196", features = ["std", "derive", "alloc"] }
serde_json = "1.0.108"
itertools = "0.12.0"
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
futures-util = "0.3.29"
tokio-util = "0.7.10"
chrono = "0.4.31"
rand = "0.8.4"
dashmap = "5.5.3"
derive_more = { version = "1.0.0-beta.6", features = ["full"] }
futures = "0.3.30"
bothan-core = { path = "../bothan-core" }
async-trait = "0.1.77"
16 changes: 16 additions & 0 deletions bothan-binance/examples/dummy_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use bothan_binance::Binance;
use bothan_core::service::Service;
use tracing_subscriber::fmt::init;

#[tokio::main]
async fn main() {
init();

if let Ok(mut service) = Binance::default().await {
loop {
let data = service.get_price_data(&["btcusdt", "ethusdt"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
}
}
3 changes: 3 additions & 0 deletions bothan-binance/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod error;
pub mod types;
pub mod websocket;
25 changes: 25 additions & 0 deletions bothan-binance/src/api/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tokio_tungstenite::tungstenite::{self, http::StatusCode};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to connect with response code {0}")]
ConnectionFailure(StatusCode),

#[error("failed to parse")]
Parse(#[from] serde_json::Error),

#[error("not connected")]
NotConnected(),

#[error("already connected")]
AlreadyConnected(),

#[error("tungstenite error")]
Tungstenite(#[from] tungstenite::Error),

#[error("channel closed")]
ChannelClosed,

#[error("unsupported message")]
UnsupportedMessage,
}
63 changes: 63 additions & 0 deletions bothan-binance/src/api/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SuccessResponse {
pub result: Option<String>,
pub id: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub code: u16,
pub msg: String,
pub id: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "e")]
pub enum Data {
#[serde(rename = "24hrMiniTicker")]
MiniTicker(MiniTickerInfo),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MiniTickerInfo {
#[serde(rename = "E")]
pub event_time: u64,

#[serde(rename = "s")]
pub symbol: String,

#[serde(rename = "c")]
pub close_price: String,

#[serde(rename = "o")]
pub open_price: String,

#[serde(rename = "h")]
pub high_price: String,

#[serde(rename = "l")]
pub low_price: String,

#[serde(rename = "v")]
pub base_volume: String,

#[serde(rename = "q")]
pub quote_volume: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StreamResponse {
pub stream: String,
pub data: Data,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BinanceResponse {
Success(SuccessResponse),
Error(ErrorResponse),
Stream(StreamResponse),
Ping,
}
112 changes: 112 additions & 0 deletions bothan-binance/src/api/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::warn;

use crate::api::error::Error;
use crate::api::types::BinanceResponse;

const DEFAULT_URL: &str = "wss://stream.binance.com:9443/stream";

pub struct BinanceWebsocket {
url: String,
sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
receiver: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
}

impl BinanceWebsocket {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
sender: None,
receiver: None,
}
}

pub async fn connect(&mut self) -> Result<(), Error> {
let (socket, response) = connect_async(&self.url).await?;

let status = response.status();
if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) {
return Err(Error::ConnectionFailure(status));
}

let (sender, receiver) = socket.split();
self.sender = Some(sender);
self.receiver = Some(receiver);

Ok(())
}

pub async fn disconnect(&mut self) -> Result<(), Error> {
let mut sender = self.sender.take().ok_or(Error::NotConnected())?;
// Ignore result as we just want to send a close message
if sender.send(Message::Close(None)).await.is_err() {
warn!("unable to send close frame")
}

self.receiver = None;
Ok(())
}

pub async fn subscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;

let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

let payload = json!({
"method": "SUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

let message = Message::Text(payload.to_string());
Ok(sender.send(message).await?)
RogerKSI marked this conversation as resolved.
Show resolved Hide resolved
}

pub async fn unsubscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
let sender = self.sender.as_mut().ok_or(Error::NotConnected())?;

let stream_ids = ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we try to unscribe symbols that we never subscribe? will we see error anywhere?

.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

let payload = json!({
"method": "UNSUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

let message = Message::Text(payload.to_string());
Ok(sender.send(message).await?)
RogerKSI marked this conversation as resolved.
Show resolved Hide resolved
}

pub async fn next(&mut self) -> Result<BinanceResponse, Error> {
let receiver = self.receiver.as_mut().ok_or(Error::NotConnected())?;

if let Some(result_msg) = receiver.next().await {
return match result_msg {
Ok(Message::Text(msg)) => Ok(serde_json::from_str::<BinanceResponse>(&msg)?),
Ok(Message::Ping(_)) => Ok(BinanceResponse::Ping),
Ok(Message::Close(_)) => Err(Error::ChannelClosed),
_ => Err(Error::UnsupportedMessage),
};
}

Err(Error::ChannelClosed)
}
}

impl Default for BinanceWebsocket {
fn default() -> Self {
Self::new(DEFAULT_URL)
}
}
18 changes: 18 additions & 0 deletions bothan-binance/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// TODO: Add more errors apart from catch all
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown error: {0}")]
Unknown(String),

#[error("pending result")]
Pending,

#[error("invalid symbol")]
InvalidSymbol,

#[error("tungstenite error")]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),

#[error("api error: {0}")]
Api(#[from] crate::api::error::Error),
}
7 changes: 7 additions & 0 deletions bothan-binance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub use api::websocket::BinanceWebsocket;
pub use service::Binance;

mod api;
mod error;
mod service;
mod types;
Loading
Loading