Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to new Aurora load/resource model #389

Merged
merged 2 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ aurora_txns:


aurora_scaling:
# [Deprecated]
# [coef1(load)^2(resource ratio)^2 + coef2(load)^2 + coef3(resource ratio)^2 + coef4] * base
coef1: 0.2454489
coef2: 0.00841046
Expand All @@ -397,6 +398,17 @@ redshift_scaling:
coef3: 0.49948713
coef4: 0.36256359


aurora_scaling_new:
# Wait time (from queuing theory)
# alpha * avg_query_time * (u / (1 - u)) + base
alpha: 0.0464553

# Resources
# [coef1 (s/d) + coef2] * base
coef1: 0.75851053
coef2: 0.5486482

use_io_optimized_aurora: true
use_recorded_routing_if_available: true
ensure_tables_together_on_one_engine: true
Expand Down
12 changes: 12 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def load(cls, path: str) -> "PlannerConfig":
def __init__(self, raw: Dict[str, Any]):
self._raw = raw

self._aurora_new_scaling_coefs: Optional[npt.NDArray] = None

# Deprecated
self._aurora_scaling_coefs: Optional[npt.NDArray] = None
self._redshift_scaling_coefs: Optional[npt.NDArray] = None

Expand Down Expand Up @@ -211,6 +214,15 @@ def aurora_scaling_coefs(self) -> npt.NDArray:
def aurora_txn_coefs(self, schema_name: str) -> Dict[str, float]:
return self._raw["aurora_txns"][schema_name]

def aurora_new_scaling_coefs(self) -> npt.NDArray:
if self._aurora_new_scaling_coefs is None:
coefs = self._raw["aurora_scaling_new"]
self._aurora_new_scaling_coefs = np.array([coefs["coef1"], coefs["coef2"]])
return self._aurora_new_scaling_coefs

def aurora_new_scaling_alpha(self) -> float:
return self._raw["aurora_scaling_new"]["alpha"]

