diff --git a/experiments/15-e2e-scenarios-v2/common.sh b/experiments/15-e2e-scenarios-v2/common.sh index 17cc1b73..a2604b19 100644 --- a/experiments/15-e2e-scenarios-v2/common.sh +++ b/experiments/15-e2e-scenarios-v2/common.sh @@ -93,6 +93,7 @@ function start_repeating_olap_runner() { local ra_gap_std_s=$3 local query_indexes=$4 local results_name=$5 + local client_offset=$6 local args=( --num-clients $ra_clients @@ -107,6 +108,10 @@ function start_repeating_olap_runner() { args+=(--query-frequency-path $ra_query_frequency_path) fi + if [[ ! -z $client_offset ]]; then + args+=(--client-offset $client_offset) + fi + >&2 echo "[Repeating Analytics] Running with $ra_clients..." results_dir=$COND_OUT/$results_name mkdir -p $results_dir diff --git a/experiments/15-e2e-scenarios-v2/scale_up/COND b/experiments/15-e2e-scenarios-v2/scale_up/COND index 5a14fe81..5721e424 100644 --- a/experiments/15-e2e-scenarios-v2/scale_up/COND +++ b/experiments/15-e2e-scenarios-v2/scale_up/COND @@ -1,7 +1,5 @@ include("../common.cond") -aurora_alt = [99, 56, 32, 92, 91, 49, 30, 83, 94, 38, 87, 86, 76, 37, 31, 46] - run_experiment( name="brad_100g", run="./run_workload.sh", @@ -10,9 +8,8 @@ run_experiment( "config-file": "config/config_large_100.yml", "planner-config-file": "config/planner.yml", "schema-name": "imdb_extended_100g", - "ra-query-indexes": ",".join(map(str, aurora_alt)), "ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK, - "num-front-ends": 24, + "num-front-ends": 28, "dataset-type": "100gb", }, ) @@ -25,9 +22,8 @@ run_command( "config-file": "config/config_large_100.yml", "planner-config-file": "config/planner.yml", "schema-name": "imdb_extended_100g", - "ra-query-indexes": ",".join(map(str, aurora_alt)), "ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK, - "num-front-ends": 24, + "num-front-ends": 28, "dataset-type": "100gb", }, ) diff --git a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh index ee4b0896..063fd631 100755 --- a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh +++ b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh @@ -14,6 +14,9 @@ source ../common.sh # TODO: This executor file should be adapted to run against the baselines too # (TiDB / Serverless Redshift + Aurora) +initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46" +heavier_queries="58,61,62,64,69,70,71,72,73,74" + function step_txns() { local lo=$1 local hi=$2 @@ -36,7 +39,7 @@ function txn_sweep() { txn_pid=$runner_pid sleep $(($gap_minute * 60)) - if [[ ! -z $keep_last ]] && [[ $t_clients = $keep_last ]]; then + if [[ -z $keep_last ]] || [[ $t_clients != $keep_last ]]; then kill -INT $txn_pid wait $txn_pid fi @@ -44,7 +47,11 @@ function txn_sweep() { } function inner_cancel_experiment() { - cancel_experiment $rana_pid $txn_pid + if [ ! -z $heavy_rana_pid ]; then + cancel_experiment $rana_pid $txn_pid $heavy_rana_pid + else + cancel_experiment $rana_pid $txn_pid + fi } trap "inner_cancel_experiment" INT @@ -55,35 +62,40 @@ log_workload_point "brad_start_initiated" sleep 30 # Start with 8 analytical clients. -log_workload_point "ra_client_starting" -start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" +log_workload_point "start_rana_8" +start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid # Scale up to 8 transactional clients and hold for 15 minutes. +log_workload_point "start_increase_txn_4_to_8" txn_sweep "4 5 6 7 8" 1 8 +log_workload_point "hold_txn_8_15_min" sleep $((15 * 60)) -# Scale up to 24 transactional clients. +# Scale up to 28 transactional clients. Hold for 15 minutes. +log_workload_point "start_increase_txn_12_to_28" kill -INT $txn_pid wait $txn_pid -txn_sweep "12 16 20 24" 2 24 - -# 5 minutes -kill -INT $rana_pid -wait $rana_pid -start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_16" -rana_pid=$runner_pid -sleep $((5 * 60)) +txn_sweep "12 16 20 24 28" 2 28 +log_workload_point "hold_txn_28_15_min" +sleep $((15 * 60)) + +# 15 minutes. +log_workload_point "start_heavy_rana_8" +start_repeating_olap_runner 8 5 1 $heavier_queries "ra_8_heavy" 8 +heavy_rana_pid=$runner_pid +sleep $((15 * 60)) -# 20 minutes -kill -INT $rana_pid -wait $rana_pid -start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_24" -rana_pid=$runner_pid +# 20 minutes. +log_workload_point "start_heavy_rana_20" +kill -INT $heavy_rana_pid +wait $heavy_rana_pid +start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8 +heavy_rana_pid=$runner_pid sleep $((20 * 60)) log_workload_point "experiment_workload_done" # Shut down everything now. >&2 echo "Experiment done. Shutting down runners..." -graceful_shutdown $rana_pid $txn_pid +graceful_shutdown $rana_pid $heavy_rana_pid $txn_pid log_workload_point "shutdown_complete" diff --git a/experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh b/experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh index 97ac9854..fb010022 100755 --- a/experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh +++ b/experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh @@ -1,5 +1,4 @@ #! /bin/bash - script_loc=$(cd $(dirname $0) && pwd -P) cd $script_loc source ../common.sh @@ -14,11 +13,8 @@ source ../common.sh # TODO: This executor file should be adapted to run against the baselines too # (TiDB / Serverless Redshift + Aurora) -function step_txns() { - local lo=$1 - local hi=$2 - local gap_minute=$3 -} +initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46" +heavier_queries="58,61,62,64,69,70,71,72,73,74" # Arguments: # --config-file @@ -42,7 +38,7 @@ function point_one() { # A: 8x # T: 4x local run_for_minutes=$1 - start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" + start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid start_txn_runner 4 # Implicit: --dataset-type txn_pid=$runner_pid @@ -58,7 +54,7 @@ function point_two() { # A: 8x # T: 8x local run_for_minutes=$1 - start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" + start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid start_txn_runner 8 # Implicit: --dataset-type txn_pid=$runner_pid @@ -72,11 +68,11 @@ function point_two() { function point_three() { # A: 8x - # T: 24x + # T: 28x local run_for_minutes=$1 - start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" + start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid - start_txn_runner 24 # Implicit: --dataset-type + start_txn_runner 28 # Implicit: --dataset-type txn_pid=$runner_pid sleep $(($run_for_minutes * 60)) @@ -88,41 +84,49 @@ function point_three() { function point_four() { # A: 16x - # T: 24x + # T: 28x local run_for_minutes=$1 - start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_8" + start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid - start_txn_runner 24 # Implicit: --dataset-type + start_repeating_olap_runner 8 15 5 $heavier_queries "ra_8_heavy" 8 + rana_heavy_pid=$runner_pid + start_txn_runner 28 # Implicit: --dataset-type txn_pid=$runner_pid sleep $(($run_for_minutes * 60)) kill -INT $rana_pid + kill -INT $rana_heavy_pid kill -INT $txn_pid wait $rana_pid + wait $rana_heavy_pid wait $txn_pid } function point_five() { - # A: 24x - # T: 24x + # A: 28x + # T: 28x local run_for_minutes=$1 - start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_8" + start_repeating_olap_runner 8 2 1 $initial_queries "ra_8" rana_pid=$runner_pid - start_txn_runner 24 # Implicit: --dataset-type + start_repeating_olap_runner 32 2 1 $heavier_queries "ra_24_heavy" 8 + rana_heavy_pid=$runner_pid + start_txn_runner 28 # Implicit: --dataset-type txn_pid=$runner_pid sleep $(($run_for_minutes * 60)) kill -INT $rana_pid + kill -INT $rana_heavy_pid kill -INT $txn_pid wait $rana_pid + wait $rana_heavy_pid wait $txn_pid } echo "READY -- Running for 1 hour. Hit Ctrl-C to stop." # point_one 60 -point_two 60 +# point_two 60 # point_three 60 # point_four 60 -# point_five 60 +point_five 60 inner_cancel_experiment diff --git a/experiments/15-e2e-scenarios-v2/scale_up/set_up_starting_blueprint.py b/experiments/15-e2e-scenarios-v2/scale_up/set_up_starting_blueprint.py new file mode 100644 index 00000000..aa6f3d7f --- /dev/null +++ b/experiments/15-e2e-scenarios-v2/scale_up/set_up_starting_blueprint.py @@ -0,0 +1,131 @@ +import asyncio +import argparse +import logging + +from brad.asset_manager import AssetManager +from brad.blueprint import Blueprint +from brad.blueprint.manager import BlueprintManager +from brad.blueprint.provisioning import Provisioning +from brad.config.engine import Engine +from brad.config.file import ConfigFile +from brad.daemon.transition_orchestrator import TransitionOrchestrator +from brad.planner.enumeration.blueprint import EnumeratedBlueprint +from brad.query_rep import QueryRep +from brad.routing.abstract_policy import FullRoutingPolicy +from brad.routing.cached import CachedLocationPolicy +from brad.routing.policy import RoutingPolicy +from brad.routing.tree_based.forest_policy import ForestPolicy +from brad.utils import set_up_logging + +logger = logging.getLogger(__name__) + + +async def run_transition( + config: ConfigFile, + blueprint_mgr: BlueprintManager, + next_blueprint: Blueprint, +) -> None: + logger.info("Starting the transition...") + assert next_blueprint is not None + await blueprint_mgr.start_transition(next_blueprint, new_score=None) + orchestrator = TransitionOrchestrator(config, blueprint_mgr) + logger.info("Running the transition...") + await orchestrator.run_prepare_then_transition() + logger.info("Running the post-transition clean up...") + await orchestrator.run_clean_up_after_transition() + logger.info("Done!") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + type=str, + required=True, + help="Path to BRAD's configuration file.", + ) + parser.add_argument( + "--schema-name", + type=str, + required=True, + help="The name of the schema to drop.", + ) + parser.add_argument("--query-bank-file", type=str) + parser.add_argument( + "--athena-queries", type=str, help="Comma separated list of indices." + ) + parser.add_argument( + "--aurora-queries", + type=str, + help="Comma separated list of indices.", + default="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46", + ) + parser.add_argument( + "--redshift-queries", type=str, help="Comma separated list of indices." + ) + args = parser.parse_args() + set_up_logging(debug_mode=True) + + # 1. Load the config. + config = ConfigFile.load(args.config_file) + + # 2. Load the existing blueprint. + assets = AssetManager(config) + blueprint_mgr = BlueprintManager(config, assets, args.schema_name) + blueprint_mgr.load_sync() + blueprint = blueprint_mgr.get_blueprint() + + # 3. Load the query bank. + queries = [] + with open(args.query_bank_file, "r", encoding="UTF-8") as file: + for line in file: + clean = line.strip() + if clean.endswith(";"): + clean = clean[:-1] + queries.append(QueryRep(clean)) + + # 4. Create the fixed routing policy. + query_map = {} + if args.athena_queries is not None: + for qidx_str in args.athena_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Athena + if args.redshift_queries is not None: + for qidx_str in args.redshift_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Redshift + if args.aurora_queries is not None: + for qidx_str in args.aurora_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Aurora + clp = CachedLocationPolicy(query_map) + + # 5. Replace the policy. + enum_blueprint = EnumeratedBlueprint(blueprint) + definite_policy = asyncio.run( + ForestPolicy.from_assets( + args.schema_name, RoutingPolicy.ForestTableSelectivity, assets + ) + ) + replaced_policy = FullRoutingPolicy( + indefinite_policies=[clp], definite_policy=definite_policy + ) + enum_blueprint.set_routing_policy(replaced_policy) + + # Ensure the provisioning is as expected. + enum_blueprint.set_aurora_provisioning(Provisioning("db.t4g.medium", 2)) + enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 0)) + + # 6. Adjust the placement. + new_placement = {} + for table in blueprint.tables(): + new_placement[table.name] = [Engine.Aurora, Engine.Athena] + enum_blueprint.set_table_locations(new_placement) + + # 6. Transition to the new blueprint. + modified_blueprint = enum_blueprint.to_blueprint() + asyncio.run(run_transition(config, blueprint_mgr, modified_blueprint)) + + +if __name__ == "__main__": + main() diff --git a/src/brad/daemon/front_end_metrics.py b/src/brad/daemon/front_end_metrics.py index d413fe04..c014f763 100644 --- a/src/brad/daemon/front_end_metrics.py +++ b/src/brad/daemon/front_end_metrics.py @@ -205,7 +205,8 @@ def handle_metric_report(self, report: MetricsReport) -> None: fe_index ].add_sample(report.txn_latency_sketch(), now) - logger.debug( + log_verbose( + logger, "Received metrics report: [%d] %f (ts: %s)", report.fe_index, report.txn_completions_per_s, diff --git a/src/brad/planner/enumeration/provisioning.py b/src/brad/planner/enumeration/provisioning.py index db82c491..70ca8958 100644 --- a/src/brad/planner/enumeration/provisioning.py +++ b/src/brad/planner/enumeration/provisioning.py @@ -73,6 +73,10 @@ def enumerate_nearby( ): candidate.set_num_nodes(num_nodes) + if id(self._instances) == id(_REDSHIFT_INSTANCES) and num_nodes == 1: + # TODO: Temporarily avoid single node instances. + continue + if ( abs(self._compute_distance(base_provisioning_value, candidate)) <= max_distance