diff --git a/config/planner.yml b/config/planner.yml index a8ab9a94..90fcc6c9 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -14,6 +14,10 @@ reinterpret_second_as: 1 # to be accepted. query_dist_change_frac: 0.1 +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + ### ### Trigger configs. ### diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index 9cd2474b..77d83c65 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -243,3 +243,6 @@ def redshift_initialize_load_fraction(self) -> float: def aurora_storage_index_multiplier(self) -> float: return float(self._raw["aurora_storage_index_multiplier"]) + + def metrics_agg(self) -> Dict[str, Any]: + return self._raw["metrics_agg"] diff --git a/src/brad/planner/metrics.py b/src/brad/planner/metrics.py index c5402d94..92b0159d 100644 --- a/src/brad/planner/metrics.py +++ b/src/brad/planner/metrics.py @@ -5,7 +5,7 @@ import numpy as np from datetime import datetime from collections import namedtuple -from typing import Tuple, Optional +from typing import Tuple, Optional, Dict, Any from brad.blueprint.manager import BlueprintManager from brad.config.file import ConfigFile @@ -31,6 +31,8 @@ ], ) +AggCfg = Dict[str, Any] + class MetricsProvider: """ @@ -137,19 +139,19 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: epochs_per_planning_window, ) - agg_type = "mean" + agg_cfg = self._planner_config.metrics_agg() redshift_cpu = self._aggregate_possibly_missing( redshift.loc[redshift.index <= most_recent_common, _REDSHIFT_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="redshift_cpu", ) txn_per_s = self._aggregate_possibly_missing( front_end.loc[front_end.index <= most_recent_common, _FRONT_END_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_per_s", ) txn_lat_s_p50 = self._aggregate_possibly_missing( @@ -159,7 +161,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: ], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_lat_s_p50", ) txn_lat_s_p90 = self._aggregate_possibly_missing( @@ -169,7 +171,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: ], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_lat_s_p90", ) @@ -180,36 +182,36 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: aurora_writer_rel[_AURORA_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_writer_cpu", ) aurora_reader_cpu = self._aggregate_possibly_missing( aurora_reader_rel[_AURORA_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_reader_cpu", ) aurora_writer_load_minute = self._recover_load_value( - aurora_writer_rel, aggregate_epochs, agg_type, "aurora_writer_load_minute" + aurora_writer_rel, aggregate_epochs, agg_cfg, "aurora_writer_load_minute" ) aurora_reader_load_minute = self._recover_load_value( - aurora_reader_rel, aggregate_epochs, agg_type, "aurora_reader_load_minute" + aurora_reader_rel, aggregate_epochs, agg_cfg, "aurora_reader_load_minute" ) aurora_writer_hit_rate_pct = self._aggregate_possibly_missing( aurora_writer_rel[_AURORA_METRICS[2]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_writer_hit_rate_pct", ) aurora_reader_hit_rate_pct = self._aggregate_possibly_missing( aurora_reader_rel[_AURORA_METRICS[2]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_reader_hit_rate_pct", ) @@ -234,7 +236,7 @@ def _aggregate_possibly_missing( series: pd.Series, num_epochs: int, default_value: int | float, - agg_type: str, + agg_cfg: AggCfg, name: Optional[str] = None, ) -> int | float: if name is not None and len(series) == 0: @@ -246,9 +248,11 @@ def _aggregate_possibly_missing( return default_value else: relevant = series.iloc[-num_epochs:] - if agg_type == "mean": + if agg_cfg["method"] == "mean": return relevant.mean() - # TODO: Can add other types (e.g., exponentially weighted) + elif agg_cfg["method"] == "ewm": + alpha = agg_cfg["alpha"] + return relevant.ewm(alpha=alpha).mean().iloc[-1] else: raise AssertionError() @@ -256,7 +260,7 @@ def _recover_load_value( self, aurora_rel: pd.DataFrame, num_epochs: int, - agg_type: str, + agg_cfg: AggCfg, metric_name: str, ) -> float: if len(aurora_rel) < 2: @@ -265,7 +269,7 @@ def _recover_load_value( aurora_rel[_AURORA_METRICS[1]], num_epochs=num_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name=metric_name, ) @@ -296,8 +300,11 @@ def _recover_load_value( ) relevant = load_minute[-epochs_to_consider:] - if agg_type == "mean": + if agg_cfg["method"] == "mean": return relevant.mean() + elif agg_cfg["method"] == "ewm": + alpha = agg_cfg["alpha"] + return relevant.ewm(alpha=alpha).mean().iloc[-1] else: # TODO: Can add other types (e.g., exponentially weighted) raise AssertionError()