Skip to content

Commit

Permalink
Prevent triggers from firing immediately after a replan (#342)
Browse files Browse the repository at this point in the history
* Prevent triggers from firing immediately after a replan

* Remove unneeded test

* Add units to measurements

* Handle no queries executed edge case

* Add progress logging
  • Loading branch information
geoffxy authored Nov 1, 2023
1 parent 5ce98a0 commit e0fc10e
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 34 deletions.
20 changes: 17 additions & 3 deletions src/brad/planner/beam/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,27 @@ def get_beam_triggers(
et_config = trigger_config["elapsed_time"]
if "disabled" not in et_config:
trigger_list.append(
ElapsedTimeTrigger(planning_window * et_config["multiplier"])
ElapsedTimeTrigger(
planning_window * et_config["multiplier"],
epoch_length=config.epoch_length,
)
)

aurora_cpu = trigger_config["aurora_cpu"]
if "disabled" not in aurora_cpu:
trigger_list.append(AuroraCpuUtilization(monitor, **aurora_cpu))
trigger_list.append(
AuroraCpuUtilization(
monitor, epoch_length=config.epoch_length, **aurora_cpu
)
)

redshift_cpu = trigger_config["redshift_cpu"]
if "disabled" not in redshift_cpu:
trigger_list.append(RedshiftCpuUtilization(monitor, **redshift_cpu))
trigger_list.append(
RedshiftCpuUtilization(
monitor, epoch_length=config.epoch_length, **redshift_cpu
)
)

var_costs = trigger_config["variable_costs"]
if "disabled" not in var_costs:
Expand All @@ -52,6 +63,7 @@ def get_beam_triggers(
data_access_provider,
router_provider,
var_costs["threshold"],
config.epoch_length,
)
)

Expand All @@ -62,6 +74,7 @@ def get_beam_triggers(
monitor,
latency_ceiling["ceiling_s"],
latency_ceiling["sustained_epochs"],
config.epoch_length,
)
)

Expand All @@ -72,6 +85,7 @@ def get_beam_triggers(
monitor,
latency_ceiling["ceiling_s"],
latency_ceiling["sustained_epochs"],
config.epoch_length,
)
)

Expand Down
24 changes: 21 additions & 3 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def compute(

current_aurora_has_replicas = curr_prov.num_nodes() > 1
next_aurora_has_replicas = next_prov.num_nodes() > 1
no_analytics_queries_executed = (
len(ctx.current_query_locations[Engine.Aurora]) == 0
)

overall_writer_load = ctx.metrics.aurora_writer_load_minute_avg
overall_writer_cpu_util_pct = ctx.metrics.aurora_writer_cpu_avg
Expand All @@ -67,9 +70,10 @@ def compute(
)

# 1. Compute the transaction portion of load.
if current_aurora_has_replicas:
# We schedule all analytics on the read replica(s). So the metric
# values on the writer are due to the transactional workload.
if current_aurora_has_replicas or no_analytics_queries_executed:
# We schedule all analytics on the read replica(s) or no queries
# were routed to Aurora. So the metric values on the writer are due
# to the transactional workload.
pred_txn_load = overall_writer_load
pred_txn_cpu_denorm = overall_writer_cpu_util_denorm
else:
Expand Down Expand Up @@ -146,6 +150,12 @@ def compute(
* curr_num_read_replicas
)

elif no_analytics_queries_executed:
# This is a special case: no queries executed on Aurora and there
# was no read replica.
total_analytics_load = 0.0
total_analytics_cpu_denorm = 0.0

else:
# Analytics load should never be zero - so we impute a small value
# to work around mispredictions.
Expand Down Expand Up @@ -225,6 +235,14 @@ def query_latency_load_resources(
overall_load: float,
ctx: "ScoringContext",
) -> npt.NDArray:
# Special case:
if (
overall_load == 0.0
and aurora_num_cpus(to_prov) == _AURORA_BASE_RESOURCE_VALUE
):
# No changes.
return base_predicted_latency

# This method is used to compute the predicted query latencies.
resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array(
Expand Down
16 changes: 13 additions & 3 deletions src/brad/planner/triggers/aurora_cpu_utilization.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import timedelta
from typing import Optional

from .metrics_thresholds import MetricsThresholds
Expand All @@ -14,12 +15,14 @@ def __init__(
monitor: Monitor,
lo: float,
hi: float,
epoch_length: timedelta,
sustained_epochs: int = 1,
lookahead_epochs: Optional[int] = None,
) -> None:
super().__init__()
super().__init__(epoch_length)
self._monitor = monitor
self._impl = MetricsThresholds(lo, hi, sustained_epochs)
self._epoch_length = epoch_length
self._sustained_epochs = sustained_epochs
self._lookahead_epochs = lookahead_epochs

Expand All @@ -39,23 +42,30 @@ async def should_replan(self) -> bool:
past = self._monitor.aurora_writer_metrics().read_k_most_recent(
k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC]
)
relevant = past[past.index > self._cutoff]
if self._impl.exceeds_thresholds(
past[_UTILIZATION_METRIC], "Aurora writer CPU utilization"
relevant[_UTILIZATION_METRIC], "Aurora writer CPU utilization"
):
return True

for idx, reader_metrics in enumerate(self._monitor.aurora_reader_metrics()):
past = reader_metrics.read_k_most_recent(
k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC]
)
relevant = past[past.index > self._cutoff]
if self._impl.exceeds_thresholds(
past[_UTILIZATION_METRIC], f"Aurora reader {idx} CPU utilization"
relevant[_UTILIZATION_METRIC], f"Aurora reader {idx} CPU utilization"
):
return True

