Skip to content

Commit

Permalink
Adjust query factor computation, change the vector workload (#392)
Browse files Browse the repository at this point in the history
* Adjust vector workload again

* Adjust query factor computation

* Use all queries

* Update tests
  • Loading branch information
geoffxy authored Dec 4, 2023
1 parent bf6226e commit 286323d
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ extract_named_arguments $@
# Touch `title`:
# 14, 54, 59, 75

# General scenario:
# Aurora is being used for queries involving `title` because of the vector
# similarity queries that also touch `title`. After deploying BRAD, it realizes
# that it's better to replicate `title` and route the rest of the queries onto
# Redshift.

query_indices="62,64,65,66,69,72,73,74,91,59"
heavier_queries="14,54,60,71,75"
all_queries="${query_indices},${heavier_queries}"

# Should be removed eventually and we should rely on the blueprint.
start_brad $config_file $planner_config_file
log_workload_point "brad_start_initiated"
sleep 30

log_workload_point "clients_starting"
start_repeating_olap_runner 8 5 5 $query_indices "ra_8"
start_repeating_olap_runner 8 5 5 $all_queries "ra_8"
rana_pid=$runner_pid

start_other_repeating_runner 2 8 5 "ra_vector" 8
Expand All @@ -48,15 +54,10 @@ function inner_cancel_experiment() {
trap "inner_cancel_experiment" INT
trap "inner_cancel_experiment" TERM

sleep $((16 * 60)) # Wait for 16 mins.
start_repeating_olap_runner 4 5 5 $heavier_queries "ra_4_heavy" 10
heavy_pid=$runner_pid
log_workload_point "heavier_queries_started"

sleep $((60 * 60)) # Wait for 60 mins.
log_workload_point "experiment_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown $rana_pid $txn_pid $other_pid $heavy_pid
graceful_shutdown $rana_pid $txn_pid $other_pid
log_workload_point "shutdown_complete"
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ def main():
"--aurora-queries",
type=str,
help="Comma separated list of indices.",
default="62,64,65,66,69,72,73,74,91,59",
default="65,69,73,14,54,59,75",
)
parser.add_argument(
"--redshift-queries", type=str, help="Comma separated list of indices."
"--redshift-queries",
type=str,
help="Comma separated list of indices.",
default="62,64,66,72,74,91,59,60,71",
)
args = parser.parse_args()
set_up_logging(debug_mode=True)
Expand Down Expand Up @@ -113,16 +116,16 @@ def main():
enum_blueprint.set_routing_policy(replaced_policy)

# Ensure the provisioning is as expected.
enum_blueprint.set_aurora_provisioning(Provisioning("db.r6g.xlarge", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 0))
enum_blueprint.set_aurora_provisioning(Provisioning("db.r6g.2xlarge", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 2))

# 6. Adjust the placement.
new_placement = {}
for table in blueprint.tables():
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
new_placement[table.name] = [Engine.Aurora, Engine.Athena, Engine.Redshift]
if table.name == "telemetry":
new_placement[table.name] = [Engine.Athena]
if table.name == "embeddings":
if table.name == "embeddings" or table.name == "title":
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
enum_blueprint.set_table_locations(new_placement)

Expand Down
33 changes: 24 additions & 9 deletions src/brad/planner/scoring/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
compute_aurora_hourly_operational_cost,
compute_redshift_hourly_operational_cost,
)
from brad.planner.scoring.performance.unified_aurora import AuroraProvisioningScore
from brad.planner.scoring.performance.unified_redshift import RedshiftProvisioningScore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,21 +132,34 @@ def compute_engine_latency_norm_factor(self) -> None:
)
)

# N.B. Using the actual recorded run times is slightly problematic
# here. We use this normalization factor as a part of predicting a
# query's execution time on a different blueprint. We need to use
# query execution times on the same provisioning (and system load)
# as the recorded run times. Scaling the recorded run times to a
# base provisioning or vice-versa is difficult to do accurately
# without having this normalization factor.
# 2. Adjust to the provisioning if needed.
if engine == Engine.Aurora:
adjusted_latencies = (
AuroraProvisioningScore.predict_query_latency_resources(
predicted_base_latencies,
self.current_blueprint.aurora_provisioning(),
self,
)
)
elif engine == Engine.Redshift:
adjusted_latencies = (
RedshiftProvisioningScore.predict_query_latency_resources(
predicted_base_latencies,
self.current_blueprint.redshift_provisioning(),
self,
)
)
elif engine == Engine.Athena:
# No provisioning.
adjusted_latencies = predicted_base_latencies

# 2. Extract query weights (based on arrival frequency) and scale
# 3. Extract query weights (based on arrival frequency) and scale
# the run times.
query_weights = self.current_workload.get_arrival_counts_batch(
self.current_query_locations[engine]
)
assert query_weights.shape == predicted_base_latencies.shape

