Skip to content

Commit

Permalink
Clean up the redshift scoring as well
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 15, 2023
1 parent 8c6bfff commit c94f273
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 45 deletions.
3 changes: 2 additions & 1 deletion config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ query_dist_change_frac: 0.1
###

triggers:
enabled: true
enabled: false
check_period_s: 90 # Triggers are checked every X seconds.
check_period_offset_s: 360 # Wait 6 mins before starting.

Expand Down Expand Up @@ -351,3 +351,4 @@ use_recorded_routing_if_available: true
ensure_tables_together_on_one_engine: true

aurora_initialize_load_fraction: 0.25
redshift_initialize_load_fraction: 0.25
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,8 @@ def flag(self, key: str, default: bool = False) -> bool:
def aurora_initialize_load_fraction(self) -> float:
return self._raw["aurora_initialize_load_fraction"]

def redshift_initialize_load_fraction(self) -> float:
return self._raw["redshift_initialize_load_fraction"]

def aurora_storage_index_multiplier(self) -> float:
return float(self._raw["aurora_storage_index_multiplier"])
149 changes: 105 additions & 44 deletions src/brad/planner/scoring/performance/unified_redshift.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import numpy as np
import numpy.typing as npt
from typing import Dict, TYPE_CHECKING
from typing import Dict, TYPE_CHECKING, Optional

from brad.config.engine import Engine
from brad.blueprint.provisioning import Provisioning
Expand All @@ -18,13 +18,11 @@ def __init__(
self,
scaled_run_times: npt.NDArray,
overall_cpu_denorm: float,
for_next_prov: Provisioning,
debug_values: Dict[str, int | float],
) -> None:
self.scaled_run_times = scaled_run_times
self.debug_values = debug_values
self.overall_cpu_denorm = overall_cpu_denorm
self.for_next_prov = for_next_prov

