From 17f666517851470f381e6f608072a851da1d42ad Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 13 Nov 2023 11:13:07 -0500 Subject: [PATCH] Fix bug in repeating analytics runner shutdown control --- .../IMDB_extended/run_repeating_analytics.py | 231 +++++++++--------- 1 file changed, 121 insertions(+), 110 deletions(-) diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index ba857224..627923dd 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -26,6 +26,8 @@ EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc) ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"] +RUNNER_EXIT = "runner_exit" + def get_time_of_the_day_unsimulated( now: datetime, time_scale_factor: Optional[int] @@ -91,129 +93,135 @@ def noop(_signal, _frame): query_frequency = query_frequency[queries] query_frequency = query_frequency / np.sum(query_frequency) + file = open( + out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), + "w", + encoding="UTF-8", + ) + try: - with open( - out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), - "w", - encoding="UTF-8", - ) as file: - print( - "timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine", - file=file, - flush=True, - ) + print( + "timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine", + file=file, + flush=True, + ) - prng = random.Random(args.seed ^ runner_idx) - rand_backoff = None + prng = random.Random(args.seed ^ runner_idx) + rand_backoff = None - logger.info( - "[Repeating Analytics Runner %d] Queries to run: %s", - runner_idx, - queries, - ) - query_order_main = queries.copy() - prng.shuffle(query_order_main) - query_order = query_order_main.copy() + logger.info( + "[Repeating Analytics Runner %d] Queries to run: %s", + runner_idx, + queries, + ) + query_order_main = queries.copy() + prng.shuffle(query_order_main) + query_order = query_order_main.copy() + + # Signal that we're ready to start and wait for the controller. + start_queue.put_nowait("") + msg = stop_queue.get() + + if msg == RUNNER_EXIT: + print(f"Runner {runner_idx} is stopping without having started.") + return + + while True: + if execution_gap_dist is not None: + now = datetime.now().astimezone(pytz.utc) + time_unsimulated = get_time_of_the_day_unsimulated( + now, args.time_scale_factor + ) + wait_for_s = execution_gap_dist[ + int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) + ] + time.sleep(wait_for_s) + elif args.avg_gap_s is not None: + # Wait times are normally distributed if execution_gap_dist is not provided. + wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) + if wait_for_s < 0.0: + wait_for_s = 0.0 + time.sleep(wait_for_s) + + if query_frequency is not None: + qidx = prng.choices(queries, list(query_frequency))[0] + else: + if len(query_order) == 0: + query_order = query_order_main.copy() - # Signal that we're ready to start and wait for the controller. - start_queue.put_nowait("") - _ = stop_queue.get() + qidx = query_order.pop() + logger.debug("Executing qidx: %d", qidx) + query = query_bank[qidx] - while True: - if execution_gap_dist is not None: - now = datetime.now().astimezone(pytz.utc) + try: + engine = None + now = datetime.now().astimezone(pytz.utc) + if args.time_scale_factor is not None: time_unsimulated = get_time_of_the_day_unsimulated( now, args.time_scale_factor ) - wait_for_s = execution_gap_dist[ - int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) - ] - time.sleep(wait_for_s) - elif args.avg_gap_s is not None: - # Wait times are normally distributed if execution_gap_dist is not provided. - wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) - if wait_for_s < 0.0: - wait_for_s = 0.0 - time.sleep(wait_for_s) - - if query_frequency is not None: - qidx = prng.choices(queries, list(query_frequency))[0] + time_unsimulated_str = time_in_minute_to_datetime_str( + time_unsimulated + ) else: - if len(query_order) == 0: - query_order = query_order_main.copy() - - qidx = query_order.pop() - logger.debug("Executing qidx: %d", qidx) - query = query_bank[qidx] - - try: - engine = None - now = datetime.now().astimezone(pytz.utc) - if args.time_scale_factor is not None: - time_unsimulated = get_time_of_the_day_unsimulated( - now, args.time_scale_factor - ) - time_unsimulated_str = time_in_minute_to_datetime_str( - time_unsimulated - ) - else: - time_unsimulated_str = "xxx" + time_unsimulated_str = "xxx" + + start = time.time() + _, engine = database.execute_sync_with_engine(query) + end = time.time() + print( + "{},{},{},{},{},{}".format( + now, + (now - EXECUTE_START_TIME).total_seconds(), + time_unsimulated_str, + qidx, + end - start, + engine.value, + ), + file=file, + flush=True, + ) + rand_backoff = None - start = time.time() - _, engine = database.execute_sync_with_engine(query) - end = time.time() + except BradClientError as ex: + if ex.is_transient(): print( - "{},{},{},{},{},{}".format( - now, - (now - EXECUTE_START_TIME).total_seconds(), - time_unsimulated_str, - qidx, - end - start, - engine.value, - ), - file=file, + "Transient query error:", + ex.message(), flush=True, + file=sys.stderr, ) - rand_backoff = None - except BradClientError as ex: - if ex.is_transient(): - print( - "Transient query error:", - ex.message(), - flush=True, - file=sys.stderr, + if rand_backoff is None: + rand_backoff = RandomizedExponentialBackoff( + max_retries=10, + base_delay_s=2.0, + max_delay_s=timedelta(minutes=10).total_seconds(), ) - if rand_backoff is None: - rand_backoff = RandomizedExponentialBackoff( - max_retries=10, - base_delay_s=2.0, - max_delay_s=timedelta(minutes=10).total_seconds(), - ) - - # Delay retrying in the case of a transient error (this - # happens during blueprint transitions). - wait_s = rand_backoff.wait_time_s() - if wait_s is None: - print("Aborting benchmark. Too many transient errors.") - break - time.sleep(wait_s) + # Delay retrying in the case of a transient error (this + # happens during blueprint transitions). + wait_s = rand_backoff.wait_time_s() + if wait_s is None: + print("Aborting benchmark. Too many transient errors.") + break + time.sleep(wait_s) - else: - print( - "Unexpected query error:", - ex.message(), - flush=True, - file=sys.stderr, - ) + else: + print( + "Unexpected query error:", + ex.message(), + flush=True, + file=sys.stderr, + ) - try: - _ = stop_queue.get_nowait() - break - except queue.Empty: - pass + try: + _ = stop_queue.get_nowait() + break + except queue.Empty: + pass finally: + file.close() database.close_sync() @@ -271,7 +279,11 @@ def noop(_signal, _frame): # Signal that we're ready to start and wait for the controller. start_queue.put_nowait("") - _ = stop_queue.get() + msg = stop_queue.get() + + if msg == RUNNER_EXIT: + print(f"Simulation runner {runner_idx} is stopping without having started.") + return while True: if execution_gap_dist is not None: @@ -701,11 +713,10 @@ def signal_handler(_signal, _frame): print("Stopping all clients...", flush=True, file=sys.stderr) for i in range(args.num_clients): - stop_queue[i].put("") - - # stop again, just incase some client hasn't started yet - for i in range(args.num_clients): - stop_queue[i].put("") + # N.B. This process will hang if there is unconsumed data in the + # cross-process queues. Please be careful when modifying how these + # queues are used. + stop_queue[i].put(RUNNER_EXIT) print("Waiting for the clients to complete.") for p in processes: