Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust query factor computation, change the vector workload #392

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ extract_named_arguments $@
# Touch `title`:
# 14, 54, 59, 75

# General scenario:
# Aurora is being used for queries involving `title` because of the vector
# similarity queries that also touch `title`. After deploying BRAD, it realizes
# that it's better to replicate `title` and route the rest of the queries onto
# Redshift.

query_indices="62,64,65,66,69,72,73,74,91,59"
heavier_queries="14,54,60,71,75"
all_queries="${query_indices},${heavier_queries}"

# Should be removed eventually and we should rely on the blueprint.
start_brad $config_file $planner_config_file
log_workload_point "brad_start_initiated"
sleep 30

log_workload_point "clients_starting"
start_repeating_olap_runner 8 5 5 $query_indices "ra_8"
start_repeating_olap_runner 8 5 5 $all_queries "ra_8"
rana_pid=$runner_pid

start_other_repeating_runner 2 8 5 "ra_vector" 8
Expand All @@ -48,15 +54,10 @@ function inner_cancel_experiment() {
trap "inner_cancel_experiment" INT
trap "inner_cancel_experiment" TERM

sleep $((16 * 60)) # Wait for 16 mins.
start_repeating_olap_runner 4 5 5 $heavier_queries "ra_4_heavy" 10
heavy_pid=$runner_pid
log_workload_point "heavier_queries_started"

sleep $((60 * 60)) # Wait for 60 mins.
log_workload_point "experiment_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown $rana_pid $txn_pid $other_pid $heavy_pid
graceful_shutdown $rana_pid $txn_pid $other_pid
log_workload_point "shutdown_complete"
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ def main():
"--aurora-queries",
type=str,
help="Comma separated list of indices.",
default="62,64,65,66,69,72,73,74,91,59",
default="65,69,73,14,54,59,75",
)
parser.add_argument(
"--redshift-queries", type=str, help="Comma separated list of indices."
"--redshift-queries",
type=str,
help="Comma separated list of indices.",
default="62,64,66,72,74,91,59,60,71",
)
args = parser.parse_args()
set_up_logging(debug_mode=True)
Expand Down Expand Up @@ -113,16 +116,16 @@ def main():
enum_blueprint.set_routing_policy(replaced_policy)

# Ensure the provisioning is as expected.
enum_blueprint.set_aurora_provisioning(Provisioning("db.r6g.xlarge", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 0))
enum_blueprint.set_aurora_provisioning(Provisioning("db.r6g.2xlarge", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 2))

# 6. Adjust the placement.
new_placement = {}
for table in blueprint.tables():
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
new_placement[table.name] = [Engine.Aurora, Engine.Athena, Engine.Redshift]
if table.name == "telemetry":
new_placement[table.name] = [Engine.Athena]
if table.name == "embeddings":
if table.name == "embeddings" or table.name == "title":
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
enum_blueprint.set_table_locations(new_placement)

Expand Down
33 changes: 24 additions & 9 deletions src/brad/planner/scoring/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
compute_aurora_hourly_operational_cost,
compute_redshift_hourly_operational_cost,
)
from brad.planner.scoring.performance.unified_aurora import AuroraProvisioningScore
from brad.planner.scoring.performance.unified_redshift import RedshiftProvisioningScore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,21 +132,34 @@ def compute_engine_latency_norm_factor(self) -> None:
)
)

# N.B. Using the actual recorded run times is slightly problematic
# here. We use this normalization factor as a part of predicting a
# query's execution time on a different blueprint. We need to use
# query execution times on the same provisioning (and system load)
# as the recorded run times. Scaling the recorded run times to a
# base provisioning or vice-versa is difficult to do accurately
# without having this normalization factor.
# 2. Adjust to the provisioning if needed.
if engine == Engine.Aurora:
adjusted_latencies = (
AuroraProvisioningScore.predict_query_latency_resources(
predicted_base_latencies,
self.current_blueprint.aurora_provisioning(),
self,
)
)
elif engine == Engine.Redshift:
adjusted_latencies = (
RedshiftProvisioningScore.predict_query_latency_resources(
predicted_base_latencies,
self.current_blueprint.redshift_provisioning(),
self,
)
)
elif engine == Engine.Athena:
# No provisioning.
adjusted_latencies = predicted_base_latencies