if self._lookahead_epochs is None:
return False

if not self._passed_n_epochs_since_cutoff(self._sustained_epochs):
# We do not trigger based on a forecast if `sustained_epochs` has
# not passed since the last cutoff.
return False

future = self._monitor.aurora_writer_metrics().read_k_upcoming(
k=self._lookahead_epochs, metric_ids=[_UTILIZATION_METRIC]
)
Expand Down
14 changes: 11 additions & 3 deletions src/brad/planner/triggers/elapsed_time.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import logging
from datetime import timedelta, datetime
from typing import Optional

from brad.blueprint import Blueprint
from brad.planner.scoring.score import Score

from .trigger import Trigger

logger = logging.getLogger(__name__)


class ElapsedTimeTrigger(Trigger):
def __init__(self, period: timedelta) -> None:
super().__init__()
def __init__(self, period: timedelta, epoch_length: timedelta) -> None:
super().__init__(epoch_length)
self._period = period
self._reset_trigger_next()

Expand All @@ -22,5 +26,9 @@ async def should_replan(self) -> bool:
return True
return False

def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None:
super().update_blueprint(blueprint, score)
self._reset_trigger_next()

def _reset_trigger_next(self) -> None:
self._trigger_next = datetime.now() + self._period
self._trigger_next = self._cutoff + self._period
1 change: 1 addition & 0 deletions src/brad/planner/triggers/metrics_thresholds.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def exceeds_thresholds(self, metric_values: pd.Series, log_desc: str) -> bool:
rel = metric_values[-self._sustained_epochs :]
if len(rel) < self._sustained_epochs:
# Not enough data.
logger.debug("Not enough data for trigger based on %s.", log_desc)
return False

if (rel < self._lo).all():
Expand Down
10 changes: 8 additions & 2 deletions src/brad/planner/triggers/query_latency_ceiling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Optional
from datetime import timedelta

