diff --git a/config/planner.yml b/config/planner.yml index fd0194d2..afddc5e0 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -14,6 +14,7 @@ reinterpret_second_as: 1 # to be accepted. query_dist_change_frac: 0.1 +# Used to aggregate metrics collected in the planning window. metrics_agg: method: ewm # 'mean' is another option alpha: 0.86466472 # 1 - 1 / e^2 @@ -98,15 +99,21 @@ aurora_per_instance_change_time_s: 300 # 5 minutes redshift_elastic_resize_time_s: 900 # 15 minutes - https://repost.aws/knowledge-center/resize-redshift-cluster redshift_classic_resize_time_s: 7200 # 2 hours (This is very difficult to estimate; it depends on state we have no visibility into.) -redshift_extract_rate_mb_per_s: 10.0 -redshift_load_rate_mb_per_s: 10.0 +# Minimum of 5 seconds per table, but this amount is negligible since we work +# with relatively large tables. +redshift_extract_rate_mb_per_s: 30.0 +redshift_load_rate_mb_per_s: 20.0 -aurora_extract_rate_mb_per_s: 10.0 -aurora_load_rate_mb_per_s: 10.0 +# There is a minimum of 1 second per table. But this is negligible since we work +# with relatively large tables. +aurora_extract_rate_mb_per_s: 12.0 +aurora_load_rate_mb_per_s: 12.0 # Extracting / loading in Athena corresponds to a conversion to/from Iceberg. -athena_extract_rate_mb_per_s: 10.0 -athena_load_rate_mb_per_s: 10.0 +athena_extract_rate_mb_per_s: 15.0 +# From experiments: 820 MB/s * data size + 2.2 s +# But we just ignore the 2.2 for simplicity as we work with large tables. +athena_load_rate_mb_per_s: 820.0 # Storage costs. s3_usd_per_mb_per_month: 0.000023 diff --git a/config/temp_config_sample.yml b/config/temp_config_sample.yml index cbc7187d..4dab3636 100644 --- a/config/temp_config_sample.yml +++ b/config/temp_config_sample.yml @@ -1,7 +1,18 @@ -latency_ceiling_s: 30.0 +query_latency_p90_ceiling_s: 30.0 txn_latency_p50_ceiling_s: 0.020 # Currently unused. txn_latency_p90_ceiling_s: 0.030 +comparator: + type: perf_ceiling # or `benefit_perf_ceiling` + + benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + + penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator + # Use this instead of the individual paths below. std_dataset_path: workloads/IMDB_20GB/regular_test/ diff --git a/src/brad/config/temp_config.py b/src/brad/config/temp_config.py index 37f63097..a5ebe82e 100644 --- a/src/brad/config/temp_config.py +++ b/src/brad/config/temp_config.py @@ -1,6 +1,7 @@ import pathlib import yaml from typing import Any, Dict, Optional +from datetime import timedelta class TempConfig: @@ -16,8 +17,8 @@ def load_from_file(cls, file_path: str | pathlib.Path) -> "TempConfig": def __init__(self, raw: Dict[str, Any]) -> None: self._raw = raw - def latency_ceiling_s(self) -> float: - return float(self._raw["latency_ceiling_s"]) + def query_latency_p90_ceiling_s(self) -> float: + return float(self._raw["query_latency_p90_ceiling_s"]) def txn_latency_p50_ceiling_s(self) -> float: return float(self._raw["txn_latency_p50_ceiling_s"]) @@ -25,6 +26,21 @@ def txn_latency_p50_ceiling_s(self) -> float: def txn_latency_p90_ceiling_s(self) -> float: return float(self._raw["txn_latency_p90_ceiling_s"]) + def comparator_type(self) -> str: + return self._raw["comparator"]["type"] + + def benefit_horizon(self) -> timedelta: + period = self._raw["comparator"]["benefit_horizon"] + return timedelta( + weeks=period["weeks"], + days=period["days"], + hours=period["hours"], + minutes=period["minutes"], + ) + + def penalty_threshold(self) -> float: + return self._raw["comparator"]["penalty_threshold"] + def std_dataset_path(self) -> Optional[pathlib.Path]: if "std_dataset_path" not in self._raw: return None diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 980b5f4f..f4db4405 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -32,7 +32,11 @@ from brad.data_sync.execution.executor import DataSyncExecutor from brad.front_end.start_front_end import start_front_end from brad.planner.abstract import BlueprintPlanner -from brad.planner.compare.provider import PerformanceCeilingComparatorProvider +from brad.planner.compare.provider import ( + BlueprintComparatorProvider, + PerformanceCeilingComparatorProvider, + BenefitPerformanceCeilingComparatorProvider, +) from brad.planner.estimator import EstimatorProvider from brad.planner.factory import BlueprintPlannerFactory from brad.planner.metrics import WindowedMetricsFromMonitor @@ -181,10 +185,21 @@ async def _run_setup(self) -> None: aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(), athena_accessed_bytes_path=self._temp_config.athena_data_access_path(), ) - comparator_provider = PerformanceCeilingComparatorProvider( - self._temp_config.latency_ceiling_s(), - self._temp_config.txn_latency_p90_ceiling_s(), - ) + + if self._temp_config.comparator_type() == "benefit_perf_ceiling": + comparator_provider: BlueprintComparatorProvider = ( + BenefitPerformanceCeilingComparatorProvider( + self._temp_config.query_latency_p90_ceiling_s(), + self._temp_config.txn_latency_p90_ceiling_s(), + self._temp_config.benefit_horizon(), + self._temp_config.penalty_threshold(), + ) + ) + else: + comparator_provider = PerformanceCeilingComparatorProvider( + self._temp_config.query_latency_p90_ceiling_s(), + self._temp_config.txn_latency_p90_ceiling_s(), + ) else: logger.warning( "TempConfig not provided. The planner will not be able to run correctly." diff --git a/src/brad/planner/abstract.py b/src/brad/planner/abstract.py index 59e3130d..a82af7a1 100644 --- a/src/brad/planner/abstract.py +++ b/src/brad/planner/abstract.py @@ -54,8 +54,6 @@ def __init__( for t in self._triggers: t.update_blueprint(self._current_blueprint, self._current_blueprint_score) - self._comparator = self._providers.comparator_provider.get_comparator() - async def run_forever(self) -> None: """ Called to start the planner. The planner is meant to run until its task diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index 61ec6bb5..be20b433 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -135,7 +135,16 @@ async def _run_replan_impl( ) await ctx.simulate_current_workload_routing(planning_router) ctx.compute_engine_latency_norm_factor() + ctx.compute_current_workload_predicted_hourly_scan_cost() + ctx.compute_current_blueprint_provisioning_hourly_cost() + comparator = self._providers.comparator_provider.get_comparator( + metrics, + curr_hourly_cost=( + ctx.current_workload_predicted_hourly_scan_cost + + ctx.current_blueprint_provisioning_hourly_cost + ), + ) beam_size = self._planner_config.beam_size() first_query_idx = query_indices[0] first_query = analytical_queries[first_query_idx] @@ -145,9 +154,7 @@ async def _run_replan_impl( for routing_engine in Engine.from_bitmap( planning_router.run_functionality_routing(first_query) ): - candidate = BlueprintCandidate.based_on( - self._current_blueprint, self._comparator - ) + candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator) candidate.add_transactional_tables(ctx) candidate.add_query( first_query_idx, diff --git a/src/brad/planner/beam/query_based_legacy.py b/src/brad/planner/beam/query_based_legacy.py index f3d1f695..f7d81cd6 100644 --- a/src/brad/planner/beam/query_based_legacy.py +++ b/src/brad/planner/beam/query_based_legacy.py @@ -124,7 +124,16 @@ async def _run_replan_impl( ) await ctx.simulate_current_workload_routing(planning_router) ctx.compute_engine_latency_norm_factor() + ctx.compute_current_workload_predicted_hourly_scan_cost() + ctx.compute_current_blueprint_provisioning_hourly_cost() + comparator = self._providers.comparator_provider.get_comparator( + metrics, + curr_hourly_cost=( + ctx.current_workload_predicted_hourly_scan_cost + + ctx.current_blueprint_provisioning_hourly_cost + ), + ) beam_size = self._planner_config.beam_size() engines = [Engine.Aurora, Engine.Redshift, Engine.Athena] first_query_idx = query_indices[0] @@ -136,9 +145,7 @@ async def _run_replan_impl( # 4. Initialize the top-k set (beam). for routing_engine in engines: - candidate = BlueprintCandidate.based_on( - self._current_blueprint, self._comparator - ) + candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator) candidate.add_transactional_tables(ctx) query = analytical_queries[first_query_idx] candidate.add_query( diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index 4d7aaf20..cb603e54 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -132,7 +132,16 @@ async def _run_replan_impl( ) await ctx.simulate_current_workload_routing(planning_router) ctx.compute_engine_latency_norm_factor() + ctx.compute_current_workload_predicted_hourly_scan_cost() + ctx.compute_current_blueprint_provisioning_hourly_cost() + comparator = self._providers.comparator_provider.get_comparator( + metrics, + curr_hourly_cost=( + ctx.current_workload_predicted_hourly_scan_cost + + ctx.current_blueprint_provisioning_hourly_cost + ), + ) beam_size = self._planner_config.beam_size() placement_options = self._get_table_placement_options_bitmap() first_cluster = clusters[0] @@ -144,9 +153,7 @@ async def _run_replan_impl( # 4. Initialize the top-k set (beam). for placement_bitmap in placement_options: - candidate = BlueprintCandidate.based_on( - self._current_blueprint, self._comparator - ) + candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator) candidate.add_transactional_tables(ctx) tables, queries, _ = first_cluster placement_changed = candidate.add_placement(placement_bitmap, tables, ctx) diff --git a/src/brad/planner/compare/cost.py b/src/brad/planner/compare/cost.py index 6be748e8..abc1e2a6 100644 --- a/src/brad/planner/compare/cost.py +++ b/src/brad/planner/compare/cost.py @@ -85,7 +85,7 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo def best_cost_under_perf_ceilings( - max_query_latency_s: float, + max_query_p90_latency_s: float, max_txn_p90_latency_s: float, ) -> BlueprintComparator: def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> bool: @@ -110,18 +110,16 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo return True # Query latency ceilings. - # left_lat = _get_or_compute_p99_latency(left) - # right_lat = _get_or_compute_p99_latency(right) - left_lat = _get_or_compute_nth_largest_latency(left, 2) - right_lat = _get_or_compute_nth_largest_latency(right, 2) + left_lat = _get_or_compute_p90_latency(left) + right_lat = _get_or_compute_p90_latency(right) - if left_lat > max_query_latency_s and right_lat > max_query_latency_s: + if left_lat > max_query_p90_latency_s and right_lat > max_query_p90_latency_s: # Both are above the latency ceiling. # So the better blueprint is the one that does better on performance. return left_lat < right_lat - elif left_lat > max_query_latency_s: + elif left_lat > max_query_p90_latency_s: return False - elif right_lat > max_query_latency_s: + elif right_lat > max_query_p90_latency_s: return True # Both are under the performance ceilings. @@ -208,22 +206,22 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo def _get_or_compute_geomean_latency(bp: ComparableBlueprint) -> float: - stored = bp.get_memoized_value("geomean_latency") + stored = bp.get_memoized_value("query_geomean_latency") if stored is not None: return stored else: geomean_lat = np.exp(np.log(bp.get_predicted_analytical_latencies()).mean()) - bp.set_memoized_value("geomean_latency", geomean_lat) + bp.set_memoized_value("query_geomean_latency", geomean_lat) return geomean_lat def _get_or_compute_max_latency(bp: ComparableBlueprint) -> float: - stored = bp.get_memoized_value("max_latency") + stored = bp.get_memoized_value("query_max_latency") if stored is not None: return stored else: max_lat = bp.get_predicted_analytical_latencies().max() - bp.set_memoized_value("max_latency", max_lat) + bp.set_memoized_value("query_max_latency", max_lat) return max_lat @@ -234,27 +232,39 @@ def _get_or_compute_nth_largest_latency(bp: ComparableBlueprint, n: int) -> floa else: actual_n = n - stored = bp.get_memoized_value(f"{actual_n}_largest") + stored = bp.get_memoized_value(f"query_{actual_n}_largest") if stored is not None: return stored nth_largest = np.partition(pred_lats, -actual_n)[-actual_n] - bp.set_memoized_value(f"{actual_n}_largest", nth_largest) + bp.set_memoized_value(f"query_{actual_n}_largest", nth_largest) return nth_largest def _get_or_compute_p99_latency(bp: ComparableBlueprint) -> float: - stored = bp.get_memoized_value("p99_latency") + stored = bp.get_memoized_value("query_p99_latency") if stored is not None: return stored else: p99_lat = np.quantile( bp.get_predicted_analytical_latencies(), 0.99, method="lower" ) - bp.set_memoized_value("p99_latency", p99_lat) + bp.set_memoized_value("query_p99_latency", p99_lat) return p99_lat +def _get_or_compute_p90_latency(bp: ComparableBlueprint) -> float: + stored = bp.get_memoized_value("query_p90_latency") + if stored is not None: + return stored + else: + p90_lat = np.quantile( + bp.get_predicted_analytical_latencies(), 0.90, method="lower" + ) + bp.set_memoized_value("query_p90_latency", p90_lat) + return p90_lat + + def _compute_max_ratio(value1: float, value2: float) -> float: if value1 == 0.0 or value2 == 0.0: return np.inf diff --git a/src/brad/planner/compare/cost_with_benefit.py b/src/brad/planner/compare/cost_with_benefit.py new file mode 100644 index 00000000..fd0938b5 --- /dev/null +++ b/src/brad/planner/compare/cost_with_benefit.py @@ -0,0 +1,188 @@ +import math +import numpy as np +from datetime import timedelta +from typing import Optional + +from .blueprint import ComparableBlueprint +from .function import BlueprintComparator + + +def best_cost_under_perf_ceilings_with_benefit_horizon( + max_query_p90_latency_s: float, + max_txn_p90_latency_s: float, + curr_query_p90_latency_s: float, + curr_txn_p90_latency_s: float, + curr_hourly_cost: float, + benefit_horizon: timedelta, + penalty_threshold: float, +) -> BlueprintComparator: + def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> bool: + # Transactional latency ceilings (feasibility check). + result = _txn_p90_ceiling(left, right, max_txn_p90_latency_s) + if result is not None: + return result + + # Query latency ceilings (feasibility check). + result = _query_p90_ceiling(left, right, max_query_p90_latency_s) + if result is not None: + return result + + # Transition times (feasibility check). + result = _transition_under_benefit_horizon(left, right, benefit_horizon) + if result is not None: + return result + + # We compute a scalar score for each blueprint (lower is better): + # + # Score = Penalty * C_0 * T + C * (B - T) + # Penalty = 1 + max(0, max(curr_query_p90 / max_query_p90, curr_txn_p90 / max_txn_p90) - threshold) + # Threshold = 0.8 (hyperparameter) + # + # Notation: + # C_0 is the operating cost of the current blueprint ($/h) + # C is the operating cost of the proposed blueprint ($/h) + # T is the transition time (h) + # B is the benefit horizon (h) + # + # Intuition: The penalty reweighs the first term when we are close to + # exceeding the performance objectives to encourage selecting blueprints + # that are faster to transition to. + penalty_multiplier = _compute_penalty_multiplier( + max_query_p90_latency_s, + max_txn_p90_latency_s, + curr_query_p90_latency_s, + curr_txn_p90_latency_s, + penalty_threshold, + ) + + left_score = _compute_scalar_score( + left.get_transition_time_s(), + left.get_transition_cost(), + curr_hourly_cost, + left.get_operational_monetary_cost(), + benefit_horizon, + penalty_multiplier, + ) + right_score = _compute_scalar_score( + left.get_transition_time_s(), + left.get_transition_cost(), + curr_hourly_cost, + left.get_operational_monetary_cost(), + benefit_horizon, + penalty_multiplier, + ) + + # For debugging purposes. + left.set_memoized_value("benefit_penalty_multiplier", penalty_multiplier) + right.set_memoized_value("benefit_penalty_multiplier", penalty_multiplier) + + return left_score < right_score + + return is_better_than + + +def _get_or_compute_query_p90_latency(bp: ComparableBlueprint) -> float: + stored = bp.get_memoized_value("query_p90_latency") + if stored is not None: + return stored + else: + p90_lat = np.quantile( + bp.get_predicted_analytical_latencies(), 0.9, method="lower" + ) + bp.set_memoized_value("query_p90_latency", p90_lat) + return p90_lat + + +def _txn_p90_ceiling( + left: ComparableBlueprint, right: ComparableBlueprint, txn_lat_s_p90: float +) -> Optional[bool]: + # Check transactional latency ceilings first. + left_txn_p90 = left.get_predicted_transactional_latencies()[1] + right_txn_p90 = right.get_predicted_transactional_latencies()[1] + + # If one of these candidates have NaN predictions, we need to + # consider other factors. NaN indicates that a prediction is not + # available (e.g., due to missing metrics). + if not math.isnan(left_txn_p90) and not math.isnan(right_txn_p90): + # Both above the ceiling, return the blueprint that does better on + # performance. + if left_txn_p90 > txn_lat_s_p90 and right_txn_p90 > txn_lat_s_p90: + return left_txn_p90 < right_txn_p90 + elif left_txn_p90 > txn_lat_s_p90: + return False + elif right_txn_p90 > txn_lat_s_p90: + return True + + return None + + +def _query_p90_ceiling( + left: ComparableBlueprint, right: ComparableBlueprint, query_ceiling_s: float +) -> Optional[bool]: + # Query latency ceilings. + left_lat = _get_or_compute_query_p90_latency(left) + right_lat = _get_or_compute_query_p90_latency(right) + + if left_lat > query_ceiling_s and right_lat > query_ceiling_s: + # Both are above the latency ceiling. + # So the better blueprint is the one that does better on performance. + return left_lat < right_lat + elif left_lat > query_ceiling_s: + return False + elif right_lat > query_ceiling_s: + return True + + return None + + +def _transition_under_benefit_horizon( + left: ComparableBlueprint, right: ComparableBlueprint, payoff_period: timedelta +) -> Optional[bool]: + left_tr_s = left.get_transition_time_s() + right_tr_s = right.get_transition_time_s() + + # If either are above the payoff period, return the blueprint that does + # better on transition time. + if ( + left_tr_s > payoff_period.total_seconds() + and right_tr_s > payoff_period.total_seconds() + ): + return left_tr_s < right_tr_s + elif left_tr_s > payoff_period.total_seconds(): + return False + elif right_tr_s > payoff_period.total_seconds(): + return True + + return None + + +def _compute_penalty_multiplier( + max_query_p90_latency_s: float, + max_txn_p90_latency_s: float, + curr_query_p90_latency_s: float, + curr_txn_p90_latency_s: float, + threshold: float, +) -> float: + query_val = curr_query_p90_latency_s / max_query_p90_latency_s + txn_val = curr_txn_p90_latency_s / max_txn_p90_latency_s + return 1.0 + max(0.0, max(query_val, txn_val) - threshold) + + +def _compute_scalar_score( + transition_time_s: float, + transition_cost: float, + curr_hourly_cost: float, + next_hourly_cost: float, + benefit_horizon: timedelta, + penalty_multiplier: float, +) -> float: + leftover_time_s = benefit_horizon.total_seconds() - transition_time_s + assert leftover_time_s > 0.0 + leftover_time_hr = leftover_time_s / 60.0 / 60.0 + transition_time_hr = transition_time_s / 60.0 / 60.0 + # Lower is better. + return ( + transition_time_hr * curr_hourly_cost * penalty_multiplier + + transition_cost + + leftover_time_hr * next_hourly_cost + ) diff --git a/src/brad/planner/compare/provider.py b/src/brad/planner/compare/provider.py index c33d429c..f1f58756 100644 --- a/src/brad/planner/compare/provider.py +++ b/src/brad/planner/compare/provider.py @@ -1,5 +1,11 @@ +from datetime import timedelta + from brad.planner.compare.function import BlueprintComparator from brad.planner.compare.cost import best_cost_under_perf_ceilings +from brad.planner.compare.cost_with_benefit import ( + best_cost_under_perf_ceilings_with_benefit_horizon, +) +from brad.planner.metrics import Metrics class BlueprintComparatorProvider: @@ -9,7 +15,9 @@ class BlueprintComparatorProvider: comparator captures state (it is a closure) and cannot be pickled. """ - def get_comparator(self) -> BlueprintComparator: + def get_comparator( + self, metrics: Metrics, curr_hourly_cost: float + ) -> BlueprintComparator: raise NotImplementedError @@ -20,7 +28,36 @@ def __init__( self._max_query_latency_s = max_query_latency_s self._max_txn_p90_latency_s = max_txn_p90_latency_s - def get_comparator(self) -> BlueprintComparator: + def get_comparator( + self, metrics: Metrics, curr_hourly_cost: float + ) -> BlueprintComparator: return best_cost_under_perf_ceilings( self._max_query_latency_s, self._max_txn_p90_latency_s ) + + +class BenefitPerformanceCeilingComparatorProvider(BlueprintComparatorProvider): + def __init__( + self, + max_query_p90_latency_s: float, + max_txn_p90_latency_s: float, + benefit_horizon: timedelta, + threshold: float, + ) -> None: + self._max_query_p90_latency_s = max_query_p90_latency_s + self._max_txn_p90_latency_s = max_txn_p90_latency_s + self._benefit_horizon = benefit_horizon + self._threshold = threshold + + def get_comparator( + self, metrics: Metrics, curr_hourly_cost: float + ) -> BlueprintComparator: + return best_cost_under_perf_ceilings_with_benefit_horizon( + max_query_p90_latency_s=self._max_query_p90_latency_s, + max_txn_p90_latency_s=self._max_txn_p90_latency_s, + curr_query_p90_latency_s=metrics.query_lat_s_p90, + curr_txn_p90_latency_s=metrics.txn_lat_s_p90, + curr_hourly_cost=curr_hourly_cost, + benefit_horizon=self._benefit_horizon, + penalty_threshold=self._threshold, + ) diff --git a/src/brad/planner/metrics.py b/src/brad/planner/metrics.py index 5b3bbffa..75491c03 100644 --- a/src/brad/planner/metrics.py +++ b/src/brad/planner/metrics.py @@ -28,7 +28,10 @@ "txn_completions_per_s", "txn_lat_s_p50", "txn_lat_s_p90", + "query_lat_s_p50", + "query_lat_s_p90", ], + defaults=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], ) AggCfg = Dict[str, Any] @@ -106,7 +109,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: if redshift.empty and aurora_writer.empty and front_end.empty: logger.warning("All metrics are empty.") return ( - Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), + Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0), universal_now(), ) @@ -185,6 +188,26 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: agg_cfg=agg_cfg, name="txn_lat_s_p90", ) + query_lat_s_p50 = self._aggregate_possibly_missing( + front_end.loc[ + front_end.index <= most_recent_common, + FrontEndMetric.QueryLatencySecondP50.value, + ], + num_epochs=aggregate_epochs, + default_value=0.0, + agg_cfg=agg_cfg, + name="query_lat_s_p50", + ) + query_lat_s_p90 = self._aggregate_possibly_missing( + front_end.loc[ + front_end.index <= most_recent_common, + FrontEndMetric.QueryLatencySecondP90.value, + ], + num_epochs=aggregate_epochs, + default_value=0.0, + agg_cfg=agg_cfg, + name="query_lat_s_p90", + ) aurora_writer_rel = aurora_writer.loc[aurora_writer.index <= most_recent_common] aurora_reader_rel = aurora_reader.loc[aurora_reader.index <= most_recent_common] @@ -238,6 +261,8 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: txn_completions_per_s=txn_per_s, txn_lat_s_p50=txn_lat_s_p50, txn_lat_s_p90=txn_lat_s_p90, + query_lat_s_p50=query_lat_s_p50, + query_lat_s_p90=query_lat_s_p90, ), most_recent_common.to_pydatetime(), ) @@ -339,7 +364,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: if redshift.empty and aurora_writer.empty and front_end.empty: logger.warning("All metrics are empty.") return ( - Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), + Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0), universal_now(), ) @@ -400,6 +425,22 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: default_value=0.0, name="txn_lat_s_p90", ) + query_lat_s_p50 = self._extract_most_recent_possibly_missing( + front_end.loc[ + front_end.index <= most_recent_common, + FrontEndMetric.QueryLatencySecondP50.value, + ], + default_value=0.0, + name="query_lat_s_p50", + ) + query_lat_s_p90 = self._extract_most_recent_possibly_missing( + front_end.loc[ + front_end.index <= most_recent_common, + FrontEndMetric.QueryLatencySecondP90.value, + ], + default_value=0.0, + name="query_lat_s_p90", + ) aurora_writer_rel = aurora_writer.loc[aurora_writer.index <= most_recent_common] aurora_reader_rel = aurora_reader.loc[aurora_reader.index <= most_recent_common] @@ -445,6 +486,8 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: txn_completions_per_s=txn_per_s, txn_lat_s_p50=txn_lat_s_p50, txn_lat_s_p90=txn_lat_s_p90, + query_lat_s_p50=query_lat_s_p50, + query_lat_s_p90=query_lat_s_p90, ), most_recent_common.to_pydatetime(), ) @@ -572,4 +615,6 @@ def _fill_empty_metrics(to_fill: pd.DataFrame, guide: pd.DataFrame) -> pd.DataFr FrontEndMetric.TxnEndPerSecond.value, FrontEndMetric.TxnLatencySecondP50.value, FrontEndMetric.TxnLatencySecondP90.value, + FrontEndMetric.QueryLatencySecondP50.value, + FrontEndMetric.QueryLatencySecondP90.value, ] diff --git a/src/brad/planner/scoring/context.py b/src/brad/planner/scoring/context.py index d2c20bfc..e8de0fcb 100644 --- a/src/brad/planner/scoring/context.py +++ b/src/brad/planner/scoring/context.py @@ -1,6 +1,7 @@ import logging import numpy as np from typing import Dict, List +from datetime import timedelta from brad.config.engine import Engine from brad.blueprint import Blueprint @@ -8,6 +9,11 @@ from brad.planner.metrics import Metrics from brad.planner.workload import Workload from brad.routing.router import Router +from brad.planner.scoring.provisioning import compute_athena_scan_cost_numpy +from brad.planner.scoring.provisioning import ( + compute_aurora_hourly_operational_cost, + compute_redshift_hourly_operational_cost, +) logger = logging.getLogger(__name__) @@ -38,6 +44,11 @@ def __init__( self.current_query_locations[Engine.Redshift] = [] self.current_query_locations[Engine.Athena] = [] + # TODO: This is messy - we should have one place for blueprint scoring + # relative to a workload. + self.current_workload_predicted_hourly_scan_cost = 0.0 + self.current_blueprint_provisioning_hourly_cost = 0.0 + # This is used for reweighing metrics due to query routing changes # across blueprints. self.engine_latency_norm_factor: Dict[Engine, float] = {} @@ -66,6 +77,40 @@ async def simulate_current_workload_routing(self, router: Router) -> None: eng = await router.engine_for(query) self.current_query_locations[eng].append(qidx) + def compute_current_workload_predicted_hourly_scan_cost(self) -> None: + # Note that this is not ideal. We should use the actual _recorded_ scan + # cost (but this is not readily recorded through PyAthena). + # Athena bytes: + bytes_accessed = ( + self.current_workload.get_predicted_athena_bytes_accessed_batch( + self.current_query_locations[Engine.Athena] + ) + ) + arrival_counts = self.current_workload.get_arrival_counts_batch( + self.current_query_locations[Engine.Athena] + ) + period_scan_cost = compute_athena_scan_cost_numpy( + bytes_accessed, arrival_counts, self.planner_config + ) + scaling_factor = ( + timedelta(hours=1).total_seconds() + / self.current_workload.period().total_seconds() + ) + self.current_workload_predicted_hourly_scan_cost = ( + period_scan_cost * scaling_factor + ) + if not self.planner_config.use_io_optimized_aurora(): + logger.warning("Aurora blocks accessed is not implemented.") + + def compute_current_blueprint_provisioning_hourly_cost(self) -> None: + aurora_cost = compute_aurora_hourly_operational_cost( + self.current_blueprint.aurora_provisioning(), self + ) + redshift_cost = compute_redshift_hourly_operational_cost( + self.current_blueprint.redshift_provisioning() + ) + self.current_blueprint_provisioning_hourly_cost = aurora_cost + redshift_cost + def compute_engine_latency_norm_factor(self) -> None: for engine in [Engine.Aurora, Engine.Redshift, Engine.Athena]: if len(self.current_query_locations[engine]) == 0: diff --git a/src/brad/planner/scoring/provisioning.py b/src/brad/planner/scoring/provisioning.py index a02b3718..387dffa5 100644 --- a/src/brad/planner/scoring/provisioning.py +++ b/src/brad/planner/scoring/provisioning.py @@ -1,18 +1,22 @@ import json import math import importlib.resources as pkg_resources +import numpy as np +import numpy.typing as npt from collections import namedtuple -from typing import Dict, Iterable +from typing import Dict, Iterable, TYPE_CHECKING import brad.planner.scoring.data as score_data from brad.blueprint.diff.provisioning import ProvisioningDiff from brad.blueprint.provisioning import Provisioning from brad.config.planner import PlannerConfig -from brad.planner.scoring.context import ScoringContext from brad.planner.workload.query import Query from brad.provisioning.redshift import RedshiftProvisioningManager +if TYPE_CHECKING: + from brad.planner.scoring.context import ScoringContext + ProvisioningResources = namedtuple( "ProvisioningResources", @@ -41,7 +45,7 @@ def _load_instance_specs(file_name: str) -> Dict[str, ProvisioningResources]: def compute_aurora_hourly_operational_cost( - provisioning: Provisioning, ctx: ScoringContext + provisioning: Provisioning, ctx: "ScoringContext" ) -> float: prov = AuroraSpecs[provisioning.instance_type()] if ctx.planner_config.use_io_optimized_aurora(): @@ -87,12 +91,12 @@ def compute_athena_scanned_bytes( ) -> int: # N.B. There is a minimum charge of 10 MB per query. min_bytes_per_query = planner_config.athena_min_mb_per_query() * 1000 * 1000 - total_accessed_bytes = 0 - arrival_counts = 0.0 + total_accessed_bytes = 0.0 for query, accessed_bytes in zip(queries, accessed_bytes_per_query): - total_accessed_bytes += max(accessed_bytes, min_bytes_per_query) - arrival_counts += query.arrival_count() - return max(int(total_accessed_bytes * arrival_counts), 1) + total_accessed_bytes += ( + max(accessed_bytes, min_bytes_per_query) * query.arrival_count() + ) + return max(int(total_accessed_bytes), 1) def compute_athena_scan_cost( @@ -103,6 +107,20 @@ def compute_athena_scan_cost( return accessed_mb * planner_config.athena_usd_per_mb_scanned() +def compute_athena_scan_cost_numpy( + bytes_accessed: npt.NDArray, + arrival_counts: npt.NDArray, + planner_config: PlannerConfig, +) -> float: + # Note we use MB instead of MiB + mb_accessed = bytes_accessed / 1000.0 / 1000.0 + mb_accessed = np.clip( + mb_accessed, a_min=float(planner_config.athena_min_mb_per_query()), a_max=None + ) + total_mb = np.dot(mb_accessed, arrival_counts) + return total_mb * planner_config.athena_usd_per_mb_scanned() + + def compute_aurora_transition_time_s( old: Provisioning, new: Provisioning, planner_config: PlannerConfig ) -> float: diff --git a/tests/test_aurora_scoring.py b/tests/test_aurora_scoring.py index c0fa8547..b5340ed2 100644 --- a/tests/test_aurora_scoring.py +++ b/tests/test_aurora_scoring.py @@ -32,6 +32,8 @@ def get_fixtures( txn_completions_per_s=10.0, txn_lat_s_p50=0.010, txn_lat_s_p90=0.020, + query_lat_s_p50=10.0, + query_lat_s_p90=20.0, ) planner_config = PlannerConfig( { diff --git a/tests/test_redshift_scoring.py b/tests/test_redshift_scoring.py index a87c7599..a0b37787 100644 --- a/tests/test_redshift_scoring.py +++ b/tests/test_redshift_scoring.py @@ -26,6 +26,8 @@ def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringCon txn_completions_per_s=10.0, txn_lat_s_p50=0.010, txn_lat_s_p90=0.020, + query_lat_s_p50=10.0, + query_lat_s_p90=20.0, ) planner_config = PlannerConfig( {