Skip to content

Commit

Permalink
Implement the new benefit horizon (#379)
Browse files Browse the repository at this point in the history
* Update table load/unload throughputs

* Implement new comparator based on the payoff period concept

* Revise comparator based on discussions

* Modify the comparator provider

* Compute the current hourly operating cost

* Add new comparator to the temp config

* Add penalty threshold to the config

* Add debug information, fix check errors

* Update Athena load/extract rates based on our experiments

* Add metrics defaults for backward compatability

* Better naming
  • Loading branch information
geoffxy authored Nov 21, 2023
1 parent 9fed6dd commit 45e4dc4
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 53 deletions.
19 changes: 13 additions & 6 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ reinterpret_second_as: 1
# to be accepted.
query_dist_change_frac: 0.1

# Used to aggregate metrics collected in the planning window.
metrics_agg:
method: ewm # 'mean' is another option
alpha: 0.86466472 # 1 - 1 / e^2
Expand Down Expand Up @@ -98,15 +99,21 @@ aurora_per_instance_change_time_s: 300 # 5 minutes
redshift_elastic_resize_time_s: 900 # 15 minutes - https://repost.aws/knowledge-center/resize-redshift-cluster
redshift_classic_resize_time_s: 7200 # 2 hours (This is very difficult to estimate; it depends on state we have no visibility into.)

redshift_extract_rate_mb_per_s: 10.0
redshift_load_rate_mb_per_s: 10.0
# Minimum of 5 seconds per table, but this amount is negligible since we work
# with relatively large tables.
redshift_extract_rate_mb_per_s: 30.0
redshift_load_rate_mb_per_s: 20.0

aurora_extract_rate_mb_per_s: 10.0
aurora_load_rate_mb_per_s: 10.0
# There is a minimum of 1 second per table. But this is negligible since we work
# with relatively large tables.
aurora_extract_rate_mb_per_s: 12.0
aurora_load_rate_mb_per_s: 12.0

# Extracting / loading in Athena corresponds to a conversion to/from Iceberg.
athena_extract_rate_mb_per_s: 10.0
athena_load_rate_mb_per_s: 10.0
athena_extract_rate_mb_per_s: 15.0
# From experiments: 820 MB/s * data size + 2.2 s
# But we just ignore the 2.2 for simplicity as we work with large tables.
athena_load_rate_mb_per_s: 820.0

# Storage costs.
s3_usd_per_mb_per_month: 0.000023
Expand Down
13 changes: 12 additions & 1 deletion config/temp_config_sample.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
latency_ceiling_s: 30.0
query_latency_p90_ceiling_s: 30.0
txn_latency_p50_ceiling_s: 0.020 # Currently unused.
txn_latency_p90_ceiling_s: 0.030

comparator:
type: perf_ceiling # or `benefit_perf_ceiling`

benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator
weeks: 0
days: 0
hours: 1
minutes: 0

penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/

Expand Down
20 changes: 18 additions & 2 deletions src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pathlib
import yaml
from typing import Any, Dict, Optional
from datetime import timedelta


class TempConfig:
Expand All @@ -16,15 +17,30 @@ def load_from_file(cls, file_path: str | pathlib.Path) -> "TempConfig":
def __init__(self, raw: Dict[str, Any]) -> None:
self._raw = raw

def latency_ceiling_s(self) -> float:
return float(self._raw["latency_ceiling_s"])
def query_latency_p90_ceiling_s(self) -> float:
return float(self._raw["query_latency_p90_ceiling_s"])

def txn_latency_p50_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p50_ceiling_s"])

def txn_latency_p90_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p90_ceiling_s"])

def comparator_type(self) -> str:
return self._raw["comparator"]["type"]

def benefit_horizon(self) -> timedelta:
period = self._raw["comparator"]["benefit_horizon"]
return timedelta(
weeks=period["weeks"],
days=period["days"],
hours=period["hours"],
minutes=period["minutes"],
)

def penalty_threshold(self) -> float:
return self._raw["comparator"]["penalty_threshold"]

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
return None
Expand Down
25 changes: 20 additions & 5 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
from brad.data_sync.execution.executor import DataSyncExecutor
from brad.front_end.start_front_end import start_front_end
from brad.planner.abstract import BlueprintPlanner
from brad.planner.compare.provider import PerformanceCeilingComparatorProvider
from brad.planner.compare.provider import (
BlueprintComparatorProvider,
PerformanceCeilingComparatorProvider,
BenefitPerformanceCeilingComparatorProvider,
)
from brad.planner.estimator import EstimatorProvider
from brad.planner.factory import BlueprintPlannerFactory
from brad.planner.metrics import WindowedMetricsFromMonitor
Expand Down Expand Up @@ -181,10 +185,21 @@ async def _run_setup(self) -> None:
aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(),
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.latency_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)