###
### Unified Redshift scaling
###
Expand Down
57 changes: 43 additions & 14 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ def compute(
query_factor = cls.query_movement_factor(
base_query_run_times, query_arrival_counts, ctx
)
txn_cpu_denorm, ana_node_load = cls.predict_loads(
txn_cpu_denorm, ana_node_cpu_denorm = cls.predict_loads(
curr_prov, next_prov, query_factor, ctx
)
scaled_rt = cls.predict_query_latency_load_resources(
base_query_run_times, next_prov, ana_node_load, ctx
base_query_run_times,
next_prov,
ana_node_cpu_denorm / aurora_num_cpus(next_prov),
ctx,
)
scaled_txn_lats = cls.predict_txn_latency(
ctx.metrics.aurora_writer_cpu_avg / 100.0 * aurora_num_cpus(curr_prov),
Expand All @@ -61,7 +64,7 @@ def compute(
if query_factor is not None
else 0.0,
"aurora_txn_cpu_denorm": txn_cpu_denorm,
"aurora_ana_node_load": ana_node_load,
"aurora_ana_cpu_denorm": ana_node_cpu_denorm,
},
)

Expand All @@ -85,13 +88,12 @@ def predict_loads(
# Output:
# - "CPU denorm" value on `next_prov` on the writer node, for use for
# transaction scaling
# - "load denorm" value on `next_prov` on one node that will be used for
# - "CPU denorm" value on `next_prov` on one node that will be used for
# analytics scaling

current_has_replicas = curr_prov.num_nodes() > 1
next_has_replicas = next_prov.num_nodes() > 1

curr_writer_load = ctx.metrics.aurora_writer_load_minute_avg
curr_writer_cpu_util = ctx.metrics.aurora_writer_cpu_avg / 100
curr_writer_cpu_util_denorm = curr_writer_cpu_util * aurora_num_cpus(curr_prov)

Expand Down Expand Up @@ -127,12 +129,12 @@ def predict_loads(
)
return (
curr_writer_cpu_util_denorm + initialized_load,
curr_writer_load + initialized_load,
curr_writer_cpu_util_denorm + initialized_load,
)
else:
return (
curr_writer_cpu_util_denorm * query_factor_clean,
curr_writer_load * query_factor_clean,
curr_writer_cpu_util_denorm * query_factor_clean,
)
else:
# No replicas -> Yes replicas
Expand All @@ -151,16 +153,13 @@ def predict_loads(
else:
return (
curr_writer_cpu_util_denorm,
(curr_writer_load * query_factor_clean)
(curr_writer_cpu_util_denorm * query_factor_clean)
/ (next_prov.num_nodes() - 1),
)

else:
# We currently have read replicas.
curr_num_read_replicas = curr_prov.num_nodes() - 1
total_reader_load = (
ctx.metrics.aurora_reader_load_minute_avg * curr_num_read_replicas
)
total_reader_cpu_denorm = (
(ctx.metrics.aurora_reader_cpu_avg / 100)
* aurora_num_cpus(curr_prov)
Expand All @@ -181,13 +180,14 @@ def predict_loads(
# Special case.
return (
curr_writer_cpu_util_denorm + initialized_load,
curr_writer_load + initialized_load,
(curr_writer_cpu_util_denorm + initialized_load),
)
else:
return (
curr_writer_cpu_util_denorm
+ (total_reader_cpu_denorm * query_factor_clean),
curr_writer_load + (total_reader_load * query_factor_clean),
curr_writer_cpu_util_denorm
+ (total_reader_cpu_denorm * query_factor_clean),
)

else:
Expand All @@ -201,7 +201,7 @@ def predict_loads(
else:
return (
curr_writer_cpu_util_denorm,
total_reader_load
total_reader_cpu_denorm
* query_factor_clean
/ (next_prov.num_nodes() - 1),
)
Expand All @@ -225,6 +225,35 @@ def query_movement_factor(

@staticmethod
def predict_query_latency_load_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
cpu_util: float,
ctx: "ScoringContext",
) -> npt.NDArray:
# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
denom = max(1e-3, 1.0 - cpu_util) # Want to avoid division by 0.
wait_sf = cpu_util / denom
mean_wait_time = (
mean_service_time * wait_sf * ctx.planner_config.aurora_new_scaling_alpha()
)

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load)
return alone_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_load_resources_legacy(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
overall_load: float,
Expand Down
76 changes: 38 additions & 38 deletions tests/test_aurora_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,37 @@ def test_single_node() -> None:
)

# Scale up with no change in workload.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Scale up with 2x increase in analytical workload.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(8.0)
assert ana_load == pytest.approx(8.0)
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with a 0.5x increase (i.e., a decrease) in the analytical
# workload. We stay conservative here and do not modify the loads.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# No queries executed previously. But now we are executing queries on
# Aurora.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(
4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
4.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -114,45 +114,45 @@ def test_single_to_multi() -> None:
# Scale up with no change in workload.
# We are conservative here and assume the same load is replicated across
# both nodes.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Scale up with 2x more load.
# Transaction load should be unchanged because we move analytics to a
# replica.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(8.0)
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with 0.5x more load.
# We leave the loads unchanged because we are conservative.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# 2 replicas.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, Provisioning("db.r6g.xlarge", 3), 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
# Should assume analytical load is split across the replicas.
assert ana_load == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(2.0)

# No queries executed previously. But now we are executing queries on
# Aurora.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -171,44 +171,44 @@ def test_multi_to_single() -> None:

# Scale down with no change in workload.
# All the load should be concentrated on the single node.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_load == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)

# Scale down with 2x more load.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0 + 2.0 * 1.0)
assert ana_load == pytest.approx(2.0 + 2.0 * 1.0)
assert ana_cpu_denorm == pytest.approx(2.0 + 2.0 * 1.0)

# Scale down with 0.5x more load.
# We stay conservative here.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_load == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)

# Multiple replicas (2) down to one node.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Provisioning("db.r6g.xlarge", 3), next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_load == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)

# Special case (no queries executed previously and now we are executing
# queries). Should be rare because there are read replicas.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(
2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
2.0 + ctx.planner_config.aurora_initialize_load_fraction() * 4.0
)

Expand All @@ -228,42 +228,42 @@ def test_multi_to_multi() -> None:

# Scale up with no change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(0.5)
assert ana_cpu_denorm == pytest.approx(0.5)

# Scale up with a 2x change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 2.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(1.0)
assert ana_cpu_denorm == pytest.approx(1.0)

# Scale down with a 0.5x change in workload.
# Load should be spread out.
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(0.5) # Stay conservative.
assert ana_cpu_denorm == pytest.approx(0.5) # Stay conservative.

# Decrease the number of replicas (2 to 1).
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, Provisioning("db.r6g.xlarge", 2), 1.0, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(2.0)

# Special case (no queries executed previously and now we are executing
# queries). Should be rare because there are read replicas.
ctx.current_query_locations[Engine.Aurora].append(0)
txn_cpu_denorm, ana_load = AuroraProvisioningScore.predict_loads(
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, None, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_load == pytest.approx(
assert ana_cpu_denorm == pytest.approx(
ctx.planner_config.aurora_initialize_load_fraction() * 4.0 * 2.0 / 4.0
)
Loading