diff --git a/README.md b/README.md index 8024a703..dfe25751 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ engines. bulk_load --config-file path/to/config.yml --manifest-file path/to/manifest.yml` to execute the bulk load (this time may take some time, depending on how much data you are loading). -- Start the BRAD server `brad server --config-file path/to/config.yml +- Start the BRAD server `brad daemon --config-file path/to/config.yml --schema-name your_schema_name --planner-config-file path/to/planner.yml`. - Run queries through the CLI `brad cli`. diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index 216e28a0..6f1f383e 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -1,7 +1,9 @@ import argparse +import copy import multiprocessing as mp import time import os +import pickle import numpy as np import pathlib import random @@ -21,6 +23,30 @@ from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff logger = logging.getLogger(__name__) +EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc) +ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"] + + +def get_time_of_the_day_unsimulated( + now: datetime, time_scale_factor: Optional[int] +) -> int: + # Get the time of the day in minute in real-time + assert time_scale_factor is not None, "need to specify args.time_scale_factor" + # time_diff in minutes after scaling + time_diff = int((now - EXECUTE_START_TIME).total_seconds() / 60 * time_scale_factor) + time_unsimulated = time_diff % (24 * 60) # time of the day in minutes + return time_unsimulated + + +def time_in_minute_to_datetime_str(time_unsimulated: Optional[int]) -> str: + if time_unsimulated is None: + return "xxx" + hour = time_unsimulated // 60 + assert hour < 24 + minute = time_unsimulated % 60 + hour_str = str(hour) if hour >= 10 else "0" + str(hour) + minute_str = str(minute) if minute >= 10 else "0" + str(minute) + return f"{hour_str}:{minute_str}" def runner( @@ -31,6 +57,7 @@ def runner( query_bank: List[str], queries: List[int], query_frequency: Optional[npt.NDArray] = None, + execution_gap_dist: Optional[npt.NDArray] = None, ) -> None: def noop(_signal, _frame): pass @@ -70,7 +97,11 @@ def noop(_signal, _frame): "w", encoding="UTF-8", ) as file: - print("timestamp,query_idx,run_time_s,engine", file=file, flush=True) + print( + "timestamp,time_since_execution,time_of_day,query_idx,run_time,engine", + file=file, + flush=True, + ) prng = random.Random(args.seed ^ runner_idx) rand_backoff = None @@ -88,10 +119,17 @@ def noop(_signal, _frame): _ = stop_queue.get() while True: - if args.avg_gap_s is not None: - # Wait times are normally distributed right now. - # TODO: Consider using a different distribution (e.g., exponential). - # TODO: load gap distribution from a path that mimics snowset + if execution_gap_dist is not None: + now = datetime.now().astimezone(pytz.utc) + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + wait_for_s = execution_gap_dist[ + int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) + ] + time.sleep(wait_for_s) + elif args.avg_gap_s is not None: + # Wait times are normally distributed if execution_gap_dist is not provided. wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) if wait_for_s < 0.0: wait_for_s = 0.0 @@ -111,15 +149,26 @@ def noop(_signal, _frame): try: engine = None now = datetime.now().astimezone(pytz.utc) + if args.time_scale_factor is not None: + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + else: + time_unsimulated = None + time_unsimulated_str = time_in_minute_to_datetime_str( + time_unsimulated + ) start = time.time() _, engine = database.execute_sync_with_engine(query) end = time.time() print( - "{},{},{},{}".format( + "{},{},{},{},{},{}".format( now, + (now - EXECUTE_START_TIME).total_seconds(), + time_unsimulated_str, qidx, end - start, - engine.value if engine is not None else "unknown", + ENGINE_NAMES[engine], ), file=file, flush=True, @@ -167,6 +216,126 @@ def noop(_signal, _frame): database.close_sync() +def simulation_runner( + all_query_runtime: npt.NDArray, + runner_idx: int, + start_queue: mp.Queue, + stop_queue: mp.Queue, + args, + query_bank: List[str], + queries: List[int], + query_frequency_original: Optional[npt.NDArray] = None, + execution_gap_dist: Optional[npt.NDArray] = None, + wait_for_execute: bool = False, +) -> None: + def noop(_signal, _frame): + pass + + signal.signal(signal.SIGINT, noop) + + # For printing out results. + if "COND_OUT" in os.environ: + # pylint: disable-next=import-error + import conductor.lib as cond + + out_dir = cond.get_output_path() + else: + out_dir = pathlib.Path(".") + + if query_frequency_original is not None: + query_frequency = copy.deepcopy(query_frequency_original[queries]) + query_frequency = query_frequency / np.sum(query_frequency) + else: + query_frequency = None + + with open( + out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), + "w", + encoding="UTF-8", + ) as file: + print( + "timestamp,time_since_execution,time_of_day,query_idx,run_time_s,engine", + file=file, + flush=True, + ) + + prng = random.Random(args.seed ^ runner_idx) + + logger.info( + "[Repeating Analytics Runner %d] Queries to run: %s", + runner_idx, + queries, + ) + query_order = queries.copy() + prng.shuffle(query_order) + + # Signal that we're ready to start and wait for the controller. + start_queue.put_nowait("") + _ = stop_queue.get() + + while True: + if execution_gap_dist is not None: + now = datetime.now().astimezone(pytz.utc) + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + wait_for_s = execution_gap_dist[ + int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) + ] + time.sleep(wait_for_s) + elif args.avg_gap_s is not None: + # Wait times are normally distributed if execution_gap_dist is not provided. + wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) + if wait_for_s < 0.0: + wait_for_s = 0.0 + time.sleep(wait_for_s) + + if query_frequency is not None: + qidx = prng.choices(queries, list(query_frequency))[0] + else: + if len(query_order) == 0: + query_order = queries.copy() + prng.shuffle(query_order) + + qidx = query_order.pop() + logger.debug("Executing qidx: %d", qidx) + query = query_bank[qidx] + # using the average of the best two engines as approximation of brad runtime + runtime = ( + np.sum(all_query_runtime[qidx]) - np.min(all_query_runtime[qidx]) + ) / 2 + if wait_for_execute: + time.sleep(runtime) + engine = np.argmin(all_query_runtime[qidx]) + + now = datetime.now().astimezone(pytz.utc) + if args.time_scale_factor is not None: + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + else: + time_unsimulated = None + time_unsimulated_str = time_in_minute_to_datetime_str(time_unsimulated) + print( + "{},{},{},{},{},{}".format( + now, + (now - EXECUTE_START_TIME).total_seconds(), + time_unsimulated_str, + qidx, + runtime, + ENGINE_NAMES[engine], + ), + file=file, + flush=True, + ) + + try: + _ = stop_queue.get_nowait() + break + except queue.Empty: + pass + + def run_warmup(args, query_bank: List[str], queries: List[int]): if args.engine is not None: engine = Engine.from_str(args.engine) @@ -254,6 +423,22 @@ def main(): parser.add_argument("--seed", type=int, default=42) parser.add_argument("--num-front-ends", type=int, default=1) parser.add_argument("--run-warmup", action="store_true") + parser.add_argument( + "--run-simulation", + action="store_true", + help="Run the simulation instead of actual execution.", + ) + parser.add_argument( + "--wait-for-execute-sim", + action="store_true", + help="Waiting for execution in simulation?", + ) + parser.add_argument( + "--query-runtime-path", + type=str, + default=None, + help="path to the query runtime numpy file", + ) parser.add_argument( "--run-warmup-times", type=int, @@ -274,9 +459,33 @@ def main(): default=None, help="path to the frequency to draw each query in query bank", ) + parser.add_argument( + "--num-query-path", + type=str, + default=None, + help="Path to the distribution of number of queries for each period of a day", + ) + parser.add_argument( + "--num-client-path", + type=str, + default=None, + help="Path to the distribution of number of clients for each period of a day", + ) parser.add_argument("--num-clients", type=int, default=1) parser.add_argument("--avg-gap-s", type=float) parser.add_argument("--avg-gap-std-s", type=float, default=0.5) + parser.add_argument( + "--gap-dist-path", + type=str, + default=None, + help="Path to the distribution regarding the number of concurrent queries", + ) + parser.add_argument( + "--time-scale-factor", + type=int, + default=100, + help="trace 1s of simulation as X seconds in real-time to match the num-concurrent-query", + ) parser.add_argument("--query-indexes", type=str) parser.add_argument( "--brad-direct", @@ -297,12 +506,6 @@ def main(): "--engine", type=str, help="The engine to use, if connecting directly." ) parser.add_argument("--run-for-s", type=int, help="If set, run for this long.") - parser.add_argument( - "--query-gap-path", - type=str, - default=None, - help="Path to the gaps to run each query", - ) args = parser.parse_args() with open(args.query_bank_file, "r", encoding="UTF-8") as file: @@ -318,6 +521,27 @@ def main(): else: query_frequency = None + if ( + args.gap_dist_path is not None + and os.path.exists(args.gap_dist_path) + and args.time_scale_factor is not None + ): + # we can only set the num_concurrent_query trace in presence of time_scale_factor + execution_gap_dist = np.load(args.gap_dist_path) + else: + execution_gap_dist = None + + if ( + args.num_client_path is not None + and os.path.exists(args.num_client_path) + and args.time_scale_factor is not None + ): + # we can only set the num_concurrent_query trace in presence of time_scale_factor + with open(args.num_client_path, "rb") as f: + num_client_trace = pickle.load(f) + else: + num_client_trace = None + if args.query_indexes is None: queries = list(range(len(query_bank))) else: @@ -332,42 +556,124 @@ def main(): return mgr = mp.Manager() - start_queue = mgr.Queue() - stop_queue = mgr.Queue() - - processes = [] - for idx in range(args.num_clients): - p = mp.Process( - target=runner, - args=( - idx, - start_queue, - stop_queue, - args, - query_bank, - queries, - query_frequency, - ), - ) - p.start() - processes.append(p) + start_queue = [mgr.Queue() for _ in range(args.num_clients)] + stop_queue = [mgr.Queue() for _ in range(args.num_clients)] + + if args.run_simulation: + assert ( + args.query_runtime_path is not None + ), "must provide query runtime to run simulation" + all_query_runtime = np.load(args.query_runtime_path) + assert all_query_runtime.shape == ( + len(query_bank), + 3, + ), "incorrect query runtime file format" + processes = [] + for idx in range(args.num_clients): + p = mp.Process( + target=simulation_runner, + args=( + all_query_runtime, + idx, + start_queue[idx], + stop_queue[idx], + args, + query_bank, + queries, + query_frequency, + execution_gap_dist, + args.wait_for_execute_sim, + ), + ) + p.start() + processes.append(p) + else: + processes = [] + for idx in range(args.num_clients): + p = mp.Process( + target=runner, + args=( + idx, + start_queue[idx], + stop_queue[idx], + args, + query_bank, + queries, + query_frequency, + execution_gap_dist, + ), + ) + p.start() + processes.append(p) print("Waiting for startup...", flush=True) - for _ in range(args.num_clients): - start_queue.get() + for i in range(args.num_clients): + start_queue[i].get() + + global EXECUTE_START_TIME + EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc) + + if num_client_trace is not None: + assert args.time_scale_factor is not None, "need to set args.time_scale_factor" + print("Telling client no.{} to start.".format(0), flush=True) + stop_queue[0].put("") + num_running_client = 1 + + curr_day_start_time = datetime.now().astimezone(pytz.utc) + for time_of_day in num_client_trace: + if time_of_day == 0: + continue + # at this time_of_day start/shut-down more clients + time_in_s = time_of_day / args.time_scale_factor + now = datetime.now().astimezone(pytz.utc) + curr_time_in_s = (now - curr_day_start_time).total_seconds() + total_exec_time_in_s = (now - EXECUTE_START_TIME).total_seconds() + if args.run_for_s <= total_exec_time_in_s: + break + if args.run_for_s - total_exec_time_in_s <= (time_in_s - curr_time_in_s): + wait_time = args.run_for_s - total_exec_time_in_s + if wait_time > 0: + time.sleep(wait_time) + break + time.sleep(time_in_s - curr_time_in_s) + num_client_required = min(num_client_trace[time_of_day], args.num_clients) + if num_client_required > num_running_client: + # starting additional clients + for add_client in range(num_running_client, num_client_required): + print( + "Telling client no.{} to start.".format(add_client), flush=True + ) + stop_queue[add_client].put("") + num_running_client += 1 + elif num_running_client > num_client_required: + # shutting down clients + for delete_client in range(num_running_client, num_client_required, -1): + print( + "Telling client no.{} to stop.".format(delete_client - 1), + flush=True, + ) + stop_queue[delete_client - 1].put("") + num_running_client -= 1 + now = datetime.now().astimezone(pytz.utc) + total_exec_time_in_s = (now - EXECUTE_START_TIME).total_seconds() + print( + f"Finished executing one day of workload in {total_exec_time_in_s}s, will ignore the rest of " + f"pre-set execution time {args.run_for_s}s" + ) - print("Telling {} clients to start.".format(args.num_clients), flush=True) - for _ in range(args.num_clients): - stop_queue.put("") + else: + print("Telling all {} clients to start.".format(args.num_clients), flush=True) + for i in range(args.num_clients): + stop_queue[i].put("") - if args.run_for_s: + if args.run_for_s and num_client_trace is None: print( "Waiting for {} seconds...".format(args.run_for_s), flush=True, file=sys.stderr, ) time.sleep(args.run_for_s) - else: + elif num_client_trace is None: # Wait until requested to stop. print( "Repeating analytics waiting until requested to stop... (hit Ctrl-C)", @@ -384,9 +690,13 @@ def signal_handler(_signal, _frame): should_shutdown.wait() - print("Stopping clients...", flush=True, file=sys.stderr) - for _ in range(args.num_clients): - stop_queue.put("") + print("Stopping all clients...", flush=True, file=sys.stderr) + for i in range(args.num_clients): + stop_queue[i].put("") + + # stop again, just incase some client hasn't started yet + for i in range(args.num_clients): + stop_queue[i].put("") print("Waiting for the clients to complete.") for p in processes: diff --git a/workloads/cross_db_benchmark/meta_tools/analyze_exec_log.py b/workloads/cross_db_benchmark/meta_tools/analyze_exec_log.py new file mode 100644 index 00000000..c2400a61 --- /dev/null +++ b/workloads/cross_db_benchmark/meta_tools/analyze_exec_log.py @@ -0,0 +1,100 @@ +import numpy as np +import matplotlib.pyplot as plt +import os +import pandas as pd +from typing import Optional, List, Tuple +import numpy.typing as npt + + +def convert_str_to_minute(time_of_day: str) -> int: + h = int(time_of_day.split(":")[0]) + m = int(time_of_day.split(":")[-1]) + return h * 60 + m + + +def convert_minute_to_str(time_of_day: int) -> str: + hour = time_of_day // 60 + assert hour < 24 + minute = time_of_day % 60 + hour_str = str(hour) if hour >= 10 else "0" + str(hour) + minute_str = str(minute) if minute >= 10 else "0" + str(minute) + return f"{hour_str}:{minute_str}" + + +def load_all_log_csv(log_dir: str) -> pd.DataFrame: + all_df = [] + for file in os.listdir(log_dir): + if file.startswith("repeating_olap_batch_") and file.endswith(".csv"): + all_df.append(pd.read_csv(os.path.join(log_dir, file))) + df = pd.concat(all_df) + return df + + +def plot_overall_runtime_hist( + log_dir: str, nbins: int = 100, bins: Optional[npt.NDArray] = None +) -> Tuple[npt.NDArray, npt.NDArray]: + df = load_all_log_csv(log_dir) + runtime = df["run_time_s"].values + if bins is None: + count, bin_breaks, _ = plt.hist(runtime, bins=nbins) + else: + count, bin_breaks, _ = plt.hist(runtime, bins=bins) + plt.xlabel("Query runtime") + plt.ylabel("Number of queries") + plt.show() + return count, bin_breaks + + +def plot_num_queries_with_time( + log_dir: str, time_interval: int = 15 +) -> Tuple[List[str], List[int]]: + df = load_all_log_csv(log_dir) + time_of_day = df["time_of_day"].values + time_of_day = np.asarray([convert_str_to_minute(i) for i in time_of_day]) + time_str = [] + count = [] + start = 0 + for h in range(24): + for m in range(0, 60, time_interval): + end = h * 60 + m + if end == 0: + continue + time_str.append(convert_minute_to_str(end)) + count.append(int(np.sum((time_of_day >= start) & (time_of_day < end)))) + start = end + plt.bar(np.arange(len(count)), count) + plt.xlabel("Time of the day") + plt.ylabel("Number of queries") + plt.show() + return time_str, count + + +def plot_num_concurrent_queries_hist(log_dir: str) -> Tuple[List[int], List[int]]: + df = load_all_log_csv(log_dir) + runtime = df["run_time_s"].values + end = df["time_since_execution"].values + start = end - runtime + idx = np.argsort(start) + start = start[idx] + end = end[idx] + + res = np.zeros(len(start)) + for i in range(len(start)): + s = start[i] + e = end[i] + ne = np.searchsorted( + start[i + 1 :], e + ) # number of queries start after s and before e + ns = np.sum(end[:i] > s) # number of queries start before s and ends after s + res[i] = ne + ns + + num_concurrent_queries = [] + count = [] + for num_query in range(int(np.max(res)) + 1): + num_concurrent_queries.append(num_query) + count.append(int(np.sum(res == num_query))) + plt.bar(np.arange(len(count)), count) + plt.xlabel("Number of concurrent queries") + plt.ylabel("Count") + plt.show() + return num_concurrent_queries, count diff --git a/workloads/cross_db_benchmark/meta_tools/analyze_snowset.py b/workloads/cross_db_benchmark/meta_tools/analyze_snowset.py new file mode 100644 index 00000000..59e2b122 --- /dev/null +++ b/workloads/cross_db_benchmark/meta_tools/analyze_snowset.py @@ -0,0 +1,255 @@ +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from typing import List +import numpy.typing as npt + + +def get_column_names(csv_file_name: str) -> List[str]: + column_names = pd.read_csv(csv_file_name, sep=",", header=0, skiprows=0, nrows=0) + column_names = list(column_names.columns) + return column_names + + +def create_sample( + file_name: str, sample_size: float = 0.01, nrows: int = 100000 +) -> pd.DataFrame: + # create a uniform sample of the table + sample_df = [] + column_names = pd.read_csv(file_name, sep=",", header=0, skiprows=0, nrows=0) + column_names = list(column_names.columns) + skiprows = 0 + while True: + try: + df = pd.read_csv( + file_name, sep=",", header=None, skiprows=skiprows, nrows=nrows + ) + except: + break + sample_df.append(df) + skiprows += int(nrows / sample_size) + sample_df = pd.concat(sample_df) + sample_df.columns = column_names + return sample_df + + +def load_parquet(parquet_file_name: str, columns: List[str]) -> pd.DataFrame: + df = pd.read_parquet(parquet_file_name, columns=columns) + return df + + +def get_runtime_distribution( + df: pd.DataFrame, max_runtime: float +) -> (npt.NDArray, npt.NDArray): + runtime = df["durationTotal"].values / 1000 + density, bins = plt.hist(runtime[runtime < max_runtime], bins=100) + return density, bins + + +def get_num_concurrent_queries(rows: pd.DataFrame) -> npt.NDArray: + start = rows["createdTime"].values + end = rows["endTime"].values + idx = np.argsort(start) + start = start[idx] + end = end[idx] + + res = np.zeros(len(start)) + for i in range(len(start)): + s = start[i] + e = end[i] + ne = np.searchsorted( + start[i + 1 :], e + ) # number of queries start after s and before e + ns = np.sum(end[:i] > s) # number of queries start before s and ends after s + res[i] = ne + ns + return res + + +def get_num_queries_per_time_interval( + rows: pd.DataFrame, + dates: List[str] = None, + time_gap: int = 10, + day_type: str = "all", + aggregate: bool = True, +) -> (List, npt.NDArray, npt.NDArray): + """ + Get the number of queries per time interval of the given dates + time_gap: provide aggregated stats every {time_interval} minutes, for current implementation + please make it a number divisible by 60 + day_type: choose between "all", "weekday", "weekend" + aggregate: average across days? + """ + assert day_type in ["all", "weekday", "weekend"], f"invalid day_type: {day_type}" + if dates is None: + dates = [ + "2018-02-22", + "2018-02-23", + "2018-02-24", + "2018-02-25", + "2018-02-26", + "2018-02-27", + "2018-02-28", + "2018-03-01", + "2018-03-02", + "2018-03-03", + "2018-03-04", + "2018-03-05", + "2018-03-06", + ] + if day_type == "weekday": + dates = [ + d + for d in dates + if d not in ["2018-02-24", "2018-02-25", "2018-03-03", "2018-03-04"] + ] + elif day_type == "weekend": + dates = [ + d + for d in dates + if d in ["2018-02-24", "2018-02-25", "2018-03-03", "2018-03-04"] + ] + + time_intervals = [] + for h in range(24): + hour = str(h) if h >= 10 else f"0{h}" + for m in range(0, 60, time_gap): + minute = str(m) if m >= 10 else f"0{m}" + time_intervals.append(f"{hour}:{minute}:00") + + start_time = rows["createdTime"].values + end = rows["endTime"].values + idx = np.argsort(start_time) + start_time = start_time[idx] + end_time = end[idx] + + num_queries = [] + num_concurrent_queries = [] + all_time_interval = [] + outer_start = 0 # the start index of outer loop + for date in dates: + outer_end = np.array([f"{date}T23:59:59"], dtype="datetime64[ns]") + outer_end = np.searchsorted(start_time, outer_end[0]) + curr_time_intervals = np.array( + [f"{date}T{t}" for t in time_intervals], dtype="datetime64[ns]" + ) + if not aggregate: + all_time_interval.extend(curr_time_intervals) + if outer_end - outer_start == 0: + num_queries.append(np.zeros(len(time_intervals))) + num_concurrent_queries.append(np.zeros(len(time_intervals))) + continue + + curr_start_time = start_time[outer_start:outer_end] + curr_end_time = end_time[outer_start:outer_end] + + curr_num_queries = [] + curr_num_concurrent_queries = [] + inner_start = 0 # the start index of inner loop + for t in curr_time_intervals: + inner_end = np.searchsorted(curr_start_time, t) + if inner_end - inner_start == 0: + curr_num_queries.append(0) + curr_num_concurrent_queries.append(0) + continue + curr_num_queries.append(inner_end - inner_start) + num_concurrent_queries_cnt = 0 + for i in range(inner_start, inner_end): + s = curr_start_time[i] + e = curr_end_time[i] + if i < len(curr_start_time): + ne = np.searchsorted( + curr_start_time[i + 1 :], e + ) # number of queries start after s and before e + else: + ne = 0 + ns = np.sum( + curr_end_time[:i] > s + ) # number of queries start before s and ends after s + num_concurrent_queries_cnt += ne + ns + curr_num_concurrent_queries.append( + num_concurrent_queries_cnt / (inner_end - inner_start) + ) + inner_start = inner_end + num_queries.append(np.asarray(curr_num_queries)) + num_concurrent_queries.append(np.asarray(curr_num_concurrent_queries)) + outer_start = outer_end + + if aggregate: + return ( + time_intervals, + np.mean(num_queries, axis=0), + np.mean(num_concurrent_queries, axis=0), + ) + else: + return ( + all_time_interval, + np.concatenate(num_queries), + np.concatenate(num_concurrent_queries), + ) + + +def aggregate_across_database( + parquet_file_name: str, + min_num_queries: int = 5000, + num_db_cap: int = 100, + dates: List[str] = None, + time_gap: int = 10, + day_type: str = "all", +) -> (List, npt.NDArray, npt.NDArray): + """ + Get the number of queries per time interval of the given dates (averaged across all database for all days) + min_num_queries: discard database with fewer than {min_num_queries} queries + num_db_cap: only consider the first {num_db_cap} databases + time_gap: provide aggregated stats every {time_interval} minutes, for current implementation + please make it a number divisible by 60 + day_type: choose between "all", "weekday", "weekend" + """ + df = load_parquet( + parquet_file_name, ["databaseId", "durationTotal", "createdTime", "endTime"] + ) + num_db = 0 + agg_num_queries = [] + agg_num_concurrent_queries = [] + all_time_interval = None + for db, rows in df.groupby("databaseId"): + if len(rows) > min_num_queries: + ( + all_time_interval, + num_queries, + num_concurrent_queries, + ) = get_num_queries_per_time_interval(rows, dates, time_gap, day_type) + agg_num_queries.append(num_queries) + agg_num_concurrent_queries.append(num_concurrent_queries) + num_db += 1 + if num_db >= num_db_cap: + break + return ( + all_time_interval, + np.mean(agg_num_queries, axis=0), + np.mean(agg_num_concurrent_queries, axis=0), + ) + + +def num_concurrent_queries_across_database( + parquet_file_name: str, min_num_queries: int = 5000, num_db_cap: int = 100 +) -> (List, npt.NDArray, npt.NDArray): + """ + Get the number of concurrent queries + min_num_queries: discard database with fewer than {min_num_queries} queries + num_db_cap: only consider the first {num_db_cap} databases + """ + df = load_parquet( + parquet_file_name, ["databaseId", "durationTotal", "createdTime", "endTime"] + ) + num_db = 0 + total_num_concurrent_queries = [] + avg_num_query_per_day = [] + for db, rows in df.groupby("databaseId"): + if len(rows) > min_num_queries: + avg_num_query_per_day.append(len(rows) / 13) # 13 days of data in snowset + num_concurrent_queries = get_num_concurrent_queries(rows) + total_num_concurrent_queries.append(num_concurrent_queries) + num_db += 1 + if num_db >= num_db_cap: + break + return np.concatenate(total_num_concurrent_queries), np.mean(avg_num_query_per_day) diff --git a/workloads/cross_db_benchmark/meta_tools/gen_exec_start_time_trace.py b/workloads/cross_db_benchmark/meta_tools/gen_exec_start_time_trace.py new file mode 100644 index 00000000..960fb8a3 --- /dev/null +++ b/workloads/cross_db_benchmark/meta_tools/gen_exec_start_time_trace.py @@ -0,0 +1,127 @@ +import math +import pickle +import numpy as np +from typing import Optional, List, Mapping +import numpy.typing as npt + + +def gen_num_client_dist( + time_interval: int = 60, + max_num_client: int = 10, + hourly_distribution: Optional[List[float]] = None, + seed: int = 0, + sigma: float = 0.0, + save_path: Optional[str] = None, +) -> Mapping[int, int]: + """generate the number of client for different time of the day + time_interval: the granularity we want to divide a whole day by (unit minute, + will be good to make it a number divisible by 60) + max_num_client: maximum number of client + hourly_distribution: a list of 24 integer for the average number of clients within an hour + sigma: the random noise added to the generation process (with random seed) + """ + np.random.seed(seed) + if hourly_distribution is None: + hourly_distribution = [ + 0.1, + 0.05, + 0.05, + 0.05, + 0.05, + 0.05, + 0.05, + 0.1, + 0.2, + 0.4, + 0.6, + 0.8, + 0.8, + 0.9, + 1.0, + 1.0, + 0.9, + 0.7, + 0.4, + 0.3, + 0.2, + 0.2, + 0.1, + 0.1, + ] + assert ( + len(hourly_distribution) == 24 + ), "invalid length for hourly_distribution, must be 24" + + num_client_by_time = dict() + for h in range(24): + avg_num_client_h = hourly_distribution[h] * max_num_client + for m in range(0, 60, time_interval): + num_client_m = np.random.normal(avg_num_client_h, sigma) + num_client_m = min(math.ceil(num_client_m), max_num_client) + time_in_s = (h * 60 + m) * 60 + num_client_by_time[time_in_s] = num_client_m + + if save_path is not None: + with open(save_path, "wb") as f: + pickle.dump(num_client_by_time, f) + return num_client_by_time + + +def gen_gap_time_dist( + time_interval: int = 60, + avg_gap_in_s: int = 4, + hourly_distribution: Optional[List[float]] = None, + seed: int = 0, + sigma: float = 0.5, + save_path: Optional[str] = None, +) -> npt.NDArray: + """generate the execution time gap for different time of the day + time_interval: the granularity we want to divide a whole day by (unit minute, + will be good to make it a number divisible by 60) + avg_wait_time_in_s: average wait gap in second + hourly_distribution: a list of 24 integer for the average number of clients within an hour + sigma: the random noise added to the generation process (with random seed) + """ + np.random.seed(seed) + if hourly_distribution is None: + hourly_distribution = [ + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 0.7, + 0.6, + 0.5, + 0.5, + 0.5, + 0.3, + 0.3, + 0.3, + 0.5, + 0.6, + 0.7, + 0.8, + 0.9, + 1.0, + 1.0, + 1.0, + ] + assert ( + len(hourly_distribution) == 24 + ), "invalid length for hourly_distribution, must be 24" + + gap_time_dist = [] + for h in range(24): + avg_gap_time_h = hourly_distribution[h] * avg_gap_in_s + sigma_gap_time_h = hourly_distribution[h] * sigma + for m in range(0, 60, time_interval): + gap_time_m = np.random.normal(avg_gap_time_h, sigma_gap_time_h) + gap_time_dist.append(np.abs(gap_time_m)) + gap_time_dist = np.asarray(gap_time_dist) + if save_path is not None: + np.save(save_path, gap_time_dist) + return gap_time_dist diff --git a/workloads/cross_db_benchmark/meta_tools/shift_distribution.py b/workloads/cross_db_benchmark/meta_tools/shift_distribution.py index 37d33725..326da862 100644 --- a/workloads/cross_db_benchmark/meta_tools/shift_distribution.py +++ b/workloads/cross_db_benchmark/meta_tools/shift_distribution.py @@ -1,10 +1,11 @@ import numpy as np import matplotlib.pyplot as plt import numpy.typing as npt +from workloads.cross_db_benchmark.meta_tools.analyze_snowset import load_parquet def shift_frequency_to_match_hist( - runtime: npt.NDArray, target_hist: (npt.NDArray, npt.NDArray), balanced: bool + runtime: npt.NDArray, target_hist: (npt.NDArray, npt.NDArray), balanced: bool = True ) -> npt.NDArray: """ change the frequency of execution for a list of queries to match the distribution of a target histogram @@ -77,3 +78,21 @@ def shift_frequency_to_match_hist( best_engines == le ) return return_freq + + +def match_runtime_hist_snowset( + runtime: npt.NDArray, parquet_file_name: str, balanced: bool = True +) -> npt.NDArray: + """ + Matching the runtime histogram of snowset + runtime: a numpy array of shape (n, 3) corresponding to the athena-aurora-redshift runtime of n queries + parquet_file_name: path to the parquet file of snowset + balanced: whether to balance query from the best engine. + """ + df = load_parquet( + parquet_file_name, ["databaseId", "durationTotal", "createdTime", "endTime"] + ) + target_runtime = df["durationTotal"].values / 1000 + target_hist = plt.hist(target_runtime[target_runtime < 30], bins=100) + return_freq = shift_frequency_to_match_hist(runtime, target_hist, balanced) + return return_freq diff --git a/workloads/readme.md b/workloads/readme.md index 31963e74..eaf8aa5e 100644 --- a/workloads/readme.md +++ b/workloads/readme.md @@ -9,4 +9,10 @@ pip install -r requirements.txt ```angular2html mkdir imdb && cd imdb && wget -c http://homepages.cwi.nl/~boncz/job/imdb.tgz && tar -xvzf imdb.tgz python workloads/IMDB/prepend_imdb_headers.py --csv_dir /path/to/imdb +``` + +## Execute repeating analytical queries +```angular2html +cd workloads/IMDB_extended/ + ``` \ No newline at end of file