Skip to content

Commit

Permalink
Modify the TPC-C runner to record more detailed stats (#493)
Browse files Browse the repository at this point in the history
* Record more detailed transaction stats that we need

* Set up connections to the new recording infrastructure

* Avoid sharing file descriptors when forking

* Fix formatting
  • Loading branch information
geoffxy authored Apr 17, 2024
1 parent a071bcc commit 4b0ac46
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 29 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"typing-extensions",
"types-tabulate",
"matplotlib",
"conductor-cli",
]

UI_REQUIRES = [
Expand Down
35 changes: 33 additions & 2 deletions workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import random
import traceback
import logging
import os
import pathlib
from datetime import datetime
from pprint import pprint, pformat

from .. import constants
from ..util import *

RECORD_DETAILED_STATS_VAR = "RECORD_DETAILED_STATS"


class Executor:

Expand All @@ -51,8 +55,35 @@ def __init__(self, driver, scaleParameters, stop_on_error=False):

## DEF

def execute(self, duration):
r = results.Results()
def execute(self, duration: float, worker_index: int) -> results.Results:
if RECORD_DETAILED_STATS_VAR in os.environ:
import conductor.lib as cond

try:
out_path = cond.get_output_path()
logging.info("Writing detailed stats to %s", str(out_path))
except ImportError:
logging.warning(
"Conductor not installed. Detailed stats will be saved to the current working directory."
)
out_path = pathlib.Path(".")
except RuntimeError:
logging.warning(
"Workload is not orchestrated by Conductor. Detailed stats will be saved to the current working directory."
)
out_path = pathlib.Path(".")

options = {
"record_detailed": True,
"worker_index": worker_index,
"output_prefix": out_path,
"lat_sample_prob": 0.10, # Sampled 10%
}
else:
logging.info("Not recording detailed stats.")
options = {}

r = results.Results(options)
assert r
logging.info("Executing benchmark for %d seconds" % duration)
start = r.startBenchmark()
Expand Down
42 changes: 28 additions & 14 deletions workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import time
import multiprocessing
import pathlib
import traceback
from configparser import ConfigParser
from pprint import pprint, pformat

Expand Down Expand Up @@ -164,6 +165,7 @@ def startExecution(driverClass, scaleParameters, args, config):
args,
config,
debug,
i,
),
)
worker_results.append(r)
Expand All @@ -190,21 +192,28 @@ def startExecution(driverClass, scaleParameters, args, config):
## ==============================================
## executorFunc
## ==============================================
def executorFunc(driverClass, scaleParameters, args, config, debug):
driver = driverClass(args["ddl"])
assert driver != None
logging.debug("Starting client execution: %s" % driver)

config["execute"] = True
config["reset"] = False
driver.loadConfig(config)
def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index):
try:
driver = driverClass(args["ddl"])
assert driver != None
logging.debug("Starting client execution: %s" % driver)

e = executor.Executor(driver, scaleParameters, stop_on_error=args["stop_on_error"])
driver.executeStart()
results = e.execute(args["duration"])
driver.executeFinish()
config["execute"] = True
config["reset"] = False
driver.loadConfig(config)

return results
e = executor.Executor(
driver, scaleParameters, stop_on_error=args["stop_on_error"]
)
driver.executeStart()
results = e.execute(args["duration"], worker_index)
driver.executeFinish()

return results
except Exception as ex:
print("Error in worker", worker_index, str(ex))
print(traceback.format_exc())
raise


## DEF
Expand All @@ -213,6 +222,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug):
## main
## ==============================================
if __name__ == "__main__":
# On Unix platforms, the default way to start a process is by forking, which
# is not ideal (we do not want to duplicate this process' file
# descriptors!).
multiprocessing.set_start_method("spawn")

aparser = argparse.ArgumentParser(
description="Python implementation of the TPC-C Benchmark"
)
Expand Down Expand Up @@ -347,7 +361,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug):
driver, scaleParameters, stop_on_error=args["stop_on_error"]
)
driver.executeStart()
results = e.execute(args["duration"])
results = e.execute(args["duration"], worker_index=0)
driver.executeFinish()
else:
results = startExecution(driverClass, scaleParameters, args, config)
Expand Down
120 changes: 107 additions & 13 deletions workloads/chbenchmark/py-tpcc/pytpcc/util/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,86 @@

import logging
import time
import pathlib
import random
import pytz
from datetime import datetime
from typing import Dict, Any, Optional, Tuple
from io import TextIOWrapper

NAME_TO_IDX = {
"DELIVERY": 0,
"NEW_ORDER": 1,
"ORDER_STATUS": 2,
"PAYMENT": 3,
"STOCK_LEVEL": 4,
}


class Results:

