Skip to content

Commit

Permalink
Avoid sharing file descriptors when forking
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Apr 17, 2024
1 parent b0e0407 commit bce92da
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
39 changes: 25 additions & 14 deletions workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import time
import multiprocessing
import pathlib
import traceback
from configparser import ConfigParser
from pprint import pprint, pformat

Expand Down Expand Up @@ -192,20 +193,25 @@ def startExecution(driverClass, scaleParameters, args, config):
## executorFunc
## ==============================================
def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index):
driver = driverClass(args["ddl"])
assert driver != None
logging.debug("Starting client execution: %s" % driver)

config["execute"] = True
config["reset"] = False
driver.loadConfig(config)

e = executor.Executor(driver, scaleParameters, stop_on_error=args["stop_on_error"])
driver.executeStart()
results = e.execute(args["duration"], worker_index)
driver.executeFinish()

return results
try:
driver = driverClass(args["ddl"])
assert driver != None
logging.debug("Starting client execution: %s" % driver)

config["execute"] = True
config["reset"] = False
driver.loadConfig(config)

e = executor.Executor(driver, scaleParameters, stop_on_error=args["stop_on_error"])
driver.executeStart()
results = e.execute(args["duration"], worker_index)
driver.executeFinish()

return results
except Exception as ex:
print("Error in worker", worker_index, str(ex))
print(traceback.format_exc())
raise


## DEF
Expand All @@ -214,6 +220,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index
## main
## ==============================================
if __name__ == "__main__":
# On Unix platforms, the default way to start a process is by forking, which
# is not ideal (we do not want to duplicate this process' file
# descriptors!).
multiprocessing.set_start_method("spawn")

aparser = argparse.ArgumentParser(
description="Python implementation of the TPC-C Benchmark"
)
Expand Down
29 changes: 29 additions & 0 deletions workloads/chbenchmark/py-tpcc/pytpcc/util/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
self.txn_abort_counters: Dict[str, int] = {}
self.txn_times: Dict[str, float] = {}
self.running: Dict[str, Tuple[str, float, datetime]] = {}
self.options = options

if options is not None and "record_detailed" in options:
worker_index = options["worker_index"]
Expand All @@ -78,6 +79,34 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
self._lat_sample_prob = 0.0
self._prng = None

def __getstate__(self) -> object:
# We need a custom implementation to avoid serializing the file handles.
return {
"start": self.start,
"stop": self.stop,
"txn_id": self.txn_id,
"txn_counters": self.txn_counters,
"txn_abort_counters": self.txn_abort_counters,
"txn_times": self.txn_times,
"running": self.running,
"options": self.options,
}

def __setstate__(self, d: Dict[Any, Any]) -> None:
self.start = d["start"]
self.stop = d["stop"]
self.txn_id = d["txn_id"]
self.txn_counters = d["txn_counters"]
self.txn_abort_counters = d["txn_abort_counters"]
self.txn_times = d["txn_times"]
self.running = d["running"]
self.options = d["options"]

self._lat_file = None
self._stats_file = None
self._lat_sample_prob = 0.0
self._prng = None

def startBenchmark(self):
"""Mark the benchmark as having been started"""
assert self.start == None
Expand Down

0 comments on commit bce92da

Please sign in to comment.