Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the variable costs trigger #343

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/brad/planner/triggers/variable_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytz
import pandas as pd
from datetime import datetime, timedelta
from typing import List
from typing import List, Tuple

from .trigger import Trigger
from brad.config.engine import Engine
Expand Down Expand Up @@ -60,7 +60,13 @@ async def should_replan(self) -> bool:
)
return False

current_hourly_cost = await self._estimate_current_scan_hourly_cost()
aurora_cost, athena_cost = await self._estimate_current_scan_hourly_cost()

if self._planner_config.use_io_optimized_aurora():
current_hourly_cost = athena_cost
else:
current_hourly_cost = athena_cost + aurora_cost

if current_hourly_cost <= 1e-5:
# Treated as 0.
logger.debug(
Expand All @@ -85,9 +91,9 @@ async def should_replan(self) -> bool:

return False

async def _estimate_current_scan_hourly_cost(self) -> float:
async def _estimate_current_scan_hourly_cost(self) -> Tuple[float, float]:
if self._current_blueprint is None:
return 0.0
return 0.0, 0.0

# Extract the queries seen in the last window.
window_end = datetime.now()
Expand All @@ -107,7 +113,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)
)
if len(workload.analytical_queries()) == 0:
return 0.0
return 0.0, 0.0
self._data_access_provider.apply_access_statistics(workload)

# Compute the scan cost of this last window of queries.
Expand All @@ -120,14 +126,20 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)

for idx, q in enumerate(workload.analytical_queries()):
engine = await router.engine_for(q)
maybe_engine = q.primary_execution_location()
if maybe_engine is None:
engine = await router.engine_for(q)
else:
engine = maybe_engine

if engine == Engine.Aurora:
aurora_query_indices.append(idx)
aurora_queries.append(q)
elif engine == Engine.Athena:
athena_query_indices.append(idx)
athena_queries.append(q)

# NOTE: Ideally we use the actual values.
aurora_accessed_pages = compute_aurora_accessed_pages(
aurora_queries,
workload.get_predicted_aurora_pages_accessed_batch(aurora_query_indices),
Expand All @@ -139,10 +151,11 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)

# We use the hit rate to estimate Aurora scan costs.
# Note that if we are using I/O optimized Aurora, there is no
# incremental scan cost.
lookback_epochs = math.ceil(
self._planner_config.planning_window() / self._config.epoch_length
)
# TODO: If there are read replicas, we should use the hit rate from them instead.
aurora_reader_metrics = self._monitor.aurora_reader_metrics()
if len(aurora_reader_metrics) > 0:
reader_hit_rates = []
Expand All @@ -168,7 +181,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
athena_scanned_bytes, self._planner_config
)

return aurora_scan_cost + athena_scan_cost
return aurora_scan_cost, athena_scan_cost


_HIT_RATE_METRIC = "BufferCacheHitRatio_Average"