diff --git a/src/brad/planner/beam/triggers.py b/src/brad/planner/beam/triggers.py index 90de0cb1..078b8c0c 100644 --- a/src/brad/planner/beam/triggers.py +++ b/src/brad/planner/beam/triggers.py @@ -31,16 +31,27 @@ def get_beam_triggers( et_config = trigger_config["elapsed_time"] if "disabled" not in et_config: trigger_list.append( - ElapsedTimeTrigger(planning_window * et_config["multiplier"]) + ElapsedTimeTrigger( + planning_window * et_config["multiplier"], + epoch_length=config.epoch_length, + ) ) aurora_cpu = trigger_config["aurora_cpu"] if "disabled" not in aurora_cpu: - trigger_list.append(AuroraCpuUtilization(monitor, **aurora_cpu)) + trigger_list.append( + AuroraCpuUtilization( + monitor, epoch_length=config.epoch_length, **aurora_cpu + ) + ) redshift_cpu = trigger_config["redshift_cpu"] if "disabled" not in redshift_cpu: - trigger_list.append(RedshiftCpuUtilization(monitor, **redshift_cpu)) + trigger_list.append( + RedshiftCpuUtilization( + monitor, epoch_length=config.epoch_length, **redshift_cpu + ) + ) var_costs = trigger_config["variable_costs"] if "disabled" not in var_costs: @@ -52,6 +63,7 @@ def get_beam_triggers( data_access_provider, router_provider, var_costs["threshold"], + config.epoch_length, ) ) @@ -62,6 +74,7 @@ def get_beam_triggers( monitor, latency_ceiling["ceiling_s"], latency_ceiling["sustained_epochs"], + config.epoch_length, ) ) @@ -72,6 +85,7 @@ def get_beam_triggers( monitor, latency_ceiling["ceiling_s"], latency_ceiling["sustained_epochs"], + config.epoch_length, ) ) diff --git a/src/brad/planner/scoring/performance/unified_aurora.py b/src/brad/planner/scoring/performance/unified_aurora.py index 586ce516..18315e89 100644 --- a/src/brad/planner/scoring/performance/unified_aurora.py +++ b/src/brad/planner/scoring/performance/unified_aurora.py @@ -58,6 +58,9 @@ def compute( current_aurora_has_replicas = curr_prov.num_nodes() > 1 next_aurora_has_replicas = next_prov.num_nodes() > 1 + no_analytics_queries_executed = ( + len(ctx.current_query_locations[Engine.Aurora]) == 0 + ) overall_writer_load = ctx.metrics.aurora_writer_load_minute_avg overall_writer_cpu_util_pct = ctx.metrics.aurora_writer_cpu_avg @@ -67,9 +70,10 @@ def compute( ) # 1. Compute the transaction portion of load. - if current_aurora_has_replicas: - # We schedule all analytics on the read replica(s). So the metric - # values on the writer are due to the transactional workload. + if current_aurora_has_replicas or no_analytics_queries_executed: + # We schedule all analytics on the read replica(s) or no queries + # were routed to Aurora. So the metric values on the writer are due + # to the transactional workload. pred_txn_load = overall_writer_load pred_txn_cpu_denorm = overall_writer_cpu_util_denorm else: @@ -146,6 +150,12 @@ def compute( * curr_num_read_replicas ) + elif no_analytics_queries_executed: + # This is a special case: no queries executed on Aurora and there + # was no read replica. + total_analytics_load = 0.0 + total_analytics_cpu_denorm = 0.0 + else: # Analytics load should never be zero - so we impute a small value # to work around mispredictions. @@ -225,6 +235,14 @@ def query_latency_load_resources( overall_load: float, ctx: "ScoringContext", ) -> npt.NDArray: + # Special case: + if ( + overall_load == 0.0 + and aurora_num_cpus(to_prov) == _AURORA_BASE_RESOURCE_VALUE + ): + # No changes. + return base_predicted_latency + # This method is used to compute the predicted query latencies. resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov) basis = np.array( diff --git a/src/brad/planner/triggers/aurora_cpu_utilization.py b/src/brad/planner/triggers/aurora_cpu_utilization.py index d91b1820..1b5d36eb 100644 --- a/src/brad/planner/triggers/aurora_cpu_utilization.py +++ b/src/brad/planner/triggers/aurora_cpu_utilization.py @@ -1,4 +1,5 @@ import logging +from datetime import timedelta from typing import Optional from .metrics_thresholds import MetricsThresholds @@ -14,12 +15,14 @@ def __init__( monitor: Monitor, lo: float, hi: float, + epoch_length: timedelta, sustained_epochs: int = 1, lookahead_epochs: Optional[int] = None, ) -> None: - super().__init__() + super().__init__(epoch_length) self._monitor = monitor self._impl = MetricsThresholds(lo, hi, sustained_epochs) + self._epoch_length = epoch_length self._sustained_epochs = sustained_epochs self._lookahead_epochs = lookahead_epochs @@ -39,8 +42,9 @@ async def should_replan(self) -> bool: past = self._monitor.aurora_writer_metrics().read_k_most_recent( k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC] ) + relevant = past[past.index > self._cutoff] if self._impl.exceeds_thresholds( - past[_UTILIZATION_METRIC], "Aurora writer CPU utilization" + relevant[_UTILIZATION_METRIC], "Aurora writer CPU utilization" ): return True @@ -48,14 +52,20 @@ async def should_replan(self) -> bool: past = reader_metrics.read_k_most_recent( k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC] ) + relevant = past[past.index > self._cutoff] if self._impl.exceeds_thresholds( - past[_UTILIZATION_METRIC], f"Aurora reader {idx} CPU utilization" + relevant[_UTILIZATION_METRIC], f"Aurora reader {idx} CPU utilization" ): return True if self._lookahead_epochs is None: return False + if not self._passed_n_epochs_since_cutoff(self._sustained_epochs): + # We do not trigger based on a forecast if `sustained_epochs` has + # not passed since the last cutoff. + return False + future = self._monitor.aurora_writer_metrics().read_k_upcoming( k=self._lookahead_epochs, metric_ids=[_UTILIZATION_METRIC] ) diff --git a/src/brad/planner/triggers/elapsed_time.py b/src/brad/planner/triggers/elapsed_time.py index 7f9a4d95..aff2a6f5 100644 --- a/src/brad/planner/triggers/elapsed_time.py +++ b/src/brad/planner/triggers/elapsed_time.py @@ -1,5 +1,9 @@ import logging from datetime import timedelta, datetime +from typing import Optional + +from brad.blueprint import Blueprint +from brad.planner.scoring.score import Score from .trigger import Trigger @@ -7,8 +11,8 @@ class ElapsedTimeTrigger(Trigger): - def __init__(self, period: timedelta) -> None: - super().__init__() + def __init__(self, period: timedelta, epoch_length: timedelta) -> None: + super().__init__(epoch_length) self._period = period self._reset_trigger_next() @@ -22,5 +26,9 @@ async def should_replan(self) -> bool: return True return False + def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None: + super().update_blueprint(blueprint, score) + self._reset_trigger_next() + def _reset_trigger_next(self) -> None: - self._trigger_next = datetime.now() + self._period + self._trigger_next = self._cutoff + self._period diff --git a/src/brad/planner/triggers/metrics_thresholds.py b/src/brad/planner/triggers/metrics_thresholds.py index 83595694..cc976f37 100644 --- a/src/brad/planner/triggers/metrics_thresholds.py +++ b/src/brad/planner/triggers/metrics_thresholds.py @@ -19,6 +19,7 @@ def exceeds_thresholds(self, metric_values: pd.Series, log_desc: str) -> bool: rel = metric_values[-self._sustained_epochs :] if len(rel) < self._sustained_epochs: # Not enough data. + logger.debug("Not enough data for trigger based on %s.", log_desc) return False if (rel < self._lo).all(): diff --git a/src/brad/planner/triggers/query_latency_ceiling.py b/src/brad/planner/triggers/query_latency_ceiling.py index 9121a19c..c5b6ac2a 100644 --- a/src/brad/planner/triggers/query_latency_ceiling.py +++ b/src/brad/planner/triggers/query_latency_ceiling.py @@ -1,5 +1,6 @@ import logging from typing import Optional +from datetime import timedelta from brad.config.metrics import FrontEndMetric from brad.daemon.monitor import Monitor @@ -14,9 +15,10 @@ def __init__( monitor: Monitor, latency_ceiling_s: float, sustained_epochs: int, + epoch_length: timedelta, lookahead_epochs: Optional[int] = None, ) -> None: - super().__init__() + super().__init__(epoch_length) self._monitor = monitor self._latency_ceiling_s = latency_ceiling_s self._sustained_epochs = sustained_epochs @@ -27,7 +29,8 @@ async def should_replan(self) -> bool: k=self._sustained_epochs, metric_ids=[FrontEndMetric.QueryLatencySecondP90.value], ) - rel = past[FrontEndMetric.QueryLatencySecondP90.value] + rel_past = past[past.index > self._cutoff] + rel = rel_past[FrontEndMetric.QueryLatencySecondP90.value] if len(rel) >= self._sustained_epochs and (rel > self._latency_ceiling_s).all(): p90_lat_s = rel.iloc[-1] logger.info( @@ -40,6 +43,9 @@ async def should_replan(self) -> bool: if self._lookahead_epochs is None: return False + if not self._passed_n_epochs_since_cutoff(self._sustained_epochs): + return False + future = self._monitor.front_end_metrics().read_k_upcoming( k=self._lookahead_epochs, metric_ids=[FrontEndMetric.QueryLatencySecondP90.value], diff --git a/src/brad/planner/triggers/redshift_cpu_utilization.py b/src/brad/planner/triggers/redshift_cpu_utilization.py index 6bbe4d33..7982d43c 100644 --- a/src/brad/planner/triggers/redshift_cpu_utilization.py +++ b/src/brad/planner/triggers/redshift_cpu_utilization.py @@ -1,4 +1,5 @@ import logging +from datetime import timedelta from typing import Optional from .metrics_thresholds import MetricsThresholds @@ -14,10 +15,11 @@ def __init__( monitor: Monitor, lo: float, hi: float, + epoch_length: timedelta, sustained_epochs: int = 1, lookahead_epochs: Optional[int] = None, ) -> None: - super().__init__() + super().__init__(epoch_length) self._monitor = monitor self._impl = MetricsThresholds(lo, hi, sustained_epochs) self._sustained_epochs = sustained_epochs @@ -39,14 +41,20 @@ async def should_replan(self) -> bool: past = self._monitor.redshift_metrics().read_k_most_recent( k=self._sustained_epochs, metric_ids=[_UTILIZATION_METRIC] ) + relevant = past[past.index > self._cutoff] if self._impl.exceeds_thresholds( - past[_UTILIZATION_METRIC], "Redshift CPU utilization" + relevant[_UTILIZATION_METRIC], "Redshift CPU utilization" ): return True if self._lookahead_epochs is None: return False + if not self._passed_n_epochs_since_cutoff(self._sustained_epochs): + # We do not trigger based on a forecast if `sustained_epochs` has + # not passed since the last cutoff. + return False + future = self._monitor.redshift_metrics().read_k_upcoming( k=self._lookahead_epochs, metric_ids=[_UTILIZATION_METRIC] ) diff --git a/src/brad/planner/triggers/trigger.py b/src/brad/planner/triggers/trigger.py index dd17023e..8b5558b0 100644 --- a/src/brad/planner/triggers/trigger.py +++ b/src/brad/planner/triggers/trigger.py @@ -1,12 +1,16 @@ +import pytz from typing import Optional +from datetime import datetime, timedelta from brad.blueprint import Blueprint from brad.planner.scoring.score import Score class Trigger: - def __init__(self) -> None: + def __init__(self, epoch_length: timedelta) -> None: self._current_blueprint: Optional[Blueprint] = None self._current_score: Optional[Score] = None + self._epoch_length = epoch_length + self._reset_cutoff() async def should_replan(self) -> bool: """ @@ -18,9 +22,19 @@ async def should_replan(self) -> bool: def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None: self._current_blueprint = blueprint self._current_score = score + # Used by triggers that want to avoid firing immediately after a + # blueprint transition. + self._reset_cutoff() def name(self) -> str: """ The name of the trigger. """ return self.__class__.__name__ + + def _reset_cutoff(self) -> None: + self._cutoff = datetime.now(tz=pytz.utc) + + def _passed_n_epochs_since_cutoff(self, n: int) -> bool: + elapsed = datetime.now(tz=pytz.utc) - self._cutoff + return elapsed >= n * self._epoch_length diff --git a/src/brad/planner/triggers/txn_latency_ceiling.py b/src/brad/planner/triggers/txn_latency_ceiling.py index 2fab7c3f..766aafbc 100644 --- a/src/brad/planner/triggers/txn_latency_ceiling.py +++ b/src/brad/planner/triggers/txn_latency_ceiling.py @@ -1,4 +1,5 @@ import logging +from datetime import timedelta from typing import Optional from brad.config.metrics import FrontEndMetric @@ -14,9 +15,10 @@ def __init__( monitor: Monitor, latency_ceiling_s: float, sustained_epochs: int, + epoch_length: timedelta, lookahead_epochs: Optional[int] = None, ) -> None: - super().__init__() + super().__init__(epoch_length) self._monitor = monitor self._latency_ceiling_s = latency_ceiling_s self._sustained_epochs = sustained_epochs @@ -27,7 +29,8 @@ async def should_replan(self) -> bool: k=self._sustained_epochs, metric_ids=[FrontEndMetric.TxnLatencySecondP90.value], ) - rel = past[FrontEndMetric.TxnLatencySecondP90.value] + rel_past = past[past.index > self._cutoff] + rel = rel_past[FrontEndMetric.TxnLatencySecondP90.value] if len(rel) >= self._sustained_epochs and (rel > self._latency_ceiling_s).all(): p90_lat_s = rel.iloc[-1] logger.info( @@ -40,6 +43,9 @@ async def should_replan(self) -> bool: if self._lookahead_epochs is None: return False + if not self._passed_n_epochs_since_cutoff(self._sustained_epochs): + return False + future = self._monitor.front_end_metrics().read_k_upcoming( k=self._lookahead_epochs, metric_ids=[FrontEndMetric.TxnLatencySecondP90.value], diff --git a/src/brad/planner/triggers/variable_costs.py b/src/brad/planner/triggers/variable_costs.py index 7b1a3faa..9ac518be 100644 --- a/src/brad/planner/triggers/variable_costs.py +++ b/src/brad/planner/triggers/variable_costs.py @@ -33,6 +33,7 @@ def __init__( data_access_provider: DataAccessProvider, router_provider: RouterProvider, threshold_frac: float, + epoch_length: timedelta, ) -> None: """ This will trigger a replan if the current variable costs (currently, @@ -42,7 +43,7 @@ def __init__( For example, if `threshold_frac` is 0.2, then replanning is triggered if the estimated cost is +/- 20% of the previously estimated cost. """ - super().__init__() + super().__init__(epoch_length) self._config = config self._planner_config = planner_config self._monitor = monitor diff --git a/tests/test_workload.py b/tests/test_workload.py deleted file mode 100644 index 6fb58543..00000000 --- a/tests/test_workload.py +++ /dev/null @@ -1,11 +0,0 @@ -from brad.planner.workload.legacy_utils import workload_from_s3_logs -from brad.config.file import ConfigFile - - -def test_read_from_s3(): - config = ConfigFile.load("./config/config.yml") - - workload = workload_from_s3_logs(config, 3) - - print(workload.analytical_queries()) - print(workload.transactional_queries()) diff --git a/tools/run-tests.sh b/tools/run-tests.sh index a0163a09..faefe814 100755 --- a/tools/run-tests.sh +++ b/tools/run-tests.sh @@ -1,5 +1,4 @@ #! /bin/bash set -e -pytest \ - --ignore=tests/test_workload.py +pytest diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index 7663fec9..70a1475c 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -98,7 +98,7 @@ def noop(_signal, _frame): encoding="UTF-8", ) as file: print( - "timestamp,time_since_execution,time_of_day,query_idx,run_time,engine", + "timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine", file=file, flush=True, )