Skip to content

Commit

Permalink
add monitoring details for processor and post processor with minor re…
Browse files Browse the repository at this point in the history
…factor
  • Loading branch information
warittornc committed Nov 21, 2024
1 parent 911ccaa commit 181a42c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 49 deletions.
44 changes: 32 additions & 12 deletions bothan-core/src/manager/crypto_asset_info/price/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisite
use crate::manager::crypto_asset_info::types::{
PriceSignalComputationRecord, PriceState, WorkerMap,
};
use crate::monitoring::records::{OperationRecord, SignalComputationRecord, SourceRecord};
use crate::registry::post_processor::PostProcess;
use crate::registry::processor::Process;
use crate::monitoring::records::{
OperationRecord, ProcessRecord, SignalComputationRecord, SourceRecord,
};
use crate::registry::signal::Signal;
use crate::registry::source::{OperationRoute, SourceQuery};
use crate::registry::{Registry, Valid};
Expand Down Expand Up @@ -89,18 +89,37 @@ async fn compute_signal_result<'a>(
// We can unwrap here because we just pushed the record, so it's guaranteed to be there
let record_ref = records.last_mut().unwrap();

// let processor_ref: Processor = &signal.processor;
let process_signal_result = signal.processor.process(source_results);
record_ref.process_result = Some(process_signal_result.clone());

let processed_signal = process_signal_result?;
record_ref.process_result = Some(ProcessRecord::new(
signal.processor.name().to_string(),
process_signal_result.clone(),
));

let mut processed_signal = process_signal_result?;
let mut post_process_records = Vec::with_capacity(signal.post_processors.len());
for post_processor in &signal.post_processors {
let post_process_signal_result = post_processor.post_process(processed_signal);

post_process_records.push(ProcessRecord::new(
post_processor.name().to_string(),
post_process_signal_result.clone(),
));

match post_process_signal_result {
Ok(post_processed) => {
processed_signal = post_processed;
}
Err(e) => {
record_ref.post_process_result = Some(post_process_records);
return Err(Error::FailedToPostProcessSignal(e));
}
}
}

let post_process_signal_result = signal
.post_processors
.iter()
.try_fold(processed_signal, |acc, post| post.process(acc));
record_ref.post_process_result = Some(post_process_signal_result.clone());
record_ref.post_process_result = Some(post_process_records);

Ok(post_process_signal_result?)
Ok(processed_signal)
}
None => Err(Error::InvalidSignal),
}
Expand Down Expand Up @@ -216,6 +235,7 @@ fn compute_source_routes(
.map(|(r, v)| OperationRecord::new(r.signal_id.clone(), r.operation.clone(), *v))
.collect::<Vec<OperationRecord>>();
record.operations = op_records;
record.final_value = price;

Ok(price)
}
Expand Down
19 changes: 17 additions & 2 deletions bothan-core/src/monitoring/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ where
{
pub signal_id: String,
pub sources: Vec<SourceRecord<T>>,
pub process_result: Option<Result<U, ProcessError>>,
pub post_process_result: Option<Result<U, PostProcessError>>,
pub process_result: Option<ProcessRecord<U, ProcessError>>,
pub post_process_result: Option<Vec<ProcessRecord<U, PostProcessError>>>,
}

