Skip to content

Commit

Permalink
Fix bug in repeating analytics runner shutdown control
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 13, 2023
1 parent e605ea7 commit 17f6665
Showing 1 changed file with 121 additions and 110 deletions.
231 changes: 121 additions & 110 deletions workloads/IMDB_extended/run_repeating_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc)
ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"]

RUNNER_EXIT = "runner_exit"


def get_time_of_the_day_unsimulated(
now: datetime, time_scale_factor: Optional[int]
Expand Down Expand Up @@ -91,129 +93,135 @@ def noop(_signal, _frame):
query_frequency = query_frequency[queries]
query_frequency = query_frequency / np.sum(query_frequency)

file = open(
out_dir / "repeating_olap_batch_{}.csv".format(runner_idx),
"w",
encoding="UTF-8",
)

try:
with open(
out_dir / "repeating_olap_batch_{}.csv".format(runner_idx),
"w",
encoding="UTF-8",
) as file:
print(
"timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine",
file=file,
flush=True,
)
print(
"timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine",
file=file,
flush=True,
)

prng = random.Random(args.seed ^ runner_idx)
rand_backoff = None
prng = random.Random(args.seed ^ runner_idx)
rand_backoff = None

logger.info(
"[Repeating Analytics Runner %d] Queries to run: %s",
runner_idx,
queries,
)
query_order_main = queries.copy()
prng.shuffle(query_order_main)
query_order = query_order_main.copy()
logger.info(
"[Repeating Analytics Runner %d] Queries to run: %s",
runner_idx,
queries,
)
query_order_main = queries.copy()
prng.shuffle(query_order_main)
query_order = query_order_main.copy()

# Signal that we're ready to start and wait for the controller.
start_queue.put_nowait("")
msg = stop_queue.get()

if msg == RUNNER_EXIT:
print(f"Runner {runner_idx} is stopping without having started.")
return

while True:
if execution_gap_dist is not None:
now = datetime.now().astimezone(pytz.utc)
time_unsimulated = get_time_of_the_day_unsimulated(
now, args.time_scale_factor
)
wait_for_s = execution_gap_dist[
int(time_unsimulated / (60 * 24) * len(execution_gap_dist))
]
time.sleep(wait_for_s)
elif args.avg_gap_s is not None:
# Wait times are normally distributed if execution_gap_dist is not provided.
wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s)
if wait_for_s < 0.0:
wait_for_s = 0.0
time.sleep(wait_for_s)

if query_frequency is not None:
qidx = prng.choices(queries, list(query_frequency))[0]
else:
if len(query_order) == 0:
query_order = query_order_main.copy()

# Signal that we're ready to start and wait for the controller.
start_queue.put_nowait("")
_ = stop_queue.get()
qidx = query_order.pop()
logger.debug("Executing qidx: %d", qidx)
query = query_bank[qidx]

while True:
if execution_gap_dist is not None:
now = datetime.now().astimezone(pytz.utc)
try:
engine = None
now = datetime.now().astimezone(pytz.utc)
if args.time_scale_factor is not None:
time_unsimulated = get_time_of_the_day_unsimulated(
now, args.time_scale_factor
)
wait_for_s = execution_gap_dist[
int(time_unsimulated / (60 * 24) * len(execution_gap_dist))
]
time.sleep(wait_for_s)
elif args.avg_gap_s is not None:
# Wait times are normally distributed if execution_gap_dist is not provided.
wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s)
if wait_for_s < 0.0:
wait_for_s = 0.0
time.sleep(wait_for_s)

if query_frequency is not None:
qidx = prng.choices(queries, list(query_frequency))[0]
time_unsimulated_str = time_in_minute_to_datetime_str(
time_unsimulated
)
else:
if len(query_order) == 0:
query_order = query_order_main.copy()

qidx = query_order.pop()
logger.debug("Executing qidx: %d", qidx)
query = query_bank[qidx]

try:
engine = None
now = datetime.now().astimezone(pytz.utc)
if args.time_scale_factor is not None:
time_unsimulated = get_time_of_the_day_unsimulated(
now, args.time_scale_factor
)
time_unsimulated_str = time_in_minute_to_datetime_str(
time_unsimulated
)
else:
time_unsimulated_str = "xxx"
time_unsimulated_str = "xxx"

start = time.time()
_, engine = database.execute_sync_with_engine(query)
end = time.time()
print(
"{},{},{},{},{},{}".format(
now,
(now - EXECUTE_START_TIME).total_seconds(),
time_unsimulated_str,
qidx,
end - start,
engine.value,
),
file=file,
flush=True,
)
rand_backoff = None

start = time.time()
_, engine = database.execute_sync_with_engine(query)
end = time.time()
except BradClientError as ex:
if ex.is_transient():
print(
"{},{},{},{},{},{}".format(
now,
(now - EXECUTE_START_TIME).total_seconds(),
time_unsimulated_str,
qidx,
end - start,
engine.value,
),
file=file,
"Transient query error:",
ex.message(),
flush=True,
file=sys.stderr,
)
rand_backoff = None

except BradClientError as ex:
if ex.is_transient():
print(
"Transient query error:",
ex.message(),
flush=True,
file=sys.stderr,
if rand_backoff is None:
rand_backoff = RandomizedExponentialBackoff(
max_retries=10,
base_delay_s=2.0,
max_delay_s=timedelta(minutes=10).total_seconds(),
)

if rand_backoff is None:
rand_backoff = RandomizedExponentialBackoff(
max_retries=10,
base_delay_s=2.0,
max_delay_s=timedelta(minutes=10).total_seconds(),
)

# Delay retrying in the case of a transient error (this
# happens during blueprint transitions).
wait_s = rand_backoff.wait_time_s()
if wait_s is None:
print("Aborting benchmark. Too many transient errors.")
break
time.sleep(wait_s)
# Delay retrying in the case of a transient error (this
# happens during blueprint transitions).
wait_s = rand_backoff.wait_time_s()
if wait_s is None:
print("Aborting benchmark. Too many transient errors.")
break
time.sleep(wait_s)

else:
print(
"Unexpected query error:",
ex.message(),
flush=True,
file=sys.stderr,
)
else:
print(
"Unexpected query error:",
ex.message(),
flush=True,
file=sys.stderr,
)

try:
_ = stop_queue.get_nowait()
break
except queue.Empty:
pass
try:
_ = stop_queue.get_nowait()
break
except queue.Empty:
pass
finally:
file.close()
database.close_sync()


Expand Down Expand Up @@ -271,7 +279,11 @@ def noop(_signal, _frame):

# Signal that we're ready to start and wait for the controller.
start_queue.put_nowait("")
_ = stop_queue.get()
msg = stop_queue.get()

if msg == RUNNER_EXIT:
print(f"Simulation runner {runner_idx} is stopping without having started.")
return

while True:
if execution_gap_dist is not None:
Expand Down Expand Up @@ -701,11 +713,10 @@ def signal_handler(_signal, _frame):

print("Stopping all clients...", flush=True, file=sys.stderr)
for i in range(args.num_clients):
stop_queue[i].put("")

# stop again, just incase some client hasn't started yet
for i in range(args.num_clients):
stop_queue[i].put("")
# N.B. This process will hang if there is unconsumed data in the
# cross-process queues. Please be careful when modifying how these
# queues are used.
stop_queue[i].put(RUNNER_EXIT)

print("Waiting for the clients to complete.")
for p in processes:
Expand Down

0 comments on commit 17f6665

Please sign in to comment.