from brad.config.metrics import FrontEndMetric
from brad.daemon.monitor import Monitor
Expand All @@ -14,9 +15,10 @@ def __init__(
monitor: Monitor,
latency_ceiling_s: float,
sustained_epochs: int,
epoch_length: timedelta,
lookahead_epochs: Optional[int] = None,
) -> None:
super().__init__()
super().__init__(epoch_length)
self._monitor = monitor
self._latency_ceiling_s = latency_ceiling_s
self._sustained_epochs = sustained_epochs
Expand All @@ -27,7 +29,8 @@ async def should_replan(self) -> bool:
k=self._sustained_epochs,
metric_ids=[FrontEndMetric.QueryLatencySecondP90.value],
)
rel = past[FrontEndMetric.QueryLatencySecondP90.value]
rel_past = past[past.index > self._cutoff]
rel = rel_past[FrontEndMetric.QueryLatencySecondP90.value]
if len(rel) >= self._sustained_epochs and (rel > self._latency_ceiling_s).all():
p90_lat_s = rel.iloc[-1]
logger.info(
Expand All @@ -40,6 +43,9 @@ async def should_replan(self) -> bool:
if self._lookahead_epochs is None:
return False

if not self._passed_n_epochs_since_cutoff(self._sustained_epochs):
return False

future = self._monitor.front_end_metrics().read_k_upcoming(
k=self._lookahead_epochs,
metric_ids=[FrontEndMetric.QueryLatencySecondP90.value],
Expand Down
12 changes: 10 additions & 2 deletions src/brad/planner/triggers/redshift_cpu_utilization.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import timedelta
from typing import Optional

from .metrics_thresholds import MetricsThresholds
Expand All @@ -14,10 +15,11 @@ def __init__(
monitor: Monitor,
lo: float,
hi: float,
epoch_length: timedelta,
sustained_epochs: int = 1,
lookahead_epochs: Optional[int] = None,
) -> None:
super().__init__()
super().__init__(epoch_length)
self._monitor = monitor
self._impl = MetricsThresholds(lo, hi, sustained_epochs)
self._sustained_epochs = sustained_epochs
Expand All @@ -39,14 +41,20 @@ async def should_replan(self) -> bool:
past = self._monitor.redshift_metrics().read_k_most_recent(
k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC]
)
relevant = past[past.index > self._cutoff]
if self._impl.exceeds_thresholds(
past[_UTILIZATION_METRIC], "Redshift CPU utilization"
relevant[_UTILIZATION_METRIC], "Redshift CPU utilization"
):
return True

if self._lookahead_epochs is None:
return False

if not self._passed_n_epochs_since_cutoff(self._sustained_epochs):
# We do not trigger based on a forecast if `sustained_epochs` has
# not passed since the last cutoff.
return False

future = self._monitor.redshift_metrics().read_k_upcoming(
k=self._lookahead_epochs, metric_ids=[_UTILIZATION_METRIC]
)
Expand Down
16 changes: 15 additions & 1 deletion src/brad/planner/triggers/trigger.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import pytz
from typing import Optional
from datetime import datetime, timedelta
from brad.blueprint import Blueprint
from brad.planner.scoring.score import Score


class Trigger:
def __init__(self) -> None:
def __init__(self, epoch_length: timedelta) -> None:
self._current_blueprint: Optional[Blueprint] = None
self._current_score: Optional[Score] = None
self._epoch_length = epoch_length
self._reset_cutoff()

async def should_replan(self) -> bool:
"""
Expand All @@ -18,9 +22,19 @@ async def should_replan(self) -> bool:
def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None:
self._current_blueprint = blueprint
self._current_score = score
# Used by triggers that want to avoid firing immediately after a
# blueprint transition.
self._reset_cutoff()

def name(self) -> str:
"""
The name of the trigger.
"""
return self.__class__.__name__

def _reset_cutoff(self) -> None:
self._cutoff = datetime.now(tz=pytz.utc)

def _passed_n_epochs_since_cutoff(self, n: int) -> bool:
elapsed = datetime.now(tz=pytz.utc) - self._cutoff
return elapsed >= n * self._epoch_length
10 changes: 8 additions & 2 deletions src/brad/planner/triggers/txn_latency_ceiling.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import timedelta
from typing import Optional

from brad.config.metrics import FrontEndMetric
Expand All @@ -14,9 +15,10 @@ def __init__(
monitor: Monitor,
latency_ceiling_s: float,
sustained_epochs: int,
epoch_length: timedelta,
lookahead_epochs: Optional[int] = None,
) -> None:
super().__init__()
super().__init__(epoch_length)
self._monitor = monitor
self._latency_ceiling_s = latency_ceiling_s
self._sustained_epochs = sustained_epochs
Expand All @@ -27,7 +29,8 @@ async def should_replan(self) -> bool:
k=self._sustained_epochs,
metric_ids=[FrontEndMetric.TxnLatencySecondP90.value],
)
rel = past[FrontEndMetric.TxnLatencySecondP90.value]
rel_past = past[past.index > self._cutoff]
rel = rel_past[FrontEndMetric.TxnLatencySecondP90.value]
if len(rel) >= self._sustained_epochs and (rel > self._latency_ceiling_s).all():
p90_lat_s = rel.iloc[-1]
logger.info(
Expand All @@ -40,6 +43,9 @@ async def should_replan(self) -> bool:
if self._lookahead_epochs is None:
return False

if not self._passed_n_epochs_since_cutoff(self._sustained_epochs):
return False

future = self._monitor.front_end_metrics().read_k_upcoming(
k=self._lookahead_epochs,
metric_ids=[FrontEndMetric.TxnLatencySecondP90.value],
Expand Down
3 changes: 2 additions & 1 deletion src/brad/planner/triggers/variable_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
data_access_provider: DataAccessProvider,
router_provider: RouterProvider,
threshold_frac: float,
epoch_length: timedelta,
) -> None:
"""
This will trigger a replan if the current variable costs (currently,
Expand All @@ -42,7 +43,7 @@ def __init__(
For example, if `threshold_frac` is 0.2, then replanning is triggered if
the estimated cost is +/- 20% of the previously estimated cost.
"""
super().__init__()
super().__init__(epoch_length)
self._config = config
self._planner_config = planner_config
self._monitor = monitor
Expand Down
11 changes: 0 additions & 11 deletions tests/test_workload.py

This file was deleted.

3 changes: 1 addition & 2 deletions tools/run-tests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /bin/bash

set -e
pytest \
--ignore=tests/test_workload.py
pytest
Loading

0 comments on commit e0fc10e

Please sign in to comment.