if self._temp_config.comparator_type() == "benefit_perf_ceiling":
comparator_provider: BlueprintComparatorProvider = (
BenefitPerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
self._temp_config.benefit_horizon(),
self._temp_config.penalty_threshold(),
)
)
else:
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)
else:
logger.warning(
"TempConfig not provided. The planner will not be able to run correctly."
Expand Down
2 changes: 0 additions & 2 deletions src/brad/planner/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ def __init__(
for t in self._triggers:
t.update_blueprint(self._current_blueprint, self._current_blueprint_score)

self._comparator = self._providers.comparator_provider.get_comparator()

async def run_forever(self) -> None:
"""
Called to start the planner. The planner is meant to run until its task
Expand Down
13 changes: 10 additions & 3 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,16 @@ async def _run_replan_impl(
)
await ctx.simulate_current_workload_routing(planning_router)
ctx.compute_engine_latency_norm_factor()
ctx.compute_current_workload_predicted_hourly_scan_cost()
ctx.compute_current_blueprint_provisioning_hourly_cost()

comparator = self._providers.comparator_provider.get_comparator(
metrics,
curr_hourly_cost=(
ctx.current_workload_predicted_hourly_scan_cost
+ ctx.current_blueprint_provisioning_hourly_cost
),
)
beam_size = self._planner_config.beam_size()
first_query_idx = query_indices[0]
first_query = analytical_queries[first_query_idx]
Expand All @@ -145,9 +154,7 @@ async def _run_replan_impl(
for routing_engine in Engine.from_bitmap(
planning_router.run_functionality_routing(first_query)
):
candidate = BlueprintCandidate.based_on(
self._current_blueprint, self._comparator
)
candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator)
candidate.add_transactional_tables(ctx)
candidate.add_query(
first_query_idx,
Expand Down
13 changes: 10 additions & 3 deletions src/brad/planner/beam/query_based_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ async def _run_replan_impl(
)
await ctx.simulate_current_workload_routing(planning_router)
ctx.compute_engine_latency_norm_factor()
ctx.compute_current_workload_predicted_hourly_scan_cost()
ctx.compute_current_blueprint_provisioning_hourly_cost()

comparator = self._providers.comparator_provider.get_comparator(
metrics,
curr_hourly_cost=(
ctx.current_workload_predicted_hourly_scan_cost
+ ctx.current_blueprint_provisioning_hourly_cost
),
)
beam_size = self._planner_config.beam_size()
engines = [Engine.Aurora, Engine.Redshift, Engine.Athena]
first_query_idx = query_indices[0]
Expand All @@ -136,9 +145,7 @@ async def _run_replan_impl(

# 4. Initialize the top-k set (beam).
for routing_engine in engines:
candidate = BlueprintCandidate.based_on(
self._current_blueprint, self._comparator
)
candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator)
candidate.add_transactional_tables(ctx)
query = analytical_queries[first_query_idx]
candidate.add_query(
Expand Down
13 changes: 10 additions & 3 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,16 @@ async def _run_replan_impl(
)
await ctx.simulate_current_workload_routing(planning_router)
ctx.compute_engine_latency_norm_factor()
ctx.compute_current_workload_predicted_hourly_scan_cost()
ctx.compute_current_blueprint_provisioning_hourly_cost()

comparator = self._providers.comparator_provider.get_comparator(
metrics,
curr_hourly_cost=(
ctx.current_workload_predicted_hourly_scan_cost
+ ctx.current_blueprint_provisioning_hourly_cost
),
)
beam_size = self._planner_config.beam_size()
placement_options = self._get_table_placement_options_bitmap()
first_cluster = clusters[0]
Expand All @@ -144,9 +153,7 @@ async def _run_replan_impl(

# 4. Initialize the top-k set (beam).
for placement_bitmap in placement_options:
candidate = BlueprintCandidate.based_on(
self._current_blueprint, self._comparator
)
candidate = BlueprintCandidate.based_on(self._current_blueprint, comparator)
candidate.add_transactional_tables(ctx)
tables, queries, _ = first_cluster
placement_changed = candidate.add_placement(placement_bitmap, tables, ctx)
Expand Down
42 changes: 26 additions & 16 deletions src/brad/planner/compare/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo


def best_cost_under_perf_ceilings(
max_query_latency_s: float,
max_query_p90_latency_s: float,
max_txn_p90_latency_s: float,
) -> BlueprintComparator:
def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> bool:
Expand All @@ -110,18 +110,16 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo
return True

# Query latency ceilings.
# left_lat = _get_or_compute_p99_latency(left)
# right_lat = _get_or_compute_p99_latency(right)
left_lat = _get_or_compute_nth_largest_latency(left, 2)
right_lat = _get_or_compute_nth_largest_latency(right, 2)
left_lat = _get_or_compute_p90_latency(left)
right_lat = _get_or_compute_p90_latency(right)

if left_lat > max_query_latency_s and right_lat > max_query_latency_s:
if left_lat > max_query_p90_latency_s and right_lat > max_query_p90_latency_s:
# Both are above the latency ceiling.
# So the better blueprint is the one that does better on performance.
return left_lat < right_lat
elif left_lat > max_query_latency_s:
elif left_lat > max_query_p90_latency_s:
return False
elif right_lat > max_query_latency_s:
elif right_lat > max_query_p90_latency_s:
return True

# Both are under the performance ceilings.
Expand Down Expand Up @@ -208,22 +206,22 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo


def _get_or_compute_geomean_latency(bp: ComparableBlueprint) -> float:
stored = bp.get_memoized_value("geomean_latency")
stored = bp.get_memoized_value("query_geomean_latency")
if stored is not None:
return stored
else:
geomean_lat = np.exp(np.log(bp.get_predicted_analytical_latencies()).mean())
bp.set_memoized_value("geomean_latency", geomean_lat)
bp.set_memoized_value("query_geomean_latency", geomean_lat)
return geomean_lat


def _get_or_compute_max_latency(bp: ComparableBlueprint) -> float:
stored = bp.get_memoized_value("max_latency")
stored = bp.get_memoized_value("query_max_latency")
if stored is not None:
return stored
else:
max_lat = bp.get_predicted_analytical_latencies().max()
bp.set_memoized_value("max_latency", max_lat)
bp.set_memoized_value("query_max_latency", max_lat)
return max_lat


Expand All @@ -234,27 +232,39 @@ def _get_or_compute_nth_largest_latency(bp: ComparableBlueprint, n: int) -> floa
else:
actual_n = n

stored = bp.get_memoized_value(f"{actual_n}_largest")
stored = bp.get_memoized_value(f"query_{actual_n}_largest")
if stored is not None:
return stored

nth_largest = np.partition(pred_lats, -actual_n)[-actual_n]
bp.set_memoized_value(f"{actual_n}_largest", nth_largest)
bp.set_memoized_value(f"query_{actual_n}_largest", nth_largest)
return nth_largest


def _get_or_compute_p99_latency(bp: ComparableBlueprint) -> float:
stored = bp.get_memoized_value("p99_latency")
stored = bp.get_memoized_value("query_p99_latency")
if stored is not None:
return stored
else:
p99_lat = np.quantile(
bp.get_predicted_analytical_latencies(), 0.99, method="lower"
)
bp.set_memoized_value("p99_latency", p99_lat)
bp.set_memoized_value("query_p99_latency", p99_lat)
return p99_lat


def _get_or_compute_p90_latency(bp: ComparableBlueprint) -> float:
stored = bp.get_memoized_value("query_p90_latency")
if stored is not None:
return stored
else:
p90_lat = np.quantile(
bp.get_predicted_analytical_latencies(), 0.90, method="lower"
)
bp.set_memoized_value("query_p90_latency", p90_lat)
return p90_lat


def _compute_max_ratio(value1: float, value2: float) -> float:
if value1 == 0.0 or value2 == 0.0:
return np.inf
Expand Down
Loading

0 comments on commit 45e4dc4

Please sign in to comment.