Skip to content

Commit

Permalink
Switch to new Aurora load/resource model (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Dec 2, 2023
1 parent a75c253 commit e9d053a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 52 deletions.
12 changes: 12 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ aurora_txns:


aurora_scaling:
# [Deprecated]
# [coef1(load)^2(resource ratio)^2 + coef2(load)^2 + coef3(resource ratio)^2 + coef4] * base
coef1: 0.2454489
coef2: 0.00841046
Expand All @@ -397,6 +398,17 @@ redshift_scaling:
coef3: 0.49948713
coef4: 0.36256359


aurora_scaling_new:
# Wait time (from queuing theory)
# alpha * avg_query_time * (u / (1 - u)) + base
alpha: 0.0464553

# Resources
# [coef1 (s/d) + coef2] * base
coef1: 0.75851053
coef2: 0.5486482

use_io_optimized_aurora: true
use_recorded_routing_if_available: true
ensure_tables_together_on_one_engine: true
Expand Down
12 changes: 12 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def load(cls, path: str) -> "PlannerConfig":
def __init__(self, raw: Dict[str, Any]):
self._raw = raw

self._aurora_new_scaling_coefs: Optional[npt.NDArray] = None

# Deprecated
self._aurora_scaling_coefs: Optional[npt.NDArray] = None
self._redshift_scaling_coefs: Optional[npt.NDArray] = None

Expand Down Expand Up @@ -211,6 +214,15 @@ def aurora_scaling_coefs(self) -> npt.NDArray:
def aurora_txn_coefs(self, schema_name: str) -> Dict[str, float]:
return self._raw["aurora_txns"][schema_name]

def aurora_new_scaling_coefs(self) -> npt.NDArray:
if self._aurora_new_scaling_coefs is None:
coefs = self._raw["aurora_scaling_new"]
self._aurora_new_scaling_coefs = np.array([coefs["coef1"], coefs["coef2"]])
return self._aurora_new_scaling_coefs

def aurora_new_scaling_alpha(self) -> float:
return self._raw["aurora_scaling_new"]["alpha"]

###
### Unified Redshift scaling
###
Expand Down
57 changes: 43 additions & 14 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ def compute(
query_factor = cls.query_movement_factor(
base_query_run_times, query_arrival_counts, ctx
)
txn_cpu_denorm, ana_node_load = cls.predict_loads(
txn_cpu_denorm, ana_node_cpu_denorm = cls.predict_loads(
curr_prov, next_prov, query_factor, ctx
)
scaled_rt = cls.predict_query_latency_load_resources(
base_query_run_times, next_prov, ana_node_load, ctx
base_query_run_times,
next_prov,
ana_node_cpu_denorm / aurora_num_cpus(next_prov),
ctx,
)
scaled_txn_lats = cls.predict_txn_latency(
ctx.metrics.aurora_writer_cpu_avg / 100.0 * aurora_num_cpus(curr_prov),
Expand All @@ -61,7 +64,7 @@ def compute(
if query_factor is not None
else 0.0,
"aurora_txn_cpu_denorm": txn_cpu_denorm,
"aurora_ana_node_load": ana_node_load,
"aurora_ana_cpu_denorm": ana_node_cpu_denorm,
},
)

Expand All @@ -85,13 +88,12 @@ def predict_loads(
# Output:
# - "CPU denorm" value on `next_prov` on the writer node, for use for
# transaction scaling
# - "load denorm" value on `next_prov` on one node that will be used for
# - "CPU denorm" value on `next_prov` on one node that will be used for
# analytics scaling

current_has_replicas = curr_prov.num_nodes() > 1
next_has_replicas = next_prov.num_nodes() > 1

curr_writer_load = ctx.metrics.aurora_writer_load_minute_avg
curr_writer_cpu_util = ctx.metrics.aurora_writer_cpu_avg / 100
curr_writer_cpu_util_denorm = curr_writer_cpu_util * aurora_num_cpus(curr_prov)

Expand Down Expand Up @@ -127,12 +129,12 @@ def predict_loads(
)
return (
curr_writer_cpu_util_denorm + initialized_load,
curr_writer_load + initialized_load,
curr_writer_cpu_util_denorm + initialized_load,
)
else:
return (
curr_writer_cpu_util_denorm * query_factor_clean,
curr_writer_load * query_factor_clean,
curr_writer_cpu_util_denorm * query_factor_clean,
)
else:
# No replicas -> Yes replicas
Expand All @@ -151,16 +153,13 @@ def predict_loads(
else:
return (
curr_writer_cpu_util_denorm,
(curr_writer_load * query_factor_clean)
(curr_writer_cpu_util_denorm * query_factor_clean)
/ (next_prov.num_nodes() - 1),
)

else:
# We currently have read replicas.
curr_num_read_replicas = curr_prov.num_nodes() - 1
total_reader_load = (
ctx.metrics.aurora_reader_load_minute_avg * curr_num_read_replicas
)
total_reader_cpu_denorm = (
(ctx.metrics.aurora_reader_cpu_avg / 100)
* aurora_num_cpus(curr_prov)
Expand All @@ -181,13 +180,14 @@ def predict_loads(
# Special case.
return (
curr_writer_cpu_util_denorm + initialized_load,
curr_writer_load + initialized_load,
(curr_writer_cpu_util_denorm + initialized_load),
)
else:
return (
curr_writer_cpu_util_denorm
+ (total_reader_cpu_denorm * query_factor_clean),
curr_writer_load + (total_reader_load * query_factor_clean),
curr_writer_cpu_util_denorm
+ (total_reader_cpu_denorm * query_factor_clean),
)

else:
Expand All @@ -201,7 +201,7 @@ def predict_loads(
else:
return (
curr_writer_cpu_util_denorm,
total_reader_load
total_reader_cpu_denorm
* query_factor_clean
/ (next_prov.num_nodes() - 1),
)
Expand All @@ -225,6 +225,35 @@ def query_movement_factor(

@staticmethod
def predict_query_latency_load_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
cpu_util: float,
ctx: "ScoringContext",
) -> npt.NDArray:
# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
denom = max(1e-3, 1.0 - cpu_util) # Want to avoid division by 0.
wait_sf = cpu_util / denom
mean_wait_time = (
mean_service_time * wait_sf * ctx.planner_config.aurora_new_scaling_alpha()
)

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load)
return alone_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_load_resources_legacy(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
overall_load: float,
Expand Down
76 changes: 38 additions & 38 deletions tests/test_aurora_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,37 @@ def test_single_node() -> None:
)

# Scale up with no change in workload.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Scale up with 2x increase in analytical workload.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(8.0)
assert ana_load == pytest.approx(8.0)
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with a 0.5x increase (i.e., a decrease) in the analytical
# workload. We stay conservative here and do not modify the loads.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# No queries executed previously. But now we are executing queries on
# Aurora.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(
4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -114,45 +114,45 @@ def test_single_to_multi() -> None:
# Scale up with no change in workload.
# We are conservative here and assume the same load is replicated across
# both nodes.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Scale up with 2x more load.
# Transaction load should be unchanged because we move analytics to a
# replica.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(8.0)
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with 0.5x more load.
# We leave the loads unchanged because we are conservative.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# 2 replicas.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, Provisioning("db.r6g.xlarge", 3), 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
# Should assume analytical load is split across the replicas.
assert ana_load == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(2.0)

# No queries executed previously. But now we are executing queries on
# Aurora.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -171,44 +171,44 @@ def test_multi_to_single() -> None:

# Scale down with no change in workload.
# All the load should be concentrated on the single node.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_load == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)

# Scale down with 2x more load.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0 + 2.0 * 1.0)
assert ana_load == pytest.approx(2.0 + 2.0 * 1.0)
assert ana_cpu_denorm == pytest.approx(2.0 + 2.0 * 1.0)

# Scale down with 0.5x more load.
# We stay conservative here.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_load == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)

# Multiple replicas (2) down to one node.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Provisioning("db.r6g.xlarge", 3), next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Special case (no queries executed previously and now we are executing
# queries). Should be rare because there are read replicas.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(
2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -228,42 +228,42 @@ def test_multi_to_multi() -> None:

# Scale up with no change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(0.5)
assert ana_cpu_denorm == pytest.approx(0.5)

# Scale up with a 2x change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(1.0)
assert ana_cpu_denorm == pytest.approx(1.0)

# Scale down with a 0.5x change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(0.5) # Stay conservative.
assert ana_cpu_denorm == pytest.approx(0.5) # Stay conservative.

# Decrease the number of replicas (2 to 1).
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, Provisioning("db.r6g.xlarge", 2), 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(2.0)

# Special case (no queries executed previously and now we are executing
# queries). Should be rare because there are read replicas.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
ctx.planner_config.aurora_initialize_load_fraction() * 4.0 * 2.0 / 4.0
)

0 comments on commit e9d053a

Please sign in to comment.