diff --git a/config/planner.yml b/config/planner.yml index a8ab9a94..90fcc6c9 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -14,6 +14,10 @@ reinterpret_second_as: 1 # to be accepted. query_dist_change_frac: 0.1 +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + ### ### Trigger configs. ### diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index 9cd2474b..77d83c65 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -243,3 +243,6 @@ def redshift_initialize_load_fraction(self) -> float: def aurora_storage_index_multiplier(self) -> float: return float(self._raw["aurora_storage_index_multiplier"]) + + def metrics_agg(self) -> Dict[str, Any]: + return self._raw["metrics_agg"] diff --git a/src/brad/planner/metrics.py b/src/brad/planner/metrics.py index c5402d94..92b0159d 100644 --- a/src/brad/planner/metrics.py +++ b/src/brad/planner/metrics.py @@ -5,7 +5,7 @@ import numpy as np from datetime import datetime from collections import namedtuple -from typing import Tuple, Optional +from typing import Tuple, Optional, Dict, Any from brad.blueprint.manager import BlueprintManager from brad.config.file import ConfigFile @@ -31,6 +31,8 @@ ], ) +AggCfg = Dict[str, Any] + class MetricsProvider: """ @@ -137,19 +139,19 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: epochs_per_planning_window, ) - agg_type = "mean" + agg_cfg = self._planner_config.metrics_agg() redshift_cpu = self._aggregate_possibly_missing( redshift.loc[redshift.index <= most_recent_common, _REDSHIFT_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="redshift_cpu", ) txn_per_s = self._aggregate_possibly_missing( front_end.loc[front_end.index <= most_recent_common, _FRONT_END_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_per_s", ) txn_lat_s_p50 = self._aggregate_possibly_missing( @@ -159,7 +161,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: ], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_lat_s_p50", ) txn_lat_s_p90 = self._aggregate_possibly_missing( @@ -169,7 +171,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: ], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="txn_lat_s_p90", ) @@ -180,36 +182,36 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: aurora_writer_rel[_AURORA_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_writer_cpu", ) aurora_reader_cpu = self._aggregate_possibly_missing( aurora_reader_rel[_AURORA_METRICS[0]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_reader_cpu", ) aurora_writer_load_minute = self._recover_load_value( - aurora_writer_rel, aggregate_epochs, agg_type, "aurora_writer_load_minute" + aurora_writer_rel, aggregate_epochs, agg_cfg, "aurora_writer_load_minute" ) aurora_reader_load_minute = self._recover_load_value( - aurora_reader_rel, aggregate_epochs, agg_type, "aurora_reader_load_minute" + aurora_reader_rel, aggregate_epochs, agg_cfg, "aurora_reader_load_minute" ) aurora_writer_hit_rate_pct = self._aggregate_possibly_missing( aurora_writer_rel[_AURORA_METRICS[2]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_writer_hit_rate_pct", ) aurora_reader_hit_rate_pct = self._aggregate_possibly_missing( aurora_reader_rel[_AURORA_METRICS[2]], num_epochs=aggregate_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name="aurora_reader_hit_rate_pct", ) @@ -234,7 +236,7 @@ def _aggregate_possibly_missing( series: pd.Series, num_epochs: int, default_value: int | float, - agg_type: str, + agg_cfg: AggCfg, name: Optional[str] = None, ) -> int | float: if name is not None and len(series) == 0: @@ -246,9 +248,11 @@ def _aggregate_possibly_missing( return default_value else: relevant = series.iloc[-num_epochs:] - if agg_type == "mean": + if agg_cfg["method"] == "mean": return relevant.mean() - # TODO: Can add other types (e.g., exponentially weighted) + elif agg_cfg["method"] == "ewm": + alpha = agg_cfg["alpha"] + return relevant.ewm(alpha=alpha).mean().iloc[-1] else: raise AssertionError() @@ -256,7 +260,7 @@ def _recover_load_value( self, aurora_rel: pd.DataFrame, num_epochs: int, - agg_type: str, + agg_cfg: AggCfg, metric_name: str, ) -> float: if len(aurora_rel) < 2: @@ -265,7 +269,7 @@ def _recover_load_value( aurora_rel[_AURORA_METRICS[1]], num_epochs=num_epochs, default_value=0.0, - agg_type=agg_type, + agg_cfg=agg_cfg, name=metric_name, ) @@ -296,8 +300,11 @@ def _recover_load_value( ) relevant = load_minute[-epochs_to_consider:] - if agg_type == "mean": + if agg_cfg["method"] == "mean": return relevant.mean() + elif agg_cfg["method"] == "ewm": + alpha = agg_cfg["alpha"] + return relevant.ewm(alpha=alpha).mean().iloc[-1] else: # TODO: Can add other types (e.g., exponentially weighted) raise AssertionError() diff --git a/src/brad/planner/triggers/recent_change.py b/src/brad/planner/triggers/recent_change.py index a45eb140..f6d51a8e 100644 --- a/src/brad/planner/triggers/recent_change.py +++ b/src/brad/planner/triggers/recent_change.py @@ -3,9 +3,11 @@ from datetime import timedelta, datetime from typing import Optional -from brad.config.planner import PlannerConfig from brad.blueprint import Blueprint from brad.blueprint.diff.blueprint import BlueprintDiff +from brad.config.planner import PlannerConfig +from brad.daemon.aurora_metrics import AuroraMetrics +from brad.daemon.redshift_metrics import RedshiftMetrics from brad.planner.scoring.score import Score from .trigger import Trigger @@ -23,6 +25,11 @@ def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> No self._planner_config = planner_config self._is_first_change = True self._last_provisioning_change: Optional[datetime] = None + # Metrics sources can be delayed; we do not want to replan before the + # metrics are available in the planning window. + self._metrics_delay = max( + AuroraMetrics.METRICS_DELAY, RedshiftMetrics.METRICS_DELAY + ) async def should_replan(self) -> bool: if self._last_provisioning_change is None: @@ -31,7 +38,7 @@ async def should_replan(self) -> bool: window = self._planner_config.planning_window() now = datetime.now(tz=pytz.utc) - if now >= self._last_provisioning_change + window: + if now > self._last_provisioning_change + window + self._metrics_delay: self._last_provisioning_change = None logger.info("Triggering replan because of a recent provisioning change.") return True