diff --git a/experiments/16-demo/.gitignore b/experiments/16-demo/.gitignore new file mode 100644 index 00000000..1fcb1529 --- /dev/null +++ b/experiments/16-demo/.gitignore @@ -0,0 +1 @@ +out diff --git a/experiments/16-demo/common.sh b/experiments/16-demo/common.sh new file mode 100644 index 00000000..0a47f833 --- /dev/null +++ b/experiments/16-demo/common.sh @@ -0,0 +1,194 @@ +function start_brad_w_ui() { + system_config_file=$1 + physical_config_file=$2 + curr_dir=$(pwd) + + pushd ../../ + brad daemon \ + --physical-config-file $physical_config_file \ + --system-config-file $curr_dir/$system_config_file \ + --schema-name $schema_name \ + --ui \ + & + brad_pid=$! + popd +} + +function cancel_experiment() { + for pid_var in "$@"; do + kill -INT $pid_var + done + kill -INT $brad_pid +} + +function graceful_shutdown() { + for pid_var in "$@"; do + kill -INT $pid_var + done + for pid_var in "$@"; do + wait $pid_var + done + + kill -INT $brad_pid + wait $brad_pid +} + +function terminate_process_group() { + local pid=$1 + local initial_wait_s=$2 + sleep $2 + if kill -0 $pid >/dev/null 2>&1; then + pkill -KILL -P $pid + pkill -KILL $pid + echo "NOTE: Forced process $pid to stop." + else + echo "Process $pid stopped gracefully." + fi +} + +function log_workload_point() { + msg=$1 + now=$(date --utc "+%Y-%m-%d %H:%M:%S") + echo "$now,$msg" >> $COND_OUT/points.log +} + +function pause_for_s_past_timepoint() { + local timepoint="$1" + local wait_s="$2" + + local curr_tp="$(date -u +%s)" + elapsed_s="$(($curr_tp - $timepoint))" + if (( $elapsed_s < $wait_s )); then + leftover_s=$(($wait_s - $elapsed_s)) + >&2 echo "Waiting $leftover_s seconds before continuing..." + sleep $leftover_s + fi +} + +function poll_file_for_event() { + local file="$1" + local event_name="$2" + local timeout_minutes="$3" + local previous_size=$(stat -c %s "$file") + local current_size + local last_line + + local start_time + local elapsed_time + start_time=$(date +%s) + + while true; do + current_size=$(stat -c %s "$file") + + if [[ $current_size -ne $previous_size ]]; then + last_line=$(tail -n 1 "$file") + + if [[ $last_line == *"$event_name"* ]]; then + >&2 echo "Detected new $event_name!" + break + fi + fi + + elapsed_time=$(( $(date +%s) - $start_time )) + if [[ $elapsed_time -ge $((timeout_minutes * 60)) ]]; then + >&2 echo "Timeout reached. Did not detect $event_name within $timeout_minutes minutes." + log_workload_point "timeout_poll_${event_name}" + break + fi + + sleep 30 + done +} + +function extract_named_arguments() { + # Evaluates any environment variables in this script's arguments. This script + # should only be run on trusted input. + orig_args=($@) + for val in "${orig_args[@]}"; do + phys_arg=$(eval "echo $val") + + if [[ $phys_arg =~ --ra-clients=.+ ]]; then + ra_clients=${phys_arg:13} + fi + + if [[ $phys_arg =~ --t-clients-lo=.+ ]]; then + t_clients_lo=${phys_arg:15} + fi + + if [[ $phys_arg =~ --t-clients-hi=.+ ]]; then + t_clients_hi=${phys_arg:15} + fi + + if [[ $phys_arg =~ --ra-query-indexes=.+ ]]; then + ra_query_indexes=${phys_arg:19} + fi + + if [[ $phys_arg =~ --ra-query-bank-file=.+ ]]; then + ra_query_bank_file=${phys_arg:21} + fi + + if [[ $phys_arg =~ --other-query-bank-file=.+ ]]; then + other_query_bank_file=${phys_arg:24} + fi + + if [[ $phys_arg =~ --ra-gap-s=.+ ]]; then + ra_gap_s=${phys_arg:11} + fi + + if [[ $phys_arg =~ --ra-gap-std-s=.+ ]]; then + ra_gap_std_s=${phys_arg:15} + fi + + if [[ $phys_arg =~ --num-front-ends=.+ ]]; then + num_front_ends=${phys_arg:17} + fi + + if [[ $phys_arg =~ --run-for-s=.+ ]]; then + run_for_s=${phys_arg:12} + fi + + if [[ $phys_arg =~ --physical-config-file=.+ ]]; then + physical_config_file=${phys_arg:23} + fi + + if [[ $phys_arg =~ --system-config-file=.+ ]]; then + system_config_file=${phys_arg:21} + fi + + if [[ $phys_arg =~ --skip-replan=.+ ]]; then + skip_replan=${phys_arg:14} + fi + + if [[ $phys_arg =~ --schema-name=.+ ]]; then + schema_name=${phys_arg:14} + fi + + if [[ $phys_arg =~ --dataset-type=.+ ]]; then + dataset_type=${phys_arg:15} + fi + + if [[ $phys_arg =~ --query-sequence-file=.+ ]]; then + query_sequence_file=${phys_arg:22} + fi + + if [[ $phys_arg =~ --snowset-query-frequency-path=.+ ]]; then + snowset_query_frequency_path=${phys_arg:31} + fi + + if [[ $phys_arg =~ --snowset-client-dist-path=.+ ]]; then + snowset_client_dist_path=${phys_arg:27} + fi + + if [[ $phys_arg =~ --snowset-gap-dist-path=.+ ]]; then + snowset_gap_dist_path=${phys_arg:24} + fi + + if [[ $phys_arg =~ --txn-scale-factor=.+ ]]; then + txn_scale_factor=${phys_arg:19} + fi + + if [[ $phys_arg =~ --is-daylong-hd=.+ ]]; then + is_daylong_hd=1 + fi + done +} diff --git a/experiments/16-demo/out/.gitignore b/experiments/16-demo/out/.gitignore new file mode 100644 index 00000000..e69de29b diff --git a/experiments/16-demo/run_scale_down_workload.sh b/experiments/16-demo/run_scale_down_workload.sh new file mode 100755 index 00000000..9e8b61fb --- /dev/null +++ b/experiments/16-demo/run_scale_down_workload.sh @@ -0,0 +1,10 @@ +#! /bin/bash + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source common.sh + +./scale_down_workload_impl.sh \ + --physical-config-file=$1 \ + --system-config-file=scale_down_config.yml \ + --schema-name=imdb_extended_100g diff --git a/experiments/16-demo/scale_down_config.yml b/experiments/16-demo/scale_down_config.yml new file mode 100644 index 00000000..1e6a5959 --- /dev/null +++ b/experiments/16-demo/scale_down_config.yml @@ -0,0 +1,163 @@ +# This file contains configurations that are used by BRAD. These are default +# values and should be customized for specific situations. + +# BRAD's front end servers will listen for client connections on this interface +# and port. If `num_front_ends` is greater than one, subsequent front ends will +# listen on successive ports (e.g., 6584, 6585, etc.). +front_end_interface: "0.0.0.0" +front_end_port: 6583 +num_front_ends: 12 + +# Logging paths. If the value is in ALL_CAPS (with underscores), it is +# interpreted as an environment variable (BRAD will log to the path stored in +# the environment variable). + +# Where BRAD's daemon process will write its logs. +daemon_log_file: /tmp + +# Where BRAD's front end processes will write their logs. +front_end_log_path: /tmp + +# Where BRAD's blueprint planner will write debug logs. +planner_log_path: /tmp + +# Where BRAD's metrics loggers will write their logs. +metrics_log_path: /tmp + +# Probability that each transactional query will be logged. +txn_log_prob: 0.01 + +# Set to a non-zero value enable automatic data syncing. When this is set to 0, +# automatic syncing is disabled. +data_sync_period_seconds: 0 + +# BRAD's front end servers will report their metrics at regular intervals. +front_end_metrics_reporting_period_seconds: 30 +front_end_query_latency_buffer_size: 100 + +# `default` means to use the policy encoded in the blueprint. Other values will +# override the blueprint. +routing_policy: default + +# Whether to disable table movement for benchmark purposes (i.e., keep all +# tables on all engines.) +disable_table_movement: true + +# Epoch length for metrics and forecasting. This is the granularity at which +# metrics/forecasting will be performed. +epoch_length: + weeks: 0 + days: 0 + hours: 0 + minutes: 1 + +# Blueprint planning strategy. +strategy: fp_query_based_beam + +# Used to specify the period of time over which to use data for planning. +# Currrently, this is a "look behind" window for the workload. +planning_window: + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + +# Used to aggregate metrics collected in the planning window. +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + +# Used during planning. +reinterpret_second_as: 1 + +# The query distribution must change by at least this much for a new blueprint +# to be accepted. +query_dist_change_frac: 0.1 + +# The search bound for the provisioning. +max_provisioning_multiplier: 2.5 + +# Flag options for blueprint planning. +use_io_optimized_aurora: true +use_recorded_routing_if_available: true +ensure_tables_together_on_one_engine: true + +# Loads used to prime the system when no information is available. +aurora_initialize_load_fraction: 0.25 +redshift_initialize_load_fraction: 0.25 + +# BRAD will not reduce predicted load lower than these values. Raise these +# values to be more conservative against mispredictions. +aurora_min_load_removal_fraction: 0.8 +redshift_min_load_removal_fraction: 0.8 + +# Blueprint planning performance ceilings. +query_latency_p90_ceiling_s: 30.0 +txn_latency_p90_ceiling_s: 0.030 + +aurora_provisioning_search_distance: 900.0 +redshift_provisioning_search_distance: 900.0 + +# Used for ordering blueprints during planning. +comparator: + type: benefit_perf_ceiling # or `perf_ceiling` + + benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator + weeks: 0 + days: 0 + hours: 24 + minutes: 0 + + penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator + penalty_power: 2 # Only used by the `benefit_perf_ceiling` comparator + +aurora_max_query_factor: 4.0 +aurora_max_query_factor_replace: 10000.0 +redshift_peak_load_threshold: 99.0 +redshift_peak_load_multiplier: 1.5 + +planner_max_workers: 16 + +# Used for precomputed predictions. +std_datasets: + - name: regular + path: workloads/IMDB_100GB/regular_test/ + - name: adhoc + path: workloads/IMDB_100GB/adhoc_test/ + +# Blueprint planning trigger configs. + +triggers: + enabled: false # Change to true when running. + check_period_s: 90 # Triggers are checked every X seconds. + check_period_offset_s: 360 # Wait 6 mins before starting. + observe_new_blueprint_mins: 3 + + elapsed_time: + disabled: true + multiplier: 60 # Multiplier over `planning_window`. + + redshift_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + aurora_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + variable_costs: + disabled: true + threshold: 1.0 + + query_latency_ceiling: + ceiling_s: 30.0 + sustained_epochs: 3 + + txn_latency_ceiling: + ceiling_s: 0.030 + sustained_epochs: 3 + + recent_change: + delay_epochs: 5 diff --git a/experiments/16-demo/scale_down_workload_impl.sh b/experiments/16-demo/scale_down_workload_impl.sh new file mode 100755 index 00000000..eee461c4 --- /dev/null +++ b/experiments/16-demo/scale_down_workload_impl.sh @@ -0,0 +1,57 @@ +#! /bin/bash + +if [ -z $1 ]; then + >&2 echo "Usage: $0 path/to/physical/config.yml" + exit 1 +fi + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source common.sh + +extract_named_arguments $@ +start_brad_w_ui $system_config_file $physical_config_file + +>&2 echo "Waiting for BRAD to start up..." +sleep 30 + +num_clients=12 +starting_clients=4 +COND_OUT=out python3 ../../workloads/IMDB_extended/run_transactions_variable_clients.py \ + --num-clients $num_clients \ + --starting-clients $starting_clients \ + --num-front-ends $num_clients \ + & +txn_pid=$! +>&2 echo "Started T runner (PID $txn_pid)" + +qidx="99,56,32,92,91,49,30,83,94,87,86,76,37,31,46" +qbank_file="../../workloads/IMDB_100GB/regular_test/queries.sql" +ra_gap_s=15 +ra_gap_std_s=5 +COND_OUT=out python3 ../../workloads/IMDB_extended/run_variable_clients.py \ + --num-clients $num_clients \ + --starting-clients $starting_clients \ + --num-front-ends $num_clients \ + --query-indexes $qidx \ + --query-bank-file $qbank_file \ + --avg-gap-s $ra_gap_s \ + --avg-gap-std-s $ra_gap_std_s \ + & +rana_pid=$! +>&2 echo "Started RA runner (PID $rana_pid)" + +function inner_cancel_experiment() { + cancel_experiment $txn_pid $rana_pid + + wait $txn_pid + wait $rana_pid + wait $brad_pid +} + +trap "inner_cancel_experiment" INT +trap "inner_cancel_experiment" TERM + +wait $txn_pid +wait $rana_pid +wait $brad_pid diff --git a/workloads/IMDB_extended/run_transactions_variable_clients.py b/workloads/IMDB_extended/run_transactions_variable_clients.py index abadcee5..1aa3b5c3 100644 --- a/workloads/IMDB_extended/run_transactions_variable_clients.py +++ b/workloads/IMDB_extended/run_transactions_variable_clients.py @@ -1,10 +1,8 @@ import asyncio import argparse import pathlib -import pickle import random import signal -import threading import time import numpy as np import os @@ -12,17 +10,18 @@ import multiprocessing as mp import logging from datetime import datetime, timedelta -from typing import Optional, List +from typing import Optional from brad.config.engine import Engine from brad.config.file import ConfigFile from brad.grpc_client import BradClientError from brad.provisioning.directory import Directory from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff -from brad.utils.time_periods import universal_now from brad.utils import set_up_logging, create_custom_logger from workload_utils.connect import connect_to_db from workload_utils.transaction_worker import TransactionWorker +from workload_utils.pause_controller import PauseController, get_command_line_input +from workload_utils.change_clients_api import serve logger = logging.getLogger(__name__) STARTUP_FAILED = "startup_failed" @@ -376,80 +375,6 @@ def noop_handler(_signal, _frame): latency_file.close() -class PauseController: - # controlling the pause and resume of clients - def __init__( - self, - total_num_clients: int, - pause_semaphore: List[mp.Semaphore], # type: ignore - resume_semaphore: List[mp.Semaphore], # type: ignore - ): - self.total_num_clients = total_num_clients - self.pause_semaphore = pause_semaphore - self.resume_semaphore = resume_semaphore - self.paused_clients: List[int] = [] - self.running_clients: List[int] = list(range(total_num_clients)) - self.num_running_clients: int = total_num_clients - - def adjust_num_running_clients( - self, num_clients: int, verbose: bool = True - ) -> None: - if num_clients > self.total_num_clients: - print( - f"invalid input number of clients {num_clients}, larger than total number of clients" - ) - return - if num_clients == self.num_running_clients: - return - elif num_clients < self.num_running_clients: - pause_clients = np.random.choice( - self.running_clients, - size=self.num_running_clients - num_clients, - replace=False, - ) - for i in pause_clients: - assert ( - i not in self.paused_clients - ), f"trying to pause a client that is already paused: {i}" - if verbose: - print(f"pausing client {i}") - self.pause_semaphore[i].release() - self.pause_semaphore[i].release() - self.paused_clients.append(i) - self.running_clients.remove(i) - self.num_running_clients -= 1 - else: - resume_clients = np.random.choice( - self.paused_clients, - size=num_clients - self.num_running_clients, - replace=False, - ) - for i in resume_clients: - assert ( - i not in self.running_clients - ), f"trying to resume a running client: {i}" - if verbose: - print(f"resuming client {i}") - self.resume_semaphore[i].release() - self.resume_semaphore[i].release() - self.paused_clients.remove(i) - self.running_clients.append(i) - self.num_running_clients += 1 - - -async def get_command_line_input(pause_controller: PauseController) -> None: - while True: - try: - user_input = input() - if user_input.isnumeric(): - num_client = int(user_input) - pause_controller.adjust_num_running_clients(num_client) - elif user_input == "exit": - break - except KeyboardInterrupt: - break - - def main(): parser = argparse.ArgumentParser( "Tool used to run IMDB-extended transactions against BRAD or an ODBC database." @@ -532,44 +457,16 @@ def main(): help="The alpha parameter for the Zipfian distribution. Only used if " "--use-zipfian-ids is `True`. Must be strictly greater than 1. ", ) - # These three arguments are used for the day long experiment. - parser.add_argument( - "--num-client-path", - type=str, - default=None, - help="Path to the distribution of number of clients for each period of a day", - ) - parser.add_argument( - "--num-client-multiplier", - type=int, - default=1, - help="The multiplier to the number of clients for each period of a day", - ) - parser.add_argument( - "--time-scale-factor", - type=int, - default=100, - help="trace 1s of simulation as X seconds in real-time to match the num-concurrent-query", - ) parser.add_argument("--brad-host", type=str, default="localhost") parser.add_argument("--brad-port", type=int, default=6583) parser.add_argument("--num-front-ends", type=int, default=1) + parser.add_argument("--adjust-clients-port", type=int, default=8585) + parser.add_argument("--interactive", action="store_true") + parser.add_argument("--starting-clients", type=int) args = parser.parse_args() set_up_logging() - - if ( - args.num_client_path is not None - and os.path.exists(args.num_client_path) - and args.time_scale_factor is not None - ): - # we can only set the num_concurrent_query trace in presence of time_scale_factor - with open(args.num_client_path, "rb") as f: - num_client_trace = pickle.load(f) - logger.info("[T] Preparing to run a time varying workload") - else: - num_client_trace = None - logger.info("[T] Preparing to run a steady workload") + logger.info("[T] Preparing to run a steady workload") mgr = mp.Manager() start_queue = [mgr.Queue() for _ in range(args.num_clients)] @@ -641,100 +538,28 @@ def main(): logger.info("Transactional client abort complete.") return - if num_client_trace is not None: - logger.info("[T] Scaling number of clients by %d", args.num_client_multiplier) - for k in num_client_trace.keys(): - num_client_trace[k] *= args.num_client_multiplier - - assert args.time_scale_factor is not None, "Need to set --time-scale-factor" - assert args.run_for_s is not None, "Need to set --run-for-s" - - execute_start_time = universal_now() - num_running_client = 0 - num_client_required = min(num_client_trace[0], args.num_clients) - for add_client in range(num_running_client, num_client_required): - logger.info("[T] Telling client no. %d to start.", add_client) - control_semaphore[add_client].release() - num_running_client += 1 - - finished_one_day = True - curr_day_start_time = datetime.now().astimezone(pytz.utc) - for time_of_day, num_expected_clients in num_client_trace.items(): - if time_of_day == 0: - continue - # at this time_of_day start/shut-down more clients - time_in_s = time_of_day / args.time_scale_factor - now = datetime.now().astimezone(pytz.utc) - curr_time_in_s = (now - curr_day_start_time).total_seconds() - total_exec_time_in_s = (now - execute_start_time).total_seconds() - if args.run_for_s <= total_exec_time_in_s: - finished_one_day = False - break - if args.run_for_s - total_exec_time_in_s <= (time_in_s - curr_time_in_s): - wait_time = args.run_for_s - total_exec_time_in_s - if wait_time > 0: - time.sleep(wait_time) - finished_one_day = False - break - time.sleep(time_in_s - curr_time_in_s) - num_client_required = min(num_expected_clients, args.num_clients) - if num_client_required > num_running_client: - # starting additional clients - for add_client in range(num_running_client, num_client_required): - logger.info("[T] Telling client no. %d to start.", add_client) - control_semaphore[add_client].release() - num_running_client += 1 - elif num_running_client > num_client_required: - # shutting down clients - for delete_client in range(num_running_client, num_client_required, -1): - logger.info( - "[T] Telling client no. %d to stop.", (delete_client - 1) - ) - control_semaphore[delete_client - 1].release() - num_running_client -= 1 - now = datetime.now().astimezone(pytz.utc) - total_exec_time_in_s = (now - execute_start_time).total_seconds() - if finished_one_day: - logger.info( - "[T] Finished executing one day of workload in %d s, will ignore the rest of " - "pre-set execution time %d s", - total_exec_time_in_s, - args.run_for_s, - ) - else: - logger.info( - "[T] Executed ended but unable to finish executing the trace of a full day within %d s", - args.run_for_s, - ) - - else: - logger.info("[T] Telling all %d clients to start.", args.num_clients) - for idx in range(args.num_clients): - control_semaphore[idx].release() + logger.info("[T] Telling all %d clients to start.", args.num_clients) + for idx in range(args.num_clients): + control_semaphore[idx].release() pause_controller = PauseController( args.num_clients, pause_semaphore, resume_semaphore ) - if args.run_for_s is not None and num_client_trace is None: - logger.info("[T] Letting the experiment run for %d seconds...", args.run_for_s) - time.sleep(args.run_for_s) + if args.starting_clients is not None: + pause_controller.adjust_num_running_clients(args.starting_clients) - elif num_client_trace is None: - logger.info("[T] Waiting until requested to stop... (hit Ctrl-C)") + if args.interactive: logger.info( - "type in an integer smaller than total number of clients and press enter to change number of running client, type in exit to stop dynamically adjusting number of clients...", + "[T] type in an integer smaller than total number of clients and press enter to change number of running client, type in exit to stop dynamically adjusting number of clients...", ) - asyncio.run(get_command_line_input(pause_controller)) - should_shutdown = threading.Event() - - def signal_handler(_signal, _frame): - should_shutdown.set() - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - should_shutdown.wait() + get_command_line_input(pause_controller) + else: + logger.info( + "[T] Listening on %d until requested to stop... (hit Ctrl-C)", + args.adjust_clients_port, + ) + serve(pause_controller, port=args.adjust_clients_port) logger.info("[T] Stopping clients...") for idx in range(args.num_clients): diff --git a/workloads/IMDB_extended/run_variable_clients.py b/workloads/IMDB_extended/run_variable_clients.py index c3f05491..09066b34 100644 --- a/workloads/IMDB_extended/run_variable_clients.py +++ b/workloads/IMDB_extended/run_variable_clients.py @@ -1,91 +1,75 @@ -import asyncio import argparse +import copy +import multiprocessing as mp +import time +import os +import numpy as np import pathlib -import pickle import random +import sys import signal -import threading -import time -import numpy as np -import os import pytz -import multiprocessing as mp import logging +from typing import List, Optional +import numpy.typing as npt from datetime import datetime, timedelta -from typing import Optional, List +from workload_utils.connect import connect_to_db +from workload_utils.pause_controller import PauseController, get_command_line_input +from workload_utils.change_clients_api import serve from brad.config.engine import Engine -from brad.config.file import ConfigFile from brad.grpc_client import BradClientError -from brad.provisioning.directory import Directory from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff -from brad.utils.time_periods import universal_now from brad.utils import set_up_logging, create_custom_logger -from workload_utils.connect import connect_to_db -from workload_utils.transaction_worker import TransactionWorker logger = logging.getLogger(__name__) +EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc) +ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"] + STARTUP_FAILED = "startup_failed" +def get_time_of_the_day_unsimulated( + now: datetime, time_scale_factor: Optional[int] +) -> int: + # Get the time of the day in minute in real-time + assert time_scale_factor is not None, "need to specify args.time_scale_factor" + # time_diff in minutes after scaling + time_diff = int((now - EXECUTE_START_TIME).total_seconds() / 60 * time_scale_factor) + time_unsimulated = time_diff % (24 * 60) # time of the day in minutes + return time_unsimulated + + +def time_in_minute_to_datetime_str(time_unsimulated: Optional[int]) -> str: + if time_unsimulated is None: + return "xxx" + hour = time_unsimulated // 60 + assert hour < 24 + minute = time_unsimulated % 60 + hour_str = str(hour) if hour >= 10 else "0" + str(hour) + minute_str = str(minute) if minute >= 10 else "0" + str(minute) + return f"{hour_str}:{minute_str}" + + def runner( - args, - worker_idx: int, - directory: Optional[Directory], + runner_idx: int, start_queue: mp.Queue, control_semaphore: mp.Semaphore, # type: ignore pause_semaphore: mp.Semaphore, # type: ignore resume_semaphore: mp.Semaphore, # type: ignore + args, + query_bank: List[str], + queries: List[int], + query_frequency: Optional[npt.NDArray] = None, + execution_gap_dist: Optional[npt.NDArray] = None, ) -> None: - """ - Meant to be launched as a subprocess with multiprocessing. - """ - - def noop_handler(_signal, _frame): + def noop(_signal, _frame): pass - signal.signal(signal.SIGINT, noop_handler) + signal.signal(signal.SIGINT, noop) set_up_logging() - worker = TransactionWorker( - worker_idx, - args.seed ^ worker_idx, - args.scale_factor, - args.dataset_type, - args.use_zipfian_ids, - args.zipfian_alpha, - ) - - txn_prng = random.Random(~(args.seed ^ worker_idx)) - transactions = [ - worker.purchase_tickets, - worker.add_new_showing, - worker.edit_movie_note, - ] - transaction_weights = [ - 0.70, - 0.20, - 0.10, - ] - lookup_theatre_id_by_name = 0.8 - txn_indexes = list(range(len(transactions))) - commits = [0 for _ in range(len(transactions))] - aborts = [0 for _ in range(len(transactions))] - - # Connect and set the isolation level. - try: - db = connect_to_db( - args, worker_idx, direct_engine=Engine.Aurora, directory=directory - ) - db.execute_sync( - f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {args.isolation_level}" - ) - except BradClientError as ex: - logger.error("[T %d] Failed to connect to BRAD: %s", worker_idx, str(ex)) - start_queue.put_nowait(STARTUP_FAILED) - return - # For printing out results. if "COND_OUT" in os.environ: # pylint: disable-next=import-error @@ -98,30 +82,74 @@ def noop_handler(_signal, _frame): verbose_log_dir = out_dir / "verbose_logs" verbose_log_dir.mkdir(exist_ok=True) verbose_logger = create_custom_logger( - "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") + "repeating_analytics_verbose", str(verbose_log_dir / f"runner_{runner_idx}.log") ) verbose_logger.info("Workload starting...") - # Signal that we are ready to start and wait for other clients. - start_queue.put("") - control_semaphore.acquire() # type: ignore + if args.engine is not None: + engine = Engine.from_str(args.engine) + else: + engine = None + + try: + database = connect_to_db( + args, + runner_idx, + direct_engine=engine, + # Ensure we disable the result cache if we are running directly on + # Redshift. + disable_direct_redshift_result_cache=True, + ) + except BradClientError as ex: + logger.error("[RA %d] Failed to connect to BRAD: %s", runner_idx, str(ex)) + start_queue.put_nowait(STARTUP_FAILED) + return + + if query_frequency is not None: + # There are no predictions for query 48 in our test set (query cannot be parsed). + # Set its frequency to 0 so it is never used. + query_frequency[48] = 0.0 + + query_frequency = query_frequency[queries] + query_frequency = query_frequency / np.sum(query_frequency) + + exec_count = 0 + file = open( + out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), + "w", + encoding="UTF-8", + ) - txn_exec_count = 0 - rand_backoff = None - overall_start = time.time() try: - latency_file = open( - out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" + print( + "timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine", + file=file, + flush=True, + ) + + prng = random.Random(args.seed ^ runner_idx) + rand_backoff = None + + logger.info( + "[Repeating Analytics Runner %d] Queries to run: %s", + runner_idx, + queries, ) - print("txn_idx,timestamp,run_time_s", file=latency_file) + query_order_main = queries.copy() + prng.shuffle(query_order_main) + query_order = query_order_main.copy() + + first_run = True + + # Signal that we're ready to start and wait for the controller. + logger.info("[RA] Runner %d is ready to start running.", runner_idx) + start_queue.put_nowait("") + control_semaphore.acquire() # type: ignore pause = False while True: # Note that `False` means to not block. should_exit = control_semaphore.acquire(False) # type: ignore - if should_exit: - logger.info("T Runner %d is exiting.", worker_idx) - break should_pause = pause_semaphore.acquire(False) # type: ignore if should_pause: pause = True @@ -132,160 +160,139 @@ def noop_handler(_signal, _frame): else: time.sleep(1) continue + if should_exit: + logger.info("[RA] Runner %d is exiting.", runner_idx) + break + + if execution_gap_dist is not None: + now = datetime.now().astimezone(pytz.utc) + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + wait_for_s = execution_gap_dist[ + int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) + ] + time.sleep(wait_for_s) + elif args.avg_gap_s is not None: + if first_run: + # We wait a uniformly random amount of time at the beginning + # to stagger queries across all the clients that will run + # (e.g., to avoid having all clients issue queries at the + # same time). + first_run = False + wait_for_s = prng.uniform(0.0, args.avg_gap_s) + time.sleep(wait_for_s) + else: + # Wait times are normally distributed if execution_gap_dist is not provided. + wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) + if wait_for_s < 0.0: + wait_for_s = 0.0 + time.sleep(wait_for_s) + + if query_frequency is not None: + qidx = prng.choices(queries, list(query_frequency))[0] + else: + if len(query_order) == 0: + query_order = query_order_main.copy() - txn_exec_count += 1 - txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] - txn = transactions[txn_idx] + qidx = query_order.pop() + logger.debug("Executing qidx: %d", qidx) + query = query_bank[qidx] - now = datetime.now().astimezone(pytz.utc) - txn_start = time.time() try: - # pylint: disable-next=comparison-with-callable - if txn == worker.purchase_tickets: - succeeded = txn( - db, - select_using_name=txn_prng.random() < lookup_theatre_id_by_name, + engine = None + now = datetime.now().astimezone(pytz.utc) + if args.time_scale_factor is not None: + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + time_unsimulated_str = time_in_minute_to_datetime_str( + time_unsimulated ) else: - succeeded = txn(db) + time_unsimulated_str = "xxx" + + verbose_logger.info("[RA %d] Issuing query %d", runner_idx, qidx) + start = time.time() + _, engine = database.execute_sync_with_engine(query) + end = time.time() + print( + "{},{},{},{},{},{}".format( + now, + (now - EXECUTE_START_TIME).total_seconds(), + time_unsimulated_str, + qidx, + end - start, + engine.value, + ), + file=file, + flush=True, + ) + + if exec_count % 20 == 0: + # To avoid data loss if this script crashes. + os.fsync(file.fileno()) + exec_count += 1 if rand_backoff is not None: - logger.info("[T %d] Continued after transient errors.", worker_idx) + logger.info("[RA %d] Continued after transient errors.", runner_idx) rand_backoff = None except BradClientError as ex: - succeeded = False if ex.is_transient(): - verbose_logger.warning("Transient txn error: %s", ex.message()) + verbose_logger.warning("Transient query error: %s", ex.message()) if rand_backoff is None: rand_backoff = RandomizedExponentialBackoff( max_retries=100, - base_delay_s=0.1, + base_delay_s=1.0, max_delay_s=timedelta(minutes=1).total_seconds(), ) logger.info( - "[T %d] Backing off due to transient errors.", - worker_idx, + "[RA %d] Backing off due to transient errors.", runner_idx ) # Delay retrying in the case of a transient error (this # happens during blueprint transitions). wait_s = rand_backoff.wait_time_s() if wait_s is None: - logger.info( - "[T %d] Aborting benchmark. Too many transient errors.", - worker_idx, + logger.error( + "[RA %d] Aborting benchmark. Too many transient errors.", + runner_idx, ) break verbose_logger.info( - "[T %d] Backing off for %.4f seconds...", worker_idx, wait_s + "[RA %d] Backing off for %.4f seconds...", runner_idx, wait_s ) time.sleep(wait_s) else: - logger.error( - "[T %d] Encountered an unexpected `BradClientError`. Aborting the workload...", - worker_idx, - ) - raise - except: - succeeded = False - logger.error( - "[T %d] Encountered an unexpected error. Aborting the workload...", - worker_idx, - ) - raise - txn_end = time.time() - - # Record metrics. - if succeeded: - commits[txn_idx] += 1 - else: - aborts[txn_idx] += 1 - - if txn_prng.random() < args.latency_sample_prob: - print( - "{},{},{}".format(txn_idx, now, txn_end - txn_start), - file=latency_file, - ) - if txn_exec_count > 20_000: - latency_file.flush() - txn_exec_count = 0 - - # Warn if the abort rate is high. - total_aborts = sum(aborts) - total_commits = sum(commits) - abort_rate = total_aborts / (total_aborts + total_commits) - if abort_rate > 0.15: - logger.info( - "[T %d] Abort rate is higher than expected ({%4f}).", - worker_idx, - abort_rate, - ) + logger.error("Unexpected query error: %s", ex.message()) finally: - overall_end = time.time() - logger.info("[T %d] Done running transactions.", worker_idx) - latency_file.close() - - with open( - out_dir / "oltp_stats_{}.csv".format(worker_idx), "w", encoding="UTF-8" - ) as file: - print("stat,value", file=file) - print(f"overall_run_time_s,{overall_end - overall_start}", file=file) - print(f"purchase_commits,{commits[0]}", file=file) - print(f"add_showing_commits,{commits[1]}", file=file) - print(f"edit_note_commits,{commits[2]}", file=file) - print(f"purchase_aborts,{aborts[0]}", file=file) - print(f"add_showing_aborts,{aborts[1]}", file=file) - print(f"edit_note_aborts,{aborts[2]}", file=file) - - db.close_sync() + os.fsync(file.fileno()) + file.close() + database.close_sync() + logger.info("[RA] Runner %d has exited.", runner_idx) def simulation_runner( - args, - worker_idx: int, + all_query_runtime: npt.NDArray, + runner_idx: int, start_queue: mp.Queue, control_semaphore: mp.Semaphore, # type: ignore pause_semaphore: mp.Semaphore, # type: ignore resume_semaphore: mp.Semaphore, # type: ignore + args, + queries: List[int], + query_frequency_original: Optional[npt.NDArray] = None, + execution_gap_dist: Optional[npt.NDArray] = None, + wait_for_execute: bool = False, ) -> None: - """ - Meant to be launched as a subprocess with multiprocessing. - """ - - def noop_handler(_signal, _frame): + def noop(_signal, _frame): pass - signal.signal(signal.SIGINT, noop_handler) - - set_up_logging() - - worker = TransactionWorker( - worker_idx, - args.seed ^ worker_idx, - args.scale_factor, - args.dataset_type, - args.use_zipfian_ids, - args.zipfian_alpha, - ) - - txn_prng = random.Random(~(args.seed ^ worker_idx)) - transactions = [ - worker.purchase_tickets, - worker.add_new_showing, - worker.edit_movie_note, - ] - transaction_weights = [ - 0.70, - 0.20, - 0.10, - ] - txn_indexes = list(range(len(transactions))) - commits = [0 for _ in range(len(transactions))] - aborts = [0 for _ in range(len(transactions))] + signal.signal(signal.SIGINT, noop) # For printing out results. if "COND_OUT" in os.environ: @@ -296,283 +303,329 @@ def noop_handler(_signal, _frame): else: out_dir = pathlib.Path(".") - verbose_log_dir = out_dir / "verbose_logs" - verbose_log_dir.mkdir(exist_ok=True) - verbose_logger = create_custom_logger( - "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") - ) - verbose_logger.info("Workload starting...") - - # Signal that we are ready to start and wait for other clients. - start_queue.put("") - control_semaphore.acquire() # type: ignore - - txn_exec_count = 0 - rand_backoff = None - - latency_file = open( - out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" - ) - print("txn_idx,timestamp,run_time_s", file=latency_file) - - pause = False - while True: - # Note that `False` means to not block. - should_exit = control_semaphore.acquire(False) # type: ignore - if should_exit: - logger.info("T Runner %d is exiting.", worker_idx) - break - should_pause = pause_semaphore.acquire(False) # type: ignore - if should_pause: - pause = True - if pause: - should_resume = resume_semaphore.acquire(False) # type: ignore - if should_resume: - pause = False - else: - time.sleep(1) - continue - - txn_exec_count += 1 - txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] - - now = datetime.now().astimezone(pytz.utc) + if query_frequency_original is not None: + query_frequency = copy.deepcopy(query_frequency_original) + # There are no predictions for query 48 in our test set (query cannot be parsed). + # Set its frequency to 0 so it is never used. + query_frequency[48] = 0.0 + query_frequency = query_frequency[queries] + query_frequency = query_frequency / np.sum(query_frequency) + else: + query_frequency = None + + with open( + out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), + "w", + encoding="UTF-8", + ) as file: + print( + "timestamp,time_since_execution,time_of_day,query_idx,run_time_s,engine", + file=file, + flush=True, + ) - succeeded = np.random.choice([True, False], p=[0.95, 0.05]) + prng = random.Random(args.seed ^ runner_idx) - if rand_backoff is not None: - logger.info("[T %d] Continued after transient errors.", worker_idx) - rand_backoff = None + logger.info( + "[Repeating Analytics Runner %d] Queries to run: %s", + runner_idx, + queries, + ) + query_order = queries.copy() + prng.shuffle(query_order) - time.sleep(0.01) + # Signal that we're ready to start and wait for the controller. + start_queue.put_nowait("") + control_semaphore.acquire() # type: ignore - # Record metrics. - if succeeded: - commits[txn_idx] += 1 - else: - aborts[txn_idx] += 1 + pause = False + while True: + should_exit = control_semaphore.acquire(False) # type: ignore + if should_exit: + break + should_pause = pause_semaphore.acquire(False) # type: ignore + if should_pause: + pause = True + if pause: + should_resume = resume_semaphore.acquire(False) # type: ignore + if should_resume: + pause = False + else: + time.sleep(1) + continue + if execution_gap_dist is not None: + now = datetime.now().astimezone(pytz.utc) + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + wait_for_s = execution_gap_dist[ + int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) + ] + time.sleep(wait_for_s) + elif args.avg_gap_s is not None: + # Wait times are normally distributed if execution_gap_dist is not provided. + wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) + if wait_for_s < 0.0: + wait_for_s = 0.0 + time.sleep(wait_for_s) + + if query_frequency is not None: + qidx = prng.choices(queries, list(query_frequency))[0] + else: + if len(query_order) == 0: + query_order = queries.copy() + prng.shuffle(query_order) + + qidx = query_order.pop() + logger.debug("Executing qidx: %d", qidx) + # using the average of the best two engines as approximation of brad runtime + runtime = ( + np.sum(all_query_runtime[qidx]) - np.min(all_query_runtime[qidx]) + ) / 2 + if wait_for_execute: + time.sleep(runtime) + engine = np.argmin(all_query_runtime[qidx]) - if txn_prng.random() < args.latency_sample_prob: + now = datetime.now().astimezone(pytz.utc) + if args.time_scale_factor is not None: + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + time_unsimulated_str = time_in_minute_to_datetime_str(time_unsimulated) + else: + time_unsimulated_str = "xxx" print( - "{},{},{}".format(txn_idx, now, 0.01), - file=latency_file, + "{},{},{},{},{},{}".format( + now, + (now - EXECUTE_START_TIME).total_seconds(), + time_unsimulated_str, + qidx, + runtime, + ENGINE_NAMES[engine], + ), + file=file, + flush=True, ) - if txn_exec_count > 20_000: - latency_file.flush() - txn_exec_count = 0 - - # Warn if the abort rate is high. - total_aborts = sum(aborts) - total_commits = sum(commits) - abort_rate = total_aborts / (total_aborts + total_commits) - if abort_rate > 0.15: - logger.info( - "[T %d] Abort rate is higher than expected ({%4f}).", - worker_idx, - abort_rate, - ) - logger.info("[T %d] Done running transactions.", worker_idx) - latency_file.close() +def run_warmup(args, query_bank: List[str], queries: List[int]): + if args.engine is not None: + engine = Engine.from_str(args.engine) + else: + engine = None + + database = connect_to_db( + args, + worker_index=0, + direct_engine=engine, + # Ensure we disable the result cache if we are running directly on + # Redshift. + disable_direct_redshift_result_cache=True, + ) -class PauseController: - # controlling the pause and resume of clients - def __init__( - self, - total_num_clients: int, - pause_semaphore: List[mp.Semaphore], # type: ignore - resume_semaphore: List[mp.Semaphore], # type: ignore - ): - self.total_num_clients = total_num_clients - self.pause_semaphore = pause_semaphore - self.resume_semaphore = resume_semaphore - self.paused_clients: List[int] = [] - self.running_clients: List[int] = list(range(total_num_clients)) - self.num_running_clients: int = total_num_clients - - def adjust_num_running_clients( - self, num_clients: int, verbose: bool = True - ) -> None: - if num_clients > self.total_num_clients: - print( - f"invalid input number of clients {num_clients}, larger than total number of clients" - ) - return - if num_clients == self.num_running_clients: - return - elif num_clients < self.num_running_clients: - pause_clients = np.random.choice( - self.running_clients, - size=self.num_running_clients - num_clients, - replace=False, - ) - for i in pause_clients: - assert ( - i not in self.paused_clients - ), f"trying to pause a client that is already paused: {i}" - if verbose: - print(f"pausing client {i}") - self.pause_semaphore[i].release() - self.pause_semaphore[i].release() - self.paused_clients.append(i) - self.running_clients.remove(i) - self.num_running_clients -= 1 - else: - resume_clients = np.random.choice( - self.paused_clients, - size=num_clients - self.num_running_clients, - replace=False, - ) - for i in resume_clients: - assert ( - i not in self.running_clients - ), f"trying to resume a running client: {i}" - if verbose: - print(f"resuming client {i}") - self.resume_semaphore[i].release() - self.resume_semaphore[i].release() - self.paused_clients.remove(i) - self.running_clients.append(i) - self.num_running_clients += 1 - - -async def get_command_line_input(pause_controller: PauseController) -> None: - while True: - try: - user_input = input() - if user_input.isnumeric(): - num_client = int(user_input) - pause_controller.adjust_num_running_clients(num_client) - elif user_input == "exit": - break - except KeyboardInterrupt: - break + # For printing out results. + if "COND_OUT" in os.environ: + # pylint: disable-next=import-error + import conductor.lib as cond + + out_dir = cond.get_output_path() + else: + out_dir = pathlib.Path(".") + + try: + print( + f"Starting warmup pass (will run {args.run_warmup_times} times)...", + file=sys.stderr, + flush=True, + ) + with open( + out_dir / "repeating_olap_batch_warmup.csv", "w", encoding="UTF-8" + ) as file: + print("timestamp,query_idx,run_time_s,engine", file=file) + for _ in range(args.run_warmup_times): + for idx, qidx in enumerate(queries): + try: + engine = None + query = query_bank[qidx] + now = datetime.now().astimezone(pytz.utc) + start = time.time() + _, engine = database.execute_sync_with_engine(query) + end = time.time() + run_time_s = end - start + print( + "Warmed up {} of {}. Run time (s): {}".format( + idx + 1, len(queries), run_time_s + ), + file=sys.stderr, + flush=True, + ) + print( + "{},{},{},{}".format( + now, + qidx, + run_time_s, + engine.value if engine is not None else "unknown", + ), + file=file, + flush=True, + ) + except BradClientError as ex: + if ex.is_transient(): + print( + "Transient query error:", + ex.message(), + flush=True, + file=sys.stderr, + ) + else: + print( + "Unexpected query error:", + ex.message(), + flush=True, + file=sys.stderr, + ) + finally: + database.close_sync() def main(): - parser = argparse.ArgumentParser( - "Tool used to run IMDB-extended transactions against BRAD or an ODBC database." - ) - parser.add_argument( - "--run-for-s", - type=int, - help="How long to run the workload for. If unset, the experiment will run until Ctrl-C.", - ) + parser = argparse.ArgumentParser() + parser.add_argument("--brad-host", type=str, default="localhost") + parser.add_argument("--brad-port", type=int, default=6583) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument("--num-front-ends", type=int, default=1) + parser.add_argument("--run-warmup", action="store_true") parser.add_argument( "--run-simulation", action="store_true", help="Run the simulation instead of actual execution.", ) parser.add_argument( - "--num-clients", - type=int, - default=1, - help="The number of transactional clients.", - ) - parser.add_argument("--client-offset", type=int, default=0) - parser.add_argument( - "--seed", type=int, default=42, help="Random seed for reproducibility." + "--wait-for-execute-sim", + action="store_true", + help="Waiting for execution in simulation?", ) parser.add_argument( - "--cstr-var", + "--query-runtime-path", type=str, - help="Environment variable that holds a ODBC connection string. Set to connect directly (i.e., not through BRAD)", + default=None, + help="path to the query runtime numpy file", ) parser.add_argument( - "--scale-factor", + "--run-warmup-times", type=int, default=1, - help="The scale factor used to generate the dataset.", + help="Run the warmup query list this many times.", ) parser.add_argument( - "--isolation-level", + "--cstr-var", type=str, - default="REPEATABLE READ", - help="The isolation level to use when running the transactions.", + help="Set to connect via ODBC instead of the BRAD client (for use with other baselines).", ) parser.add_argument( - "--brad-direct", - action="store_true", - help="Set to connect directly to Aurora via BRAD's config.", + "--query-bank-file", type=str, required=True, help="Path to a query bank." ) parser.add_argument( - "--config-file", + "--time-scale-factor", type=str, - help="The BRAD config file (if --brad-direct is used).", + required=False, + help="scale the machine time to time of the day", ) parser.add_argument( - "--schema-name", + "--query-frequency-path", type=str, - help="The schema name to use, if connecting directly.", - ) - parser.add_argument( - "--latency-sample-prob", - type=float, - default=0.01, - help="The probability that a transaction's latency will be recorded.", + default=None, + help="path to the frequency to draw each query in query bank", ) parser.add_argument( - "--dataset-type", - choices=["original", "20gb", "100gb"], - default="original", - help="This controls the range of reads the transaction worker performs, " - "depending on the dataset size.", + "--num-client-path", + type=str, + default=None, + help="Path to the distribution of number of clients for each period of a day", ) + parser.add_argument("--num-clients", type=int, default=1) + parser.add_argument("--client-offset", type=int, default=0) + parser.add_argument("--avg-gap-s", type=float) + parser.add_argument("--avg-gap-std-s", type=float, default=0.5) + parser.add_argument("--query-indexes", type=str) parser.add_argument( - "--use-zipfian-ids", + "--brad-direct", action="store_true", - help="Whether the transaction worker should draw movie and theatre IDs " - "from a Zipfian distribution.", + help="Set to connect directly to Aurora via BRAD's config.", ) parser.add_argument( - "--zipfian-alpha", - type=float, - default=1.1, - help="The alpha parameter for the Zipfian distribution. Only used if " - "--use-zipfian-ids is `True`. Must be strictly greater than 1. ", + "--config-file", + type=str, + help="The BRAD config file (if --brad-direct is used).", ) - # These three arguments are used for the day long experiment. parser.add_argument( - "--num-client-path", + "--schema-name", type=str, - default=None, - help="Path to the distribution of number of clients for each period of a day", + help="The schema name to use, if connecting directly.", ) parser.add_argument( - "--num-client-multiplier", - type=int, - default=1, - help="The multiplier to the number of clients for each period of a day", + "--engine", type=str, help="The engine to use, if connecting directly." ) + parser.add_argument("--run-for-s", type=int, help="If set, run for this long.") parser.add_argument( - "--time-scale-factor", + "--ff-trace-clients", type=int, - default=100, - help="trace 1s of simulation as X seconds in real-time to match the num-concurrent-query", + help="Start the client trace at the given number of clients. Used for debugging only.", ) - parser.add_argument("--brad-host", type=str, default="localhost") - parser.add_argument("--brad-port", type=int, default=6583) - parser.add_argument("--num-front-ends", type=int, default=1) + parser.add_argument("--adjust-clients-port", type=int, default=8586) + parser.add_argument("--interactive", action="store_true") + parser.add_argument("--starting-clients", type=int) args = parser.parse_args() set_up_logging() - if ( - args.num_client_path is not None - and os.path.exists(args.num_client_path) - and args.time_scale_factor is not None + logger.info( + "[Serial RA] Using query bank %s. Query indices: %s", + args.query_bank_file, + args.query_indexes, + ) + + with open(args.query_bank_file, "r", encoding="UTF-8") as file: + query_bank = [line.strip() for line in file] + + if args.query_frequency_path is not None and os.path.exists( + args.query_frequency_path ): - # we can only set the num_concurrent_query trace in presence of time_scale_factor - with open(args.num_client_path, "rb") as f: - num_client_trace = pickle.load(f) - logger.info("[T] Preparing to run a time varying workload") + query_frequency = np.load(args.query_frequency_path) + assert len(query_frequency) == len( + query_bank + ), "query_frequency size does not match total number of queries" else: - num_client_trace = None - logger.info("[T] Preparing to run a steady workload") + query_frequency = None + + execution_gap_dist = None + + if args.query_indexes is None: + queries = list(range(len(query_bank))) + else: + queries = list(map(int, args.query_indexes.split(","))) + + for qidx in queries: + assert qidx < len(query_bank) + assert qidx >= 0 + if args.run_warmup: + run_warmup(args, query_bank, queries) + return + + # Our control protocol is as follows. + # - Runner processes write to their `start_queue` when they have finished + # setting up and are ready to start running. They then wait on the control + # semaphore. + # - The control process blocks and waits on each `start_queue` to ensure + # runners can start together (if needed). + # - The control process signals the control semaphore twice. Once to tell a + # runner to start, once to tell it to stop. + # - If there is an error, a runner is free to exit as long as they have + # written to `start_queue`. mgr = mp.Manager() start_queue = [mgr.Queue() for _ in range(args.num_clients)] + # N.B. `value = 0` since we use this for synchronization, not mutual exclusion. # pylint: disable-next=no-member control_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] # pylint: disable-next=no-member @@ -580,49 +633,57 @@ def main(): # pylint: disable-next=no-member resume_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] - if args.brad_direct: - assert args.config_file is not None - assert args.schema_name is not None - config = ConfigFile.load(args.config_file) - directory = Directory(config) - asyncio.run(directory.refresh()) - else: - directory = None - - clients = [] if args.run_simulation: + assert ( + args.query_runtime_path is not None + ), "must provide query runtime to run simulation" + all_query_runtime = np.load(args.query_runtime_path) + assert all_query_runtime.shape == ( + len(query_bank), + 3, + ), "incorrect query runtime file format" + processes = [] for idx in range(args.num_clients): p = mp.Process( target=simulation_runner, args=( - args, + all_query_runtime, idx, start_queue[idx], control_semaphore[idx], pause_semaphore[idx], resume_semaphore[idx], + args, + queries, + query_frequency, + execution_gap_dist, + args.wait_for_execute_sim, ), ) p.start() - clients.append(p) + processes.append(p) else: + processes = [] for idx in range(args.num_clients): p = mp.Process( target=runner, args=( - args, idx, - directory, start_queue[idx], control_semaphore[idx], pause_semaphore[idx], resume_semaphore[idx], + args, + query_bank, + queries, + query_frequency, + execution_gap_dist, ), ) p.start() - clients.append(p) + processes.append(p) - logger.info("[T] Waiting for startup...") + logger.info("[RA] Waiting for startup...") one_startup_failed = False for i in range(args.num_clients): msg = start_queue[i].get() @@ -631,123 +692,63 @@ def main(): if one_startup_failed: logger.error( - "At least one transactional runner failed to start up. Aborting the experiment.", + "[RA] At least one runner failed to start up. Aborting the experiment." ) for i in range(args.num_clients): + # Ideally we should be able to release twice atomically. control_semaphore[i].release() control_semaphore[i].release() - for p in clients: + for p in processes: p.join() - logger.info("Transactional client abort complete.") + logger.info("[RA] Overall abort complete.") return - if num_client_trace is not None: - logger.info("[T] Scaling number of clients by %d", args.num_client_multiplier) - for k in num_client_trace.keys(): - num_client_trace[k] *= args.num_client_multiplier - - assert args.time_scale_factor is not None, "Need to set --time-scale-factor" - assert args.run_for_s is not None, "Need to set --run-for-s" - - execute_start_time = universal_now() - num_running_client = 0 - num_client_required = min(num_client_trace[0], args.num_clients) - for add_client in range(num_running_client, num_client_required): - logger.info("[T] Telling client no. %d to start.", add_client) - control_semaphore[add_client].release() - num_running_client += 1 - - finished_one_day = True - curr_day_start_time = datetime.now().astimezone(pytz.utc) - for time_of_day, num_expected_clients in num_client_trace.items(): - if time_of_day == 0: - continue - # at this time_of_day start/shut-down more clients - time_in_s = time_of_day / args.time_scale_factor - now = datetime.now().astimezone(pytz.utc) - curr_time_in_s = (now - curr_day_start_time).total_seconds() - total_exec_time_in_s = (now - execute_start_time).total_seconds() - if args.run_for_s <= total_exec_time_in_s: - finished_one_day = False - break - if args.run_for_s - total_exec_time_in_s <= (time_in_s - curr_time_in_s): - wait_time = args.run_for_s - total_exec_time_in_s - if wait_time > 0: - time.sleep(wait_time) - finished_one_day = False - break - time.sleep(time_in_s - curr_time_in_s) - num_client_required = min(num_expected_clients, args.num_clients) - if num_client_required > num_running_client: - # starting additional clients - for add_client in range(num_running_client, num_client_required): - logger.info("[T] Telling client no. %d to start.", add_client) - control_semaphore[add_client].release() - num_running_client += 1 - elif num_running_client > num_client_required: - # shutting down clients - for delete_client in range(num_running_client, num_client_required, -1): - logger.info( - "[T] Telling client no. %d to stop.", (delete_client - 1) - ) - control_semaphore[delete_client - 1].release() - num_running_client -= 1 - now = datetime.now().astimezone(pytz.utc) - total_exec_time_in_s = (now - execute_start_time).total_seconds() - if finished_one_day: - logger.info( - "[T] Finished executing one day of workload in %d s, will ignore the rest of " - "pre-set execution time %d s", - total_exec_time_in_s, - args.run_for_s, - ) - else: - logger.info( - "[T] Executed ended but unable to finish executing the trace of a full day within %d s", - args.run_for_s, - ) + global EXECUTE_START_TIME # pylint: disable=global-statement + EXECUTE_START_TIME = datetime.now().astimezone( + pytz.utc + ) # pylint: disable=global-statement - else: - logger.info("[T] Telling all %d clients to start.", args.num_clients) - for idx in range(args.num_clients): - control_semaphore[idx].release() + logger.info("[RA] Telling all %d clients to start.", args.num_clients) + for i in range(args.num_clients): + control_semaphore[i].release() pause_controller = PauseController( args.num_clients, pause_semaphore, resume_semaphore ) - if args.run_for_s is not None and num_client_trace is None: - logger.info("[T] Letting the experiment run for %d seconds...", args.run_for_s) - time.sleep(args.run_for_s) + if args.starting_clients is not None: + pause_controller.adjust_num_running_clients(args.starting_clients) - elif num_client_trace is None: - logger.info("[T] Waiting until requested to stop... (hit Ctrl-C)") + # Wait until requested to stop. + + if args.interactive: logger.info( "type in an integer smaller than total number of clients and press enter to change number of running client, type in exit to stop dynamically adjusting number of clients...", ) - asyncio.run(get_command_line_input(pause_controller)) - should_shutdown = threading.Event() - - def signal_handler(_signal, _frame): - should_shutdown.set() - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - should_shutdown.wait() + get_command_line_input(pause_controller) + else: + logger.info( + "Repeating analytics listening on %d until requested to stop... (hit Ctrl-C)", + args.adjust_clients_port, + ) + serve(pause_controller, port=args.adjust_clients_port) - logger.info("[T] Stopping clients...") - for idx in range(args.num_clients): + logger.info("[RA] Stopping all clients...") + for i in range(args.num_clients): # Note that in most cases, one release will have already run. This is OK # because downstream runners will not hang if there is a unconsumed # semaphore value. - control_semaphore[idx].release() - control_semaphore[idx].release() + control_semaphore[i].release() + control_semaphore[i].release() + + logger.info("[RA] Waiting for the clients to complete...") + for p in processes: + p.join() + + for idx, p in enumerate(processes): + logger.info("Runner %d exit code: %d", idx, p.exitcode) - logger.info("[T] Waiting for clients to terminate...") - for c in clients: - c.join() - logger.info("[T] Done transactions!") + logger.info("Done repeating analytics!") if __name__ == "__main__": diff --git a/workloads/IMDB_extended/workload_utils/change_clients_api.py b/workloads/IMDB_extended/workload_utils/change_clients_api.py new file mode 100644 index 00000000..a7524850 --- /dev/null +++ b/workloads/IMDB_extended/workload_utils/change_clients_api.py @@ -0,0 +1,61 @@ +import uvicorn +from fastapi import FastAPI +from pydantic import BaseModel +from typing import Optional + +from .pause_controller import PauseController + + +class ClientState(BaseModel): + max_clients: int + curr_clients: int + + +class SetClientState(BaseModel): + curr_clients: int + + +class Manager: + def __init__(self, pc: PauseController) -> None: + self.pc = pc + + +app = FastAPI() +manager: Optional[Manager] = None + + +@app.get("/clients") +def get_clients() -> ClientState: + global manager # pylint: disable=global-variable-not-assigned + assert manager is not None + return ClientState( + curr_clients=manager.pc.num_running_clients, + max_clients=manager.pc.total_num_clients, + ) + + +@app.post("/clients") +def set_clients(set_state: SetClientState) -> ClientState: + global manager # pylint: disable=global-variable-not-assigned + assert manager is not None + manager.pc.adjust_num_running_clients(set_state.curr_clients, verbose=True) + return ClientState( + curr_clients=manager.pc.num_running_clients, + max_clients=manager.pc.total_num_clients, + ) + + +def serve( + pc: PauseController, port: int, host: str = "0.0.0.0", log_level: str = "info" +) -> None: + try: + global manager # pylint: disable=global-statement + manager = Manager(pc) + uvicorn.run( + "workload_utils.change_clients_api:app", + host=host, + port=port, + log_level=log_level, + ) + finally: + manager = None diff --git a/workloads/IMDB_extended/workload_utils/pause_controller.py b/workloads/IMDB_extended/workload_utils/pause_controller.py new file mode 100644 index 00000000..a4674b28 --- /dev/null +++ b/workloads/IMDB_extended/workload_utils/pause_controller.py @@ -0,0 +1,77 @@ +from typing import List +import numpy as np +import multiprocessing as mp + + +class PauseController: + # controlling the pause and resume of clients + def __init__( + self, + total_num_clients: int, + pause_semaphore: List[mp.Semaphore], # type: ignore + resume_semaphore: List[mp.Semaphore], # type: ignore + ): + self.total_num_clients = total_num_clients + self.pause_semaphore = pause_semaphore + self.resume_semaphore = resume_semaphore + self.paused_clients: List[int] = [] + self.running_clients: List[int] = list(range(total_num_clients)) + self.num_running_clients: int = total_num_clients + + def adjust_num_running_clients( + self, num_clients: int, verbose: bool = True + ) -> None: + if num_clients > self.total_num_clients: + print( + f"invalid input number of clients {num_clients}, larger than total number of clients" + ) + return + if num_clients == self.num_running_clients: + return + elif num_clients < self.num_running_clients: + pause_clients = np.random.choice( + self.running_clients, + size=self.num_running_clients - num_clients, + replace=False, + ) + for i in pause_clients: + assert ( + i not in self.paused_clients + ), f"trying to pause a client that is already paused: {i}" + if verbose: + print(f"pausing client {i}") + self.pause_semaphore[i].release() + self.pause_semaphore[i].release() + self.paused_clients.append(i) + self.running_clients.remove(i) + self.num_running_clients -= 1 + else: + resume_clients = np.random.choice( + self.paused_clients, + size=num_clients - self.num_running_clients, + replace=False, + ) + for i in resume_clients: + assert ( + i not in self.running_clients + ), f"trying to resume a running client: {i}" + if verbose: + print(f"resuming client {i}") + self.resume_semaphore[i].release() + self.resume_semaphore[i].release() + self.paused_clients.remove(i) + self.running_clients.append(i) + self.num_running_clients += 1 + + +def get_command_line_input(pause_controller: PauseController) -> None: + while True: + try: + user_input = input() + if user_input.isnumeric(): + num_client = int(user_input) + pause_controller.adjust_num_running_clients(num_client) + elif user_input == "exit": + break + except KeyboardInterrupt: + break