Skip to content

Commit

Permalink
Use exponentially weighted metrics during planning
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 15, 2023
1 parent eb94e94 commit 41381c1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
4 changes: 4 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ reinterpret_second_as: 1
# to be accepted.
query_dist_change_frac: 0.1

metrics_agg:
method: ewm # 'mean' is another option
alpha: 0.86466472 # 1 - 1 / e^2

###
### Trigger configs.
###
Expand Down
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,6 @@ def redshift_initialize_load_fraction(self) -> float:

def aurora_storage_index_multiplier(self) -> float:
return float(self._raw["aurora_storage_index_multiplier"])

def metrics_agg(self) -> Dict[str, Any]:
return self._raw["metrics_agg"]
43 changes: 25 additions & 18 deletions src/brad/planner/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
from datetime import datetime
from collections import namedtuple
from typing import Tuple, Optional
from typing import Tuple, Optional, Dict, Any

from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
Expand All @@ -31,6 +31,8 @@
],
)

AggCfg = Dict[str, Any]


class MetricsProvider:
"""
Expand Down Expand Up @@ -137,19 +139,19 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
epochs_per_planning_window,
)

agg_type = "mean"
agg_cfg = self._planner_config.metrics_agg()
redshift_cpu = self._aggregate_possibly_missing(
redshift.loc[redshift.index <= most_recent_common, _REDSHIFT_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="redshift_cpu",
)
txn_per_s = self._aggregate_possibly_missing(
front_end.loc[front_end.index <= most_recent_common, _FRONT_END_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_per_s",
)
txn_lat_s_p50 = self._aggregate_possibly_missing(
Expand All @@ -159,7 +161,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p50",
)
txn_lat_s_p90 = self._aggregate_possibly_missing(
Expand All @@ -169,7 +171,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p90",
)

Expand All @@ -180,36 +182,36 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
aurora_writer_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_cpu",
)
aurora_reader_cpu = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_cpu",
)

aurora_writer_load_minute = self._recover_load_value(
aurora_writer_rel, aggregate_epochs, agg_type, "aurora_writer_load_minute"
aurora_writer_rel, aggregate_epochs, agg_cfg, "aurora_writer_load_minute"
)
aurora_reader_load_minute = self._recover_load_value(
aurora_reader_rel, aggregate_epochs, agg_type, "aurora_reader_load_minute"
aurora_reader_rel, aggregate_epochs, agg_cfg, "aurora_reader_load_minute"
)

aurora_writer_hit_rate_pct = self._aggregate_possibly_missing(
aurora_writer_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_hit_rate_pct",
)
aurora_reader_hit_rate_pct = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_hit_rate_pct",
)

Expand All @@ -234,7 +236,7 @@ def _aggregate_possibly_missing(
series: pd.Series,
num_epochs: int,
default_value: int | float,
agg_type: str,
agg_cfg: AggCfg,
name: Optional[str] = None,
) -> int | float:
if name is not None and len(series) == 0:
Expand All @@ -246,17 +248,19 @@ def _aggregate_possibly_missing(
return default_value
else:
relevant = series.iloc[-num_epochs:]
if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
# TODO: Can add other types (e.g., exponentially weighted)
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
raise AssertionError()

def _recover_load_value(
self,
aurora_rel: pd.DataFrame,
num_epochs: int,
agg_type: str,
agg_cfg: AggCfg,
metric_name: str,
) -> float:
if len(aurora_rel) < 2:
Expand All @@ -265,7 +269,7 @@ def _recover_load_value(
aurora_rel[_AURORA_METRICS[1]],
num_epochs=num_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name=metric_name,
)

Expand Down Expand Up @@ -296,8 +300,11 @@ def _recover_load_value(
)
relevant = load_minute[-epochs_to_consider:]

if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
# TODO: Can add other types (e.g., exponentially weighted)
raise AssertionError()
Expand Down

0 comments on commit 41381c1

Please sign in to comment.