From 699ed8cdf1f5871d5f66a48fde83c24528be9c06 Mon Sep 17 00:00:00 2001 From: Ferdi K Date: Thu, 2 Nov 2023 17:17:37 -0400 Subject: [PATCH] telemetry workload runner --- workloads/IMDB_extended/run_telemetry.py | 205 ++++++++++++++++++ .../workload_utils/telemetry_worker.py | 99 +++++++++ 2 files changed, 304 insertions(+) create mode 100644 workloads/IMDB_extended/run_telemetry.py create mode 100644 workloads/IMDB_extended/workload_utils/telemetry_worker.py diff --git a/workloads/IMDB_extended/run_telemetry.py b/workloads/IMDB_extended/run_telemetry.py new file mode 100644 index 00000000..122fdf26 --- /dev/null +++ b/workloads/IMDB_extended/run_telemetry.py @@ -0,0 +1,205 @@ +import argparse +import pathlib +import pyodbc +import queue +import random +import signal +import sys +import threading +import time +import os +import pytz +import multiprocessing as mp +from datetime import datetime +from typing import List, Tuple + +from brad.grpc_client import BradGrpcClient +from workload_utils.database import Database, PyodbcDatabase, BradDatabase +from workload_utils.telemetry_worker import TelemetryWorker + + +def runner( + args, + worker_idx: int, + start_queue: mp.Queue, + stop_queue: mp.Queue, +) -> None: + """ + Meant to be launched as a subprocess with multiprocessing. + """ + + def noop_handler(_signal, _frame): + pass + + signal.signal(signal.SIGINT, noop_handler) + + worker = TelemetryWorker(worker_idx, args.seed ^ worker_idx) + + wait_s = args.wait_seconds + prng = random.Random(~(args.seed ^ worker_idx)) + queries = [ + worker.query1, + worker.query2, + worker.query3, + ] + query_weights = [0.4, 0.3, 0.3] + query_indexes = list(range(len(queries))) + latencies: List[List[Tuple[datetime, float]]] = [[] for _ in range(len(queries))] + + try: + # Connect. + if args.cstr_var is not None: + db: Database = PyodbcDatabase( + pyodbc.connect(os.environ[args.cstr_var], autocommit=True) + ) + else: + brad = BradGrpcClient(args.brad_host, args.brad_port) + brad.connect() + db = BradDatabase(brad) + + # Signal that we are ready to start and wait for other clients. + start_queue.put("") + _ = stop_queue.get() + + while True: + time.sleep(wait_s) + + q_idx = prng.choices(query_indexes, weights=query_weights, k=1)[0] + query = queries[q_idx] + + now = datetime.now().astimezone(pytz.utc) + query_start = time.time() + query(db) + query_end = time.time() + + # Record metrics. + latencies[q_idx].append((now, query_end - query_start)) + + try: + _ = stop_queue.get_nowait() + break + except queue.Empty: + pass + print( + f"[{worker_idx}] Done running telemetry queries.", + flush=True, + file=sys.stderr, + ) + + finally: + # For printing out results. + if "COND_OUT" in os.environ: + # pylint: disable-next=import-error + import conductor.lib as cond + + out_dir = cond.get_output_path() + else: + out_dir = pathlib.Path(".") + + with open( + out_dir / "telemetry_latency_{}.csv".format(worker_idx), + "w", + encoding="UTF-8", + ) as file: + print("query_idx,timestamp,run_time_s", file=file) + for qidx, lat_list in enumerate(latencies): + for timestamp, lat in lat_list: + print("{},{},{}".format(qidx, timestamp, lat), file=file) + + db.close_sync() + + +def main(): + parser = argparse.ArgumentParser( + "Tool used to run IMDB-extended telemetry workload against BRAD or an ODBC database." + ) + parser.add_argument( + "--run-for-s", + type=int, + help="How long to run the workload for. If unset, the experiment will run until Ctrl-C.", + ) + parser.add_argument( + "--num-clients", + type=int, + default=1, + help="The number of telemetry clients.", + ) + parser.add_argument( + "--seed", type=int, default=42, help="Random seed for reproducibility." + ) + parser.add_argument( + "--cstr-var", + type=str, + help="Environment variable that holds a ODBC connection string. Set to connect directly (i.e., not through BRAD)", + ) + parser.add_argument( + "--wait-seconds", + type=float, + default=5.0, + help="How many seconds should worker wait in between sending queries.", + ) + parser.add_argument("--brad-host", type=str, default="localhost") + parser.add_argument("--brad-port", type=int, default=6583) + args = parser.parse_args() + + mgr = mp.Manager() + start_queue = mgr.Queue() + stop_queue = mgr.Queue() + + clients = [] + for idx in range(args.num_clients): + p = mp.Process(target=runner, args=(args, idx, start_queue, stop_queue)) + p.start() + clients.append(p) + + print("Waiting for startup...", file=sys.stderr, flush=True) + for _ in range(args.num_clients): + start_queue.get() + + print( + "Telling {} clients to start.".format(args.num_clients), + file=sys.stderr, + flush=True, + ) + for _ in range(args.num_clients): + stop_queue.put("") + + if args.run_for_s is not None: + print( + "Letting the experiment run for {} seconds...".format(args.run_for_s), + flush=True, + file=sys.stderr, + ) + time.sleep(args.run_for_s) + + else: + print( + "Waiting until requested to stop... (hit Ctrl-C)", + flush=True, + file=sys.stderr, + ) + should_shutdown = threading.Event() + + def signal_handler(_signal, _frame): + should_shutdown.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + should_shutdown.wait() + + print("Stopping clients...", flush=True, file=sys.stderr) + for _ in range(args.num_clients): + stop_queue.put("") + + print("Waiting for clients to terminate...", flush=True, file=sys.stderr) + for c in clients: + c.join() + + +if __name__ == "__main__": + # On Unix platforms, the default way to start a process is by forking, which + # is not ideal (we do not want to duplicate this process' file + # descriptors!). + mp.set_start_method("spawn") + main() diff --git a/workloads/IMDB_extended/workload_utils/telemetry_worker.py b/workloads/IMDB_extended/workload_utils/telemetry_worker.py new file mode 100644 index 00000000..bc99a724 --- /dev/null +++ b/workloads/IMDB_extended/workload_utils/telemetry_worker.py @@ -0,0 +1,99 @@ +import random +import logging + +from database import Database + +logger = logging.getLogger(__name__) + + +class TelemetryWorker: + def __init__(self, worker_id: int, seed: int) -> None: + self.worker_id = worker_id + self.prng = random.Random(seed) + + # to generate queries + self.max_dist = 1000 + self.max_close_cinemas = 80 + self.min_cap = 10 + self.max_cap = 1000 + + def random_timerange(self): + year = 2023 + month = self.prng.randint(1, 12) + + if month == 2: + day = self.prng.randint(1, 28) + elif month in [4, 6, 9, 11]: + day = self.prng.randint(1, 30) + else: + day = self.prng.randint(1, 31) + + hour = self.prng.randint(0, 23) + minute = self.prng.randint(0, 59) + second = self.prng.randint(0, 59) + millisecond = self.prng.randint(0, 999) + + return ( + f"{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02}.{millisecond:03}", + f"{year:04}-{month:02}-{day:02} {hour+1:02}:{minute:02}:{second:02}.{millisecond:03}", + ) + + def random_gb_key(self): + cols = ["ip", "event_id", "movie_id"] + return self.prng.choice(cols) + + def query1(self, db: Database) -> bool: + """ + COUNT with GROUP BY + Note: Group by ip takes over 10x longer than group by movie_id or event_id + """ + try: + gb_key = self.random_gb_key() + query = f"SELECT {gb_key}, COUNT(*) FROM telemetry GROUP BY {gb_key};" + print(query) + + db.execute_sync(query) + return True + + except: # pylint: disable=bare-except + return False + + def query2(self, db: Database) -> bool: + try: + ts1, ts2 = self.random_timerange() + query = f""" + SELECT COUNT(*) + FROM telemetry + WHERE timestamp > '{ts1}' + AND timestamp < '{ts2}'; + """ + + db.execute_sync(query) + return True + + except: # pylint: disable=bare-except + return False + + def query3(self, db: Database) -> bool: + try: + ts1, ts2 = self.random_timerange() + gb_key = self.random_gb_key() + + query = f""" + SELECT {gb_key}, COUNT(*) + FROM telemetry + WHERE timestamp > '{ts1}' + AND timestamp < '{ts2}' + GROUP BY {gb_key}; + """ + + db.execute_sync(query) + return True + + except: # pylint: disable=bare-except + return False + + +if __name__ == "__main__": + tw = TelemetryWorker(1, 20) + print(tw.query1(None))