From 36e072b4face1c8b1c15afaa52e8275a9c8969a6 Mon Sep 17 00:00:00 2001 From: Warittorn Cheevachaipimol Date: Tue, 13 Feb 2024 15:55:11 +0700 Subject: [PATCH] add core --- bothan-core/Cargo.toml | 18 ++++ bothan-core/src/cache.rs | 6 ++ bothan-core/src/cache/error.rs | 14 +++ bothan-core/src/cache/hashmap.rs | 152 +++++++++++++++++++++++++++++++ bothan-core/src/cache/types.rs | 30 ++++++ bothan-core/src/lib.rs | 3 + bothan-core/src/service.rs | 26 ++++++ bothan-core/src/types.rs | 10 ++ 8 files changed, 259 insertions(+) create mode 100644 bothan-core/Cargo.toml create mode 100644 bothan-core/src/cache.rs create mode 100644 bothan-core/src/cache/error.rs create mode 100644 bothan-core/src/cache/hashmap.rs create mode 100644 bothan-core/src/cache/types.rs create mode 100644 bothan-core/src/lib.rs create mode 100644 bothan-core/src/service.rs create mode 100644 bothan-core/src/types.rs diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml new file mode 100644 index 00000000..68fda9cb --- /dev/null +++ b/bothan-core/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "bothan-core" +version.workspace = true +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thiserror = "1.0.57" +futures = "0.3.30" +tokio = { version = "1.36.0", features = ["full"] } +tokio-util = "0.7.10" +tracing = "0.1.40" +log = "0.4.20" +serde = { version = "1.0.196", features = ["std", "derive", "alloc"] } +derive_more = { version = "1.0.0-beta.6", features = ["full"] } +async-trait = "0.1.77" diff --git a/bothan-core/src/cache.rs b/bothan-core/src/cache.rs new file mode 100644 index 00000000..2cb6bff8 --- /dev/null +++ b/bothan-core/src/cache.rs @@ -0,0 +1,6 @@ +pub use error::Error; +pub use hashmap::Cache; + +pub mod error; +pub mod hashmap; +mod types; diff --git a/bothan-core/src/cache/error.rs b/bothan-core/src/cache/error.rs new file mode 100644 index 00000000..4f2a624d --- /dev/null +++ b/bothan-core/src/cache/error.rs @@ -0,0 +1,14 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("symbol value does not exist")] + DoesNotExist, + + #[error("symbol is invalid")] + Invalid, + + #[error("symbol has already been set")] + AlreadySet, + + #[error("symbol has not been set to pending")] + PendingNotSet, +} diff --git a/bothan-core/src/cache/hashmap.rs b/bothan-core/src/cache/hashmap.rs new file mode 100644 index 00000000..3e1a1454 --- /dev/null +++ b/bothan-core/src/cache/hashmap.rs @@ -0,0 +1,152 @@ +use std::collections::hash_map::{Entry, HashMap}; +use std::ops::Sub; +use std::sync::Arc; + +use futures::future::join_all; +use tokio::select; +use tokio::sync::mpsc::Sender; +use tokio::sync::{Mutex, MutexGuard}; +use tokio::time::{interval, Instant}; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use crate::cache::error::Error; +use crate::cache::types::{StoredPriceData, DEFAULT_EVICTION_CHECK_INTERVAL, DEFAULT_TIMEOUT}; +use crate::types::PriceData; + +type Map = HashMap>; +type Store = Mutex; + +pub struct Cache { + store: Arc, + token: CancellationToken, +} + +impl Drop for Cache { + fn drop(&mut self) { + self.token.cancel() + } +} + +impl Cache { + pub fn new(sender: Option>>) -> Self { + let store: Arc = Arc::new(Mutex::new(HashMap::new())); + let token = CancellationToken::new(); + + start_eviction_process(store.clone(), token.clone(), sender); + + Self { store, token } + } + + pub async fn set_pending(&self, id: String) { + self.store + .lock() + .await + .entry(id.to_ascii_lowercase()) + .or_insert(None); + } + + pub async fn set_batch_pending(&self, ids: Vec) { + let handles = ids.into_iter().map(|id| self.set_pending(id)); + join_all(handles).await; + } + + pub async fn set_data(&self, id: String, data: PriceData) -> Result<(), Error> { + match self.store.lock().await.entry(id.to_ascii_lowercase()) { + Entry::Occupied(mut entry) => { + match entry.get_mut() { + Some(stored) => { + stored.update(data); + } + None => { + entry.insert(Some(StoredPriceData::new(data))); + } + }; + Ok(()) + } + Entry::Vacant(_) => Err(Error::PendingNotSet), + } + } + + pub async fn get(&self, id: &str) -> Result { + get_value(id, &mut self.store.lock().await) + } + + pub async fn get_batch(&self, ids: &[&str]) -> Vec> { + let mut locked_map = self.store.lock().await; + ids.iter() + .map(|id| get_value(id, &mut locked_map)) + .collect() + } + + pub async fn keys(&self) -> Vec { + self.store + .lock() + .await + .iter() + .map(|(k, _)| k.clone()) + .collect() + } +} + +fn start_eviction_process( + store: Arc, + token: CancellationToken, + sender: Option>>, +) { + tokio::spawn(async move { + let mut interval = interval(DEFAULT_EVICTION_CHECK_INTERVAL); + loop { + select! { + _ = interval.tick() => { + remove_timed_out_data(&store, &sender).await; + } + _ = token.cancelled() => { + break + } + } + } + }); +} + +fn is_timed_out(last_used: Instant) -> bool { + Instant::now().sub(last_used) > DEFAULT_TIMEOUT +} + +async fn remove_timed_out_data(store: &Store, sender: &Option>>) { + let mut locked_map = store.lock().await; + let mut evicted_keys = Vec::new(); + + // Remove entries that needs to be evicted and collect their keys into a vec + // to be sent to the sender to unsubscribe + locked_map.retain(|k, v| { + if let Some(price_data) = v { + if is_timed_out(price_data.last_used) { + evicted_keys.push(k.clone()); + return false; + } + } + true + }); + + if !evicted_keys.is_empty() { + info!("evicting timed out symbols: {:?}", evicted_keys); + if let Some(sender) = sender { + // TODO: Handle this + let _res = sender.send(evicted_keys).await; + } + } +} + +fn get_value(id: &str, locked_map: &mut MutexGuard) -> Result { + match locked_map.entry(id.to_ascii_lowercase()) { + Entry::Occupied(mut entry) => match entry.get_mut() { + Some(stored) => { + stored.bump_last_used(); + Ok(stored.data.clone()) + } + None => Err(Error::Invalid), + }, + Entry::Vacant(_) => Err(Error::DoesNotExist), + } +} diff --git a/bothan-core/src/cache/types.rs b/bothan-core/src/cache/types.rs new file mode 100644 index 00000000..43c40d23 --- /dev/null +++ b/bothan-core/src/cache/types.rs @@ -0,0 +1,30 @@ +use tokio::time::{Duration, Instant}; + +use crate::types::PriceData; + +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(6000); +pub const DEFAULT_EVICTION_CHECK_INTERVAL: Duration = Duration::from_secs(10); + +#[derive(Debug, Clone)] +pub struct StoredPriceData { + pub data: PriceData, + pub last_used: Instant, +} + +impl StoredPriceData { + pub fn new(data: PriceData) -> Self { + Self { + data, + last_used: Instant::now(), + } + } + + pub fn update(&mut self, data: PriceData) { + self.data = data; + self.last_used = Instant::now(); + } + + pub fn bump_last_used(&mut self) { + self.last_used = Instant::now(); + } +} diff --git a/bothan-core/src/lib.rs b/bothan-core/src/lib.rs new file mode 100644 index 00000000..4fa08872 --- /dev/null +++ b/bothan-core/src/lib.rs @@ -0,0 +1,3 @@ +pub mod cache; +pub mod service; +pub mod types; diff --git a/bothan-core/src/service.rs b/bothan-core/src/service.rs new file mode 100644 index 00000000..e1773f7f --- /dev/null +++ b/bothan-core/src/service.rs @@ -0,0 +1,26 @@ +use crate::types::PriceData; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unknown error: {0}")] + Unknown(String), + + #[error("pending result")] + Pending, + + #[error("invalid symbol")] + InvalidSymbol, + + #[error("websocket error: {0}")] + Websocket(String), + + #[error("rest error: {0}")] + Rest(String), +} + +pub type ServiceResult = Result; + +#[async_trait::async_trait] +pub trait Service { + async fn get_price_data(&mut self, ids: &[&str]) -> Vec>; +} diff --git a/bothan-core/src/types.rs b/bothan-core/src/types.rs new file mode 100644 index 00000000..cb670d6e --- /dev/null +++ b/bothan-core/src/types.rs @@ -0,0 +1,10 @@ +use derive_more::Display; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize, Display)] +#[display("id: {}, price: {}, timestamp: {}", id, price, timestamp)] +pub struct PriceData { + pub id: String, + pub price: String, + pub timestamp: u64, +}