Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve recent change trigger, use exponentially weighted metrics #371

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ reinterpret_second_as: 1
# to be accepted.
query_dist_change_frac: 0.1

metrics_agg:
method: ewm # 'mean' is another option
alpha: 0.86466472 # 1 - 1 / e^2

###
### Trigger configs.
###
Expand Down
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,6 @@ def redshift_initialize_load_fraction(self) -> float:

def aurora_storage_index_multiplier(self) -> float:
return float(self._raw["aurora_storage_index_multiplier"])

def metrics_agg(self) -> Dict[str, Any]:
return self._raw["metrics_agg"]
43 changes: 25 additions & 18 deletions src/brad/planner/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
from datetime import datetime
from collections import namedtuple
from typing import Tuple, Optional
from typing import Tuple, Optional, Dict, Any

from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
Expand All @@ -31,6 +31,8 @@
],
)

AggCfg = Dict[str, Any]


class MetricsProvider:
"""
Expand Down Expand Up @@ -137,19 +139,19 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
epochs_per_planning_window,
)

agg_type = "mean"
agg_cfg = self._planner_config.metrics_agg()
redshift_cpu = self._aggregate_possibly_missing(
redshift.loc[redshift.index <= most_recent_common, _REDSHIFT_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="redshift_cpu",
)
txn_per_s = self._aggregate_possibly_missing(
front_end.loc[front_end.index <= most_recent_common, _FRONT_END_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_per_s",
)
txn_lat_s_p50 = self._aggregate_possibly_missing(
Expand All @@ -159,7 +161,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p50",
)
txn_lat_s_p90 = self._aggregate_possibly_missing(
Expand All @@ -169,7 +171,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="txn_lat_s_p90",
)

Expand All @@ -180,36 +182,36 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
aurora_writer_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_cpu",
)
aurora_reader_cpu = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[0]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_cpu",
)

aurora_writer_load_minute = self._recover_load_value(
aurora_writer_rel, aggregate_epochs, agg_type, "aurora_writer_load_minute"
aurora_writer_rel, aggregate_epochs, agg_cfg, "aurora_writer_load_minute"
)
aurora_reader_load_minute = self._recover_load_value(
aurora_reader_rel, aggregate_epochs, agg_type, "aurora_reader_load_minute"
aurora_reader_rel, aggregate_epochs, agg_cfg, "aurora_reader_load_minute"
)

aurora_writer_hit_rate_pct = self._aggregate_possibly_missing(
aurora_writer_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_writer_hit_rate_pct",
)
aurora_reader_hit_rate_pct = self._aggregate_possibly_missing(
aurora_reader_rel[_AURORA_METRICS[2]],
num_epochs=aggregate_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name="aurora_reader_hit_rate_pct",
)

Expand All @@ -234,7 +236,7 @@ def _aggregate_possibly_missing(
series: pd.Series,
num_epochs: int,
default_value: int | float,
agg_type: str,
agg_cfg: AggCfg,
name: Optional[str] = None,
) -> int | float:
if name is not None and len(series) == 0:
Expand All @@ -246,17 +248,19 @@ def _aggregate_possibly_missing(
return default_value
else:
relevant = series.iloc[-num_epochs:]
if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
# TODO: Can add other types (e.g., exponentially weighted)
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
raise AssertionError()

def _recover_load_value(
self,
aurora_rel: pd.DataFrame,
num_epochs: int,
agg_type: str,
agg_cfg: AggCfg,
metric_name: str,
) -> float:
if len(aurora_rel) < 2:
Expand All @@ -265,7 +269,7 @@ def _recover_load_value(
aurora_rel[_AURORA_METRICS[1]],
num_epochs=num_epochs,
default_value=0.0,
agg_type=agg_type,
agg_cfg=agg_cfg,
name=metric_name,
)

Expand Down Expand Up @@ -296,8 +300,11 @@ def _recover_load_value(
)
relevant = load_minute[-epochs_to_consider:]

if agg_type == "mean":
if agg_cfg["method"] == "mean":
return relevant.mean()
elif agg_cfg["method"] == "ewm":
alpha = agg_cfg["alpha"]
return relevant.ewm(alpha=alpha).mean().iloc[-1]
else:
# TODO: Can add other types (e.g., exponentially weighted)
raise AssertionError()
Expand Down
11 changes: 9 additions & 2 deletions src/brad/planner/triggers/recent_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from datetime import timedelta, datetime
from typing import Optional

from brad.config.planner import PlannerConfig
from brad.blueprint import Blueprint
from brad.blueprint.diff.blueprint import BlueprintDiff
from brad.config.planner import PlannerConfig
from brad.daemon.aurora_metrics import AuroraMetrics
from brad.daemon.redshift_metrics import RedshiftMetrics
from brad.planner.scoring.score import Score

from .trigger import Trigger
Expand All @@ -23,6 +25,11 @@ def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> No
self._planner_config = planner_config
self._is_first_change = True
self._last_provisioning_change: Optional[datetime] = None
# Metrics sources can be delayed; we do not want to replan before the
# metrics are available in the planning window.
self._metrics_delay = max(
AuroraMetrics.METRICS_DELAY, RedshiftMetrics.METRICS_DELAY
)

async def should_replan(self) -> bool:
if self._last_provisioning_change is None:
Expand All @@ -31,7 +38,7 @@ async def should_replan(self) -> bool:
window = self._planner_config.planning_window()
now = datetime.now(tz=pytz.utc)

if now >= self._last_provisioning_change + window:
if now > self._last_provisioning_change + window + self._metrics_delay:
self._last_provisioning_change = None
logger.info("Triggering replan because of a recent provisioning change.")
return True
Expand Down
Loading