diff --git a/setup.py b/setup.py index 16ee6e04..9b5261ee 100644 --- a/setup.py +++ b/setup.py @@ -62,6 +62,7 @@ "typing-extensions", "types-tabulate", "matplotlib", + "conductor-cli", ] UI_REQUIRES = [ diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index 0f3bcee0..75e049ac 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -35,12 +35,16 @@ import random import traceback import logging +import os +import pathlib from datetime import datetime from pprint import pprint, pformat from .. import constants from ..util import * +RECORD_DETAILED_STATS_VAR = "RECORD_DETAILED_STATS" + class Executor: @@ -51,8 +55,35 @@ def __init__(self, driver, scaleParameters, stop_on_error=False): ## DEF - def execute(self, duration): - r = results.Results() + def execute(self, duration: float, worker_index: int) -> results.Results: + if RECORD_DETAILED_STATS_VAR in os.environ: + import conductor.lib as cond + + try: + out_path = cond.get_output_path() + logging.info("Writing detailed stats to %s", str(out_path)) + except ImportError: + logging.warning( + "Conductor not installed. Detailed stats will be saved to the current working directory." + ) + out_path = pathlib.Path(".") + except RuntimeError: + logging.warning( + "Workload is not orchestrated by Conductor. Detailed stats will be saved to the current working directory." + ) + out_path = pathlib.Path(".") + + options = { + "record_detailed": True, + "worker_index": worker_index, + "output_prefix": out_path, + "lat_sample_prob": 0.10, # Sampled 10% + } + else: + logging.info("Not recording detailed stats.") + options = {} + + r = results.Results(options) assert r logging.info("Executing benchmark for %d seconds" % duration) start = r.startBenchmark() diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py index d9684e7f..388a095d 100755 --- a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py @@ -36,6 +36,7 @@ import time import multiprocessing import pathlib +import traceback from configparser import ConfigParser from pprint import pprint, pformat @@ -164,6 +165,7 @@ def startExecution(driverClass, scaleParameters, args, config): args, config, debug, + i, ), ) worker_results.append(r) @@ -190,21 +192,28 @@ def startExecution(driverClass, scaleParameters, args, config): ## ============================================== ## executorFunc ## ============================================== -def executorFunc(driverClass, scaleParameters, args, config, debug): - driver = driverClass(args["ddl"]) - assert driver != None - logging.debug("Starting client execution: %s" % driver) - - config["execute"] = True - config["reset"] = False - driver.loadConfig(config) +def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index): + try: + driver = driverClass(args["ddl"]) + assert driver != None + logging.debug("Starting client execution: %s" % driver) - e = executor.Executor(driver, scaleParameters, stop_on_error=args["stop_on_error"]) - driver.executeStart() - results = e.execute(args["duration"]) - driver.executeFinish() + config["execute"] = True + config["reset"] = False + driver.loadConfig(config) - return results + e = executor.Executor( + driver, scaleParameters, stop_on_error=args["stop_on_error"] + ) + driver.executeStart() + results = e.execute(args["duration"], worker_index) + driver.executeFinish() + + return results + except Exception as ex: + print("Error in worker", worker_index, str(ex)) + print(traceback.format_exc()) + raise ## DEF @@ -213,6 +222,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): ## main ## ============================================== if __name__ == "__main__": + # On Unix platforms, the default way to start a process is by forking, which + # is not ideal (we do not want to duplicate this process' file + # descriptors!). + multiprocessing.set_start_method("spawn") + aparser = argparse.ArgumentParser( description="Python implementation of the TPC-C Benchmark" ) @@ -347,7 +361,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): driver, scaleParameters, stop_on_error=args["stop_on_error"] ) driver.executeStart() - results = e.execute(args["duration"]) + results = e.execute(args["duration"], worker_index=0) driver.executeFinish() else: results = startExecution(driverClass, scaleParameters, args, config) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/util/results.py b/workloads/chbenchmark/py-tpcc/pytpcc/util/results.py index 0add6883..b55cdb6c 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/util/results.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/util/results.py @@ -26,18 +26,86 @@ import logging import time +import pathlib +import random +import pytz +from datetime import datetime +from typing import Dict, Any, Optional, Tuple +from io import TextIOWrapper + +NAME_TO_IDX = { + "DELIVERY": 0, + "NEW_ORDER": 1, + "ORDER_STATUS": 2, + "PAYMENT": 3, + "STOCK_LEVEL": 4, +} class Results: - - def __init__(self): + def __init__(self, options: Optional[Dict[str, Any]] = None) -> None: self.start = None self.stop = None self.txn_id = 0 - self.txn_counters = {} - self.txn_times = {} - self.running = {} + self.txn_counters: Dict[str, int] = {} + self.txn_abort_counters: Dict[str, int] = {} + self.txn_times: Dict[str, float] = {} + self.running: Dict[str, Tuple[str, float, datetime]] = {} + self.options = options + + if options is not None and "record_detailed" in options: + worker_index = options["worker_index"] + output_prefix = pathlib.Path(options["output_prefix"]) + self._lat_file: Optional[TextIOWrapper] = open( + output_prefix / "oltp_latency_{}.csv".format(worker_index), + "w", + encoding="UTF-8", + ) + self._stats_file: Optional[TextIOWrapper] = open( + output_prefix / "oltp_stats_{}.csv".format(worker_index), + "w", + encoding="UTF-8", + ) + self._lat_sample_prob = options["lat_sample_prob"] + self._prng: Optional[random.Random] = random.Random( + worker_index + ) # Deterministic pseudorandom. + print("txn_idx,timestamp,run_time_s", file=self._lat_file, flush=True) + print("stat,value", file=self._stats_file, flush=True) + else: + self._lat_file = None + self._stats_file = None + self._lat_sample_prob = 0.0 + self._prng = None + + def __getstate__(self) -> object: + # We need a custom implementation to avoid serializing the file handles. + return { + "start": self.start, + "stop": self.stop, + "txn_id": self.txn_id, + "txn_counters": self.txn_counters, + "txn_abort_counters": self.txn_abort_counters, + "txn_times": self.txn_times, + "running": self.running, + "options": self.options, + } + + def __setstate__(self, d: Dict[Any, Any]) -> None: + self.start = d["start"] + self.stop = d["stop"] + self.txn_id = d["txn_id"] + self.txn_counters = d["txn_counters"] + self.txn_abort_counters = d["txn_abort_counters"] + self.txn_times = d["txn_times"] + self.running = d["running"] + self.options = d["options"] + + self._lat_file = None + self._stats_file = None + self._lat_sample_prob = 0.0 + self._prng = None def startBenchmark(self): """Mark the benchmark as having been started""" @@ -53,22 +121,40 @@ def stopBenchmark(self): logging.debug("Stopping benchmark statistics collection") self.stop = time.time() - def startTransaction(self, txn): + if self._lat_file is not None: + self._lat_file.close() + self._lat_file = None + + if self._stats_file is not None: + for txn_name in NAME_TO_IDX.keys(): + commits = self.txn_counters.get(txn_name, 0) + aborts = self.txn_abort_counters.get(txn_name, 0) + print(f"{txn_name.lower()}_commits,{commits}", file=self._stats_file) + print(f"{txn_name.lower()}_aborts,{aborts}", file=self._stats_file) + self._stats_file.close() + self._stats_file = None + + def startTransaction(self, txn: str) -> int: self.txn_id += 1 id = self.txn_id - self.running[id] = (txn, time.time()) + self.running[id] = (txn, time.time(), datetime.now(tz=pytz.utc)) return id - def abortTransaction(self, id): + def abortTransaction(self, id: int) -> None: """Abort a transaction and discard its times""" assert id in self.running - txn_name, txn_start = self.running[id] + txn_name, _, _ = self.running[id] del self.running[id] - def stopTransaction(self, id): + if txn_name not in self.txn_abort_counters: + self.txn_abort_counters[txn_name] = 1 + else: + self.txn_abort_counters[txn_name] += 1 + + def stopTransaction(self, id: int) -> None: """Record that the benchmark completed an invocation of the given transaction""" assert id in self.running - txn_name, txn_start = self.running[id] + txn_name, txn_start, start_ts = self.running[id] del self.running[id] duration = time.time() - txn_start @@ -78,7 +164,15 @@ def stopTransaction(self, id): total_cnt = self.txn_counters.get(txn_name, 0) self.txn_counters[txn_name] = total_cnt + 1 - def append(self, r): + if self._prng is not None and self._lat_file is not None: + if self._prng.random() < self._lat_sample_prob: + print( + f"{NAME_TO_IDX[txn_name]},{start_ts},{duration}", + file=self._lat_file, + flush=True, + ) + + def append(self, r: "Results") -> None: for txn_name in r.txn_counters.keys(): orig_cnt = self.txn_counters.get(txn_name, 0) orig_time = self.txn_times.get(txn_name, 0) @@ -90,7 +184,7 @@ def append(self, r): self.start = r.start self.stop = r.stop - def __str__(self): + def __str__(self) -> str: return self.show() def show(self, load_time=None):