def __init__(self):
def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
self.start = None
self.stop = None
self.txn_id = 0

self.txn_counters = {}
self.txn_times = {}
self.running = {}
self.txn_counters: Dict[str, int] = {}
self.txn_abort_counters: Dict[str, int] = {}
self.txn_times: Dict[str, float] = {}
self.running: Dict[str, Tuple[str, float, datetime]] = {}
self.options = options

if options is not None and "record_detailed" in options:
worker_index = options["worker_index"]
output_prefix = pathlib.Path(options["output_prefix"])
self._lat_file: Optional[TextIOWrapper] = open(
output_prefix / "oltp_latency_{}.csv".format(worker_index),
"w",
encoding="UTF-8",
)
self._stats_file: Optional[TextIOWrapper] = open(
output_prefix / "oltp_stats_{}.csv".format(worker_index),
"w",
encoding="UTF-8",
)
self._lat_sample_prob = options["lat_sample_prob"]
self._prng: Optional[random.Random] = random.Random(
worker_index
) # Deterministic pseudorandom.
print("txn_idx,timestamp,run_time_s", file=self._lat_file, flush=True)
print("stat,value", file=self._stats_file, flush=True)
else:
self._lat_file = None
self._stats_file = None
self._lat_sample_prob = 0.0
self._prng = None

def __getstate__(self) -> object:
# We need a custom implementation to avoid serializing the file handles.
return {
"start": self.start,
"stop": self.stop,
"txn_id": self.txn_id,
"txn_counters": self.txn_counters,
"txn_abort_counters": self.txn_abort_counters,
"txn_times": self.txn_times,
"running": self.running,
"options": self.options,
}

def __setstate__(self, d: Dict[Any, Any]) -> None:
self.start = d["start"]
self.stop = d["stop"]
self.txn_id = d["txn_id"]
self.txn_counters = d["txn_counters"]
self.txn_abort_counters = d["txn_abort_counters"]
self.txn_times = d["txn_times"]
self.running = d["running"]
self.options = d["options"]

self._lat_file = None
self._stats_file = None
self._lat_sample_prob = 0.0
self._prng = None

def startBenchmark(self):
"""Mark the benchmark as having been started"""
Expand All @@ -53,22 +121,40 @@ def stopBenchmark(self):
logging.debug("Stopping benchmark statistics collection")
self.stop = time.time()

def startTransaction(self, txn):
if self._lat_file is not None:
self._lat_file.close()
self._lat_file = None

if self._stats_file is not None:
for txn_name in NAME_TO_IDX.keys():
commits = self.txn_counters.get(txn_name, 0)
aborts = self.txn_abort_counters.get(txn_name, 0)
print(f"{txn_name.lower()}_commits,{commits}", file=self._stats_file)
print(f"{txn_name.lower()}_aborts,{aborts}", file=self._stats_file)
self._stats_file.close()
self._stats_file = None

def startTransaction(self, txn: str) -> int:
self.txn_id += 1
id = self.txn_id
self.running[id] = (txn, time.time())
self.running[id] = (txn, time.time(), datetime.now(tz=pytz.utc))
return id

def abortTransaction(self, id):
def abortTransaction(self, id: int) -> None:
"""Abort a transaction and discard its times"""
assert id in self.running
txn_name, txn_start = self.running[id]
txn_name, _, _ = self.running[id]
del self.running[id]

def stopTransaction(self, id):
if txn_name not in self.txn_abort_counters:
self.txn_abort_counters[txn_name] = 1
else:
self.txn_abort_counters[txn_name] += 1

def stopTransaction(self, id: int) -> None:
"""Record that the benchmark completed an invocation of the given transaction"""
assert id in self.running
txn_name, txn_start = self.running[id]
txn_name, txn_start, start_ts = self.running[id]
del self.running[id]

duration = time.time() - txn_start
Expand All @@ -78,7 +164,15 @@ def stopTransaction(self, id):
total_cnt = self.txn_counters.get(txn_name, 0)
self.txn_counters[txn_name] = total_cnt + 1

def append(self, r):
if self._prng is not None and self._lat_file is not None:
if self._prng.random() < self._lat_sample_prob:
print(
f"{NAME_TO_IDX[txn_name]},{start_ts},{duration}",
file=self._lat_file,
flush=True,
)

def append(self, r: "Results") -> None:
for txn_name in r.txn_counters.keys():
orig_cnt = self.txn_counters.get(txn_name, 0)
orig_time = self.txn_times.get(txn_name, 0)
Expand All @@ -90,7 +184,7 @@ def append(self, r):
self.start = r.start
self.stop = r.stop

def __str__(self):
def __str__(self) -> str:
return self.show()

def show(self, load_time=None):
Expand Down

0 comments on commit 4b0ac46

Please sign in to comment.