Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale down scenario adjustments #340

Merged
merged 8 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ planning_window:
hours: 0
minutes: 5

# Each second in the previous workload is treated as 10 seconds (so a 5 min
# planning window is treated as 50 mins). This is to make scenario run times
# Each second in the previous workload is treated as 12 seconds (so a 5 min
# planning window is treated as 60 mins). This is to make scenario run times
# more managable.
reinterpret_second_as: 10
reinterpret_second_as: 12

# The query distribution must change by at least this much for a new blueprint
# to be accepted.
Expand All @@ -31,12 +31,12 @@ triggers:
multiplier: 60 # Multiplier over `planning_window`.

redshift_cpu:
lo: 10
hi: 90
lo: 15
hi: 85
sustained_epochs: 3

aurora_cpu:
lo: 10
lo: 15
hi: 85
sustained_epochs: 3

Expand Down
31 changes: 20 additions & 11 deletions experiments/15-e2e-scenarios-v2/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ function start_brad_debug() {
}

function cancel_experiment() {
kill -INT $txn_pid
kill -INT $rana_pid
for pid_var in "$@"; do
kill -INT $pid_var
done
kill -INT $brad_pid
}

function graceful_shutdown() {
kill -INT $txn_pid
kill -INT $rana_pid
wait $txn_pid
wait $rana_pid
for pid_var in "$@"; do
kill -INT $pid_var
done
for pid_var in "$@"; do
wait $pid_var
done

kill -INT $brad_pid
wait $brad_pid
Expand Down Expand Up @@ -88,11 +91,13 @@ function start_repeating_olap_runner() {
local ra_clients=$1
local ra_gap_s=$2
local ra_gap_std_s=$3
local query_indexes=$4
local results_name=$5

local args=(
--num-clients $ra_clients
--num-front-ends $num_front_ends
--query-indexes $ra_query_indexes
--query-indexes $query_indexes
--query-bank-file $ra_query_bank_file
--avg-gap-s $ra_gap_s
--avg-gap-std-s $ra_gap_std_s
Expand All @@ -103,12 +108,14 @@ function start_repeating_olap_runner() {
fi

>&2 echo "[Repeating Analytics] Running with $ra_clients..."
results_dir=$COND_OUT/ra_${ra_clients}
results_dir=$COND_OUT/$results_name
mkdir -p $results_dir

log_workload_point "rana_${ra_clients}"
log_workload_point $results_name
COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_repeating_analytics.py "${args[@]}" &
rana_pid=$!

# This is a special return value variable that we use.
runner_pid=$!
}

function run_repeating_olap_warmup() {
Expand Down Expand Up @@ -141,7 +148,9 @@ function start_txn_runner() {
--num-clients $t_clients \
--num-front-ends $num_front_ends \
&
txn_pid=$!

# This is a special return value variable that we use.
runner_pid=$!
}

function extract_named_arguments() {
Expand Down
11 changes: 9 additions & 2 deletions experiments/15-e2e-scenarios-v2/scale_down/COND
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ run_command(
},
)

aurora_acceptable = [58, 61, 62, 64, 69]
aurora_good = [25, 26, 27, 28, 29, 31, 32, 33, 37, 43, 46, 47, 49, 83, 90]

# 16 queries that should be acceptable on Aurora. We have 4 that should run on
# Redshift/Athena.
aurora_alt = [99, 56, 32, 92, 91, 49, 30, 83, 94, 38, 87, 86, 76, 37, 31, 46]

run_experiment(
name="brad_100g",
run="./run_workload.sh",
Expand All @@ -39,7 +46,7 @@ run_experiment(
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
# TODO: Select regular query indexes
"ra-query-indexes": ",".join(map(str, list(range(25, 35)) + list(range(75, 80)))),
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 8,
},
Expand All @@ -53,7 +60,7 @@ run_command(
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-indexes": ",".join(map(str, list(range(25, 35)) + list(range(75, 80)))),
"ra-query-indexes": ",".join(map(str, aurora_alt)),
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 8,
},
Expand Down
24 changes: 18 additions & 6 deletions experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,30 @@ source ../common.sh
# --query-indexes
extract_named_arguments $@

trap "cancel_experiment" INT
trap "cancel_experiment" TERM

# Should be removed eventually and we should rely on the blueprint.
export BRAD_INITIAL_ROUTE_REDSHIFT_ONLY=1
start_brad $config_file $planner_config_file
log_workload_point "brad_start_initiated"
sleep 30

log_workload_point "clients_starting"
start_repeating_olap_runner 4 15 5 # Implicit: --query-indexes
start_txn_runner 4
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
rana_pid=$runner_pid

start_txn_runner 8
txn_pid=$runner_pid

start_repeating_olap_runner 1 70 5 "60,61,71,75" "ra_1_special"
rana2_pid=$runner_pid
log_workload_point "clients_started"

function inner_cancel_experiment() {
cancel_experiment $rana_pid $txn_pid $rana2_pid
}

trap "inner_cancel_experiment" INT
trap "inner_cancel_experiment" TERM

# Wait until a re-plan and transition completes.
# Expected:
# - Downscale Aurora
Expand All @@ -51,5 +63,5 @@ log_workload_point "experiment_workload_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown
graceful_shutdown $rana_pid $txn_pid $rana2_pid
log_workload_point "shutdown_complete"
18 changes: 17 additions & 1 deletion experiments/15-e2e-scenarios-v2/scale_down/run_workload_debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,27 @@ trap "cancel_experiment" TERM

# Useful for testing out blueprint planning without executing the transition.
export BRAD_IGNORE_BLUEPRINT=1
# Should be removed eventually and we should rely on the blueprint.
export BRAD_INITIAL_ROUTE_REDSHIFT_ONLY=1
start_brad_debug $config_file $planner_config_file
sleep 10

start_repeating_olap_runner 8 15 5 # Implicit: --query-indexes
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
rana_pid=$runner_pid

start_txn_runner 8
txn_pid=$runner_pid

start_repeating_olap_runner 1 70 5 "60,61,71,75" "ra_1_special"
rana2_pid=$runner_pid

function inner_cancel_experiment() {
cancel_experiment $rana_pid $txn_pid $rana2_pid
}

trap "inner_cancel_experiment" INT
trap "inner_cancel_experiment" TERM

echo "READY -- Sleeping for 1 hour. Hit Ctrl-C to stop."
sleep $((60 * 60))
inner_cancel_experiment
4 changes: 3 additions & 1 deletion src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from brad.daemon.messages import MetricsReport
from brad.daemon.metrics_logger import MetricsLogger
from brad.utils.streaming_metric import StreamingMetric, StreamingNumericMetric
from brad.utils import log_verbose

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,7 +136,8 @@ async def fetch_latest(self) -> None:
# make a copy.
merged = copy.deepcopy(sketch)

logger.debug(
log_verbose(
logger,
"[%s] [%d] Matched %d sketches with range %s -- %s",
metric_key,
fidx,
Expand Down
12 changes: 11 additions & 1 deletion src/brad/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
import os
import multiprocessing as mp
from typing import AsyncIterable, Optional, Dict, Any
from datetime import datetime, timezone, timedelta
Expand Down Expand Up @@ -53,6 +54,8 @@

LINESEP = "\n".encode()

INITIAL_ROUTE_REDSHIFT_ONLY_VAR = "BRAD_INITIAL_ROUTE_REDSHIFT_ONLY"


class BradFrontEnd(BradInterface):
def __init__(
Expand Down Expand Up @@ -144,7 +147,7 @@ def __init__(
self._estimator: Optional[Estimator] = None

# Number of transactions that completed.
self._transaction_end_counter = Counter()
self._transaction_end_counter = Counter() # pylint: disable=global-statement
self._reset_latency_sketches()
self._brad_metrics_reporting_task: Optional[asyncio.Task[None]] = None

Expand All @@ -161,6 +164,10 @@ def __init__(
# Used to re-establish engine connections.
self._reestablish_connections_task: Optional[asyncio.Task[None]] = None

# This is temporary for experiment purposes. In the future, this will be
# part of the blueprint.
self._route_redshift_only = INITIAL_ROUTE_REDSHIFT_ONLY_VAR in os.environ

async def serve_forever(self):
await self._run_setup()
try:
Expand Down Expand Up @@ -289,6 +296,8 @@ async def _run_query_impl(
)
if transactional_query:
engine_to_use = Engine.Aurora
elif self._route_redshift_only:
engine_to_use = Engine.Redshift
else:
engine_to_use = await self._router.engine_for(query_rep)

Expand Down Expand Up @@ -493,6 +502,7 @@ async def _read_daemon_messages(self) -> None:
)
# This refreshes any cached state that depends on the old blueprint.
await self._run_blueprint_update(message.version)
self._route_redshift_only = False
# Tell the daemon that we have updated.
self._output_queue.put(
NewBlueprintAck(self._fe_index, message.version), block=False
Expand Down
7 changes: 7 additions & 0 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import heapq
import json
import logging
Expand Down Expand Up @@ -129,6 +130,12 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 5. Run beam search to formulate the table placements.
for j, query_idx in enumerate(query_indices[1:]):
if j % 5 == 0:
# This is a long-running process. We should yield every so often
# to allow other tasks to run on the daemon (e.g., processing
# metrics messages).
await asyncio.sleep(0)

logger.debug("Processing index %d of %d", j, len(query_indices[1:]))

next_top_k: List[BlueprintCandidate] = []
Expand Down
10 changes: 8 additions & 2 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import heapq
import itertools
import json
Expand Down Expand Up @@ -130,8 +131,13 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 5. Run beam search to formulate the rest of the table placements.
for j, cluster in enumerate(clusters[1:]):
if j % 100 == 0:
logger.debug("Processing index %d of %d", j, len(clusters[1:]))
if j % 5 == 0:
# This is a long-running process. We should yield every so often
# to allow other tasks to run on the daemon (e.g., processing
# metrics messages).
await asyncio.sleep(0)

logger.debug("Processing index %d of %d", j, len(clusters[1:]))

next_top_k: List[BlueprintCandidate] = []
tables, queries, _ = cluster
Expand Down