diff --git a/config/planner.yml b/config/planner.yml index a0672045..a8ab9a94 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -351,3 +351,4 @@ use_recorded_routing_if_available: true ensure_tables_together_on_one_engine: true aurora_initialize_load_fraction: 0.25 +redshift_initialize_load_fraction: 0.25 diff --git a/src/brad/admin/run_planner.py b/src/brad/admin/run_planner.py index eddf049b..a6ff11a6 100644 --- a/src/brad/admin/run_planner.py +++ b/src/brad/admin/run_planner.py @@ -131,7 +131,7 @@ async def run_planner_impl(args) -> None: config = ConfigFile.load(args.config_file) # 2. Load the planner config. - planner_config = PlannerConfig(args.planner_config_file) + planner_config = PlannerConfig.load(args.planner_config_file) # 3. Load the blueprint. assets = AssetManager(config) diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index 7fa134cb..9cd2474b 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -12,10 +12,14 @@ class PlannerConfig: shared across planning strategies, some are specific to a strategy. """ - def __init__(self, path: str): - self._raw_path = path + @classmethod + def load(cls, path: str) -> "PlannerConfig": with open(path, "r", encoding="UTF-8") as file: - self._raw = yaml.load(file, Loader=yaml.Loader) + raw = yaml.load(file, Loader=yaml.Loader) + return cls(raw) + + def __init__(self, raw: Dict[str, Any]): + self._raw = raw self._aurora_scaling_coefs: Optional[npt.NDArray] = None self._redshift_scaling_coefs: Optional[npt.NDArray] = None @@ -234,5 +238,8 @@ def flag(self, key: str, default: bool = False) -> bool: def aurora_initialize_load_fraction(self) -> float: return self._raw["aurora_initialize_load_fraction"] + def redshift_initialize_load_fraction(self) -> float: + return self._raw["redshift_initialize_load_fraction"] + def aurora_storage_index_multiplier(self) -> float: return float(self._raw["aurora_storage_index_multiplier"]) diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index da34b336..7bcecdc7 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -86,7 +86,7 @@ def __init__( self._temp_config = temp_config self._schema_name = schema_name self._path_to_planner_config = path_to_planner_config - self._planner_config = PlannerConfig(path_to_planner_config) + self._planner_config = PlannerConfig.load(path_to_planner_config) self._debug_mode = debug_mode self._assets = AssetManager(self._config) diff --git a/src/brad/planner/scoring/performance/unified_aurora.py b/src/brad/planner/scoring/performance/unified_aurora.py index c6b30cd2..f78107a3 100644 --- a/src/brad/planner/scoring/performance/unified_aurora.py +++ b/src/brad/planner/scoring/performance/unified_aurora.py @@ -1,7 +1,7 @@ import logging import numpy as np import numpy.typing as npt -from typing import Dict, TYPE_CHECKING +from typing import Dict, TYPE_CHECKING, Optional, Tuple from brad.config.engine import Engine from brad.blueprint.provisioning import Provisioning @@ -18,23 +18,11 @@ def __init__( self, scaled_run_times: npt.NDArray, scaled_txn_lats: npt.NDArray, - analytics_affected_per_machine_load: float, - analytics_affected_per_machine_cpu_denorm: float, - txn_affected_cpu_denorm: float, - pred_txn_peak_cpu_denorm: float, - for_next_prov: Provisioning, debug_values: Dict[str, int | float], ) -> None: self.scaled_run_times = scaled_run_times - self.debug_values = debug_values - self.analytics_affected_per_machine_load = analytics_affected_per_machine_load - self.analytics_affected_per_machine_cpu_denorm = ( - analytics_affected_per_machine_cpu_denorm - ) - self.txn_affected_cpu_denorm = txn_affected_cpu_denorm - self.pred_txn_peak_cpu_denorm = pred_txn_peak_cpu_denorm - self.for_next_prov = for_next_prov self.scaled_txn_lats = scaled_txn_lats + self.debug_values = debug_values @classmethod def compute( @@ -49,198 +37,194 @@ def compute( Computes all of the Aurora provisioning-dependent scoring components in one place. """ + query_factor = cls.query_movement_factor( + base_query_run_times, query_arrival_counts, ctx + ) + txn_cpu_denorm, ana_node_load = cls.predict_loads( + curr_prov, next_prov, query_factor, ctx + ) + scaled_rt = cls.predict_query_latency_load_resources( + base_query_run_times, next_prov, ana_node_load, ctx + ) + scaled_txn_lats = cls.predict_txn_latency( + ctx.metrics.aurora_writer_cpu_avg / 100.0 * aurora_num_cpus(curr_prov), + txn_cpu_denorm, + curr_prov, + next_prov, + ctx, + ) + return cls( + scaled_rt, + scaled_txn_lats, + { + "aurora_query_factor": query_factor + if query_factor is not None + else 0.0, + "aurora_txn_cpu_denorm": txn_cpu_denorm, + "aurora_ana_node_load": ana_node_load, + }, + ) + + @classmethod + def predict_loads( + cls, + curr_prov: Provisioning, + next_prov: Provisioning, + query_factor: Optional[float], + ctx: "ScoringContext", + ) -> Tuple[float, float]: + # Load is computed using the following principles: + # # - Read replicas do not speed up queries, they can only "relieve" the # writer node of load. # - Transactions must run on the writer node; only analytical queries # can run on the read replica(s). # - If there is a read replica, BRAD routes all analytical queries to # the replica. - - current_aurora_has_replicas = curr_prov.num_nodes() > 1 - next_aurora_has_replicas = next_prov.num_nodes() > 1 - no_analytics_queries_executed = ( - len(ctx.current_query_locations[Engine.Aurora]) == 0 - ) - - overall_writer_load = ctx.metrics.aurora_writer_load_minute_avg - overall_writer_cpu_util_pct = ctx.metrics.aurora_writer_cpu_avg - overall_writer_cpu_util = overall_writer_cpu_util_pct / 100 - overall_writer_cpu_util_denorm = overall_writer_cpu_util * aurora_num_cpus( - curr_prov + # + # Output: + # - "CPU denorm" value on `next_prov` on the writer node, for use for + # transaction scaling + # - "load denorm" value on `next_prov` on one node that will be used for + # analytics scaling + + current_has_replicas = curr_prov.num_nodes() > 1 + next_has_replicas = next_prov.num_nodes() > 1 + + curr_writer_load = ctx.metrics.aurora_writer_load_minute_avg + curr_writer_cpu_util = ctx.metrics.aurora_writer_cpu_avg / 100 + curr_writer_cpu_util_denorm = curr_writer_cpu_util * aurora_num_cpus(curr_prov) + + # We take a very conservative approach to query movement. If new queries + # are added onto Aurora, we increase the load. But if queries are + # removed, we do not decrease the load. + query_factor_clean = 1.0 + if query_factor is not None and query_factor > 1.0: + query_factor_clean = query_factor + + # This is true iff we did not run any queries on Aurora with the current + # blueprint and are now going to run queries on Aurora on the next + # blueprint. + adding_ana_first_time = ( + query_factor is None and len(ctx.current_query_locations[Engine.Aurora]) > 0 ) - # 1. Compute the transaction portion of load. - if current_aurora_has_replicas or no_analytics_queries_executed: - # We schedule all analytics on the read replica(s) or no queries - # were routed to Aurora. So the metric values on the writer are due - # to the transactional workload. - pred_txn_load = overall_writer_load - pred_txn_cpu_denorm = overall_writer_cpu_util_denorm - else: - model = ctx.planner_config.aurora_txn_coefs(ctx.schema_name) - curr_num_cpus = aurora_num_cpus(curr_prov) - client_txns_per_s = ctx.metrics.txn_completions_per_s - - # Piecewise function; the inflection point appears due to (maybe) - # hyperthreading behavior. - # - # D(client_txns_per_s, curr_cpus) = - # C_1 * client_txns_per_s if C_1 * client_txns_per_s <= curr_cpus / 2 - # C_2 * (client_txns_per_s - curr_cpus / (2 * C_1)) + curr_cpus / 2 otherwise - cpu_denorm_limit = curr_num_cpus / 2 - pred_txn_cpu_denorm = client_txns_per_s * model["C_1"] - if pred_txn_cpu_denorm > cpu_denorm_limit: - pred_txn_cpu_denorm = ( - model["C_2"] * (client_txns_per_s - cpu_denorm_limit / model["C_1"]) - + cpu_denorm_limit - ) - - # In theory, these two should be equal. Empirically, they are mostly close enough. - pred_txn_load = pred_txn_cpu_denorm - - # TODO: Possible edge cases here due to cumulative prediction error (e.g., - # pred_txn_load > overall_writer_load violates our model's assumptions). - # We need a robust way to handle these potential errors. - if not ctx.already_logged_txn_interference_warning: - if pred_txn_load > overall_writer_load: - logger.warning( - "Predicted transactional load higher than the overall " - "writer load. Overall load: %.2f, Client txn thpt: %.2f, " - "Predicted txn load: %.2f", - overall_writer_load, - client_txns_per_s, - pred_txn_load, + # 4 cases: + # - No replicas -> No replicas + # - No replicas -> Yes replicas + # - Yes replicas -> No replicas + # - Yes replicas -> Yes replicas + if not current_has_replicas: + if not next_has_replicas: + # No replicas -> No replicas + if adding_ana_first_time: + # Special case. If no queries ran on Aurora and now we are + # running queries, we need to prime the system with some + # load. We use a constant factor to prime the system. + initialized_load = ( + ctx.planner_config.aurora_initialize_load_fraction() + * aurora_num_cpus(curr_prov) ) - ctx.already_logged_txn_interference_warning = True - - if pred_txn_cpu_denorm > overall_writer_cpu_util_denorm: - logger.warning( - "Predicted transactional CPU denormalized utilization " - "higher than the overall CPU use. Overall use: %.2f, " - "Client txn thpt: %.2f, Predicted CPU use: %.2f", - overall_writer_cpu_util_denorm, - client_txns_per_s, - pred_txn_cpu_denorm, + return ( + curr_writer_cpu_util_denorm + initialized_load, + curr_writer_load + initialized_load, + ) + else: + return ( + curr_writer_cpu_util_denorm * query_factor_clean, + curr_writer_load * query_factor_clean, + ) + else: + # No replicas -> Yes replicas + if adding_ana_first_time: + # Special case. If no queries ran on Aurora and now we are + # running queries, we need to prime the system with some + # load. We use a constant factor to prime the system. + initialized_load = ( + ctx.planner_config.aurora_initialize_load_fraction() + * aurora_num_cpus(curr_prov) + ) + return ( + curr_writer_cpu_util_denorm, + initialized_load / (next_prov.num_nodes() - 1), + ) + else: + return ( + curr_writer_cpu_util_denorm, + (curr_writer_load * query_factor_clean) + / (next_prov.num_nodes() - 1), ) - ctx.already_logged_txn_interference_warning = True - # 2. Adjust the analytical portion of the system load for query movement - # (compute `query_factor``). - if Engine.Aurora not in ctx.engine_latency_norm_factor: - # Special case. We cannot reweigh the queries because nothing in the - # current workload ran on Aurora. - query_factor = 1.0 else: - # Query movement scaling factor. - # Captures change in queries routed to this engine. - norm_factor = ctx.engine_latency_norm_factor[Engine.Aurora] - assert norm_factor != 0.0 - total_next_latency = np.dot(base_query_run_times, query_arrival_counts) - query_factor = total_next_latency / norm_factor - - # 3. Compute the analytics portion of the load and adjust it by the query factor. - if current_aurora_has_replicas: + # We currently have read replicas. curr_num_read_replicas = curr_prov.num_nodes() - 1 - total_analytics_load = ( + total_reader_load = ( ctx.metrics.aurora_reader_load_minute_avg * curr_num_read_replicas ) - total_analytics_cpu_denorm = ( + total_reader_cpu_denorm = ( (ctx.metrics.aurora_reader_cpu_avg / 100) * aurora_num_cpus(curr_prov) * curr_num_read_replicas ) - - elif no_analytics_queries_executed: - # This is a special case: no queries executed on Aurora and there - # was no read replica. - total_analytics_load = 0.0 - total_analytics_cpu_denorm = 0.0 - - else: - # Analytics load should never be zero - so we impute a small value - # to work around mispredictions. - eps = 1e-3 if len(base_query_run_times) > 0 else 0.0 - total_analytics_load = max(eps, overall_writer_load - pred_txn_load) - total_analytics_cpu_denorm = max( - eps, overall_writer_cpu_util_denorm - pred_txn_cpu_denorm - ) - - # total_analytics_load *= query_factor - # total_analytics_cpu_denorm *= query_factor - - # 4. Compute the workload-affected metrics. - # Basically, if there are no replicas, both the analytical and - # transactional load fall onto one instance (which we need to capture). - if next_aurora_has_replicas: - next_num_read_replicas = next_prov.num_nodes() - 1 - assert next_num_read_replicas > 0 - - if no_analytics_queries_executed and len(base_query_run_times) > 0: - # We need to use a non-zero load. We use a constant factor to - # prime the system. - total_analytics_load = ( - ctx.planner_config.aurora_initialize_load_fraction() - * aurora_num_cpus(ctx.current_blueprint.aurora_provisioning()) - * ctx.current_blueprint.aurora_provisioning().num_nodes() - ) - total_analytics_cpu_denorm = total_analytics_load - - # Divide by the number of read replicas: we assume the load can - # be equally divided amongst the replicas. - analytics_affected_per_machine_load = ( - total_analytics_load / next_num_read_replicas - ) - analytics_affected_per_machine_cpu_denorm = ( - total_analytics_cpu_denorm / next_num_read_replicas - ) - txn_affected_cpu_denorm = pred_txn_cpu_denorm - else: - analytics_affected_per_machine_load = total_analytics_load + pred_txn_load - analytics_affected_per_machine_cpu_denorm = ( - total_analytics_cpu_denorm + pred_txn_cpu_denorm + # Handling a special case of moving queries onto Aurora where none + # previously executed. Note that this should be very rare here + # because there are already read replicas. + initialized_load = ( + ctx.planner_config.aurora_initialize_load_fraction() + * aurora_num_cpus(curr_prov) + * curr_num_read_replicas ) - # Basically the same as above, because they are running together. - txn_affected_cpu_denorm = analytics_affected_per_machine_cpu_denorm - # 4. Predict query execution times based on load and provisioning. - scaled_rt = cls.query_latency_load_resources( - base_query_run_times, next_prov, analytics_affected_per_machine_load, ctx - ) - - # 5. Compute the expected peak CPU. - peak_cpu_denorm = ( - aurora_num_cpus(next_prov) - * ctx.planner_config.aurora_prov_to_peak_cpu_denorm() - ) + if not next_has_replicas: + # Yes replicas -> No replicas + if adding_ana_first_time: + # Special case. + return ( + curr_writer_cpu_util_denorm + initialized_load, + curr_writer_load + initialized_load, + ) + else: + return ( + curr_writer_cpu_util_denorm + + (total_reader_cpu_denorm * query_factor_clean), + curr_writer_load + (total_reader_load * query_factor_clean), + ) - # 6. Compute the transactional latencies. - scaled_txn_lats = cls._scale_txn_latency( - overall_writer_cpu_util_denorm, - txn_affected_cpu_denorm, - curr_prov, - next_prov, - ctx, - ) + else: + # Yes replicas -> Yes replicas + if adding_ana_first_time: + # Special case. + return ( + curr_writer_cpu_util_denorm, + initialized_load / (next_prov.num_nodes() - 1), + ) + else: + return ( + curr_writer_cpu_util_denorm, + total_reader_load + * query_factor_clean + / (next_prov.num_nodes() - 1), + ) - return cls( - scaled_rt, - scaled_txn_lats, - analytics_affected_per_machine_load, - analytics_affected_per_machine_cpu_denorm, - txn_affected_cpu_denorm, - peak_cpu_denorm, - next_prov, - { - "aurora_pred_txn_load": pred_txn_load, - "aurora_pred_txn_cpu_denorm": pred_txn_cpu_denorm, - "aurora_query_factor": query_factor, - "aurora_internal_total_analytics_load": total_analytics_load, - "aurora_internal_total_analytics_cpu_denorm": total_analytics_cpu_denorm, - }, - ) + @staticmethod + def query_movement_factor( + base_query_run_times: npt.NDArray, + query_arrival_counts: npt.NDArray, + ctx: "ScoringContext", + ) -> Optional[float]: + # Query movement scaling factor. + # Captures change in queries routed to this engine. + if Engine.Aurora not in ctx.engine_latency_norm_factor: + # Special case. We cannot reweigh the queries because nothing in the + # current workload ran on Aurora. + return None + norm_factor = ctx.engine_latency_norm_factor[Engine.Aurora] + assert norm_factor != 0.0 + total_next_latency = np.dot(base_query_run_times, query_arrival_counts) + return total_next_latency / norm_factor @staticmethod - def query_latency_load_resources( + def predict_query_latency_load_resources( base_predicted_latency: npt.NDArray, to_prov: Provisioning, overall_load: float, @@ -270,7 +254,7 @@ def query_latency_load_resources( return np.dot(lat_vals, coefs) @staticmethod - def _scale_txn_latency( + def predict_txn_latency( curr_cpu_denorm: float, next_cpu_denorm: float, curr_prov: Provisioning, @@ -308,11 +292,6 @@ def copy(self) -> "AuroraProvisioningScore": return AuroraProvisioningScore( self.scaled_run_times, self.scaled_txn_lats, - self.analytics_affected_per_machine_load, - self.analytics_affected_per_machine_cpu_denorm, - self.txn_affected_cpu_denorm, - self.pred_txn_peak_cpu_denorm, - self.for_next_prov, self.debug_values.copy(), ) @@ -320,14 +299,6 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: """ Adds this score instance's debug values to the `dest` dict. """ - dest[ - "aurora_analytics_affected_per_machine_load" - ] = self.analytics_affected_per_machine_load - dest[ - "aurora_analytics_affected_per_machine_cpu_denorm" - ] = self.analytics_affected_per_machine_cpu_denorm - dest["aurora_txn_affected_cpu_denorm"] = self.txn_affected_cpu_denorm - dest["aurora_pred_txn_peak_cpu_denorm"] = self.pred_txn_peak_cpu_denorm ( dest["aurora_pred_txn_lat_s_p50"], dest["aurora_pred_txn_lat_s_p90"], diff --git a/src/brad/planner/scoring/performance/unified_aurora_legacy.py b/src/brad/planner/scoring/performance/unified_aurora_legacy.py new file mode 100644 index 00000000..8ccb5c20 --- /dev/null +++ b/src/brad/planner/scoring/performance/unified_aurora_legacy.py @@ -0,0 +1,338 @@ +import logging +import numpy as np +import numpy.typing as npt +from typing import Dict, TYPE_CHECKING + +from brad.config.engine import Engine +from brad.blueprint.provisioning import Provisioning +from brad.planner.scoring.provisioning import aurora_num_cpus + +if TYPE_CHECKING: + from brad.planner.scoring.context import ScoringContext + +logger = logging.getLogger(__name__) + + +class AuroraProvisioningScoreLegacy: + def __init__( + self, + scaled_run_times: npt.NDArray, + scaled_txn_lats: npt.NDArray, + analytics_affected_per_machine_load: float, + analytics_affected_per_machine_cpu_denorm: float, + txn_affected_cpu_denorm: float, + pred_txn_peak_cpu_denorm: float, + for_next_prov: Provisioning, + debug_values: Dict[str, int | float], + ) -> None: + self.scaled_run_times = scaled_run_times + self.debug_values = debug_values + self.analytics_affected_per_machine_load = analytics_affected_per_machine_load + self.analytics_affected_per_machine_cpu_denorm = ( + analytics_affected_per_machine_cpu_denorm + ) + self.txn_affected_cpu_denorm = txn_affected_cpu_denorm + self.pred_txn_peak_cpu_denorm = pred_txn_peak_cpu_denorm + self.for_next_prov = for_next_prov + self.scaled_txn_lats = scaled_txn_lats + + @classmethod + def compute( + cls, + base_query_run_times: npt.NDArray, + query_arrival_counts: npt.NDArray, + curr_prov: Provisioning, + next_prov: Provisioning, + ctx: "ScoringContext", + ) -> "AuroraProvisioningScoreLegacy": + """ + Computes all of the Aurora provisioning-dependent scoring components in one + place. + """ + # - Read replicas do not speed up queries, they can only "relieve" the + # writer node of load. + # - Transactions must run on the writer node; only analytical queries + # can run on the read replica(s). + # - If there is a read replica, BRAD routes all analytical queries to + # the replica. + + current_aurora_has_replicas = curr_prov.num_nodes() > 1 + next_aurora_has_replicas = next_prov.num_nodes() > 1 + no_analytics_queries_executed = ( + len(ctx.current_query_locations[Engine.Aurora]) == 0 + ) + + overall_writer_load = ctx.metrics.aurora_writer_load_minute_avg + overall_writer_cpu_util_pct = ctx.metrics.aurora_writer_cpu_avg + overall_writer_cpu_util = overall_writer_cpu_util_pct / 100 + overall_writer_cpu_util_denorm = overall_writer_cpu_util * aurora_num_cpus( + curr_prov + ) + + # 1. Compute the transaction portion of load. + if current_aurora_has_replicas or no_analytics_queries_executed: + # We schedule all analytics on the read replica(s) or no queries + # were routed to Aurora. So the metric values on the writer are due + # to the transactional workload. + pred_txn_load = overall_writer_load + pred_txn_cpu_denorm = overall_writer_cpu_util_denorm + else: + model = ctx.planner_config.aurora_txn_coefs(ctx.schema_name) + curr_num_cpus = aurora_num_cpus(curr_prov) + client_txns_per_s = ctx.metrics.txn_completions_per_s + + # Piecewise function; the inflection point appears due to (maybe) + # hyperthreading behavior. + # + # D(client_txns_per_s, curr_cpus) = + # C_1 * client_txns_per_s if C_1 * client_txns_per_s <= curr_cpus / 2 + # C_2 * (client_txns_per_s - curr_cpus / (2 * C_1)) + curr_cpus / 2 otherwise + cpu_denorm_limit = curr_num_cpus / 2 + pred_txn_cpu_denorm = client_txns_per_s * model["C_1"] + if pred_txn_cpu_denorm > cpu_denorm_limit: + pred_txn_cpu_denorm = ( + model["C_2"] * (client_txns_per_s - cpu_denorm_limit / model["C_1"]) + + cpu_denorm_limit + ) + + # In theory, these two should be equal. Empirically, they are mostly close enough. + pred_txn_load = pred_txn_cpu_denorm + + # TODO: Possible edge cases here due to cumulative prediction error (e.g., + # pred_txn_load > overall_writer_load violates our model's assumptions). + # We need a robust way to handle these potential errors. + if not ctx.already_logged_txn_interference_warning: + if pred_txn_load > overall_writer_load: + logger.warning( + "Predicted transactional load higher than the overall " + "writer load. Overall load: %.2f, Client txn thpt: %.2f, " + "Predicted txn load: %.2f", + overall_writer_load, + client_txns_per_s, + pred_txn_load, + ) + ctx.already_logged_txn_interference_warning = True + + if pred_txn_cpu_denorm > overall_writer_cpu_util_denorm: + logger.warning( + "Predicted transactional CPU denormalized utilization " + "higher than the overall CPU use. Overall use: %.2f, " + "Client txn thpt: %.2f, Predicted CPU use: %.2f", + overall_writer_cpu_util_denorm, + client_txns_per_s, + pred_txn_cpu_denorm, + ) + ctx.already_logged_txn_interference_warning = True + + # 2. Adjust the analytical portion of the system load for query movement + # (compute `query_factor``). + if Engine.Aurora not in ctx.engine_latency_norm_factor: + # Special case. We cannot reweigh the queries because nothing in the + # current workload ran on Aurora. + query_factor = 1.0 + else: + # Query movement scaling factor. + # Captures change in queries routed to this engine. + norm_factor = ctx.engine_latency_norm_factor[Engine.Aurora] + assert norm_factor != 0.0 + total_next_latency = np.dot(base_query_run_times, query_arrival_counts) + query_factor = total_next_latency / norm_factor + + # 3. Compute the analytics portion of the load and adjust it by the query factor. + if current_aurora_has_replicas: + curr_num_read_replicas = curr_prov.num_nodes() - 1 + total_analytics_load = ( + ctx.metrics.aurora_reader_load_minute_avg * curr_num_read_replicas + ) + total_analytics_cpu_denorm = ( + (ctx.metrics.aurora_reader_cpu_avg / 100) + * aurora_num_cpus(curr_prov) + * curr_num_read_replicas + ) + + elif no_analytics_queries_executed: + # This is a special case: no queries executed on Aurora and there + # was no read replica. + total_analytics_load = 0.0 + total_analytics_cpu_denorm = 0.0 + + else: + # Analytics load should never be zero - so we impute a small value + # to work around mispredictions. + eps = 1e-3 if len(base_query_run_times) > 0 else 0.0 + total_analytics_load = max(eps, overall_writer_load - pred_txn_load) + total_analytics_cpu_denorm = max( + eps, overall_writer_cpu_util_denorm - pred_txn_cpu_denorm + ) + + # total_analytics_load *= query_factor + # total_analytics_cpu_denorm *= query_factor + + # 4. Compute the workload-affected metrics. + # Basically, if there are no replicas, both the analytical and + # transactional load fall onto one instance (which we need to capture). + if next_aurora_has_replicas: + next_num_read_replicas = next_prov.num_nodes() - 1 + assert next_num_read_replicas > 0 + + if no_analytics_queries_executed and len(base_query_run_times) > 0: + # We need to use a non-zero load. We use a constant factor to + # prime the system. + total_analytics_load = ( + ctx.planner_config.aurora_initialize_load_fraction() + * aurora_num_cpus(ctx.current_blueprint.aurora_provisioning()) + * ctx.current_blueprint.aurora_provisioning().num_nodes() + ) + total_analytics_cpu_denorm = total_analytics_load + + # Divide by the number of read replicas: we assume the load can + # be equally divided amongst the replicas. + analytics_affected_per_machine_load = ( + total_analytics_load / next_num_read_replicas + ) + analytics_affected_per_machine_cpu_denorm = ( + total_analytics_cpu_denorm / next_num_read_replicas + ) + txn_affected_cpu_denorm = pred_txn_cpu_denorm + else: + analytics_affected_per_machine_load = total_analytics_load + pred_txn_load + analytics_affected_per_machine_cpu_denorm = ( + total_analytics_cpu_denorm + pred_txn_cpu_denorm + ) + # Basically the same as above, because they are running together. + txn_affected_cpu_denorm = analytics_affected_per_machine_cpu_denorm + + # 4. Predict query execution times based on load and provisioning. + scaled_rt = cls.query_latency_load_resources( + base_query_run_times, next_prov, analytics_affected_per_machine_load, ctx + ) + + # 5. Compute the expected peak CPU. + peak_cpu_denorm = ( + aurora_num_cpus(next_prov) + * ctx.planner_config.aurora_prov_to_peak_cpu_denorm() + ) + + # 6. Compute the transactional latencies. + scaled_txn_lats = cls._scale_txn_latency( + overall_writer_cpu_util_denorm, + txn_affected_cpu_denorm, + curr_prov, + next_prov, + ctx, + ) + + return cls( + scaled_rt, + scaled_txn_lats, + analytics_affected_per_machine_load, + analytics_affected_per_machine_cpu_denorm, + txn_affected_cpu_denorm, + peak_cpu_denorm, + next_prov, + { + "aurora_pred_txn_load": pred_txn_load, + "aurora_pred_txn_cpu_denorm": pred_txn_cpu_denorm, + "aurora_query_factor": query_factor, + "aurora_internal_total_analytics_load": total_analytics_load, + "aurora_internal_total_analytics_cpu_denorm": total_analytics_cpu_denorm, + }, + ) + + @staticmethod + def query_latency_load_resources( + base_predicted_latency: npt.NDArray, + to_prov: Provisioning, + overall_load: float, + ctx: "ScoringContext", + ) -> npt.NDArray: + # Special case: + if ( + overall_load == 0.0 + and aurora_num_cpus(to_prov) == _AURORA_BASE_RESOURCE_VALUE + ): + # No changes. + return base_predicted_latency + + # This method is used to compute the predicted query latencies. + resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov) + basis = np.array( + [overall_load * resource_factor, overall_load, resource_factor, 1.0] + ) + basis = np.square(basis) + coefs = ctx.planner_config.aurora_scaling_coefs() + coefs = np.multiply(coefs, basis) + num_coefs = coefs.shape[0] + + lat_vals = np.expand_dims(base_predicted_latency, axis=1) + lat_vals = np.repeat(lat_vals, num_coefs, axis=1) + + return np.dot(lat_vals, coefs) + + @staticmethod + def _scale_txn_latency( + curr_cpu_denorm: float, + next_cpu_denorm: float, + curr_prov: Provisioning, + to_prov: Provisioning, + ctx: "ScoringContext", + ) -> npt.NDArray: + observed_lats = np.array([ctx.metrics.txn_lat_s_p50, ctx.metrics.txn_lat_s_p90]) + + # Q(u) = a / (K - u) + b ; u is CPU utilization in [0, 1] + # --> Q(u') = (K - u) / (K - u') (Q(u) - b) + b + + model = ctx.planner_config.aurora_txn_coefs(ctx.schema_name) + K = model["K"] + b = np.array([model["b_p50"], model["b_p90"]]) + + curr_num_cpus = aurora_num_cpus(curr_prov) + next_num_cpus = aurora_num_cpus(to_prov) + curr_cpu_util = min(curr_cpu_denorm / curr_num_cpus, 1.0) + next_cpu_util = min(next_cpu_denorm / next_num_cpus, 1.0) + + # To avoid division by zero in degenerate cases. + denom = max(K - next_cpu_util, 1e-6) + sf = (K - curr_cpu_util) / denom + + without_base = np.clip(observed_lats - b, a_min=0.0, a_max=None) + pred_dest = (without_base * sf) + b + + # If the observed latencies were not defined, we should not make a prediction. + pred_dest[observed_lats == 0.0] = np.nan + pred_dest[~np.isfinite(observed_lats)] = np.nan + + return pred_dest + + def copy(self) -> "AuroraProvisioningScoreLegacy": + return AuroraProvisioningScoreLegacy( + self.scaled_run_times, + self.scaled_txn_lats, + self.analytics_affected_per_machine_load, + self.analytics_affected_per_machine_cpu_denorm, + self.txn_affected_cpu_denorm, + self.pred_txn_peak_cpu_denorm, + self.for_next_prov, + self.debug_values.copy(), + ) + + def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: + """ + Adds this score instance's debug values to the `dest` dict. + """ + dest[ + "aurora_analytics_affected_per_machine_load" + ] = self.analytics_affected_per_machine_load + dest[ + "aurora_analytics_affected_per_machine_cpu_denorm" + ] = self.analytics_affected_per_machine_cpu_denorm + dest["aurora_txn_affected_cpu_denorm"] = self.txn_affected_cpu_denorm + dest["aurora_pred_txn_peak_cpu_denorm"] = self.pred_txn_peak_cpu_denorm + ( + dest["aurora_pred_txn_lat_s_p50"], + dest["aurora_pred_txn_lat_s_p90"], + ) = self.scaled_txn_lats + dest.update(self.debug_values) + + +_AURORA_BASE_RESOURCE_VALUE = aurora_num_cpus(Provisioning("db.r6g.xlarge", 1)) diff --git a/src/brad/planner/scoring/performance/unified_redshift.py b/src/brad/planner/scoring/performance/unified_redshift.py index c0a152bb..d5628fd8 100644 --- a/src/brad/planner/scoring/performance/unified_redshift.py +++ b/src/brad/planner/scoring/performance/unified_redshift.py @@ -1,7 +1,7 @@ import logging import numpy as np import numpy.typing as npt -from typing import Dict, TYPE_CHECKING +from typing import Dict, TYPE_CHECKING, Optional from brad.config.engine import Engine from brad.blueprint.provisioning import Provisioning @@ -18,13 +18,11 @@ def __init__( self, scaled_run_times: npt.NDArray, overall_cpu_denorm: float, - for_next_prov: Provisioning, debug_values: Dict[str, int | float], ) -> None: self.scaled_run_times = scaled_run_times self.debug_values = debug_values self.overall_cpu_denorm = overall_cpu_denorm - self.for_next_prov = for_next_prov @classmethod def compute( @@ -39,55 +37,119 @@ def compute( Computes all of the Redshift provisioning-dependent scoring components in one place. """ - # Special case: If we turn off Redshift (set the number of nodes to 0). - if next_prov.num_nodes() == 0: + query_factor = cls.query_movement_factor( + base_query_run_times, query_arrival_counts, ctx + ) + predicted_cpu_denorm = cls.predict_cpu_denorm( + curr_prov, next_prov, query_factor, ctx + ) + + # Special case (turning off Redshift). + if predicted_cpu_denorm == 0.0: return cls( base_query_run_times, 0.0, - next_prov, { "redshift_query_factor": 0.0, }, ) + else: + scaled_rt = cls.scale_load_resources( + base_query_run_times, next_prov, predicted_cpu_denorm, ctx + ) + return cls( + scaled_rt, + predicted_cpu_denorm, + { + "redshift_query_factor": query_factor + if query_factor is not None + else 0.0, + }, + ) - overall_forecasted_cpu_util_pct = ctx.metrics.redshift_cpu_avg - overall_cpu_util_denorm = ( - (overall_forecasted_cpu_util_pct / 100) - * redshift_num_cpus(curr_prov) - * curr_prov.num_nodes() - ) - - # 1. Adjust the analytical portion of the system load for query movement. - if ( - Engine.Redshift not in ctx.engine_latency_norm_factor - or curr_prov.num_nodes() == 0 - ): - # Special case. We cannot reweigh the queries because nothing in the - # current workload ran on Redshift. - query_factor = 1.0 + @classmethod + def predict_cpu_denorm( + cls, + curr_prov: Provisioning, + next_prov: Provisioning, + query_factor: Optional[float], + ctx: "ScoringContext", + ) -> float: + """ + Returns the predicted overall denormalized CPU utilization across the + entire cluster for `next_prov`. + """ + # 4 cases + # Redshift off -> Redshift off (no-op) + # Redshift off -> Redshift on + # Redshift on -> Redshift on + # Redshift on -> Redshift off + + curr_on = curr_prov.num_nodes() > 0 + next_on = next_prov.num_nodes() > 0 + + # Simple no-op cases. + if (not curr_on and not next_on) or (curr_on and not next_on): + return 0.0 + + if not curr_on and next_on: + # Turning on Redshift. + # We cannot reweigh the queries because nothing in the current + # workload ran on Redshift. We prime the load with a fraction of the + # proposed cluster's peak load. + return ( + redshift_num_cpus(next_prov) + * next_prov.num_nodes() + * ctx.planner_config.redshift_initialize_load_fraction() + ) else: - # Query movement scaling factor. - # Captures change in queries routed to this engine. - norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift] - assert norm_factor != 0.0 - total_next_latency = np.dot(base_query_run_times, query_arrival_counts) - query_factor = total_next_latency / norm_factor - - adjusted_cpu_denorm = query_factor * overall_cpu_util_denorm - - # 2. Scale query execution times based on load and provisioning. - scaled_rt = cls.scale_load_resources( - base_query_run_times, next_prov, adjusted_cpu_denorm, ctx - ) + # Redshift is staying on, but there is a potential provisioning + # change. + assert curr_on and next_on + + # Special case. Redshift was on, but nothing ran on it and now we + # want to run queries on it. We use the same load priming approach + # but on the current cluster. + if ( + query_factor is None + and len(ctx.current_query_locations[Engine.Redshift]) > 0 + ): + return ( + redshift_num_cpus(curr_prov) + * curr_prov.num_nodes() + * ctx.planner_config.redshift_initialize_load_fraction() + ) + + curr_cpu_util_denorm = ( + (ctx.metrics.redshift_cpu_avg / 100.0) + * redshift_num_cpus(curr_prov) + * curr_prov.num_nodes() + ) - return cls( - scaled_rt, - adjusted_cpu_denorm, - next_prov, - { - "redshift_query_factor": query_factor, - }, - ) + # We stay conservative here. + if query_factor is not None and query_factor > 1.0: + adjusted_cpu_denorm = query_factor * curr_cpu_util_denorm + else: + adjusted_cpu_denorm = curr_cpu_util_denorm + + return adjusted_cpu_denorm + + @staticmethod + def query_movement_factor( + base_query_run_times: npt.NDArray, + query_arrival_counts: npt.NDArray, + ctx: "ScoringContext", + ) -> Optional[float]: + # Query movement scaling factor. + # Captures change in queries routed to this engine. + if Engine.Redshift not in ctx.engine_latency_norm_factor: + # Special case. We cannot reweigh the queries because nothing in the + # current workload ran on Redshift (it could have been off). + return None + norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift] + assert norm_factor != 0.0 + total_next_latency = np.dot(base_query_run_times, query_arrival_counts) + return total_next_latency / norm_factor @staticmethod def scale_load_resources( @@ -121,12 +183,11 @@ def copy(self) -> "RedshiftProvisioningScore": return RedshiftProvisioningScore( self.scaled_run_times, self.overall_cpu_denorm, - self.for_next_prov, self.debug_values.copy(), ) def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: - dest["redshift_cpu_denorm"] = self.overall_cpu_denorm + dest["redshift_predicted_cpu_denorm"] = self.overall_cpu_denorm dest.update(self.debug_values) diff --git a/tests/test_aurora_scoring.py b/tests/test_aurora_scoring.py new file mode 100644 index 00000000..c0fa8547 --- /dev/null +++ b/tests/test_aurora_scoring.py @@ -0,0 +1,267 @@ +import pytest +from datetime import timedelta + +from brad.blueprint import Blueprint +from brad.config.engine import Engine +from brad.config.planner import PlannerConfig +from brad.planner.metrics import Metrics +from brad.planner.scoring.context import ScoringContext +from brad.planner.scoring.performance.unified_aurora import AuroraProvisioningScore +from brad.planner.scoring.provisioning import Provisioning +from brad.planner.workload import Workload +from brad.planner.workload.query import Query +from brad.routing.router import FullRoutingPolicy +from brad.routing.round_robin import RoundRobin + + +def get_fixtures( + writer_cpu: float, + reader_cpu: float, + writer_load: float, + reader_load: float, + aurora_prov: Provisioning, +) -> ScoringContext: + metrics = Metrics( + redshift_cpu_avg=0.0, + aurora_writer_cpu_avg=writer_cpu, + aurora_reader_cpu_avg=reader_cpu, + aurora_writer_buffer_hit_pct_avg=100.0, + aurora_reader_buffer_hit_pct_avg=100.0, + aurora_writer_load_minute_avg=writer_load, + aurora_reader_load_minute_avg=reader_load, + txn_completions_per_s=10.0, + txn_lat_s_p50=0.010, + txn_lat_s_p90=0.020, + ) + planner_config = PlannerConfig( + { + "aurora_initialize_load_fraction": 0.25, + } + ) + workload = Workload(timedelta(hours=1), [Query("SELECT 1")], [], {}) + blueprint = Blueprint( + "test", + [], + {}, + aurora_prov, + Provisioning("dc2.large", 0), + FullRoutingPolicy([], RoundRobin()), + ) + ctx = ScoringContext("test", blueprint, workload, workload, metrics, planner_config) + return ctx + + +def test_single_node() -> None: + curr_prov = Provisioning("db.r6g.xlarge", 1) + next_prov = Provisioning("db.r6g.2xlarge", 1) + ctx = get_fixtures( + writer_cpu=100.0, + reader_cpu=0.0, + writer_load=4.0, + reader_load=0.0, + aurora_prov=curr_prov, + ) + + # Scale up with no change in workload. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(4.0) + + # Scale up with 2x increase in analytical workload. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 2.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(8.0) + assert ana_load == pytest.approx(8.0) + + # Scale up with a 0.5x increase (i.e., a decrease) in the analytical + # workload. We stay conservative here and do not modify the loads. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 0.5, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(4.0) + + # No queries executed previously. But now we are executing queries on + # Aurora. + ctx.current_query_locations[Engine.Aurora].append(0) + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, None, ctx + ) + assert txn_cpu_denorm == pytest.approx( + 4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 + ) + assert ana_load == pytest.approx( + 4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 + ) + + +def test_single_to_multi() -> None: + curr_prov = Provisioning("db.r6g.xlarge", 1) + next_prov = Provisioning("db.r6g.xlarge", 2) + ctx = get_fixtures( + writer_cpu=100.0, + reader_cpu=0.0, + writer_load=4.0, + reader_load=0.0, + aurora_prov=curr_prov, + ) + + # Scale up with no change in workload. + # We are conservative here and assume the same load is replicated across + # both nodes. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(4.0) + + # Scale up with 2x more load. + # Transaction load should be unchanged because we move analytics to a + # replica. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 2.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(8.0) + + # Scale up with 0.5x more load. + # We leave the loads unchanged because we are conservative. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 0.5, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(4.0) + + # 2 replicas. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, Provisioning("db.r6g.xlarge", 3), 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + # Should assume analytical load is split across the replicas. + assert ana_load == pytest.approx(2.0) + + # No queries executed previously. But now we are executing queries on + # Aurora. + ctx.current_query_locations[Engine.Aurora].append(0) + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, None, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx( + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 + ) + + +def test_multi_to_single() -> None: + curr_prov = Provisioning("db.r6g.xlarge", 2) + next_prov = Provisioning("db.r6g.xlarge", 1) + + ctx = get_fixtures( + writer_cpu=50.0, + reader_cpu=25.0, + writer_load=2.0, + reader_load=1.0, + aurora_prov=curr_prov, + ) + + # Scale down with no change in workload. + # All the load should be concentrated on the single node. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(3.0) + assert ana_load == pytest.approx(3.0) + + # Scale down with 2x more load. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 2.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0 + 2.0 * 1.0) + assert ana_load == pytest.approx(2.0 + 2.0 * 1.0) + + # Scale down with 0.5x more load. + # We stay conservative here. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 0.5, ctx + ) + assert txn_cpu_denorm == pytest.approx(3.0) + assert ana_load == pytest.approx(3.0) + + # Multiple replicas (2) down to one node. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + Provisioning("db.r6g.xlarge", 3), next_prov, 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(4.0) + assert ana_load == pytest.approx(4.0) + + # Special case (no queries executed previously and now we are executing + # queries). Should be rare because there are read replicas. + ctx.current_query_locations[Engine.Aurora].append(0) + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, None, ctx + ) + assert txn_cpu_denorm == pytest.approx( + 2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 + ) + assert ana_load == pytest.approx( + 2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 + ) + + +def test_multi_to_multi() -> None: + # Doubling the number of replicas (2 to 4). + curr_prov = Provisioning("db.r6g.xlarge", 3) + next_prov = Provisioning("db.r6g.xlarge", 5) + + ctx = get_fixtures( + writer_cpu=50.0, + reader_cpu=25.0, + writer_load=2.0, + reader_load=1.0, + aurora_prov=curr_prov, + ) + + # Scale up with no change in workload. + # Load should be spread out. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0) + assert ana_load == pytest.approx(0.5) + + # Scale up with a 2x change in workload. + # Load should be spread out. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 2.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0) + assert ana_load == pytest.approx(1.0) + + # Scale down with a 0.5x change in workload. + # Load should be spread out. + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, 0.5, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0) + assert ana_load == pytest.approx(0.5) # Stay conservative. + + # Decrease the number of replicas (2 to 1). + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, Provisioning("db.r6g.xlarge", 2), 1.0, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0) + assert ana_load == pytest.approx(2.0) + + # Special case (no queries executed previously and now we are executing + # queries). Should be rare because there are read replicas. + ctx.current_query_locations[Engine.Aurora].append(0) + txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads( + curr_prov, next_prov, None, ctx + ) + assert txn_cpu_denorm == pytest.approx(2.0) + assert ana_load == pytest.approx( + ctx.planner_config.aurora_initialize_load_fraction() * 4.0 * 2.0 / 4.0 + ) diff --git a/tests/test_redshift_scoring.py b/tests/test_redshift_scoring.py new file mode 100644 index 00000000..a87c7599 --- /dev/null +++ b/tests/test_redshift_scoring.py @@ -0,0 +1,128 @@ +import pytest +from datetime import timedelta + +from brad.blueprint import Blueprint +from brad.config.engine import Engine +from brad.config.planner import PlannerConfig +from brad.planner.metrics import Metrics +from brad.planner.scoring.context import ScoringContext +from brad.planner.scoring.performance.unified_redshift import RedshiftProvisioningScore +from brad.planner.scoring.provisioning import Provisioning +from brad.planner.workload import Workload +from brad.planner.workload.query import Query +from brad.routing.router import FullRoutingPolicy +from brad.routing.round_robin import RoundRobin + + +def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringContext: + metrics = Metrics( + redshift_cpu_avg=redshift_cpu, + aurora_writer_cpu_avg=0.0, + aurora_reader_cpu_avg=0.0, + aurora_writer_buffer_hit_pct_avg=100.0, + aurora_reader_buffer_hit_pct_avg=100.0, + aurora_writer_load_minute_avg=0.0, + aurora_reader_load_minute_avg=0.0, + txn_completions_per_s=10.0, + txn_lat_s_p50=0.010, + txn_lat_s_p90=0.020, + ) + planner_config = PlannerConfig( + { + "redshift_initialize_load_fraction": 0.25, + } + ) + workload = Workload(timedelta(hours=1), [Query("SELECT 1")], [], {}) + blueprint = Blueprint( + "test", + [], + {}, + Provisioning("db.r6g.xlarge", 1), + redshift_prov, + FullRoutingPolicy([], RoundRobin()), + ) + ctx = ScoringContext("test", blueprint, workload, workload, metrics, planner_config) + return ctx + + +def test_off_to_off() -> None: + curr_prov = Provisioning("dc2.large", 0) + next_prov = Provisioning("dc2.large", 0) + ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov) + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, 1.0, ctx + ) + assert cpu_denorm == pytest.approx(0.0) + + +def test_on_to_off() -> None: + curr_prov = Provisioning("dc2.large", 2) + next_prov = Provisioning("dc2.large", 0) + ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov) + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, 1.0, ctx + ) + assert cpu_denorm == pytest.approx(0.0) + + +def test_off_to_on() -> None: + curr_prov = Provisioning("dc2.large", 0) + next_prov = Provisioning("dc2.large", 2) + ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov) + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, None, ctx + ) + # Special case: we prime the load with a fraction. + assert cpu_denorm == pytest.approx( + 2.0 * 2.0 * ctx.planner_config.redshift_initialize_load_fraction() + ) + + +def test_on_to_on() -> None: + curr_prov = Provisioning("dc2.large", 2) + next_prov = Provisioning("dc2.large", 4) + ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov) + + # Scale up, no movement. + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, 1.0, ctx + ) + assert cpu_denorm == pytest.approx(2.0) + + # Scale up, 2x movement. + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, 2.0, ctx + ) + assert cpu_denorm == pytest.approx(4.0) + + # Scale up, 0.5x movement (we stay conservative). + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, 0.5, ctx + ) + assert cpu_denorm == pytest.approx(2.0) + + # Scale down, no movement. + smaller_prov = Provisioning("dc2.large", 1) + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, smaller_prov, 1.0, ctx + ) + assert cpu_denorm == pytest.approx(2.0) + + # Scale down, 2x movement. + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, smaller_prov, 2.0, ctx + ) + assert cpu_denorm == pytest.approx(4.0) + + # Scale down, 0.5x movement (stay conservative). + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, smaller_prov, 0.5, ctx + ) + assert cpu_denorm == pytest.approx(2.0) + + # Special case (no queries executed before, but now there are queries). + ctx.current_query_locations[Engine.Redshift].append(0) + cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm( + curr_prov, next_prov, None, ctx + ) + assert cpu_denorm == pytest.approx(1.0)