Skip to content

Commit

Permalink
[feat] Add Monitoring Details (#102)
Browse files Browse the repository at this point in the history
* add monitoring details for processor and post processor with minor refactor

* fix unit tests

* remove typo

* [feat] Add Asset State to Monitoring (#103)

* add asset state to monitoring

* remove unnecessary trait requirements

* minor bug fix

* minor bug fix
  • Loading branch information
warittornc authored Nov 28, 2024
1 parent 911ccaa commit 20918d3
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 106 deletions.
1 change: 1 addition & 0 deletions bothan-binance/src/worker/asset_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn handle_reconnect(

if subscribe(&ids_vec, connection).await.is_ok() {
info!("resubscribed to all ids");
return;
} else {
error!("failed to resubscribe to all ids");
}
Expand Down
164 changes: 111 additions & 53 deletions bothan-core/src/manager/crypto_asset_info/price/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisite
use crate::manager::crypto_asset_info::types::{
PriceSignalComputationRecord, PriceState, WorkerMap,
};
use crate::monitoring::records::{OperationRecord, SignalComputationRecord, SourceRecord};
use crate::registry::post_processor::PostProcess;
use crate::registry::processor::Process;
use crate::monitoring::records::{
OperationRecord, ProcessRecord, SignalComputationRecord, SourceRecord,
};
use crate::registry::signal::Signal;
use crate::registry::source::{OperationRoute, SourceQuery};
use crate::registry::{Registry, Valid};
Expand Down Expand Up @@ -89,18 +89,37 @@ async fn compute_signal_result<'a>(
// We can unwrap here because we just pushed the record, so it's guaranteed to be there
let record_ref = records.last_mut().unwrap();

// let processor_ref: Processor = &signal.processor;
let process_signal_result = signal.processor.process(source_results);
record_ref.process_result = Some(process_signal_result.clone());

let processed_signal = process_signal_result?;
record_ref.process_result = Some(ProcessRecord::new(
signal.processor.name().to_string(),
process_signal_result.clone(),
));

let mut processed_signal = process_signal_result?;
let mut post_process_records = Vec::with_capacity(signal.post_processors.len());
for post_processor in &signal.post_processors {
let post_process_signal_result = post_processor.post_process(processed_signal);

post_process_records.push(ProcessRecord::new(
post_processor.name().to_string(),
post_process_signal_result.clone(),
));

match post_process_signal_result {
Ok(post_processed) => {
processed_signal = post_processed;
}
Err(e) => {
record_ref.post_process_result = Some(post_process_records);
return Err(Error::FailedToPostProcessSignal(e));
}
}
}

let post_process_signal_result = signal
.post_processors
.iter()
.try_fold(processed_signal, |acc, post| post.process(acc));
record_ref.post_process_result = Some(post_process_signal_result.clone());
record_ref.post_process_result = Some(post_process_records);

Ok(post_process_signal_result?)
Ok(processed_signal)
}
None => Err(Error::InvalidSignal),
}
Expand Down Expand Up @@ -151,30 +170,39 @@ async fn process_source_query<'a>(
source_query: &SourceQuery,
stale_cutoff: i64,
cache: &PriceCache<String>,
source_records: &mut Vec<SourceRecord<Decimal>>,
source_records: &mut Vec<SourceRecord<AssetState, Decimal>>,
) -> Result<Option<(String, Decimal)>, MissingPrerequisiteError> {
let source_id = &source_query.source_id;
let query_id = &source_query.query_id;

// Create a record for the specific source
let source_record = SourceRecord::new(source_id.clone(), query_id.clone(), None, vec![], None);
source_records.push(source_record);
// We can unwrap here because we just pushed the value, so it's guaranteed to be there
let record = source_records.last_mut().unwrap();

match worker.get_asset(query_id).await {
Ok(AssetState::Available(a)) if a.timestamp.ge(&stale_cutoff) => {
// Create a record for the specific source
let source_record =
SourceRecord::new(source_id.clone(), query_id.clone(), a.price, vec![], None);
source_records.push(source_record);
// We can unwrap here because we just pushed the value, so it's guaranteed to be there
let record = source_records.last_mut().unwrap();

// Calculate the source route
compute_source_routes(&source_query.routes, a.price, cache, record)
.map(|opt| opt.map(|price| (source_id.clone(), price)))
}
Ok(AssetState::Available(_)) => {
warn!("asset state for {query_id} from {source_id} has timed out");
Ok(None)
}
Ok(_) => {
warn!("asset state for {query_id} from {source_id} is unavailable");
Ok(None)
Ok(asset_state) => {
record.raw_source_value = Some(asset_state.clone());
match asset_state {
AssetState::Available(a) if a.timestamp >= stale_cutoff => {
// Calculate the source route
compute_source_routes(&source_query.routes, a.price, cache, record)
.map(|opt| opt.map(|price| (source_id.clone(), price)))
}
AssetState::Available(_) => {
warn!("asset state for {query_id} from {source_id} is stale");
Ok(None)
}
AssetState::Unsupported => {
warn!("asset state for {query_id} from {source_id} is unsupported");
Ok(None)
}
AssetState::Pending => {
info!("asset state for {query_id} from {source_id} is pending");
Ok(None)
}
}
}
Err(_) => {
warn!("error while querying source {source_id} for {query_id}");
Expand All @@ -187,7 +215,7 @@ fn compute_source_routes(
routes: &Vec<OperationRoute>,
start: Decimal,
cache: &PriceCache<String>,
record: &mut SourceRecord<Decimal>,
record: &mut SourceRecord<AssetState, Decimal>,
) -> Result<Option<Decimal>, MissingPrerequisiteError> {
// Get all pre requisites
let mut missing = Vec::with_capacity(routes.len());
Expand Down Expand Up @@ -216,6 +244,7 @@ fn compute_source_routes(
.map(|(r, v)| OperationRecord::new(r.signal_id.clone(), r.operation.clone(), *v))
.collect::<Vec<OperationRecord>>();
record.operations = op_records;
record.final_value = price;

Ok(price)
}
Expand Down Expand Up @@ -406,10 +435,9 @@ mod tests {
let signal = Signal::new(source_queries, processor, vec![]);

let mut test_source_worker = MockWorker::default();
test_source_worker.add_expected_query(
"testusd".to_string(),
AssetState::Available(AssetInfo::new("testusd".to_string(), Decimal::default(), 0)),
);
let asset_state =
AssetState::Available(AssetInfo::new("testusd".to_string(), Decimal::default(), 0));
test_source_worker.add_expected_query("testusd".to_string(), asset_state.clone());

let workers = mock_workers(vec![("test-source", test_source_worker)]);

Expand All @@ -425,9 +453,9 @@ mod tests {
sources: vec![SourceRecord::new(
"test-source".to_string(),
"testusd".to_string(),
Decimal::default(),
Some(asset_state),
vec![],
None,
Some(Decimal::default()),
)],
process_result: None,
post_process_result: None,
Expand Down Expand Up @@ -474,8 +502,9 @@ mod tests {
async fn test_process_source_query() {
let mut worker = MockWorker::default();
let id = "testusd".to_string();
let asset_info = AssetInfo::new(id.clone(), Decimal::new(1000, 0), 10);
worker.add_expected_query("testusd".to_string(), AssetState::Available(asset_info));
let asset_state =
AssetState::Available(AssetInfo::new(id.clone(), Decimal::new(1000, 0), 10));
worker.add_expected_query("testusd".to_string(), asset_state.clone());

let source_query = SourceQuery::new("test-source".to_string(), id.clone(), vec![]);
let stale_cutoff = 5;
Expand All @@ -490,9 +519,9 @@ mod tests {
let expected_source_records = vec![SourceRecord::new(
"test-source".to_string(),
"testusd".to_string(),
Decimal::new(1000, 0),
Some(asset_state),
vec![],
None,
Some(Decimal::new(1000, 0)),
)];
assert_eq!(res, expected_res);
assert_eq!(source_records, &expected_source_records);
Expand All @@ -503,7 +532,10 @@ mod tests {
let mut worker = MockWorker::default();
let id = "testusd".to_string();
let asset_info = AssetInfo::new(id.clone(), Decimal::default(), 0);
worker.add_expected_query("testusd".to_string(), AssetState::Available(asset_info));
worker.add_expected_query(
"testusd".to_string(),
AssetState::Available(asset_info.clone()),
);

let source_query = SourceQuery::new("test-source".to_string(), id.clone(), vec![]);
let stale_cutoff = 1000;
Expand All @@ -513,8 +545,17 @@ mod tests {
let res =
process_source_query(&worker, &source_query, stale_cutoff, &cache, source_records)
.await;

let expected = vec![SourceRecord::new(
"test-source".to_string(),
"testusd".to_string(),
Some(AssetState::Available(asset_info)),
vec![],
None,
)];

assert_eq!(res, Ok(None));
assert_eq!(source_records, &vec![]);
assert_eq!(source_records, &expected);
}

#[tokio::test]
Expand All @@ -531,8 +572,17 @@ mod tests {
let res =
process_source_query(&worker, &source_query, stale_cutoff, &cache, source_records)
.await;

let expected = vec![SourceRecord::new(
"test-source".to_string(),
"testusd".to_string(),
Some(AssetState::Unsupported),
vec![],
None,
)];

assert_eq!(res, Ok(None));
assert_eq!(source_records, &vec![]);
assert_eq!(source_records, &expected);
}

#[test]
Expand All @@ -552,27 +602,29 @@ mod tests {
cache.set_available("C".to_string(), Decimal::from(13));
cache.set_available("D".to_string(), Decimal::from(89));

let asset_state =
AssetState::Available(AssetInfo::new("test".to_string(), Decimal::one(), 0));
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
Some(asset_state.clone()),
vec![],
None,
);
let res = compute_source_routes(&routes, start, &cache, &mut record);

let expected_value = Some(Decimal::from_str_exact("76.4").unwrap());
let expected_value = Some(Decimal::new(764, 1));
let expected_record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
Some(asset_state),
vec![
OperationRecord::new("A".to_string(), Operation::Multiply, Decimal::from(2)),
OperationRecord::new("B".to_string(), Operation::Divide, Decimal::from(5)),
OperationRecord::new("C".to_string(), Operation::Subtract, Decimal::from(13)),
OperationRecord::new("D".to_string(), Operation::Add, Decimal::from(89)),
],
None,
expected_value,
);

assert_eq!(res, Ok(expected_value));
Expand All @@ -589,10 +641,12 @@ mod tests {
let mut cache = PriceCache::new();
cache.set_available("A".to_string(), Decimal::from(2));

let asset_state =
AssetState::Available(AssetInfo::new("test".to_string(), Decimal::one(), 0));
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
Some(asset_state),
vec![],
None,
);
Expand All @@ -617,10 +671,12 @@ mod tests {
cache.set_available("A".to_string(), Decimal::from(1337));
cache.set_unavailable("B".to_string());

let asset_state =
AssetState::Available(AssetInfo::new("test".to_string(), Decimal::one(), 0));
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
Some(asset_state),
vec![],
None,
);
Expand All @@ -645,10 +701,12 @@ mod tests {
cache.set_unsupported("B".to_string());
cache.set_available("C".to_string(), Decimal::from(10000));

let asset_state =
AssetState::Available(AssetInfo::new("test".to_string(), Decimal::one(), 0));
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
Some(asset_state),
vec![],
None,
);
Expand Down
4 changes: 2 additions & 2 deletions bothan-core/src/manager/crypto_asset_info/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::time::Duration;
use rust_decimal::Decimal;

use crate::monitoring::records::SignalComputationRecord;
use crate::worker::AssetWorker;
use crate::worker::{AssetState, AssetWorker};

pub const MONITORING_TTL: Duration = Duration::from_secs(60);
pub const HEARTBEAT: Duration = Duration::from_secs(60);

pub type WorkerMap<'a> = HashMap<String, Arc<dyn AssetWorker + 'a>>;
pub type PriceSignalComputationRecord = SignalComputationRecord<Decimal, Decimal>;
pub type PriceSignalComputationRecord = SignalComputationRecord<AssetState, Decimal>;

#[derive(Debug, Clone, PartialEq)]
pub enum PriceState {
Expand Down
36 changes: 22 additions & 14 deletions bothan-core/src/monitoring/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ pub struct SignalRecordsWithTxHash<T, U> {
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SignalComputationRecord<T, U>
where
T: Sized,
U: Sized,
{
pub struct SignalComputationRecord<T, U> {
pub signal_id: String,
pub sources: Vec<SourceRecord<T>>,
pub process_result: Option<Result<U, ProcessError>>,
pub post_process_result: Option<Result<U, PostProcessError>>,
pub sources: Vec<SourceRecord<T, U>>,
pub process_result: Option<ProcessRecord<U, ProcessError>>,
pub post_process_result: Option<Vec<ProcessRecord<U, PostProcessError>>>,
}

impl<T, U> SignalComputationRecord<T, U> {
Expand All @@ -37,21 +33,33 @@ impl<T, U> SignalComputationRecord<T, U> {
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SourceRecord<T: Sized> {
pub struct ProcessRecord<T, E> {
pub function: String,
pub result: Result<T, E>,
}

impl<T, E> ProcessRecord<T, E> {
pub fn new(function: String, result: Result<T, E>) -> Self {
ProcessRecord { function, result }
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SourceRecord<T, U> {
pub source_id: String,
pub query_id: String,
pub raw_source_value: T,
pub raw_source_value: Option<T>,
pub operations: Vec<OperationRecord>,
pub final_value: Option<T>,
pub final_value: Option<U>,
}

impl<T> SourceRecord<T> {
impl<T, U> SourceRecord<T, U> {
pub fn new(
source_id: String,
query_id: String,
raw_source_value: T,
raw_source_value: Option<T>,
operations: Vec<OperationRecord>,
final_value: Option<T>,
final_value: Option<U>,
) -> Self {
SourceRecord {
source_id,
Expand Down
Loading

0 comments on commit 20918d3

Please sign in to comment.