diff --git a/src/brad/planner/triggers/variable_costs.py b/src/brad/planner/triggers/variable_costs.py index 9ac518be..c583535a 100644 --- a/src/brad/planner/triggers/variable_costs.py +++ b/src/brad/planner/triggers/variable_costs.py @@ -3,7 +3,7 @@ import pytz import pandas as pd from datetime import datetime, timedelta -from typing import List +from typing import List, Tuple from .trigger import Trigger from brad.config.engine import Engine @@ -60,7 +60,13 @@ async def should_replan(self) -> bool: ) return False - current_hourly_cost = await self._estimate_current_scan_hourly_cost() + aurora_cost, athena_cost = await self._estimate_current_scan_hourly_cost() + + if self._planner_config.use_io_optimized_aurora(): + current_hourly_cost = athena_cost + else: + current_hourly_cost = athena_cost + aurora_cost + if current_hourly_cost <= 1e-5: # Treated as 0. logger.debug( @@ -85,9 +91,9 @@ async def should_replan(self) -> bool: return False - async def _estimate_current_scan_hourly_cost(self) -> float: + async def _estimate_current_scan_hourly_cost(self) -> Tuple[float, float]: if self._current_blueprint is None: - return 0.0 + return 0.0, 0.0 # Extract the queries seen in the last window. window_end = datetime.now() @@ -107,7 +113,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float: ) ) if len(workload.analytical_queries()) == 0: - return 0.0 + return 0.0, 0.0 self._data_access_provider.apply_access_statistics(workload) # Compute the scan cost of this last window of queries. @@ -120,7 +126,12 @@ async def _estimate_current_scan_hourly_cost(self) -> float: ) for idx, q in enumerate(workload.analytical_queries()): - engine = await router.engine_for(q) + maybe_engine = q.primary_execution_location() + if maybe_engine is None: + engine = await router.engine_for(q) + else: + engine = maybe_engine + if engine == Engine.Aurora: aurora_query_indices.append(idx) aurora_queries.append(q) @@ -128,6 +139,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float: athena_query_indices.append(idx) athena_queries.append(q) + # NOTE: Ideally we use the actual values. aurora_accessed_pages = compute_aurora_accessed_pages( aurora_queries, workload.get_predicted_aurora_pages_accessed_batch(aurora_query_indices), @@ -139,10 +151,11 @@ async def _estimate_current_scan_hourly_cost(self) -> float: ) # We use the hit rate to estimate Aurora scan costs. + # Note that if we are using I/O optimized Aurora, there is no + # incremental scan cost. lookback_epochs = math.ceil( self._planner_config.planning_window() / self._config.epoch_length ) - # TODO: If there are read replicas, we should use the hit rate from them instead. aurora_reader_metrics = self._monitor.aurora_reader_metrics() if len(aurora_reader_metrics) > 0: reader_hit_rates = [] @@ -168,7 +181,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float: athena_scanned_bytes, self._planner_config ) - return aurora_scan_cost + athena_scan_cost + return aurora_scan_cost, athena_scan_cost _HIT_RATE_METRIC = "BufferCacheHitRatio_Average"