Skip to content

Commit

Permalink
orderbook visualization and server updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunpat committed Jul 21, 2023
1 parent b515e9b commit 1150081
Show file tree
Hide file tree
Showing 23 changed files with 18,148 additions and 329 deletions.
25 changes: 24 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,24 @@
/target
.DS_Store
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.

# dependencies
/node_modules
/.pnp
.pnp.js

# testing
/coverage

# production
/build

# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local

npm-debug.log*
yarn-debug.log*
yarn-error.log*
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# exchange-rs

High performance orderbook matching engine written in Rust with a React.js visualization.

![til](https://raw.githubusercontent.com/arjunpat/exchange-rs/main/assets/orderbookvisualization.gif)
Binary file added assets/orderbookvisualization.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions exchange-server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
2 changes: 1 addition & 1 deletion Cargo.lock → exchange-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml → exchange-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "exchange-rs"
name = "exchange_server"
version = "0.1.0"
edition = "2021"

Expand Down
File renamed without changes.
187 changes: 187 additions & 0 deletions exchange-server/src/exchange.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use crate::{
order::{Order, Side},
orderbook::OrderBook,
utils,
};
use anyhow::{Context, Result};
use axum::extract::ws::{Message, WebSocket};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::{
sync::{broadcast, mpsc, Mutex},
time::Duration,
};

pub struct Exchange {
book: Mutex<OrderBook>,
// update connected clients on all events
broadcast: broadcast::Sender<ServerMsg>,
// send order to exchange task
order_sender: mpsc::Sender<Order>,
// only for exchange_loop
order_receiver: Mutex<mpsc::Receiver<Order>>,
// id counter
id_counter: Mutex<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum ServerMsg {
// set username at connection
SetUsername {
username: String,
},
Trade {
price_cents: u64,
size: u64,
ts: u64,
},
Depths {
bids: HashMap<u64, u64>,
asks: HashMap<u64, u64>,
},
}

impl From<ServerMsg> for Message {
fn from(msg: ServerMsg) -> Self {
// this should never fail
let json = serde_json::to_string(&msg).expect("Failed to serialize json");
Message::Text(json)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum ClientMsg {
// client Order
Order {
size: u64,
price_cents: u64,
buy: bool,
},
}

impl Exchange {
pub fn new() -> Self {
let (order_sender, order_receiver) = mpsc::channel(32);
let (broadcast, _) = broadcast::channel(32);

Exchange {
book: Mutex::new(OrderBook::new()),
order_sender,
order_receiver: Mutex::new(order_receiver),
broadcast,
id_counter: Mutex::new(1),
}
}

pub async fn start(&self) {
tokio::select! {
_ = self.exchange_loop() => {}
_ = self.send_depths_loop() => {}
}
println!("Exited start");
}

pub async fn send_depths_loop(&self) {
loop {
tokio::time::sleep(Duration::from_millis(250)).await;
let (bids, asks);
{
(bids, asks) = self.book.lock().await.get_depth();
}
let _ = self.broadcast.send(ServerMsg::Depths { bids, asks });
}
}

pub async fn exchange_loop(&self) {
// also prevents two loops from ever existing
let mut order_receiver = self.order_receiver.lock().await;
let mut orders_received = 0;

while let Some(order) = order_receiver.recv().await {
orders_received += 1;
if orders_received % 1000 == 0 {
println!("Have received {} orders", orders_received);
}
{
for t in self.book.lock().await.place(order) {
// can fail when there are no broadcast listeners
let _ = self.broadcast.send(ServerMsg::Trade {
price_cents: t.price,
size: t.size,
ts: t.ts,
});
}
} // essential to release the lock
}
}

pub async fn handle_connection(&self, socket: &mut WebSocket) -> Result<()> {
let username;
{
let mut id_counter = self.id_counter.lock().await;
username = format!("user-{}", *id_counter);
*id_counter += 1;
} // release mutex lock

// send client their username
let msg = ServerMsg::SetUsername {
username: username.clone(),
};
socket.send(msg.into()).await?;

let mut broadcast_rx = self.broadcast.subscribe();

loop {
tokio::select! {
update = broadcast_rx.recv() => {
socket.send(update?.into()).await?
}
msg = socket.recv() => {
if let Some(msg) = msg {
if let Ok(msg) = msg {
if let Message::Text(text) = msg { self.handle_client_msg(&username, text).await? }
} else {
println!("Client disconnected!!!!");
break;
}
} else {
println!("Stream has closed");
break;
}
}
}
}

Ok(())
}

pub async fn handle_client_msg(&self, username: &str, msg: String) -> Result<()> {
let msg: ClientMsg = serde_json::from_str(&msg)
.context(format!("Failed to deserialize client msg: {}", msg))?;
// println!("received client messgae = {:?}", msg);

match msg {
ClientMsg::Order {
size,
price_cents,
buy,
} => {
if size == 0 {
return Ok(());
}
let order = Order {
side: if buy { Side::Buy } else { Side::Sell },
created_at: utils::now(),
creator: username.to_owned(),
size,
price: price_cents,
};

// send to exchange tokio task/thread
self.order_sender.send(order).await?;
}
}

Ok(())
}
}
22 changes: 16 additions & 6 deletions src/main.rs → exchange-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
};

use axum::{
extract::{ws::WebSocket, State, WebSocketUpgrade},
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
response::{Html, IntoResponse},
routing::get,
Router,
Expand All @@ -22,11 +25,15 @@ mod utils;

#[tokio::main]
async fn main() {
let exchange = Arc::new(Exchange::new());
let exch = Arc::new(Exchange::new());
let exch_c = exch.clone();
tokio::spawn(async move {
exch_c.start().await;
});
let app = Router::new()
.route("/", get(root))
.route("/ws", get(ws_handler))
.with_state(exchange);
.with_state(exch);

let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let socket = SocketAddr::new(ip, 8080);
Expand All @@ -48,8 +55,11 @@ async fn ws_handler(ws: WebSocketUpgrade, State(ex): State<Arc<Exchange>>) -> im
ws.on_upgrade(move |socket| handle_socket(socket, ex))
}

async fn handle_socket(socket: WebSocket, ex: Arc<Exchange>) {
if ex.handle_connection(socket).await.is_err() {
println!("Failed connection");
async fn handle_socket(mut socket: WebSocket, ex: Arc<Exchange>) {
let _ = socket.send(Message::Ping(vec![])).await;

let result = ex.handle_connection(&mut socket).await;
if result.is_err() {
println!("Failed connection: {:?}", result.unwrap_err());
}
}
19 changes: 9 additions & 10 deletions src/order.rs → exchange-server/src/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ pub enum Side {
pub struct Order {
pub created_at: u64,
pub creator: String,
pub size: i64,
pub price: f64,
pub size: u64,
pub price: u64,
pub side: Side,
}

impl PartialEq for Order {
fn eq(&self, other: &Self) -> bool {
return self.cmp(other) == Ordering::Equal;
self.cmp(other) == Ordering::Equal
}
}

Expand Down Expand Up @@ -52,21 +52,20 @@ impl PartialOrd for Order {
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Transaction {
pub struct Trade {
pub from: String,
pub to: String,
pub security: String,
pub size: i64,
pub price: f64,
pub size: u64,
pub price: u64,
pub ts: u64,
}

impl Display for Transaction {
impl Display for Trade {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> {}: {} of {} @ ${}",
self.from, self.to, self.size, self.security, self.price
"{} -> {}: {} @ ${}",
self.from, self.to, self.size, self.price
)
}
}
Loading

0 comments on commit 1150081

Please sign in to comment.