Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scale up workload adjustments #361

Merged
merged 7 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions experiments/15-e2e-scenarios-v2/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ function start_repeating_olap_runner() {
local ra_gap_std_s=$3
local query_indexes=$4
local results_name=$5
local client_offset=$6

local args=(
--num-clients $ra_clients
Expand All @@ -107,6 +108,10 @@ function start_repeating_olap_runner() {
args+=(--query-frequency-path $ra_query_frequency_path)
fi

if [[ ! -z $client_offset ]]; then
args+=(--client-offset $client_offset)
fi

>&2 echo "[Repeating Analytics] Running with $ra_clients..."
results_dir=$COND_OUT/$results_name
mkdir -p $results_dir
Expand Down
8 changes: 2 additions & 6 deletions experiments/15-e2e-scenarios-v2/scale_up/COND
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
include("../common.cond")

aurora_alt = [99, 56, 32, 92, 91, 49, 30, 83, 94, 38, 87, 86, 76, 37, 31, 46]

run_experiment(
name="brad_100g",
run="./run_workload.sh",
Expand All @@ -10,9 +8,8 @@ run_experiment(
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 24,
"num-front-ends": 28,
"dataset-type": "100gb",
},
)
Expand All @@ -25,9 +22,8 @@ run_command(
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 24,
"num-front-ends": 28,
"dataset-type": "100gb",
},
)
50 changes: 31 additions & 19 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ source ../common.sh
# TODO: This executor file should be adapted to run against the baselines too
# (TiDB / Serverless Redshift + Aurora)

initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46"
heavier_queries="58,61,62,64,69,70,71,72,73,74"

function step_txns() {
local lo=$1
local hi=$2
Expand All @@ -36,15 +39,19 @@ function txn_sweep() {
txn_pid=$runner_pid

sleep $(($gap_minute * 60))
if [[ ! -z $keep_last ]] && [[ $t_clients = $keep_last ]]; then
if [[ -z $keep_last ]] || [[ $t_clients != $keep_last ]]; then
kill -INT $txn_pid
wait $txn_pid
fi
done
}

function inner_cancel_experiment() {
cancel_experiment $rana_pid $txn_pid
if [ ! -z $heavy_rana_pid ]; then
cancel_experiment $rana_pid $txn_pid $heavy_rana_pid
else
cancel_experiment $rana_pid $txn_pid
fi
}

trap "inner_cancel_experiment" INT
Expand All @@ -55,35 +62,40 @@ log_workload_point "brad_start_initiated"
sleep 30

# Start with 8 analytical clients.
log_workload_point "ra_client_starting"
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
log_workload_point "start_rana_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid

# Scale up to 8 transactional clients and hold for 15 minutes.
log_workload_point "start_increase_txn_4_to_8"
txn_sweep "4 5 6 7 8" 1 8
log_workload_point "hold_txn_8_15_min"
sleep $((15 * 60))

# Scale up to 24 transactional clients.
# Scale up to 28 transactional clients. Hold for 15 minutes.
log_workload_point "start_increase_txn_12_to_28"
kill -INT $txn_pid
wait $txn_pid
txn_sweep "12 16 20 24" 2 24

# 5 minutes
kill -INT $rana_pid
wait $rana_pid
start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_16"
rana_pid=$runner_pid
sleep $((5 * 60))
txn_sweep "12 16 20 24 28" 2 28
log_workload_point "hold_txn_28_15_min"
sleep $((15 * 60))

# 15 minutes.
log_workload_point "start_heavy_rana_8"
start_repeating_olap_runner 8 5 1 $heavier_queries "ra_8_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((15 * 60))

# 20 minutes
kill -INT $rana_pid
wait $rana_pid
start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_24"
rana_pid=$runner_pid
# 20 minutes.
log_workload_point "start_heavy_rana_20"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid
start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((20 * 60))
log_workload_point "experiment_workload_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown $rana_pid $txn_pid
graceful_shutdown $rana_pid $heavy_rana_pid $txn_pid
log_workload_point "shutdown_complete"
44 changes: 24 additions & 20 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /bin/bash

script_loc=$(cd $(dirname $0) && pwd -P)
cd $script_loc
source ../common.sh
Expand All @@ -14,11 +13,8 @@ source ../common.sh
# TODO: This executor file should be adapted to run against the baselines too
# (TiDB / Serverless Redshift + Aurora)

function step_txns() {
local lo=$1
local hi=$2
local gap_minute=$3
}
initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46"
heavier_queries="58,61,62,64,69,70,71,72,73,74"

# Arguments:
# --config-file
Expand All @@ -42,7 +38,7 @@ function point_one() {
# A: 8x
# T: 4x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 4 # Implicit: --dataset-type
txn_pid=$runner_pid
Expand All @@ -58,7 +54,7 @@ function point_two() {
# A: 8x
# T: 8x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 8 # Implicit: --dataset-type
txn_pid=$runner_pid
Expand All @@ -72,11 +68,11 @@ function point_two() {

function point_three() {
# A: 8x
# T: 24x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
Expand All @@ -88,41 +84,49 @@ function point_three() {

function point_four() {
# A: 16x
# T: 24x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_repeating_olap_runner 8 15 5 $heavier_queries "ra_8_heavy" 8
rana_heavy_pid=$runner_pid
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
kill -INT $rana_pid
kill -INT $rana_heavy_pid
kill -INT $txn_pid
wait $rana_pid
wait $rana_heavy_pid
wait $txn_pid
}

function point_five() {
# A: 24x
# T: 24x
# A: 28x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 2 1 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_repeating_olap_runner 32 2 1 $heavier_queries "ra_24_heavy" 8
rana_heavy_pid=$runner_pid
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
kill -INT $rana_pid
kill -INT $rana_heavy_pid
kill -INT $txn_pid
wait $rana_pid
wait $rana_heavy_pid
wait $txn_pid
}

echo "READY -- Running for 1 hour. Hit Ctrl-C to stop."
# point_one 60
point_two 60
# point_two 60
# point_three 60
# point_four 60
# point_five 60
point_five 60

inner_cancel_experiment
131 changes: 131 additions & 0 deletions experiments/15-e2e-scenarios-v2/scale_up/set_up_starting_blueprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import argparse
import logging

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
from brad.blueprint.manager import BlueprintManager
from brad.blueprint.provisioning import Provisioning
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.daemon.transition_orchestrator import TransitionOrchestrator
from brad.planner.enumeration.blueprint import EnumeratedBlueprint
from brad.query_rep import QueryRep
from brad.routing.abstract_policy import FullRoutingPolicy
from brad.routing.cached import CachedLocationPolicy
from brad.routing.policy import RoutingPolicy
from brad.routing.tree_based.forest_policy import ForestPolicy
from brad.utils import set_up_logging

logger = logging.getLogger(__name__)


async def run_transition(
config: ConfigFile,
blueprint_mgr: BlueprintManager,
next_blueprint: Blueprint,
) -> None:
logger.info("Starting the transition...")
assert next_blueprint is not None
await blueprint_mgr.start_transition(next_blueprint, new_score=None)
orchestrator = TransitionOrchestrator(config, blueprint_mgr)
logger.info("Running the transition...")
await orchestrator.run_prepare_then_transition()
logger.info("Running the post-transition clean up...")
await orchestrator.run_clean_up_after_transition()
logger.info("Done!")


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--config-file",
type=str,
required=True,
help="Path to BRAD's configuration file.",
)
parser.add_argument(
"--schema-name",
type=str,
required=True,
help="The name of the schema to drop.",
)
parser.add_argument("--query-bank-file", type=str)
parser.add_argument(
"--athena-queries", type=str, help="Comma separated list of indices."
)
parser.add_argument(
"--aurora-queries",
type=str,
help="Comma separated list of indices.",
default="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46",
)
parser.add_argument(
"--redshift-queries", type=str, help="Comma separated list of indices."
)
args = parser.parse_args()
set_up_logging(debug_mode=True)

# 1. Load the config.
config = ConfigFile.load(args.config_file)

# 2. Load the existing blueprint.
assets = AssetManager(config)
blueprint_mgr = BlueprintManager(config, assets, args.schema_name)
blueprint_mgr.load_sync()
blueprint = blueprint_mgr.get_blueprint()

# 3. Load the query bank.
queries = []
with open(args.query_bank_file, "r", encoding="UTF-8") as file:
for line in file:
clean = line.strip()
if clean.endswith(";"):
clean = clean[:-1]
queries.append(QueryRep(clean))

# 4. Create the fixed routing policy.
query_map = {}
if args.athena_queries is not None:
for qidx_str in args.athena_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Athena
if args.redshift_queries is not None:
for qidx_str in args.redshift_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Redshift
if args.aurora_queries is not None:
for qidx_str in args.aurora_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Aurora
clp = CachedLocationPolicy(query_map)

# 5. Replace the policy.
enum_blueprint = EnumeratedBlueprint(blueprint)
definite_policy = asyncio.run(
ForestPolicy.from_assets(
args.schema_name, RoutingPolicy.ForestTableSelectivity, assets
)
)
replaced_policy = FullRoutingPolicy(
indefinite_policies=[clp], definite_policy=definite_policy
)
enum_blueprint.set_routing_policy(replaced_policy)

# Ensure the provisioning is as expected.
enum_blueprint.set_aurora_provisioning(Provisioning("db.t4g.medium", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 0))

# 6. Adjust the placement.
new_placement = {}
for table in blueprint.tables():
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
enum_blueprint.set_table_locations(new_placement)

# 6. Transition to the new blueprint.
modified_blueprint = enum_blueprint.to_blueprint()
asyncio.run(run_transition(config, blueprint_mgr, modified_blueprint))


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def handle_metric_report(self, report: MetricsReport) -> None:
fe_index
].add_sample(report.txn_latency_sketch(), now)

logger.debug(
log_verbose(
logger,
"Received metrics report: [%d] %f (ts: %s)",
report.fe_index,
report.txn_completions_per_s,
Expand Down
Loading