self.engine_latency_norm_factor[engine] = np.dot(
predicted_base_latencies, query_weights
adjusted_latencies, query_weights
)
59 changes: 41 additions & 18 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,17 @@ def predict_loads(

# We take a very conservative approach to query movement. If new queries
# are added onto Aurora, we increase the load. But if queries are
# removed, we do not decrease the load.
# removed, we only decrease the load if we are using replicas. This
# ensures we do not mix transactional and analytical load.
query_factor_clean = 1.0
if query_factor is not None and query_factor > 1.0:
query_factor_clean = query_factor
if query_factor is not None:
if query_factor >= 1.0:
query_factor_clean = query_factor
else:
min_query_factor = (
1.0 - ctx.planner_config.aurora_initialize_load_fraction()
)
query_factor_clean = max(min_query_factor, query_factor)

# This is true iff we did not run any queries on Aurora with the current
# blueprint and are now going to run queries on Aurora on the next
Expand Down Expand Up @@ -206,8 +213,9 @@ def predict_loads(
/ (next_prov.num_nodes() - 1),
)

@staticmethod
@classmethod
def query_movement_factor(
cls,
base_query_run_times: npt.NDArray,
query_arrival_counts: npt.NDArray,
ctx: "ScoringContext",
Expand All @@ -218,13 +226,17 @@ def query_movement_factor(
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Aurora.
return None
curr_query_run_times = cls.predict_query_latency_resources(
base_query_run_times, ctx.current_blueprint.aurora_provisioning(), ctx
)
norm_factor = ctx.engine_latency_norm_factor[Engine.Aurora]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
total_next_latency = np.dot(curr_query_run_times, query_arrival_counts)
return total_next_latency / norm_factor

@staticmethod
@classmethod
def predict_query_latency_load_resources(
cls,
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
cpu_util: float,
Expand All @@ -233,18 +245,11 @@ def predict_query_latency_load_resources(
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)
prov_predicted_latency = cls.predict_query_latency_resources(
base_predicted_latency, to_prov, ctx
)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
mean_service_time = prov_predicted_latency.mean()
denom = max(1e-3, 1.0 - cpu_util) # Want to avoid division by 0.
wait_sf = cpu_util / denom
mean_wait_time = (
Expand All @@ -253,7 +258,25 @@ def predict_query_latency_load_resources(

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load)
return alone_predicted_latency + mean_wait_time
return prov_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
return np.dot(lat_vals, coefs)

@staticmethod
def predict_query_latency_load_resources_legacy(
Expand Down
65 changes: 44 additions & 21 deletions src/brad/planner/scoring/performance/unified_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,22 @@ def predict_cpu_denorm(
* curr_prov.num_nodes()
)

# We stay conservative here.
if query_factor is not None and query_factor > 1.0:
adjusted_cpu_denorm = query_factor * curr_cpu_util_denorm
else:
adjusted_cpu_denorm = curr_cpu_util_denorm
# We stay conservative here. See `AuroraProvisioningScore`.
query_factor_clean = 1.0
if query_factor is not None:
if query_factor >= 1.0:
query_factor_clean = query_factor
else:
min_query_factor = (
1.0 - ctx.planner_config.redshift_initialize_load_fraction()
)
query_factor_clean = max(min_query_factor, query_factor)

return adjusted_cpu_denorm
return query_factor_clean * curr_cpu_util_denorm

@staticmethod
@classmethod
def query_movement_factor(
cls,
base_query_run_times: npt.NDArray,
query_arrival_counts: npt.NDArray,
ctx: "ScoringContext",
Expand All @@ -146,13 +152,17 @@ def query_movement_factor(
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Redshift (it could have been off).
return None
curr_query_run_times = cls.predict_query_latency_resources(
base_query_run_times, ctx.current_blueprint.redshift_provisioning(), ctx
)
norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
total_next_latency = np.dot(curr_query_run_times, query_arrival_counts)
return total_next_latency / norm_factor

@staticmethod
@classmethod
def predict_query_latency_load_resources(
cls,
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
overall_cpu_denorm: float,
Expand All @@ -161,20 +171,13 @@ def predict_query_latency_load_resources(
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _REDSHIFT_BASE_RESOURCE_VALUE / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
# 1. Compute the impact of the provisioning.
prov_predicted_latency = cls.predict_query_latency_resources(
base_predicted_latency, to_prov, ctx
)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.redshift_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
mean_service_time = prov_predicted_latency.mean()
cpu_util = overall_cpu_denorm / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
)
Expand All @@ -188,7 +191,27 @@ def predict_query_latency_load_resources(

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load).
return alone_predicted_latency + mean_wait_time
return prov_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

resource_factor = _REDSHIFT_BASE_RESOURCE_VALUE / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.redshift_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
return np.dot(lat_vals, coefs)

@staticmethod
def scale_load_resources_legacy(
Expand Down
15 changes: 8 additions & 7 deletions tests/test_aurora_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ def test_single_node() -> None:
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with a 0.5x increase (i.e., a decrease) in the analytical
# workload. We stay conservative here and do not modify the loads.
# workload. We stay conservative here and use a fixed lower bound on the
# query factor.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)
assert txn_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))
assert ana_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))

# No queries executed previously. But now we are executing queries on
# Aurora.
Expand Down Expand Up @@ -135,7 +136,7 @@ def test_single_to_multi() -> None:
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))

# 2 replicas.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down Expand Up @@ -189,8 +190,8 @@ def test_multi_to_single() -> None:
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)
assert txn_cpu_denorm == pytest.approx(2.0 + (1 - 0.25))
assert ana_cpu_denorm == pytest.approx(2.0 + (1 - 0.25))

# Multiple replicas (2) down to one node.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down Expand Up @@ -248,7 +249,7 @@ def test_multi_to_multi() -> None:
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(0.5) # Stay conservative.
assert ana_cpu_denorm == pytest.approx(0.5 * (1 - 0.25)) # Stay conservative.

# Decrease the number of replicas (2 to 1).
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down
Loading

0 comments on commit 286323d

Please sign in to comment.