Skip to content

Commit

Permalink
Improve recent change trigger, use exponentially weighted metrics (#371)
Browse files Browse the repository at this point in the history
* Prevent RecentChange from firing too soon after a replan, due to metrics delays

* Use exponentially weighted metrics during planning
  • Loading branch information
geoffxy authored Nov 15, 2023
1 parent a8c8e15 commit 6da97f9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
4 changes: 4 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ reinterpret_second_as: 1
# to be accepted.
query_dist_change_frac: 0.1

metrics_agg:
method: ewm # 'mean' is another option
alpha: 0.86466472 # 1 - 1 / e^2

###
### Trigger configs.
###
Expand Down
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,6 @@ def redshift_initialize_load_fraction(self) -> float:

def aurora_storage_index_multiplier(self) -> float:
return float(self._raw["aurora_storage_index_multiplier"])

def metrics_agg(self) -> Dict[str, Any]:
return self._raw["metrics_agg"]
43 changes: 25 additions & 18 deletions src/brad/planner/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
from datetime import datetime
from collections import namedtuple
from typing import Tuple, Optional
from typing import Tuple, Optional, Dict, Any

from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
Expand All @@ -31,6 +31,8 @@
],
)

AggCfg = Dict[str, Any]


class MetricsProvider:
"""
Expand Down Expand Up @@ -137,19 +139,19 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
epochs_per_planning_window,
)

agg_type = "mean"
agg_cfg = self._planner_config.metrics_agg()
redshift_cpu = self._aggregate_possibly_missing(
redshift.loc[redshift.index <= most_recent_common, _REDSHIFT_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="redshift_cpu",
)
txn_per_s = self._aggregate_possibly_missing(
front_end.loc[front_end.index <= most_recent_common, _FRONT_END_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_per_s",
)
txn_lat_s_p50 = self._aggregate_possibly_missing(
Expand All @@ -159,7 +161,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p50",
)
txn_lat_s_p90 = self._aggregate_possibly_missing(
Expand All @@ -169,7 +171,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p90",
)

Expand All @@ -180,36 +182,36 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
aurora_writer_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_cpu",
)
aurora_reader_cpu = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_cpu",
)

aurora_writer_load_minute = self._recover_load_value(
aurora_writer_rel, aggregate_epochs, agg_type, "aurora_writer_load_minute"
aurora_writer_rel, aggregate_epochs, agg_cfg, "aurora_writer_load_minute"
)
aurora_reader_load_minute = self._recover_load_value(
aurora_reader_rel, aggregate_epochs, agg_type, "aurora_reader_load_minute"
aurora_reader_rel, aggregate_epochs, agg_cfg, "aurora_reader_load_minute"
)

aurora_writer_hit_rate_pct = self._aggregate_possibly_missing(
aurora_writer_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_hit_rate_pct",
)
aurora_reader_hit_rate_pct = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_hit_rate_pct",
)

Expand All @@ -234,7 +236,7 @@ def _aggregate_possibly_missing(
series: pd.Series,
num_epochs: int,
default_value: int | float,
agg_type: str,
agg_cfg: AggCfg,
name: Optional[str] = None,
) -> int | float:
if name is not None and len(series) == 0:
Expand All @@ -246,17 +248,19 @@ def _aggregate_possibly_missing(
return default_value
else:
relevant = series.iloc[-num_epochs:]
if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
# TODO: Can add other types (e.g., exponentially weighted)
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
raise AssertionError()

def _recover_load_value(
self,
aurora_rel: pd.DataFrame,
num_epochs: int,
agg_type: str,
agg_cfg: AggCfg,
metric_name: str,
) -> float:
if len(aurora_rel) < 2:
Expand All @@ -265,7 +269,7 @@ def _recover_load_value(
aurora_rel[_AURORA_METRICS[1]],
num_epochs=num_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name=metric_name,
)

Expand Down Expand Up @@ -296,8 +300,11 @@ def _recover_load_value(
)
relevant = load_minute[-epochs_to_consider:]

if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
# TODO: Can add other types (e.g., exponentially weighted)
raise AssertionError()
Expand Down
11 changes: 9 additions & 2 deletions src/brad/planner/triggers/recent_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from datetime import timedelta, datetime
from typing import Optional

from brad.config.planner import PlannerConfig
from brad.blueprint import Blueprint
from brad.blueprint.diff.blueprint import BlueprintDiff
from brad.config.planner import PlannerConfig
from brad.daemon.aurora_metrics import AuroraMetrics
from brad.daemon.redshift_metrics import RedshiftMetrics
from brad.planner.scoring.score import Score

from .trigger import Trigger
Expand All @@ -23,6 +25,11 @@ def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> No
self._planner_config = planner_config
self._is_first_change = True
self._last_provisioning_change: Optional[datetime] = None
# Metrics sources can be delayed; we do not want to replan before the
# metrics are available in the planning window.
self._metrics_delay = max(
AuroraMetrics.METRICS_DELAY, RedshiftMetrics.METRICS_DELAY
)

async def should_replan(self) -> bool:
if self._last_provisioning_change is None:
Expand All @@ -31,7 +38,7 @@ async def should_replan(self) -> bool:
window = self._planner_config.planning_window()
now = datetime.now(tz=pytz.utc)

if now >= self._last_provisioning_change + window:
if now > self._last_provisioning_change + window + self._metrics_delay:
self._last_provisioning_change = None
logger.info("Triggering replan because of a recent provisioning change.")
return True
Expand Down

0 comments on commit 6da97f9

Please sign in to comment.