Skip to content

Commit

Permalink
add core
Browse files Browse the repository at this point in the history
  • Loading branch information
warittornc committed Feb 13, 2024
1 parent 156d330 commit 36e072b
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 0 deletions.
18 changes: 18 additions & 0 deletions bothan-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "bothan-core"
version.workspace = true
edition.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
thiserror = "1.0.57"
futures = "0.3.30"
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = "0.7.10"
tracing = "0.1.40"
log = "0.4.20"
serde = { version = "1.0.196", features = ["std", "derive", "alloc"] }
derive_more = { version = "1.0.0-beta.6", features = ["full"] }
async-trait = "0.1.77"
6 changes: 6 additions & 0 deletions bothan-core/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub use error::Error;
pub use hashmap::Cache;

pub mod error;
pub mod hashmap;
mod types;
14 changes: 14 additions & 0 deletions bothan-core/src/cache/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("symbol value does not exist")]
DoesNotExist,

#[error("symbol is invalid")]
Invalid,

#[error("symbol has already been set")]
AlreadySet,

#[error("symbol has not been set to pending")]
PendingNotSet,
}
152 changes: 152 additions & 0 deletions bothan-core/src/cache/hashmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::collections::hash_map::{Entry, HashMap};
use std::ops::Sub;
use std::sync::Arc;

use futures::future::join_all;
use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::{Mutex, MutexGuard};
use tokio::time::{interval, Instant};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::cache::error::Error;
use crate::cache::types::{StoredPriceData, DEFAULT_EVICTION_CHECK_INTERVAL, DEFAULT_TIMEOUT};
use crate::types::PriceData;

type Map = HashMap<String, Option<StoredPriceData>>;
type Store = Mutex<Map>;

pub struct Cache {
store: Arc<Store>,
token: CancellationToken,
}

impl Drop for Cache {
fn drop(&mut self) {
self.token.cancel()
}
}

impl Cache {
pub fn new(sender: Option<Sender<Vec<String>>>) -> Self {
let store: Arc<Store> = Arc::new(Mutex::new(HashMap::new()));
let token = CancellationToken::new();

start_eviction_process(store.clone(), token.clone(), sender);

Self { store, token }
}

pub async fn set_pending(&self, id: String) {
self.store
.lock()
.await
.entry(id.to_ascii_lowercase())
.or_insert(None);
}

pub async fn set_batch_pending(&self, ids: Vec<String>) {
let handles = ids.into_iter().map(|id| self.set_pending(id));
join_all(handles).await;
}

pub async fn set_data(&self, id: String, data: PriceData) -> Result<(), Error> {
match self.store.lock().await.entry(id.to_ascii_lowercase()) {
Entry::Occupied(mut entry) => {
match entry.get_mut() {
Some(stored) => {
stored.update(data);
}
None => {
entry.insert(Some(StoredPriceData::new(data)));
}
};
Ok(())
}
Entry::Vacant(_) => Err(Error::PendingNotSet),
}
}

pub async fn get(&self, id: &str) -> Result<PriceData, Error> {
get_value(id, &mut self.store.lock().await)
}

pub async fn get_batch(&self, ids: &[&str]) -> Vec<Result<PriceData, Error>> {
let mut locked_map = self.store.lock().await;
ids.iter()
.map(|id| get_value(id, &mut locked_map))
.collect()
}

pub async fn keys(&self) -> Vec<String> {
self.store
.lock()
.await
.iter()
.map(|(k, _)| k.clone())
.collect()
}
}

fn start_eviction_process(
store: Arc<Store>,
token: CancellationToken,
sender: Option<Sender<Vec<String>>>,
) {
tokio::spawn(async move {
let mut interval = interval(DEFAULT_EVICTION_CHECK_INTERVAL);
loop {
select! {
_ = interval.tick() => {
remove_timed_out_data(&store, &sender).await;
}
_ = token.cancelled() => {
break
}
}
}
});
}

fn is_timed_out(last_used: Instant) -> bool {
Instant::now().sub(last_used) > DEFAULT_TIMEOUT
}

async fn remove_timed_out_data(store: &Store, sender: &Option<Sender<Vec<String>>>) {
let mut locked_map = store.lock().await;
let mut evicted_keys = Vec::new();

// Remove entries that needs to be evicted and collect their keys into a vec
// to be sent to the sender to unsubscribe
locked_map.retain(|k, v| {
if let Some(price_data) = v {
if is_timed_out(price_data.last_used) {
evicted_keys.push(k.clone());
return false;
}
}
true
});

if !evicted_keys.is_empty() {
info!("evicting timed out symbols: {:?}", evicted_keys);
if let Some(sender) = sender {
// TODO: Handle this
let _res = sender.send(evicted_keys).await;
}
}
}

fn get_value(id: &str, locked_map: &mut MutexGuard<Map>) -> Result<PriceData, Error> {
match locked_map.entry(id.to_ascii_lowercase()) {
Entry::Occupied(mut entry) => match entry.get_mut() {
Some(stored) => {
stored.bump_last_used();
Ok(stored.data.clone())
}
None => Err(Error::Invalid),
},
Entry::Vacant(_) => Err(Error::DoesNotExist),
}
}
30 changes: 30 additions & 0 deletions bothan-core/src/cache/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use tokio::time::{Duration, Instant};

use crate::types::PriceData;

pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(6000);
pub const DEFAULT_EVICTION_CHECK_INTERVAL: Duration = Duration::from_secs(10);

#[derive(Debug, Clone)]
pub struct StoredPriceData {
pub data: PriceData,
pub last_used: Instant,
}

impl StoredPriceData {
pub fn new(data: PriceData) -> Self {
Self {
data,
last_used: Instant::now(),
}
}

pub fn update(&mut self, data: PriceData) {
self.data = data;
self.last_used = Instant::now();
}

pub fn bump_last_used(&mut self) {
self.last_used = Instant::now();
}
}
3 changes: 3 additions & 0 deletions bothan-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod cache;
pub mod service;
pub mod types;
26 changes: 26 additions & 0 deletions bothan-core/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::types::PriceData;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown error: {0}")]
Unknown(String),

#[error("pending result")]
Pending,

#[error("invalid symbol")]
InvalidSymbol,

#[error("websocket error: {0}")]
Websocket(String),

#[error("rest error: {0}")]
Rest(String),
}

pub type ServiceResult<T> = Result<T, Error>;

#[async_trait::async_trait]
pub trait Service {
async fn get_price_data(&mut self, ids: &[&str]) -> Vec<ServiceResult<PriceData>>;
}
10 changes: 10 additions & 0 deletions bothan-core/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use derive_more::Display;
use serde::Deserialize;

#[derive(Clone, Debug, Deserialize, Display)]
#[display("id: {}, price: {}, timestamp: {}", id, price, timestamp)]
pub struct PriceData {
pub id: String,
pub price: String,
pub timestamp: u64,
}

0 comments on commit 36e072b

Please sign in to comment.