Skip to content

Commit

Permalink
Add scale up workload adjustments (#361)
Browse files Browse the repository at this point in the history
* Increase scale up load: txn and analytics

* Missing change:

* Make metrics logging less verbose

* Add script to reset blueprint

* Check in adjusted workload

* Fix workload start script

* Temp skip single nodes
  • Loading branch information
geoffxy authored Nov 12, 2023
1 parent e7732c2 commit 580638c
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 46 deletions.
5 changes: 5 additions & 0 deletions experiments/15-e2e-scenarios-v2/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ function start_repeating_olap_runner() {
local ra_gap_std_s=$3
local query_indexes=$4
local results_name=$5
local client_offset=$6

local args=(
--num-clients $ra_clients
Expand All @@ -107,6 +108,10 @@ function start_repeating_olap_runner() {
args+=(--query-frequency-path $ra_query_frequency_path)
fi

if [[ ! -z $client_offset ]]; then
args+=(--client-offset $client_offset)
fi

>&2 echo "[Repeating Analytics] Running with $ra_clients..."
results_dir=$COND_OUT/$results_name
mkdir -p $results_dir
Expand Down
8 changes: 2 additions & 6 deletions experiments/15-e2e-scenarios-v2/scale_up/COND
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
include("../common.cond")

aurora_alt = [99, 56, 32, 92, 91, 49, 30, 83, 94, 38, 87, 86, 76, 37, 31, 46]

run_experiment(
name="brad_100g",
run="./run_workload.sh",
Expand All @@ -10,9 +8,8 @@ run_experiment(
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 24,
"num-front-ends": 28,
"dataset-type": "100gb",
},
)
Expand All @@ -25,9 +22,8 @@ run_command(
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 24,
"num-front-ends": 28,
"dataset-type": "100gb",
},
)
50 changes: 31 additions & 19 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ source ../common.sh
# TODO: This executor file should be adapted to run against the baselines too
# (TiDB / Serverless Redshift + Aurora)

initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46"
heavier_queries="58,61,62,64,69,70,71,72,73,74"

function step_txns() {
local lo=$1
local hi=$2
Expand All @@ -36,15 +39,19 @@ function txn_sweep() {
txn_pid=$runner_pid

sleep $(($gap_minute * 60))
if [[ ! -z $keep_last ]] && [[ $t_clients = $keep_last ]]; then
if [[ -z $keep_last ]] || [[ $t_clients != $keep_last ]]; then
kill -INT $txn_pid
wait $txn_pid
fi
done
}

function inner_cancel_experiment() {
cancel_experiment $rana_pid $txn_pid
if [ ! -z $heavy_rana_pid ]; then
cancel_experiment $rana_pid $txn_pid $heavy_rana_pid
else
cancel_experiment $rana_pid $txn_pid
fi
}

trap "inner_cancel_experiment" INT
Expand All @@ -55,35 +62,40 @@ log_workload_point "brad_start_initiated"
sleep 30

# Start with 8 analytical clients.
log_workload_point "ra_client_starting"
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
log_workload_point "start_rana_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid

# Scale up to 8 transactional clients and hold for 15 minutes.
log_workload_point "start_increase_txn_4_to_8"
txn_sweep "4 5 6 7 8" 1 8
log_workload_point "hold_txn_8_15_min"
sleep $((15 * 60))

# Scale up to 24 transactional clients.
# Scale up to 28 transactional clients. Hold for 15 minutes.
log_workload_point "start_increase_txn_12_to_28"
kill -INT $txn_pid
wait $txn_pid
txn_sweep "12 16 20 24" 2 24

# 5 minutes
kill -INT $rana_pid
wait $rana_pid
start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_16"
rana_pid=$runner_pid
sleep $((5 * 60))
txn_sweep "12 16 20 24 28" 2 28
log_workload_point "hold_txn_28_15_min"
sleep $((15 * 60))

# 15 minutes.
log_workload_point "start_heavy_rana_8"
start_repeating_olap_runner 8 5 1 $heavier_queries "ra_8_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((15 * 60))

# 20 minutes
kill -INT $rana_pid
wait $rana_pid
start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_24"
rana_pid=$runner_pid
# 20 minutes.
log_workload_point "start_heavy_rana_20"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid
start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((20 * 60))
log_workload_point "experiment_workload_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown $rana_pid $txn_pid
graceful_shutdown $rana_pid $heavy_rana_pid $txn_pid
log_workload_point "shutdown_complete"
44 changes: 24 additions & 20 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload_debug.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /bin/bash

script_loc=$(cd $(dirname $0) && pwd -P)
cd $script_loc
source ../common.sh
Expand All @@ -14,11 +13,8 @@ source ../common.sh
# TODO: This executor file should be adapted to run against the baselines too
# (TiDB / Serverless Redshift + Aurora)

function step_txns() {
local lo=$1
local hi=$2
local gap_minute=$3
}
initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46"
heavier_queries="58,61,62,64,69,70,71,72,73,74"

# Arguments:
# --config-file
Expand All @@ -42,7 +38,7 @@ function point_one() {
# A: 8x
# T: 4x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 4 # Implicit: --dataset-type
txn_pid=$runner_pid
Expand All @@ -58,7 +54,7 @@ function point_two() {
# A: 8x
# T: 8x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 8 # Implicit: --dataset-type
txn_pid=$runner_pid
Expand All @@ -72,11 +68,11 @@ function point_two() {

function point_three() {
# A: 8x
# T: 24x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
Expand All @@ -88,41 +84,49 @@ function point_three() {

function point_four() {
# A: 16x
# T: 24x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 16 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_repeating_olap_runner 8 15 5 $heavier_queries "ra_8_heavy" 8
rana_heavy_pid=$runner_pid
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
kill -INT $rana_pid
kill -INT $rana_heavy_pid
kill -INT $txn_pid
wait $rana_pid
wait $rana_heavy_pid
wait $txn_pid
}

function point_five() {
# A: 24x
# T: 24x
# A: 28x
# T: 28x
local run_for_minutes=$1
start_repeating_olap_runner 24 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 8 2 1 $initial_queries "ra_8"
rana_pid=$runner_pid
start_txn_runner 24 # Implicit: --dataset-type
start_repeating_olap_runner 32 2 1 $heavier_queries "ra_24_heavy" 8
rana_heavy_pid=$runner_pid
start_txn_runner 28 # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($run_for_minutes * 60))
kill -INT $rana_pid
kill -INT $rana_heavy_pid
kill -INT $txn_pid
wait $rana_pid
wait $rana_heavy_pid
wait $txn_pid
}

echo "READY -- Running for 1 hour. Hit Ctrl-C to stop."
# point_one 60
point_two 60
# point_two 60
# point_three 60
# point_four 60
# point_five 60
point_five 60

inner_cancel_experiment
131 changes: 131 additions & 0 deletions experiments/15-e2e-scenarios-v2/scale_up/set_up_starting_blueprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import argparse
import logging

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
from brad.blueprint.manager import BlueprintManager
from brad.blueprint.provisioning import Provisioning
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.daemon.transition_orchestrator import TransitionOrchestrator
from brad.planner.enumeration.blueprint import EnumeratedBlueprint
from brad.query_rep import QueryRep
from brad.routing.abstract_policy import FullRoutingPolicy
from brad.routing.cached import CachedLocationPolicy
from brad.routing.policy import RoutingPolicy
from brad.routing.tree_based.forest_policy import ForestPolicy
from brad.utils import set_up_logging

logger = logging.getLogger(__name__)


async def run_transition(
config: ConfigFile,
blueprint_mgr: BlueprintManager,
next_blueprint: Blueprint,
) -> None:
logger.info("Starting the transition...")
assert next_blueprint is not None
await blueprint_mgr.start_transition(next_blueprint, new_score=None)
orchestrator = TransitionOrchestrator(config, blueprint_mgr)
logger.info("Running the transition...")
await orchestrator.run_prepare_then_transition()
logger.info("Running the post-transition clean up...")
await orchestrator.run_clean_up_after_transition()
logger.info("Done!")


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--config-file",
type=str,
required=True,
help="Path to BRAD's configuration file.",
)
parser.add_argument(
"--schema-name",
type=str,
required=True,
help="The name of the schema to drop.",
)
parser.add_argument("--query-bank-file", type=str)
parser.add_argument(
"--athena-queries", type=str, help="Comma separated list of indices."
)
parser.add_argument(
"--aurora-queries",
type=str,
help="Comma separated list of indices.",
default="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46",
)
parser.add_argument(
"--redshift-queries", type=str, help="Comma separated list of indices."
)
args = parser.parse_args()
set_up_logging(debug_mode=True)

# 1. Load the config.
config = ConfigFile.load(args.config_file)

# 2. Load the existing blueprint.
assets = AssetManager(config)
blueprint_mgr = BlueprintManager(config, assets, args.schema_name)
blueprint_mgr.load_sync()
blueprint = blueprint_mgr.get_blueprint()

# 3. Load the query bank.
queries = []
with open(args.query_bank_file, "r", encoding="UTF-8") as file:
for line in file:
clean = line.strip()
if clean.endswith(";"):
clean = clean[:-1]
queries.append(QueryRep(clean))

# 4. Create the fixed routing policy.
query_map = {}
if args.athena_queries is not None:
for qidx_str in args.athena_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Athena
if args.redshift_queries is not None:
for qidx_str in args.redshift_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Redshift
if args.aurora_queries is not None:
for qidx_str in args.aurora_queries.split(","):
qidx = int(qidx_str.strip())
query_map[queries[qidx]] = Engine.Aurora
clp = CachedLocationPolicy(query_map)

# 5. Replace the policy.
enum_blueprint = EnumeratedBlueprint(blueprint)
definite_policy = asyncio.run(
ForestPolicy.from_assets(
args.schema_name, RoutingPolicy.ForestTableSelectivity, assets
)
)
replaced_policy = FullRoutingPolicy(
indefinite_policies=[clp], definite_policy=definite_policy
)
enum_blueprint.set_routing_policy(replaced_policy)

# Ensure the provisioning is as expected.
enum_blueprint.set_aurora_provisioning(Provisioning("db.t4g.medium", 2))
enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 0))

# 6. Adjust the placement.
new_placement = {}
for table in blueprint.tables():
new_placement[table.name] = [Engine.Aurora, Engine.Athena]
enum_blueprint.set_table_locations(new_placement)

# 6. Transition to the new blueprint.
modified_blueprint = enum_blueprint.to_blueprint()
asyncio.run(run_transition(config, blueprint_mgr, modified_blueprint))


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def handle_metric_report(self, report: MetricsReport) -> None:
fe_index
].add_sample(report.txn_latency_sketch(), now)

logger.debug(
log_verbose(
logger,
"Received metrics report: [%d] %f (ts: %s)",
report.fe_index,
report.txn_completions_per_s,
Expand Down
Loading

0 comments on commit 580638c

Please sign in to comment.