impl<T, U> SignalComputationRecord<T, U> {
Expand All @@ -36,6 +36,21 @@ impl<T, U> SignalComputationRecord<T, U> {
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessRecord<T, E>
where
T: Sized,
{
pub function: String,
pub result: Result<T, E>,
}

impl<T, E> ProcessRecord<T, E> {
pub fn new(function: String, result: Result<T, E>) -> Self {
ProcessRecord { function, result }
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SourceRecord<T: Sized> {
pub source_id: String,
Expand Down
15 changes: 8 additions & 7 deletions bothan-core/src/registry/post_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ impl PostProcessError {
}
}

/// The PostProcessor trait defines the methods that a post-processor must implement.
pub trait PostProcess<T> {
fn process(&self, data: T) -> Result<T, PostProcessError>;
}

/// The PostProcess enum represents the different types of post-processors that can be used.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
#[serde(rename_all = "snake_case", tag = "function", content = "params")]
pub enum PostProcessor {
TickConvertor(tick::TickPostProcessor),
}

impl PostProcess<Decimal> for PostProcessor {
fn process(&self, data: Decimal) -> Result<Decimal, PostProcessError> {
impl PostProcessor {
pub fn name(&self) -> &str {
match self {
PostProcessor::TickConvertor(_) => "tick_convertor",
}
}

pub fn post_process(&self, data: Decimal) -> Result<Decimal, PostProcessError> {
match self {
PostProcessor::TickConvertor(tick) => tick.process(data),
}
Expand Down
8 changes: 4 additions & 4 deletions bothan-core/src/registry/post_processor/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use num_traits::FromPrimitive;
use rust_decimal::{Decimal, MathematicalOps};
use serde::{Deserialize, Serialize};

use crate::registry::post_processor::{PostProcess, PostProcessError};
use crate::registry::post_processor::PostProcessError;

const TICK: f64 = 1.0001;
const MID_TICK: f64 = 262144.0;
Expand All @@ -14,10 +14,10 @@ const MIN_TICK: f64 = 1.0;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
pub struct TickPostProcessor {}

impl PostProcess<Decimal> for TickPostProcessor {
impl TickPostProcessor {
/// Processes the given data into its tick value and returns it. If the data is out of bounds,
/// it returns an error.
fn process(&self, data: Decimal) -> Result<Decimal, PostProcessError> {
pub fn process(&self, data: Decimal) -> Result<Decimal, PostProcessError> {
// Unwrap here is safe because the constants are hardcoded.
let tick = Decimal::from_f64(TICK).unwrap();
let min_tick = Decimal::from_f64(MIN_TICK).unwrap();
Expand All @@ -43,7 +43,7 @@ mod tests {
#[test]
fn test_process() {
let tick_convertor = PostProcessor::TickConvertor(TickPostProcessor {});
let result = tick_convertor.process(Decimal::from(20));
let result = tick_convertor.post_process(Decimal::from(20));
assert_eq!(
result.unwrap(),
Decimal::from_str_exact("292102.82057671349939971087257").unwrap()
Expand Down
19 changes: 5 additions & 14 deletions bothan-core/src/registry/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ impl ProcessError {
}
}

/// The Processor trait defines the methods that a processor must implement.
pub trait Process<T, U> {
fn process(&self, data: Vec<T>) -> Result<U, ProcessError>;
}

/// The Process enum represents the different types of processors that can be used.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
#[serde(rename_all = "snake_case", tag = "function", content = "params")]
Expand All @@ -31,19 +26,15 @@ pub enum Processor {
WeightedMedian(weighted_median::WeightedMedianProcessor),
}

impl Process<Decimal, Decimal> for Processor {
fn process(&self, data: Vec<Decimal>) -> Result<Decimal, ProcessError> {
impl Processor {
pub fn name(&self) -> &str {
match self {
Processor::Median(median) => median.process(data),
Processor::WeightedMedian(_) => Err(ProcessError::new(
"Weighted median not implemented for T: Decimal",
)),
Processor::Median(_) => "median",
Processor::WeightedMedian(_) => "weighted_median",
}
}
}

impl Process<(String, Decimal), Decimal> for Processor {
fn process(&self, data: Vec<(String, Decimal)>) -> Result<Decimal, ProcessError> {
pub fn process(&self, data: Vec<(String, Decimal)>) -> Result<Decimal, ProcessError> {
match self {
Processor::Median(median) => {
let data = data.into_iter().map(|(_, value)| value).collect();
Expand Down
8 changes: 2 additions & 6 deletions bothan-core/src/registry/processor/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use num_traits::FromPrimitive;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

use crate::registry::processor::{Process, ProcessError};
use crate::registry::processor::ProcessError;

/// The `MedianProcessor` finds the median of a given data set. It also has a `min_source_count` which
/// is the minimum number of sources required to calculate the median. If the given data set has less
Expand All @@ -21,12 +21,8 @@ impl MedianProcessor {
pub fn new(min_source_count: usize) -> Self {
MedianProcessor { min_source_count }
}
}

impl Process<Decimal, Decimal> for MedianProcessor {
/// Processes the given data and returns the median. If there are not enough sources, it
/// returns an error.
fn process(&self, data: Vec<Decimal>) -> Result<Decimal, ProcessError> {
pub fn process(&self, data: Vec<Decimal>) -> Result<Decimal, ProcessError> {
if data.len() < max(self.min_source_count, 1) {
Err(ProcessError::new("Not enough sources to calculate median"))
} else {
Expand Down
6 changes: 3 additions & 3 deletions bothan-core/src/registry/processor/weighted_median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use num_traits::{FromPrimitive, Zero};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

use crate::registry::processor::{Process, ProcessError};
use crate::registry::processor::ProcessError;

/// The `WeightedMedianProcessor` finds the weighted median of a given data set where the dataset
/// contains the source and the value. It also has a `minimum_cumulative_weight` which is the
Expand All @@ -30,11 +30,11 @@ impl WeightedMedianProcessor {
}
}

impl Process<(String, Decimal), Decimal> for WeightedMedianProcessor {
impl WeightedMedianProcessor {
/// Processes the given data and returns the weighted median. If the cumulative weights of the
/// data sources are less than the minimum cumulative weight or the source associated
/// with the data does not have an assigned weight, it returns an error.
fn process(&self, data: Vec<(String, Decimal)>) -> Result<Decimal, ProcessError> {
pub fn process(&self, data: Vec<(String, Decimal)>) -> Result<Decimal, ProcessError> {
let cumulative_weight = data.iter().try_fold(0, |acc, (source, _)| {
self.source_weights
.get(source)
Expand Down
2 changes: 1 addition & 1 deletion proto/bothan/v1/bothan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ message GetInfoResponse {
string registry_ipfs_hash = 2;
// The version requirements for the registry.
string registry_version_requirement = 3;
// The active sources the Bothan instance is using
git // The active sources the Bothan instance is using
repeated string active_sources = 4;
// Whether or not the Bothan instance has monitoring enabled
bool monitoring_enabled = 5;
Expand Down

0 comments on commit 181a42c

Please sign in to comment.