Skip to content

Commit

Permalink
Add verbose logging to the transaction runner
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed May 12, 2024
1 parent 5aa6bd3 commit a081664
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from datetime import datetime
from pprint import pprint, pformat
from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff
from brad.utils import create_custom_logger
from typing import Optional

from .. import constants
Expand Down Expand Up @@ -103,6 +104,13 @@ def execute(
logging.info("Not recording detailed stats.")
options = {}

verbose_log_dir = out_path / "verbose_logs"
verbose_log_dir.mkdir(exist_ok=True)
verbose_logger = create_custom_logger(
"txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_index}.log")
)
verbose_logger.info("[T %d] Workload starting...", worker_index)

# Compute warehouse ranges.
self.worker_index = worker_index
self.total_workers = total_workers
Expand All @@ -117,7 +125,7 @@ def execute(
logging.info(
"Worker index %d - Warehouse range: %d to %d (inclusive)",
self.worker_index,
*self.local_warehouse_range
*self.local_warehouse_range,
)

if zipfian_alpha is not None:
Expand Down Expand Up @@ -145,9 +153,19 @@ def execute(
if debug:
logging.debug("Executing '%s' transaction" % txn)
try:
verbose_logger.info("[T %d] Issuing transaction %s", worker_index, txn)
val = self.driver.executeTransaction(txn, params)
backoff = None
# if debug: logging.debug("%s\nParameters:\n%s\nResult:\n%s" % (txn, pformat(params), pformat(val)))
r.stopTransaction(txn_id)
verbose_logger.info(
"[T %d] Finished transaction %s, %d", worker_index, txn, txn_id
)

except KeyboardInterrupt:
verbose_logger.info(
"[T %d] Aborting early due to KeyboardInterrupt", worker_index
)
return -1
except (Exception, AssertionError) as ex:
if debug:
Expand All @@ -156,6 +174,7 @@ def execute(
elif random.random() < 0.01:
logging.warning("Aborted transaction: %s: %s", txn, ex)
traceback.print_exc(file=sys.stdout)
verbose_logger.exception("[T %d] Ran into error", worker_index)
if self.stop_on_error:
raise
r.abortTransaction(txn_id)
Expand All @@ -168,16 +187,16 @@ def execute(
)
wait_s = backoff.wait_time_s()
if wait_s is not None:
verbose_logger.info(
"[T %d] Backing off for %.4f seconds", worker_index, wait_s
)
time.sleep(wait_s)

continue

# if debug: logging.debug("%s\nParameters:\n%s\nResult:\n%s" % (txn, pformat(params), pformat(val)))

r.stopTransaction(txn_id)
## WHILE

verbose_logger.info("[T %d] Benchmark stopping...", worker_index)
r.stopBenchmark()
verbose_logger.info("[T %d] Benchmark done.", worker_index)
return r

## DEF
Expand Down

0 comments on commit a081664

Please sign in to comment.