# 2. Extract query weights (based on arrival frequency) and scale
# 3. Extract query weights (based on arrival frequency) and scale
# the run times.
query_weights = self.current_workload.get_arrival_counts_batch(
self.current_query_locations[engine]
)
assert query_weights.shape == predicted_base_latencies.shape

self.engine_latency_norm_factor[engine] = np.dot(
predicted_base_latencies, query_weights
adjusted_latencies, query_weights
)
59 changes: 41 additions & 18 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,17 @@ def predict_loads(

# We take a very conservative approach to query movement. If new queries
# are added onto Aurora, we increase the load. But if queries are
# removed, we do not decrease the load.
# removed, we only decrease the load if we are using replicas. This
# ensures we do not mix transactional and analytical load.
query_factor_clean = 1.0
if query_factor is not None and query_factor > 1.0:
query_factor_clean = query_factor
if query_factor is not None:
if query_factor >= 1.0:
query_factor_clean = query_factor
else:
min_query_factor = (
1.0 - ctx.planner_config.aurora_initialize_load_fraction()
)
query_factor_clean = max(min_query_factor, query_factor)

# This is true iff we did not run any queries on Aurora with the current
# blueprint and are now going to run queries on Aurora on the next
Expand Down Expand Up @@ -206,8 +213,9 @@ def predict_loads(
/ (next_prov.num_nodes() - 1),
)

@staticmethod
@classmethod
def query_movement_factor(
cls,
base_query_run_times: npt.NDArray,
query_arrival_counts: npt.NDArray,
ctx: "ScoringContext",
Expand All @@ -218,13 +226,17 @@ def query_movement_factor(
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Aurora.
return None
curr_query_run_times = cls.predict_query_latency_resources(
base_query_run_times, ctx.current_blueprint.aurora_provisioning(), ctx
)
norm_factor = ctx.engine_latency_norm_factor[Engine.Aurora]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
total_next_latency = np.dot(curr_query_run_times, query_arrival_counts)
return total_next_latency / norm_factor

@staticmethod
@classmethod
def predict_query_latency_load_resources(
cls,
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
cpu_util: float,
Expand All @@ -233,18 +245,11 @@ def predict_query_latency_load_resources(
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)
prov_predicted_latency = cls.predict_query_latency_resources(
base_predicted_latency, to_prov, ctx
)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
mean_service_time = prov_predicted_latency.mean()
denom = max(1e-3, 1.0 - cpu_util) # Want to avoid division by 0.
wait_sf = cpu_util / denom
mean_wait_time = (
Expand All @@ -253,7 +258,25 @@ def predict_query_latency_load_resources(

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load)
return alone_predicted_latency + mean_wait_time
return prov_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

resource_factor = _AURORA_BASE_RESOURCE_VALUE / aurora_num_cpus(to_prov)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.aurora_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
return np.dot(lat_vals, coefs)

@staticmethod
def predict_query_latency_load_resources_legacy(
Expand Down
65 changes: 44 additions & 21 deletions src/brad/planner/scoring/performance/unified_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,22 @@ def predict_cpu_denorm(
* curr_prov.num_nodes()
)

# We stay conservative here.
if query_factor is not None and query_factor > 1.0:
adjusted_cpu_denorm = query_factor * curr_cpu_util_denorm
else:
adjusted_cpu_denorm = curr_cpu_util_denorm
# We stay conservative here. See `AuroraProvisioningScore`.
query_factor_clean = 1.0
if query_factor is not None:
if query_factor >= 1.0:
query_factor_clean = query_factor
else:
min_query_factor = (
1.0 - ctx.planner_config.redshift_initialize_load_fraction()
)
query_factor_clean = max(min_query_factor, query_factor)

return adjusted_cpu_denorm
return query_factor_clean * curr_cpu_util_denorm

@staticmethod
@classmethod
def query_movement_factor(
cls,
base_query_run_times: npt.NDArray,
query_arrival_counts: npt.NDArray,
ctx: "ScoringContext",
Expand All @@ -146,13 +152,17 @@ def query_movement_factor(
# Special case. We cannot reweigh the queries because nothing in the
# current workload ran on Redshift (it could have been off).
return None
curr_query_run_times = cls.predict_query_latency_resources(
base_query_run_times, ctx.current_blueprint.redshift_provisioning(), ctx
)
norm_factor = ctx.engine_latency_norm_factor[Engine.Redshift]
assert norm_factor != 0.0
total_next_latency = np.dot(base_query_run_times, query_arrival_counts)
total_next_latency = np.dot(curr_query_run_times, query_arrival_counts)
return total_next_latency / norm_factor

@staticmethod
@classmethod
def predict_query_latency_load_resources(
cls,
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
overall_cpu_denorm: float,
Expand All @@ -161,20 +171,13 @@ def predict_query_latency_load_resources(
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

# 1. Compute each query's expected run time on the given provisioning.
resource_factor = _REDSHIFT_BASE_RESOURCE_VALUE / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
# 1. Compute the impact of the provisioning.
prov_predicted_latency = cls.predict_query_latency_resources(
base_predicted_latency, to_prov, ctx
)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.redshift_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
alone_predicted_latency = np.dot(lat_vals, coefs)

# 2. Compute the impact of system load.
mean_service_time = alone_predicted_latency.mean()
mean_service_time = prov_predicted_latency.mean()
cpu_util = overall_cpu_denorm / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
)
Expand All @@ -188,7 +191,27 @@ def predict_query_latency_load_resources(

# Predicted running time is the query's execution time alone plus the
# expected wait time (due to system load).
return alone_predicted_latency + mean_wait_time
return prov_predicted_latency + mean_wait_time

@staticmethod
def predict_query_latency_resources(
base_predicted_latency: npt.NDArray,
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
if base_predicted_latency.shape[0] == 0:
return base_predicted_latency

resource_factor = _REDSHIFT_BASE_RESOURCE_VALUE / (
redshift_num_cpus(to_prov) * to_prov.num_nodes()
)
basis = np.array([resource_factor, 1.0])
coefs = ctx.planner_config.redshift_new_scaling_coefs()
coefs = np.multiply(coefs, basis)
num_coefs = coefs.shape[0]
lat_vals = np.expand_dims(base_predicted_latency, axis=1)
lat_vals = np.repeat(lat_vals, num_coefs, axis=1)
return np.dot(lat_vals, coefs)

@staticmethod
def scale_load_resources_legacy(
Expand Down
15 changes: 8 additions & 7 deletions tests/test_aurora_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ def test_single_node() -> None:
assert ana_cpu_denorm == pytest.approx(8.0)

# Scale up with a 0.5x increase (i.e., a decrease) in the analytical
# workload. We stay conservative here and do not modify the loads.
# workload. We stay conservative here and use a fixed lower bound on the
# query factor.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)
assert txn_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))
assert ana_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))

# No queries executed previously. But now we are executing queries on
# Aurora.
Expand Down Expand Up @@ -135,7 +136,7 @@ def test_single_to_multi() -> None:
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0)
assert ana_cpu_denorm == pytest.approx(4.0 * (1 - 0.25))

# 2 replicas.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down Expand Up @@ -189,8 +190,8 @@ def test_multi_to_single() -> None:
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(3.0)
assert ana_cpu_denorm == pytest.approx(3.0)
assert txn_cpu_denorm == pytest.approx(2.0 + (1 - 0.25))
assert ana_cpu_denorm == pytest.approx(2.0 + (1 - 0.25))

# Multiple replicas (2) down to one node.
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down Expand Up @@ -248,7 +249,7 @@ def test_multi_to_multi() -> None:
curr_prov, next_prov, 0.5, ctx
)
assert txn_cpu_denorm == pytest.approx(2.0)
assert ana_cpu_denorm == pytest.approx(0.5) # Stay conservative.
assert ana_cpu_denorm == pytest.approx(0.5 * (1 - 0.25)) # Stay conservative.

# Decrease the number of replicas (2 to 1).
txn_cpu_denorm, ana_cpu_denorm = AuroraProvisioningScore.predict_loads(
Expand Down
Loading
Loading