diff --git a/experiments/15-e2e-scenarios-v2/common.sh b/experiments/15-e2e-scenarios-v2/common.sh index a2604b19..87869a6c 100644 --- a/experiments/15-e2e-scenarios-v2/common.sh +++ b/experiments/15-e2e-scenarios-v2/common.sh @@ -46,6 +46,19 @@ function graceful_shutdown() { wait $brad_pid } +function terminate_process_group() { + local pid=$1 + local initial_wait_s=$2 + sleep $2 + if kill -0 $pid >/dev/null 2>&1; then + pkill -KILL -P $pid + pkill -KILL $pid + echo "NOTE: Forced process $pid to stop." + else + echo "Process $pid stopped gracefully." + fi +} + function log_workload_point() { msg=$1 now=$(date --utc "+%Y-%m-%d %H:%M:%S") @@ -117,7 +130,7 @@ function start_repeating_olap_runner() { mkdir -p $results_dir log_workload_point $results_name - COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_repeating_analytics.py "${args[@]}" & + COND_OUT=$results_dir python3.11 ../../../workloads/IMDB_extended/run_repeating_analytics.py "${args[@]}" & # This is a special return value variable that we use. runner_pid=$! diff --git a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh index 383c802e..1132eca5 100755 --- a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh +++ b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh @@ -65,6 +65,7 @@ sleep 30 log_workload_point "start_rana_8" start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" rana_pid=$runner_pid +log_workload_point "started_rana_8_$rana_pid" sleep 2 # Start with 4 transactional clients; hold for 10 minutes to stabilize. @@ -94,24 +95,27 @@ sleep $((20 * 60)) # 32 mins total; 42 mins cumulative log_workload_point "start_heavy_rana_8" start_repeating_olap_runner 8 15 1 $heavier_queries "ra_8_heavy" 8 heavy_rana_pid=$runner_pid +log_workload_point "started_heavy_rana_8_$heavy_rana_pid" sleep $((20 * 60)) # 20 mins total; 62 mins cumulative log_workload_point "stopping_heavy_rana_8" kill -INT $heavy_rana_pid -wait $heavy_rana_pid +terminate_process_group $heavy_rana_pid 10 log_workload_point "start_heavy_rana_10" start_repeating_olap_runner 10 5 1 $heavier_queries "ra_10_heavy" 8 heavy_rana_pid=$runner_pid +log_workload_point "started_heavy_rana_10_$heavy_rana_pid" sleep $((10 * 60)) # 10 mins total; 72 mins cumulative log_workload_point "stopping_heavy_rana_10" kill -INT $heavy_rana_pid -wait $heavy_rana_pid +terminate_process_group $heavy_rana_pid 10 log_workload_point "start_heavy_rana_20" start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8 heavy_rana_pid=$runner_pid +log_workload_point "started_heavy_rana_20_$heavy_rana_pid" sleep $((30 * 60)) # 30 mins total; 102 mins cumulative log_workload_point "experiment_workload_done" diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index 627923dd..dcb21aca 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -93,6 +93,7 @@ def noop(_signal, _frame): query_frequency = query_frequency[queries] query_frequency = query_frequency / np.sum(query_frequency) + exec_count = 0 file = open( out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), "w", @@ -181,8 +182,13 @@ def noop(_signal, _frame): file=file, flush=True, ) + exec_count += 1 rand_backoff = None + if exec_count % 20 == 0: + # To avoid data loss if this script crashes. + os.fsync(file.fileno()) + except BradClientError as ex: if ex.is_transient(): print( @@ -217,12 +223,16 @@ def noop(_signal, _frame): try: _ = stop_queue.get_nowait() + print(f"Runner {runner_idx} is exiting.") break except queue.Empty: pass finally: + os.fsync(file.fileno()) file.close() database.close_sync() + # Make sure the queue is drained. + _ = stop_queue.get_nowait() def simulation_runner(