From a081664249cc08931447dac706a983d953f907af Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Sun, 12 May 2024 16:22:03 -0400 Subject: [PATCH] Add verbose logging to the transaction runner --- .../py-tpcc/pytpcc/runtime/executor.py | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index f10f111f..3ea3eec9 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -41,6 +41,7 @@ from datetime import datetime from pprint import pprint, pformat from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff +from brad.utils import create_custom_logger from typing import Optional from .. import constants @@ -103,6 +104,13 @@ def execute( logging.info("Not recording detailed stats.") options = {} + verbose_log_dir = out_path / "verbose_logs" + verbose_log_dir.mkdir(exist_ok=True) + verbose_logger = create_custom_logger( + "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_index}.log") + ) + verbose_logger.info("[T %d] Workload starting...", worker_index) + # Compute warehouse ranges. self.worker_index = worker_index self.total_workers = total_workers @@ -117,7 +125,7 @@ def execute( logging.info( "Worker index %d - Warehouse range: %d to %d (inclusive)", self.worker_index, - *self.local_warehouse_range + *self.local_warehouse_range, ) if zipfian_alpha is not None: @@ -145,9 +153,19 @@ def execute( if debug: logging.debug("Executing '%s' transaction" % txn) try: + verbose_logger.info("[T %d] Issuing transaction %s", worker_index, txn) val = self.driver.executeTransaction(txn, params) backoff = None + # if debug: logging.debug("%s\nParameters:\n%s\nResult:\n%s" % (txn, pformat(params), pformat(val))) + r.stopTransaction(txn_id) + verbose_logger.info( + "[T %d] Finished transaction %s, %d", worker_index, txn, txn_id + ) + except KeyboardInterrupt: + verbose_logger.info( + "[T %d] Aborting early due to KeyboardInterrupt", worker_index + ) return -1 except (Exception, AssertionError) as ex: if debug: @@ -156,6 +174,7 @@ def execute( elif random.random() < 0.01: logging.warning("Aborted transaction: %s: %s", txn, ex) traceback.print_exc(file=sys.stdout) + verbose_logger.exception("[T %d] Ran into error", worker_index) if self.stop_on_error: raise r.abortTransaction(txn_id) @@ -168,16 +187,16 @@ def execute( ) wait_s = backoff.wait_time_s() if wait_s is not None: + verbose_logger.info( + "[T %d] Backing off for %.4f seconds", worker_index, wait_s + ) time.sleep(wait_s) - continue - - # if debug: logging.debug("%s\nParameters:\n%s\nResult:\n%s" % (txn, pformat(params), pformat(val))) - - r.stopTransaction(txn_id) ## WHILE + verbose_logger.info("[T %d] Benchmark stopping...", worker_index) r.stopBenchmark() + verbose_logger.info("[T %d] Benchmark done.", worker_index) return r ## DEF