diff --git a/experiments/15-e2e-scenarios-v2/scale_up/scale_up_config.yml b/experiments/15-e2e-scenarios-v2/scale_up/scale_up_config.yml index 3069786a..6df4c718 100644 --- a/experiments/15-e2e-scenarios-v2/scale_up/scale_up_config.yml +++ b/experiments/15-e2e-scenarios-v2/scale_up/scale_up_config.yml @@ -89,7 +89,7 @@ redshift_initialize_load_fraction: 0.25 # BRAD will not reduce predicted load lower than these values. Raise these # values to be more conservative against mispredictions. aurora_min_load_removal_fraction: 0.8 -redshift_min_load_removal_fraction: 0.8 +redshift_min_load_removal_fraction: 0.9 # Blueprint planning performance ceilings. query_latency_p90_ceiling_s: 30.0 @@ -101,16 +101,16 @@ use_preset_redshift_clusters: true # Used for ordering blueprints during planning. comparator: - type: perf_ceiling # or `perf_ceiling` + type: benefit_perf_ceiling # or `perf_ceiling` benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator weeks: 0 - days: 0 - hours: 2 + days: 1 + hours: 0 minutes: 0 penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator - penalty_power: 8 # Only used by the `benefit_perf_ceiling` comparator + penalty_power: 2 # Only used by the `benefit_perf_ceiling` comparator # Used for precomputed predictions. std_datasets: @@ -119,6 +119,12 @@ std_datasets: - name: adhoc path: workloads/IMDB_100GB/adhoc_test/ +aurora_max_query_factor: 4.0 +aurora_max_query_factor_replace: 10000.0 + +redshift_peak_load_threshold: 95.0 +redshift_peak_load_multiplier: 2.0 + # Blueprint planning trigger configs. triggers: diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index 72ea22cb..a18a62a2 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -1,10 +1,11 @@ +import math import yaml import logging import numpy as np import numpy.typing as npt import importlib.resources as pkg_resources from datetime import timedelta -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, Tuple from brad.planner.strategy import PlanningStrategy import brad.planner as brad_planner @@ -311,3 +312,21 @@ def redshift_min_load_removal_fraction(self) -> float: except KeyError: logger.warning("Using default Redshift min load removal fraction: 0.75") return 0.75 + + def aurora_max_query_factor(self) -> Tuple[float, float]: + try: + return ( + self._raw["aurora_max_query_factor"], + self._raw["aurora_max_query_factor_replace"], + ) + except KeyError: + return math.inf, math.inf + + def redshift_peak_load_multiplier(self) -> Tuple[float, float]: + try: + return ( + self._raw["redshift_peak_load_threshold"], + self._raw["redshift_peak_load_multiplier"], + ) + except KeyError: + return 110.0, 1.0 diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index be20b433..4b189c2e 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -357,7 +357,8 @@ async def _run_replan_impl( "Selected blueprint details: %s", json.dumps(debug_values, indent=2) ) logger.info( - "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) + "Metrics used during planning: %s", + json.dumps(metrics._asdict(), indent=2, default=str), ) return best_blueprint, best_blueprint_score diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index cb603e54..3b13a8d6 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -370,7 +370,8 @@ async def _run_replan_impl( "Selected blueprint details: %s", json.dumps(debug_values, indent=2) ) logger.info( - "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) + "Metrics used during planning: %s", + json.dumps(metrics._asdict(), indent=2, default=str), ) return best_blueprint, best_blueprint_score diff --git a/src/brad/planner/metrics.py b/src/brad/planner/metrics.py index f9096542..cf13b91e 100644 --- a/src/brad/planner/metrics.py +++ b/src/brad/planner/metrics.py @@ -12,7 +12,10 @@ from brad.config.metrics import FrontEndMetric from brad.config.planner import PlannerConfig from brad.daemon.monitor import Monitor -from brad.daemon.redshift_metrics import relevant_redshift_node_dimensions +from brad.daemon.redshift_metrics import ( + relevant_redshift_node_dimensions, + MAX_REDSHIFT_NODES, +) from brad.utils.time_periods import elapsed_time, universal_now logger = logging.getLogger(__name__) @@ -677,6 +680,12 @@ def _extract_metrics_from_monitor( redshift = redshift_source.read_k_most_recent( k=epochs_to_extract, metric_ids=redshift_metric_ids ) + if blueprint.redshift_provisioning().num_nodes() > MAX_REDSHIFT_NODES: + logger.warning( + "Running with %d Redshift nodes. Only capturing metrics on %d.", + blueprint.redshift_provisioning().num_nodes(), + MAX_REDSHIFT_NODES, + ) else: redshift_metric_ids = _REDSHIFT_METRICS.copy() redshift = pd.DataFrame([], columns=_REDSHIFT_METRICS) diff --git a/src/brad/planner/scoring/data/redshift_instances.json b/src/brad/planner/scoring/data/redshift_instances.json index 20e6b12f..92be81b0 100644 --- a/src/brad/planner/scoring/data/redshift_instances.json +++ b/src/brad/planner/scoring/data/redshift_instances.json @@ -16,5 +16,32 @@ "min_nodes": 2, "max_nodes": 128, "usd_per_hour": 4.80 + }, + { + "instance_type": "ra3.xlplus", + "vcpus": 4, + "memory_mib": 32768, + "io_gb_s": 0.65, + "min_nodes": 1, + "max_nodes": 16, + "usd_per_hour": 1.086 + }, + { + "instance_type": "ra3.4xlarge", + "vcpus": 12, + "memory_mib": 98304, + "io_gb_s": 2, + "min_nodes": 2, + "max_nodes": 32, + "usd_per_hour": 3.26 + }, + { + "instance_type": "ra3.16xlarge", + "vcpus": 48, + "memory_mib": 393216, + "io_gb_s": 8, + "min_nodes": 2, + "max_nodes": 128, + "usd_per_hour": 13.04 } ] diff --git a/src/brad/planner/scoring/performance/unified_aurora.py b/src/brad/planner/scoring/performance/unified_aurora.py index 8d1be9e1..73bdb14c 100644 --- a/src/brad/planner/scoring/performance/unified_aurora.py +++ b/src/brad/planner/scoring/performance/unified_aurora.py @@ -42,6 +42,9 @@ def compute( query_factor = cls.query_movement_factor( base_query_run_times, query_arrival_counts, ctx ) + max_factor, max_factor_replace = ctx.planner_config.aurora_max_query_factor() + if query_factor is not None and query_factor > max_factor: + query_factor = max_factor_replace has_queries = base_query_run_times.shape[0] > 0 txn_cpu_denorm, ana_node_cpu_denorm = cls.predict_loads( has_queries, curr_prov, next_prov, query_factor, ctx, debug_dict @@ -285,7 +288,7 @@ def predict_query_latency_load_resources( ) # Predicted running time is the query's execution time alone plus the # expected wait time (due to system load) - return mean_service_time + wait_time + return prov_predicted_latency + wait_time @staticmethod def predict_query_latency_resources( diff --git a/src/brad/planner/scoring/performance/unified_redshift.py b/src/brad/planner/scoring/performance/unified_redshift.py index 1f0939f8..9b1a2a31 100644 --- a/src/brad/planner/scoring/performance/unified_redshift.py +++ b/src/brad/planner/scoring/performance/unified_redshift.py @@ -1,4 +1,5 @@ import logging +import math import numpy as np import numpy.typing as npt from typing import Dict, TYPE_CHECKING, Optional @@ -18,12 +19,12 @@ class RedshiftProvisioningScore: def __init__( self, scaled_run_times: npt.NDArray, - overall_cpu_denorm: float, + max_node_cpu_util: float, debug_values: Dict[str, int | float], ) -> None: self.scaled_run_times = scaled_run_times self.debug_values = debug_values - self.overall_cpu_denorm = overall_cpu_denorm + self.max_node_cpu_util = max_node_cpu_util @classmethod def compute( @@ -41,35 +42,40 @@ def compute( query_factor = cls.query_movement_factor( base_query_run_times, query_arrival_counts, ctx ) - predicted_cpu_denorm = cls.predict_cpu_denorm( + predicted_max_node_cpu_util = cls.predict_max_node_cpu_util( curr_prov, next_prov, query_factor, ctx ) # Special case (turning off Redshift). - if predicted_cpu_denorm == 0.0: + if predicted_max_node_cpu_util == 0.0: return cls( base_query_run_times, 0.0, { "redshift_query_factor": 0.0, + "redshift_skew_adjustment": np.nan, }, ) else: scaled_rt = cls.predict_query_latency_load_resources( - base_query_run_times, next_prov, predicted_cpu_denorm, ctx + base_query_run_times, next_prov, predicted_max_node_cpu_util, ctx + ) + debug_skew_adjustment = cls.compute_skew_adjustment( + ctx.metrics.redshift_cpu_list / 100.0 ) return cls( scaled_rt, - predicted_cpu_denorm, + predicted_max_node_cpu_util, { "redshift_query_factor": query_factor if query_factor is not None - else 0.0, + else np.nan, + "redshift_skew_adjustment": debug_skew_adjustment, }, ) @classmethod - def predict_cpu_denorm( + def predict_max_node_cpu_util( cls, curr_prov: Provisioning, next_prov: Provisioning, @@ -77,8 +83,8 @@ def predict_cpu_denorm( ctx: "ScoringContext", ) -> float: """ - Returns the predicted overall denormalized CPU utilization across the - entire cluster for `next_prov`. + Returns the predicted maximum node CPU utilization across the entire + cluster for `next_prov`. """ # 4 cases # Redshift off -> Redshift off (no-op) @@ -95,19 +101,19 @@ def predict_cpu_denorm( if not curr_on and next_on: # Turning on Redshift. - # We cannot reweigh the queries because nothing in the current + # We cannot reweigh the load because nothing in the current # workload ran on Redshift. We prime the load with a fraction of the # proposed cluster's peak load. - return ( - redshift_num_cpus(next_prov) - * next_prov.num_nodes() - * ctx.planner_config.redshift_initialize_load_fraction() - ) + max_peak_util = ctx.planner_config.redshift_initialize_load_fraction() + return max_peak_util else: # Redshift is staying on, but there is a potential provisioning # change. assert curr_on and next_on + curr_nodes = curr_prov.num_nodes() + next_nodes = next_prov.num_nodes() + # Special case. Redshift was on, but nothing ran on it and now we # want to run queries on it. We use the same load priming approach # but on the current cluster. @@ -115,17 +121,47 @@ def predict_cpu_denorm( query_factor is None and len(ctx.current_query_locations[Engine.Redshift]) > 0 ): - return ( - redshift_num_cpus(curr_prov) - * curr_prov.num_nodes() - * ctx.planner_config.redshift_initialize_load_fraction() - ) + max_peak_util = ctx.planner_config.redshift_initialize_load_fraction() + return max_peak_util - curr_cpu_util_denorm = ( - (ctx.metrics.redshift_cpu_avg / 100.0) - * redshift_num_cpus(curr_prov) - * curr_prov.num_nodes() - ) + curr_cpu_util: npt.NDArray = ctx.metrics.redshift_cpu_list.copy() / 100.0 + assert curr_cpu_util.shape[0] > 0, "Must have Redshift CPU metrics." + curr_cpu_util.sort() # In place. + curr_cpu_denorm = curr_cpu_util * redshift_num_cpus(curr_prov) + curr_max_cpu_denorm = curr_cpu_denorm.max() + + ( + peak_load, + peak_load_multiplier, + ) = ctx.planner_config.redshift_peak_load_multiplier() + if curr_cpu_util.max() > (peak_load / 100.0): + curr_max_cpu_denorm *= peak_load_multiplier + + # First step: Adjust load based on a change in the number of nodes. + # Key observation is that we're interested in the node with the + # maximum load. This node will stay the maximum after our + # adjustments because we always multiply by a positive constant + # (linear scaling) or add the same value to each node. + if next_nodes > curr_nodes: + # When this value is close to 0, it indicates high load skew. + # Thus adding an instance of the same kind of node should not + # affect the load as much. + skew_adjustment = cls.compute_skew_adjustment(curr_cpu_util) + if skew_adjustment < 0.5: + next_max_cpu_denorm = curr_max_cpu_denorm + else: + next_max_cpu_denorm = curr_max_cpu_denorm * math.pow( + curr_nodes / next_nodes, skew_adjustment + ) + elif next_nodes < curr_nodes: + removed_nodes = curr_nodes - next_nodes + load_to_redist = curr_cpu_denorm[:removed_nodes].sum() + next_max_cpu_denorm = curr_max_cpu_denorm + ( + load_to_redist / next_nodes + ) + else: + # Number of nodes unchanged. + next_max_cpu_denorm = curr_max_cpu_denorm # We stay conservative here. See `AuroraProvisioningScore`. query_factor_clean = 1.0 @@ -138,7 +174,14 @@ def predict_cpu_denorm( ) query_factor_clean = max(min_query_factor, query_factor) - return query_factor_clean * curr_cpu_util_denorm + # We divide by the CPU count on the next provisioning to adjust for + # instance type changes. + next_util = (query_factor_clean * next_max_cpu_denorm) / redshift_num_cpus( + next_prov + ) + + # Clip to [0, 1]. + return min(max(next_util, 0.0), 1.0) @classmethod def query_movement_factor( @@ -166,7 +209,7 @@ def predict_query_latency_load_resources( cls, base_predicted_latency: npt.NDArray, to_prov: Provisioning, - overall_cpu_denorm: float, + max_node_cpu_util: float, ctx: "ScoringContext", ) -> npt.NDArray: if base_predicted_latency.shape[0] == 0: @@ -179,12 +222,11 @@ def predict_query_latency_load_resources( # 2. Compute the impact of system load. mean_service_time = prov_predicted_latency.mean() - cpu_util = overall_cpu_denorm / ( - redshift_num_cpus(to_prov) * to_prov.num_nodes() - ) # Note the use of p90. The predictions we make are specifically p90 latency. wait_time = predict_mm1_wait_time( - mean_service_time_s=mean_service_time, utilization=cpu_util, quantile=0.9 + mean_service_time_s=mean_service_time, + utilization=max_node_cpu_util, + quantile=0.9, ) # Predicted running time is the query's execution time alone plus the # expected wait time (due to system load). @@ -241,14 +283,35 @@ def scale_load_resources_legacy( def copy(self) -> "RedshiftProvisioningScore": return RedshiftProvisioningScore( self.scaled_run_times, - self.overall_cpu_denorm, + self.max_node_cpu_util, self.debug_values.copy(), ) def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: - dest["redshift_predicted_cpu_denorm"] = self.overall_cpu_denorm + dest["redshift_predicted_max_node_cpu_util"] = self.max_node_cpu_util dest.update(self.debug_values) + @classmethod + def compute_skew_adjustment(cls, cpu_utils: npt.NDArray) -> float: + """ + Returns a value between 0 and 1 where 0 represents maximum CPU + utilization skew and 1 represents no skew (equal CPU utilization among + entries). + """ + num_entries = cpu_utils.shape[0] + if num_entries <= 1: + return 1.0 + + cpu_utils = cpu_utils.copy() + cpu_utils.sort() # In place, ascending. + max_val = cpu_utils.max() + rest = cpu_utils[:-1] + diff_from_max = np.sqrt(np.square(rest - max_val).mean()) + + # We want no skew to be a factor of 1.0. Because this takes in CPU + # utils, the maximum possible diff is 1.0. + return 1.0 - diff_from_max + _REDSHIFT_BASE_PROV = Provisioning("dc2.large", 2) _REDSHIFT_BASE_RESOURCE_VALUE = ( diff --git a/tests/test_redshift_scoring.py b/tests/test_redshift_scoring.py index 8f8250df..cdd46d6a 100644 --- a/tests/test_redshift_scoring.py +++ b/tests/test_redshift_scoring.py @@ -1,5 +1,7 @@ import pytest +import numpy as np from datetime import timedelta +from typing import List from brad.blueprint import Blueprint from brad.config.engine import Engine @@ -14,9 +16,12 @@ from brad.routing.round_robin import RoundRobin -def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringContext: +def get_fixtures( + redshift_cpu: List[float], redshift_prov: Provisioning +) -> ScoringContext: + cpus = np.array(redshift_cpu) metrics = Metrics( - redshift_cpu_avg=redshift_cpu, + redshift_cpu_avg=cpus.max() if cpus.shape[0] > 0 else 0.0, aurora_writer_cpu_avg=0.0, aurora_reader_cpu_avg=0.0, aurora_writer_buffer_hit_pct_avg=100.0, @@ -28,10 +33,13 @@ def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringCon txn_lat_s_p90=0.020, query_lat_s_p50=10.0, query_lat_s_p90=20.0, + redshift_cpu_list=cpus, ) planner_config = PlannerConfig( { "redshift_initialize_load_fraction": 0.25, + "redshift_peak_load_threshold": 0.0, + "redshift_peak_load_multiplier": 1.0, } ) workload = Workload(timedelta(hours=1), [Query("SELECT 1")], [], {}) @@ -50,81 +58,99 @@ def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringCon def test_off_to_off() -> None: curr_prov = Provisioning("dc2.large", 0) next_prov = Provisioning("dc2.large", 0) - ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov) - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + ctx = get_fixtures(redshift_cpu=[], redshift_prov=curr_prov) + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, 1.0, ctx ) - assert cpu_denorm == pytest.approx(0.0) + assert cpu_util == pytest.approx(0.0) def test_on_to_off() -> None: curr_prov = Provisioning("dc2.large", 2) next_prov = Provisioning("dc2.large", 0) - ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov) - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + ctx = get_fixtures(redshift_cpu=[50.0], redshift_prov=curr_prov) + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, 1.0, ctx ) - assert cpu_denorm == pytest.approx(0.0) + assert cpu_util == pytest.approx(0.0) def test_off_to_on() -> None: curr_prov = Provisioning("dc2.large", 0) next_prov = Provisioning("dc2.large", 2) - ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov) - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + ctx = get_fixtures(redshift_cpu=[], redshift_prov=curr_prov) + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, None, ctx ) # Special case: we prime the load with a fraction. - assert cpu_denorm == pytest.approx( - 2.0 * 2.0 * ctx.planner_config.redshift_initialize_load_fraction() + assert cpu_util == pytest.approx( + ctx.planner_config.redshift_initialize_load_fraction() ) def test_on_to_on() -> None: curr_prov = Provisioning("dc2.large", 2) next_prov = Provisioning("dc2.large", 4) - ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov) + ctx = get_fixtures(redshift_cpu=[50.0, 50.0], redshift_prov=curr_prov) # Scale up, no movement. - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, 1.0, ctx ) - assert cpu_denorm == pytest.approx(2.0) + assert cpu_util == pytest.approx(0.25) # Scale up, 2x movement. - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, 2.0, ctx ) - assert cpu_denorm == pytest.approx(4.0) + assert cpu_util == pytest.approx(0.5) # Scale up, 0.5x movement (we stay conservative). - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, 0.5, ctx ) - assert cpu_denorm == pytest.approx(2.0 * (1 - 0.25)) + assert cpu_util == pytest.approx(2.0 * (1 - 0.25) / (4 * 2.0)) # Scale down, no movement. smaller_prov = Provisioning("dc2.large", 1) - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, smaller_prov, 1.0, ctx ) - assert cpu_denorm == pytest.approx(2.0) + assert cpu_util == pytest.approx(1.0) # Scale down, 2x movement. - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, smaller_prov, 2.0, ctx ) - assert cpu_denorm == pytest.approx(4.0) + assert cpu_util == pytest.approx(1.0) # Scale down, 0.5x movement (stay conservative). - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, smaller_prov, 0.5, ctx ) - assert cpu_denorm == pytest.approx(2.0 * (1 - 0.25)) + assert cpu_util == pytest.approx(2.0 * (1 - 0.25) / 2.0) # Special case (no queries executed before, but now there are queries). ctx.current_query_locations[Engine.Redshift].append(0) - cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( curr_prov, next_prov, None, ctx ) - assert cpu_denorm == pytest.approx(1.0) + assert cpu_util == pytest.approx(0.25) + + +def test_on_to_on_with_skew() -> None: + curr_prov = Provisioning("dc2.large", 2) + next_prov = Provisioning("dc2.large", 4) + ctx = get_fixtures(redshift_cpu=[0.0, 100.0], redshift_prov=curr_prov) + + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( + curr_prov, next_prov, 1.0, ctx + ) + # Captures the effect of skewed utilization. + assert cpu_util == pytest.approx(1.0) + + next_prov_instance = Provisioning("ra3.xlplus", 2) + cpu_util = RedshiftProvisioningScore.predict_max_node_cpu_util( + curr_prov, next_prov_instance, 1.0, ctx + ) + assert cpu_util == pytest.approx(0.5)