@classmethod
def compute(
Expand All @@ -39,55 +37,119 @@ def compute(
Computes all of the Redshift provisioning-dependent scoring components in one
place.
"""
# Special case: If we turn off Redshift (set the number of nodes to 0).
if next_prov.num_nodes() == 0:
query_factor = cls.query_movement_factor(
base_query_run_times, query_arrival_counts, ctx
)
predicted_cpu_denorm = cls.predict_cpu_denorm(
curr_prov, next_prov, query_factor, ctx
)

# Special case (turning off Redshift).
if predicted_cpu_denorm == 0.0:
return cls(
base_query_run_times,
0.0,
next_prov,
{
"redshift_query_factor": 0.0,
},
)
else:
scaled_rt = cls.scale_load_resources(
base_query_run_times, next_prov, predicted_cpu_denorm, ctx
)
return cls(
scaled_rt,
predicted_cpu_denorm,
{
"redshift_query_factor": query_factor
if query_factor is not None
else 0.0,
},
)

overall_forecasted_cpu_util_pct = ctx.metrics.redshift_cpu_avg
overall_cpu_util_denorm = (
(overall_forecasted_cpu_util_pct / 100)
* redshift_num_cpus(curr_prov)
* curr_prov.num_nodes()
)

# 1. Adjust the analytical portion of the system load for query movement.
if (
Engine.Redshift not in ctx.engine_latency_norm_factor
or curr_prov.num_nodes() == 0
):
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Redshift.
query_factor = 1.0
@classmethod
def predict_cpu_denorm(
cls,
curr_prov: Provisioning,
next_prov: Provisioning,
query_factor: Optional[float],
ctx: "ScoringContext",
) -> float:
"""
Returns the predicted overall denormalized CPU utilization across the
entire cluster for `next_prov`.
"""
# 4 cases
# Redshift off -> Redshift off (no-op)
# Redshift off -> Redshift on
# Redshift on -> Redshift on
# Redshift on -> Redshift off

curr_on = curr_prov.num_nodes() > 0
next_on = next_prov.num_nodes() > 0

# Simple no-op cases.
if (not curr_on and not next_on) or (curr_on and not next_on):
return 0.0

if not curr_on and next_on:
# Turning on Redshift.
# We cannot reweigh the queries because nothing in the current
# workload ran on Redshift. We prime the load with a fraction of the
# proposed cluster's peak load.
return (
redshift_num_cpus(next_prov)
* next_prov.num_nodes()
* ctx.planner_config.redshift_initialize_load_fraction()
)
else:
# Query movement scaling factor.
# Captures change in queries routed to this engine.
norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
query_factor = total_next_latency / norm_factor

adjusted_cpu_denorm = query_factor * overall_cpu_util_denorm

# 2. Scale query execution times based on load and provisioning.
scaled_rt = cls.scale_load_resources(
base_query_run_times, next_prov, adjusted_cpu_denorm, ctx
)
# Redshift is staying on, but there is a potential provisioning
# change.
assert curr_on and next_on

# Special case. Redshift was on, but nothing ran on it and now we
# want to run queries on it. We use the same load priming approach
# but on the current cluster.
if (
query_factor is None
and len(ctx.current_query_locations[Engine.Redshift]) > 0
):
return (
redshift_num_cpus(curr_prov)
* curr_prov.num_nodes()
* ctx.planner_config.redshift_initialize_load_fraction()
)

curr_cpu_util_denorm = (
(ctx.metrics.redshift_cpu_avg / 100.0)
* redshift_num_cpus(curr_prov)
* curr_prov.num_nodes()
)

return cls(
scaled_rt,
adjusted_cpu_denorm,
next_prov,
{
"redshift_query_factor": query_factor,
},
)
# We stay conservative here.
if query_factor is not None and query_factor > 1.0:
adjusted_cpu_denorm = query_factor * curr_cpu_util_denorm
else:
adjusted_cpu_denorm = curr_cpu_util_denorm

return adjusted_cpu_denorm

@staticmethod
def query_movement_factor(
base_query_run_times: npt.NDArray,
query_arrival_counts: npt.NDArray,
ctx: "ScoringContext",
) -> Optional[float]:
# Query movement scaling factor.
# Captures change in queries routed to this engine.
if Engine.Redshift not in ctx.engine_latency_norm_factor:
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Redshift (it could have been off).
return None
norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
return total_next_latency / norm_factor

@staticmethod
def scale_load_resources(
Expand Down Expand Up @@ -121,12 +183,11 @@ def copy(self) -> "RedshiftProvisioningScore":
return RedshiftProvisioningScore(
self.scaled_run_times,
self.overall_cpu_denorm,
self.for_next_prov,
self.debug_values.copy(),
)

def add_debug_values(self, dest: Dict[str, int | float | str]) -> None:
dest["redshift_cpu_denorm"] = self.overall_cpu_denorm
dest["redshift_predicted_cpu_denorm"] = self.overall_cpu_denorm
dest.update(self.debug_values)


Expand Down
128 changes: 128 additions & 0 deletions tests/test_redshift_scoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pytest
from datetime import timedelta

from brad.blueprint import Blueprint
from brad.config.engine import Engine
from brad.config.planner import PlannerConfig
from brad.planner.metrics import Metrics
from brad.planner.scoring.context import ScoringContext
from brad.planner.scoring.performance.unified_redshift import RedshiftProvisioningScore
from brad.planner.scoring.provisioning import Provisioning
from brad.planner.workload import Workload
from brad.planner.workload.query import Query
from brad.routing.router import FullRoutingPolicy
from brad.routing.round_robin import RoundRobin


def get_fixtures(redshift_cpu: float, redshift_prov: Provisioning) -> ScoringContext:
metrics = Metrics(
redshift_cpu_avg=redshift_cpu,
aurora_writer_cpu_avg=0.0,
aurora_reader_cpu_avg=0.0,
aurora_writer_buffer_hit_pct_avg=100.0,
aurora_reader_buffer_hit_pct_avg=100.0,
aurora_writer_load_minute_avg=0.0,
aurora_reader_load_minute_avg=0.0,
txn_completions_per_s=10.0,
txn_lat_s_p50=0.010,
txn_lat_s_p90=0.020,
)
planner_config = PlannerConfig(
{
"redshift_initialize_load_fraction": 0.25,
}
)
workload = Workload(timedelta(hours=1), [Query("SELECT 1")], [], {})
blueprint = Blueprint(
"test",
[],
{},
Provisioning("db.r6g.xlarge", 1),
redshift_prov,
FullRoutingPolicy([], RoundRobin()),
)
ctx = ScoringContext("test", blueprint, workload, workload, metrics, planner_config)
return ctx


def test_off_to_off() -> None:
curr_prov = Provisioning("dc2.large", 0)
next_prov = Provisioning("dc2.large", 0)
ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov)
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, 1.0, ctx
)
assert cpu_denorm == pytest.approx(0.0)


def test_on_to_off() -> None:
curr_prov = Provisioning("dc2.large", 2)
next_prov = Provisioning("dc2.large", 0)
ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov)
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, 1.0, ctx
)
assert cpu_denorm == pytest.approx(0.0)


def test_off_to_on() -> None:
curr_prov = Provisioning("dc2.large", 0)
next_prov = Provisioning("dc2.large", 2)
ctx = get_fixtures(redshift_cpu=0.0, redshift_prov=curr_prov)
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, None, ctx
)
# Special case: we prime the load with a fraction.
assert cpu_denorm == pytest.approx(
2.0 * 2.0 * ctx.planner_config.redshift_initialize_load_fraction()
)


def test_on_to_on() -> None:
curr_prov = Provisioning("dc2.large", 2)
next_prov = Provisioning("dc2.large", 4)
ctx = get_fixtures(redshift_cpu=50.0, redshift_prov=curr_prov)

# Scale up, no movement.
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, 1.0, ctx
)
assert cpu_denorm == pytest.approx(2.0)

# Scale up, 2x movement.
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, 2.0, ctx
)
assert cpu_denorm == pytest.approx(4.0)

# Scale up, 0.5x movement (we stay conservative).
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, 0.5, ctx
)
assert cpu_denorm == pytest.approx(2.0)

# Scale down, no movement.
smaller_prov = Provisioning("dc2.large", 1)
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, smaller_prov, 1.0, ctx
)
assert cpu_denorm == pytest.approx(2.0)

# Scale down, 2x movement.
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, smaller_prov, 2.0, ctx
)
assert cpu_denorm == pytest.approx(4.0)

# Scale down, 0.5x movement (stay conservative).
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, smaller_prov, 0.5, ctx
)
assert cpu_denorm == pytest.approx(2.0)

# Special case (no queries executed before, but now there are queries).
ctx.current_query_locations[Engine.Redshift].append(0)
cpu_denorm = RedshiftProvisioningScore.predict_cpu_denorm(
curr_prov, next_prov, None, ctx
)
assert cpu_denorm == pytest.approx(1.0)

0 comments on commit c94f273

Please sign in to comment.