From b7c0a0b4d3c24d0e1a35f89097a9235beec622dd Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 11:26:08 -0400 Subject: [PATCH 1/8] Add TPC-C calibration experiment configs --- .../calibration/transactions/chbenchmark/COND | 44 +++++ .../chbenchmark/retrieve_metrics.py | 117 ++++++++++++ .../transactions/chbenchmark/run_instance.sh | 84 +++++++++ .../chbenchmark/system_config_chbench.yml | 167 ++++++++++++++++++ .../py-tpcc/pytpcc/runtime/executor.py | 6 +- workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py | 14 +- 6 files changed, 428 insertions(+), 4 deletions(-) create mode 100644 tools/calibration/transactions/chbenchmark/COND create mode 100644 tools/calibration/transactions/chbenchmark/retrieve_metrics.py create mode 100755 tools/calibration/transactions/chbenchmark/run_instance.sh create mode 100644 tools/calibration/transactions/chbenchmark/system_config_chbench.yml diff --git a/tools/calibration/transactions/chbenchmark/COND b/tools/calibration/transactions/chbenchmark/COND new file mode 100644 index 00000000..7f7a63cf --- /dev/null +++ b/tools/calibration/transactions/chbenchmark/COND @@ -0,0 +1,44 @@ +RUN_FOR_S = 3 * 60 + 30 # 3 minutes 30 seconds +NUM_CLIENTS = { + "db.t4g.medium": [1, 2, 4, 8, 16, 32], + "db.r6g.large": [1, 2, 4, 8, 16, 32], + "db.r6g.xlarge": [1, 2, 4, 8, 16, 32, 64], + "db.r6g.2xlarge": [1, 2, 4, 8, 16, 32, 64, 128], + "db.r6g.4xlarge": [1, 2, 4, 8, 16, 32, 64, 128, 256], +} + +INSTANCES = list(NUM_CLIENTS.keys()) + +COND_INSTANCES = { + instance: instance.replace(".", "_").replace("db.", "") for instance in INSTANCES +} + +combine( + name="all", + deps=[ + ":{}".format(instance) + for instance in INSTANCES + ], +) + +for instance in INSTANCES: + run_experiment_group( + name=instance, + run="./run_instance.sh", + experiments=[ + ExperimentInstance( + name="{}-{}".format(COND_INSTANCES[instance], clients), + options={ + "t-clients": clients, + "run-for-s": RUN_FOR_S, + "system-config-file": "system_config_chbench.yml", + "physical-config-file": "../../../../config/physical_config_chbench.yml", + "txn-warehouses": 1740, + "txn-config-file": "aurora.config", + "schema-name": "chbenchmark", + "instance": instance, + }, + ) + for clients in NUM_CLIENTS[instance] + ], + ) diff --git a/tools/calibration/transactions/chbenchmark/retrieve_metrics.py b/tools/calibration/transactions/chbenchmark/retrieve_metrics.py new file mode 100644 index 00000000..11d52998 --- /dev/null +++ b/tools/calibration/transactions/chbenchmark/retrieve_metrics.py @@ -0,0 +1,117 @@ +import argparse +import asyncio +from datetime import timedelta +from typing import List + +from brad.config.file import ConfigFile +from brad.provisioning.directory import Directory +from brad.daemon.metrics_def import MetricDef +from brad.daemon.perf_insights import PerfInsightsClient + + +BASE_METRICS = [ + "os.loadAverageMinute.one", + "os.loadAverageMinute.five", + "os.loadAverageMinute.fifteen", + "os.cpuUtilization.system", + "os.cpuUtilization.total", + "os.cpuUtilization.user", + "os.diskIO.avgQueueLen", + "os.diskIO.tps", + "os.diskIO.util", + "os.diskIO.readIOsPS", + "os.diskIO.readKbPS", + "os.diskIO.writeIOsPS", + "os.diskIO.writeKbPS", + "os.network.rx", + "os.network.tx", + "os.memory.active", + "os.memory.dirty", + "os.memory.free", + "os.memory.writeback", + "os.memory.total", + "os.tasks.blocked", + "os.tasks.running", + "os.tasks.sleeping", + "os.tasks.stopped", + "os.tasks.total", + "db.SQL.queries", + "db.SQL.total_query_time", + "db.SQL.tup_deleted", + "db.SQL.tup_fetched", + "db.SQL.tup_inserted", + "db.SQL.tup_returned", + "db.SQL.tup_updated", + "db.Transactions.active_transactions", + "db.Transactions.blocked_transactions", + "db.Transactions.duration_commits", + "db.Transactions.xact_commit", + "db.Transactions.xact_rollback", + # NOTE: Aurora has specific storage metrics (probably because they use a custom storage engine) + # https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_PerfInsights_Counters.html#USER_PerfInsights_Counters.Aurora_PostgreSQL + "os.diskIO.auroraStorage.auroraStorageBytesRx", + "os.diskIO.auroraStorage.auroraStorageBytesTx", + "os.diskIO.auroraStorage.diskQueueDepth", + "os.diskIO.auroraStorage.readThroughput", + "os.diskIO.auroraStorage.writeThroughput", + "os.diskIO.auroraStorage.readLatency", + "os.diskIO.auroraStorage.writeLatency", + "os.diskIO.auroraStorage.readIOsPS", + "os.diskIO.auroraStorage.writeIOsPS", +] + +ALL_METRICS: List[MetricDef] = [] +for m in BASE_METRICS: + # N.B. The metrics are reported no more than once a minute. So + # average/max/min will all report the same number. + ALL_METRICS.append((m, "avg")) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--instance-id", + type=str, + help="The Aurora instance's identifier.", + ) + parser.add_argument( + "--physical-config-file", + type=str, + help="Used to specify the Aurora instance instead of by an ID.", + ) + parser.add_argument( + "--out-file", + type=str, + required=True, + help="The path where the results should be saved.", + ) + parser.add_argument( + "--num-prev-points", + type=int, + default=60, + help="The number of metric data points to retrieve.", + ) + args = parser.parse_args() + + if args.instance_id is not None: + client = PerfInsightsClient.from_instance_identifier( + instance_identifier=args.instance_id + ) + elif args.config_file is not None: + config = ConfigFile.load_from_physical_config(args.physical_config_file) + directory = Directory(config) + asyncio.run(directory.refresh()) + client = PerfInsightsClient(resource_id=directory.aurora_writer().resource_id()) + else: + raise RuntimeError() + + metrics = client.fetch_metrics( + ALL_METRICS, + period=timedelta(minutes=1), + num_prev_points=args.num_prev_points, + ) + metrics.to_csv(args.out_file) + + +if __name__ == "__main__": + main() diff --git a/tools/calibration/transactions/chbenchmark/run_instance.sh b/tools/calibration/transactions/chbenchmark/run_instance.sh new file mode 100755 index 00000000..a9ebd60e --- /dev/null +++ b/tools/calibration/transactions/chbenchmark/run_instance.sh @@ -0,0 +1,84 @@ +#! /bin/bash + +function extract_named_arguments() { + # Evaluates any environment variables in this script's arguments. This script + # should only be run on trusted input. + orig_args=($@) + for val in "${orig_args[@]}"; do + phys_arg=$(eval "echo $val") + + if [[ $phys_arg =~ --t-clients=.+ ]]; then + t_clients=${phys_arg:12} + fi + + if [[ $phys_arg =~ --run-for-s=.+ ]]; then + run_for_s=${phys_arg:12} + fi + + if [[ $phys_arg =~ --system-config-file=.+ ]]; then + system_config_file=${phys_arg:21} + fi + + if [[ $phys_arg =~ --physical-config-file=.+ ]]; then + physical_config_file=${phys_arg:23} + fi + + if [[ $phys_arg =~ --txn-warehouses=.+ ]]; then + txn_warehouses=${phys_arg:17} + fi + + if [[ $phys_arg =~ --txn-config-file=.+ ]]; then + txn_config_file=${phys_arg:18} + fi + + if [[ $phys_arg =~ --schema-name=.+ ]]; then + schema_name=${phys_arg:14} + fi + + if [[ $phys_arg =~ --instance=.+ ]]; then + instance=${phys_arg:11} + fi + done +} + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +extract_named_arguments $@ + +abs_txn_config_file=$(realpath $txn_config_file) +abs_physical_config_file=$(realpath $physical_config_file) +abs_system_config_file=$(realpath $system_config_file) + +>&2 echo "Adjusting blueprint" +brad admin --debug modify_blueprint \ + --schema-name $schema_name \ + --physical-config-file $abs_physical_config_file \ + --system-config-file $abs_system_config_file \ + --aurora-instance-type $instance \ + --aurora-num-nodes 1 + +>&2 echo "Waiting 30 seconds before retrieving pre-metrics..." +sleep 30 + +>&2 echo "Retrieving pre-metrics..." +python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics_before.csv --physical-config-file $physical_config_file + +>&2 echo "Running the transactional workload..." + +# We run against Aurora directly. +pushd ../../../../workloads/chbenchmark/py-tpcc/ +RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc aurora \ + --no-load \ + --config $abs_txn_config_file \ + --warehouses $txn_warehouses \ + --duration $run_for_s \ + --clients $t_clients \ + --scalefactor 1 \ + --lat-sample-prob 0.25 +popd + +>&2 echo "Waiting 10 seconds before retrieving metrics..." +sleep 10 + +>&2 echo "Retrieving metrics..." +python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics.csv --physical-config-file $physical_config_file diff --git a/tools/calibration/transactions/chbenchmark/system_config_chbench.yml b/tools/calibration/transactions/chbenchmark/system_config_chbench.yml new file mode 100644 index 00000000..c279878d --- /dev/null +++ b/tools/calibration/transactions/chbenchmark/system_config_chbench.yml @@ -0,0 +1,167 @@ +# This file contains configurations that are used by BRAD. These are default +# values and should be customized for specific situations. + +# BRAD's front end servers will listen for client connections on this interface +# and port. If `num_front_ends` is greater than one, subsequent front ends will +# listen on successive ports (e.g., 6584, 6585, etc.). +front_end_interface: "0.0.0.0" +front_end_port: 6583 +num_front_ends: 1 + +# If installed and enabled, BRAD will serve its UI from a webserver that listens +# for connections on this network interface and port. +ui_interface: "0.0.0.0" +ui_port: 7583 + +# Logging paths. If the value is in ALL_CAPS (with underscores), it is +# interpreted as an environment variable (BRAD will log to the path stored in +# the environment variable). + +# Where BRAD's daemon process will write its logs. +daemon_log_file: COND_OUT + +# Where BRAD's front end processes will write their logs. +front_end_log_path: COND_OUT + +# Where BRAD's blueprint planner will write debug logs. +planner_log_path: COND_OUT + +# Where BRAD's metrics loggers will write their logs. +metrics_log_path: COND_OUT + +# Probability that each transactional query will be logged. +txn_log_prob: 0.10 + +# Set to a non-zero value enable automatic data syncing. When this is set to 0, +# automatic syncing is disabled. +data_sync_period_seconds: 0 + +# BRAD's front end servers will report their metrics at regular intervals. +front_end_metrics_reporting_period_seconds: 30 +front_end_query_latency_buffer_size: 100 + +# `default` means to use the policy encoded in the blueprint. Other values will +# override the blueprint. +routing_policy: always_aurora + +# Whether to disable table movement for benchmark purposes (i.e., keep all +# tables on all engines.) +disable_table_movement: true + +# Epoch length for metrics and forecasting. This is the granularity at which +# metrics/forecasting will be performed. +epoch_length: + weeks: 0 + days: 0 + hours: 0 + minutes: 1 + +# Blueprint planning strategy. +strategy: fp_query_based_beam + +# Used to specify the period of time over which to use data for planning. +# Currrently, this is a "look behind" window for the workload. +planning_window: + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + +# Used to aggregate metrics collected in the planning window. +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + +# Used during planning. +reinterpret_second_as: 1 + +# The query distribution must change by at least this much for a new blueprint +# to be accepted. +query_dist_change_frac: 0.1 + +# The search bound for the provisioning. +max_provisioning_multiplier: 2.5 + +# Flag options for blueprint planning. +use_io_optimized_aurora: true +use_recorded_routing_if_available: true +ensure_tables_together_on_one_engine: true + +# Loads used to prime the system when no information is available. +aurora_initialize_load_fraction: 0.25 +redshift_initialize_load_fraction: 0.25 + +# BRAD will not reduce predicted load lower than these values. Raise these +# values to be more conservative against mispredictions. +aurora_min_load_removal_fraction: 0.8 +redshift_min_load_removal_fraction: 0.8 + +# Blueprint planning performance ceilings. +query_latency_p90_ceiling_s: 30.0 +txn_latency_p90_ceiling_s: 0.030 + +# If set to true, BRAD will attempt to use the specified preset Redshift +# clusters instead of resizing the main Redshift cluster. +use_preset_redshift_clusters: false + +# Used for ordering blueprints during planning. +comparator: + type: benefit_perf_ceiling # or `perf_ceiling` + + benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + + penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator + penalty_power: 8 # Only used by the `benefit_perf_ceiling` comparator + +# Used for precomputed predictions. +std_datasets: + - name: regular + path: workloads/IMDB_100GB/regular_test/ + - name: adhoc + path: workloads/IMDB_100GB/adhoc_test/ + +# Blueprint planning trigger configs. + +triggers: + enabled: false + check_period_s: 90 # Triggers are checked every X seconds. + check_period_offset_s: 360 # Wait 6 mins before starting. + + # Triggers will not fire for at least this many minutes after a new blueprint + # takes effect. Usually this should be greater than zero to give BRAD + # sufficient time to observe the effect of the blueprint on the workload. BRAD + # may wait longer to ensure metrics are also available for this many minutes. + observe_new_blueprint_mins: 3 + + elapsed_time: + disabled: true + multiplier: 60 # Multiplier over `planning_window`. + + redshift_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + aurora_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + variable_costs: + disabled: true + threshold: 1.0 + + query_latency_ceiling: + ceiling_s: 30.0 + sustained_epochs: 3 + + txn_latency_ceiling: + ceiling_s: 0.030 + sustained_epochs: 3 + + recent_change: + delay_epochs: 5 diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index 5f3baa0c..196de8b2 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -54,7 +54,9 @@ def __init__(self, driver, scaleParameters, stop_on_error=False): ## DEF - def execute(self, duration: float, worker_index: int) -> results.Results: + def execute( + self, duration: float, worker_index: int, lat_sample_prob: float + ) -> results.Results: if RECORD_DETAILED_STATS_VAR in os.environ: import conductor.lib as cond @@ -76,7 +78,7 @@ def execute(self, duration: float, worker_index: int) -> results.Results: "record_detailed": True, "worker_index": worker_index, "output_prefix": out_path, - "lat_sample_prob": 0.10, # Sampled 10% + "lat_sample_prob": lat_sample_prob, } else: logging.info("Not recording detailed stats.") diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py index 25f43f89..bcf2abce 100755 --- a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py @@ -211,7 +211,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index driver, scaleParameters, stop_on_error=args["stop_on_error"] ) driver.executeStart() - results = e.execute(args["duration"], worker_index) + results = e.execute(args["duration"], worker_index, args["lat_sample_prob"]) driver.executeFinish() return results @@ -290,6 +290,12 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index aparser.add_argument( "--debug", action="store_true", help="Enable debug log messages" ) + aparser.add_argument( + "--lat-sample-prob", + type=float, + default=0.1, + help="The fraction of the transaction latencies to record.", + ) args = vars(aparser.parse_args()) if args["debug"]: @@ -367,7 +373,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index driver, scaleParameters, stop_on_error=args["stop_on_error"] ) driver.executeStart() - results = e.execute(args["duration"], worker_index=0) + results = e.execute( + args["duration"], + worker_index=0, + lat_sample_prob=args["lat_sample_prob"], + ) driver.executeFinish() else: results = startExecution(driverClass, scaleParameters, args, config) From c148313bf9d6c5b2d7fa900e78882e3196fd1ba0 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 15:35:18 +0000 Subject: [PATCH 2/8] Fixes --- tools/calibration/transactions/chbenchmark/retrieve_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/calibration/transactions/chbenchmark/retrieve_metrics.py b/tools/calibration/transactions/chbenchmark/retrieve_metrics.py index 11d52998..862541b3 100644 --- a/tools/calibration/transactions/chbenchmark/retrieve_metrics.py +++ b/tools/calibration/transactions/chbenchmark/retrieve_metrics.py @@ -97,7 +97,7 @@ def main(): client = PerfInsightsClient.from_instance_identifier( instance_identifier=args.instance_id ) - elif args.config_file is not None: + elif args.physical_config_file is not None: config = ConfigFile.load_from_physical_config(args.physical_config_file) directory = Directory(config) asyncio.run(directory.refresh()) From f9b17d26daa055d4b6dd7f072613a6e6ad789c40 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 15:36:06 +0000 Subject: [PATCH 3/8] Adjust order --- tools/calibration/transactions/chbenchmark/COND | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/calibration/transactions/chbenchmark/COND b/tools/calibration/transactions/chbenchmark/COND index 7f7a63cf..6ba76914 100644 --- a/tools/calibration/transactions/chbenchmark/COND +++ b/tools/calibration/transactions/chbenchmark/COND @@ -1,10 +1,10 @@ RUN_FOR_S = 3 * 60 + 30 # 3 minutes 30 seconds NUM_CLIENTS = { - "db.t4g.medium": [1, 2, 4, 8, 16, 32], - "db.r6g.large": [1, 2, 4, 8, 16, 32], "db.r6g.xlarge": [1, 2, 4, 8, 16, 32, 64], "db.r6g.2xlarge": [1, 2, 4, 8, 16, 32, 64, 128], "db.r6g.4xlarge": [1, 2, 4, 8, 16, 32, 64, 128, 256], + "db.r6g.large": [1, 2, 4, 8, 16, 32], + "db.t4g.medium": [1, 2, 4, 8, 16, 32], } INSTANCES = list(NUM_CLIENTS.keys()) From 2399baba753f3feaf17fe060c1e17f773b0248b0 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 15:36:44 +0000 Subject: [PATCH 4/8] Conductor file fixes --- tools/calibration/transactions/chbenchmark/COND | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/calibration/transactions/chbenchmark/COND b/tools/calibration/transactions/chbenchmark/COND index 6ba76914..f59e559a 100644 --- a/tools/calibration/transactions/chbenchmark/COND +++ b/tools/calibration/transactions/chbenchmark/COND @@ -16,14 +16,14 @@ COND_INSTANCES = { combine( name="all", deps=[ - ":{}".format(instance) + ":{}".format(COND_INSTANCES[instance]) for instance in INSTANCES ], ) for instance in INSTANCES: run_experiment_group( - name=instance, + name=COND_INSTANCES[instance], run="./run_instance.sh", experiments=[ ExperimentInstance( From db9aa0f2309bd2a509d7109e5b3c1608ada9e4af Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 19:52:17 +0000 Subject: [PATCH 5/8] Fix paths, reduce verbosity --- tools/calibration/transactions/chbenchmark/run_instance.sh | 4 ++-- workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/calibration/transactions/chbenchmark/run_instance.sh b/tools/calibration/transactions/chbenchmark/run_instance.sh index a9ebd60e..dfe0c6b5 100755 --- a/tools/calibration/transactions/chbenchmark/run_instance.sh +++ b/tools/calibration/transactions/chbenchmark/run_instance.sh @@ -61,7 +61,7 @@ brad admin --debug modify_blueprint \ sleep 30 >&2 echo "Retrieving pre-metrics..." -python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics_before.csv --physical-config-file $physical_config_file +python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics_before.csv --physical-config-file $abs_physical_config_file >&2 echo "Running the transactional workload..." @@ -81,4 +81,4 @@ popd sleep 10 >&2 echo "Retrieving metrics..." -python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics.csv --physical-config-file $physical_config_file +python3 retrieve_metrics.py --out-file $COND_OUT/pi_metrics.csv --physical-config-file $abs_physical_config_file diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index 196de8b2..574d85ef 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -101,8 +101,8 @@ def execute( except KeyboardInterrupt: return -1 except (Exception, AssertionError) as ex: - logging.warn("Failed to execute Transaction '%s': %s" % (txn, ex)) if debug: + logging.warn("Failed to execute Transaction '%s': %s" % (txn, ex)) traceback.print_exc(file=sys.stdout) if self.stop_on_error: raise From 9a4600653bcddd75687fd5cabfd69cbd367cb2b0 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 20:38:46 +0000 Subject: [PATCH 6/8] Suppress noisy abort errors --- .../chbenchmark/py-tpcc/pytpcc/constants.py | 2 ++ .../py-tpcc/pytpcc/drivers/auroradriver.py | 27 ++++++++++++------- .../py-tpcc/pytpcc/drivers/braddriver.py | 27 ++++++++++++------- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/constants.py b/workloads/chbenchmark/py-tpcc/pytpcc/constants.py index 52afd4dd..e72e8c66 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/constants.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/constants.py @@ -166,3 +166,5 @@ def enum(*sequential, **named): "PAYMENT", "STOCK_LEVEL", ) + +NONSILENT_ERRORS_VAR = "NONSILENT_ERRORS" diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py index f1f68b5f..fe0b0267 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py @@ -1,6 +1,7 @@ import logging import traceback import decimal +import os from typing import Dict, Tuple, Any, Optional, List from .abstractdriver import * @@ -84,6 +85,7 @@ def __init__(self, ddl: str) -> None: self._connection: Optional[PsycopgConnection] = None self._cursor: Optional[PsycopgCursor] = None self._config: Dict[str, Any] = {} + self._nonsilent_errs = constants.NONSILENT_ERRORS_VAR in os.environ def makeDefaultConfig(self) -> Config: return AuroraDriver.DEFAULT_CONFIG @@ -172,8 +174,9 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return result except Exception as ex: - print("Error in DELIVERY", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in DELIVERY", str(ex)) + print(traceback.format_exc()) raise def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -362,8 +365,9 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [customer_info, misc, item_data] except Exception as ex: - print("Error in NEWORDER", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in NEWORDER", str(ex)) + print(traceback.format_exc()) raise def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -415,8 +419,9 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [customer, order, orderLines] except Exception as ex: - print("Error in ORDER_STATUS", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in ORDER_STATUS", str(ex)) + print(traceback.format_exc()) raise def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -527,8 +532,9 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [warehouse, district, customer] except Exception as ex: - print("Error in PAYMENT", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in PAYMENT", str(ex)) + print(traceback.format_exc()) raise def doStockLevel(self, params: Dict[str, Any]) -> int: @@ -559,6 +565,7 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: return int(result[0]) except Exception as ex: - print("Error in STOCK_LEVEL", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in STOCK_LEVEL", str(ex)) + print(traceback.format_exc()) raise diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py index 99e0cc53..0eb6307a 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py @@ -1,6 +1,7 @@ import logging import traceback import decimal +import os from typing import Dict, Tuple, Any, Optional, List from .abstractdriver import * @@ -83,6 +84,7 @@ def __init__(self, ddl: str) -> None: super().__init__("brad", ddl) self._client: Optional[BradGrpcClient] = None self._config: Dict[str, Any] = {} + self._nonsilent_errs = constants.NONSILENT_ERRORS_VAR in os.environ def makeDefaultConfig(self) -> Config: return BradDriver.DEFAULT_CONFIG @@ -168,8 +170,9 @@ def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return result except Exception as ex: - print("Error in DELIVERY", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in DELIVERY", str(ex)) + print(traceback.format_exc()) raise def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -355,8 +358,9 @@ def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [customer_info, misc, item_data] except Exception as ex: - print("Error in NEWORDER", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in NEWORDER", str(ex)) + print(traceback.format_exc()) raise def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -404,8 +408,9 @@ def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [customer, order, orderLines] except Exception as ex: - print("Error in ORDER_STATUS", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in ORDER_STATUS", str(ex)) + print(traceback.format_exc()) raise def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: @@ -512,8 +517,9 @@ def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: return [warehouse, district, customer] except Exception as ex: - print("Error in PAYMENT", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in PAYMENT", str(ex)) + print(traceback.format_exc()) raise def doStockLevel(self, params: Dict[str, Any]) -> int: @@ -542,6 +548,7 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: return int(result[0]) except Exception as ex: - print("Error in STOCK_LEVEL", str(ex)) - print(traceback.format_exc()) + if self._nonsilent_errs: + print("Error in STOCK_LEVEL", str(ex)) + print(traceback.format_exc()) raise From e938afc7dd5980c4fd64903cebe0750f7bcf2def Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 21:59:54 +0000 Subject: [PATCH 7/8] Fix warehouse ID generation --- .../py-tpcc/pytpcc/runtime/executor.py | 52 ++++++++++++++++--- workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py | 10 +++- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index 574d85ef..3e2c43de 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -47,15 +47,27 @@ class Executor: - def __init__(self, driver, scaleParameters, stop_on_error=False): + def __init__(self, driver, scaleParameters, stop_on_error=False, pct_remote=0.1): self.driver = driver self.scaleParameters = scaleParameters self.stop_on_error = stop_on_error + self.pct_remote = pct_remote + + self.local_warehouse_range = ( + self.scaleParameters.starting_warehouse, + self.scaleParameters.ending_warehouse, + ) + self.total_workers = 1 + self.worker_index = 0 ## DEF def execute( - self, duration: float, worker_index: int, lat_sample_prob: float + self, + duration: float, + worker_index: int, + total_workers: int, + lat_sample_prob: float, ) -> results.Results: if RECORD_DETAILED_STATS_VAR in os.environ: import conductor.lib as cond @@ -84,6 +96,23 @@ def execute( logging.info("Not recording detailed stats.") options = {} + # Compute warehouse ranges. + self.worker_index = worker_index + self.total_workers = total_workers + warehouses_per_worker = self.scaleParameters.warehouses // total_workers + min_warehouse = worker_index * warehouses_per_worker + # N.B. Warehouse IDs are 1-based and this range is supposed to be + # inclusive. + self.local_warehouse_range = ( + min_warehouse + 1, + min_warehouse + warehouses_per_worker, + ) + logging.info( + "Worker index %d - Warehouse range: %d to %d (inclusive)", + self.worker_index, + *self.local_warehouse_range + ) + r = results.Results(options) assert r logging.info("Executing benchmark for %d seconds" % duration) @@ -310,10 +339,21 @@ def generateStockLevelParams(self): ## DEF def makeWarehouseId(self): - w_id = rand.number( - self.scaleParameters.starting_warehouse, - self.scaleParameters.ending_warehouse, - ) + if random.random() < self.pct_remote: + # Generate remote. + while True: + w_id = rand.number( + self.scaleParameters.starting_warehouse, + self.scaleParameters.ending_warehouse, + ) + if self.total_workers == 1 or not ( + w_id >= self.local_warehouse_range[0] + and w_id <= self.local_warehouse_range[1] + ): + break + else: + w_id = rand.number(*self.local_warehouse_range) + assert w_id >= self.scaleParameters.starting_warehouse, ( "Invalid W_ID: %d" % w_id ) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py index bcf2abce..2273933f 100755 --- a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py @@ -170,6 +170,7 @@ def startExecution(driverClass, scaleParameters, args, config): config, debug, i, + args["clients"], ), ) worker_results.append(r) @@ -196,7 +197,9 @@ def startExecution(driverClass, scaleParameters, args, config): ## ============================================== ## executorFunc ## ============================================== -def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index): +def executorFunc( + driverClass, scaleParameters, args, config, debug, worker_index, total_workers +): try: driver = driverClass(args["ddl"]) assert driver != None @@ -211,7 +214,9 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index driver, scaleParameters, stop_on_error=args["stop_on_error"] ) driver.executeStart() - results = e.execute(args["duration"], worker_index, args["lat_sample_prob"]) + results = e.execute( + args["duration"], worker_index, total_workers, args["lat_sample_prob"] + ) driver.executeFinish() return results @@ -376,6 +381,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index results = e.execute( args["duration"], worker_index=0, + total_workers=1, lat_sample_prob=args["lat_sample_prob"], ) driver.executeFinish() From 79191292106665149b0bd306d1bbb63cd5c248b0 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 25 Apr 2024 23:04:13 +0000 Subject: [PATCH 8/8] Ensure rollback, add backoff --- .../py-tpcc/pytpcc/drivers/abstractdriver.py | 6 ++++++ .../py-tpcc/pytpcc/drivers/auroradriver.py | 6 ++++++ .../py-tpcc/pytpcc/drivers/braddriver.py | 6 ++++++ .../py-tpcc/pytpcc/runtime/executor.py | 18 ++++++++++++++++++ 4 files changed, 36 insertions(+) diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/abstractdriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/abstractdriver.py index 5de514e5..301d4d89 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/abstractdriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/abstractdriver.py @@ -185,5 +185,11 @@ def doStockLevel(self, params): "%s does not implement doStockLevel" % (self.driver_name) ) + def ensureRollback(self) -> None: + """ + Makes sure the transaction has rolled back. + """ + # Default: no-op. + ## CLASS diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py index fe0b0267..79e65ebc 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py @@ -569,3 +569,9 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: print("Error in STOCK_LEVEL", str(ex)) print(traceback.format_exc()) raise + + def ensureRollback(self) -> None: + """ + Makes sure the transaction has rolled back. + """ + self._cursor.execute_sync("ROLLBACK") diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py index 0eb6307a..9458a0c1 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py @@ -552,3 +552,9 @@ def doStockLevel(self, params: Dict[str, Any]) -> int: print("Error in STOCK_LEVEL", str(ex)) print(traceback.format_exc()) raise + + def ensureRollback(self) -> None: + """ + Makes sure the transaction has rolled back. + """ + self._client.run_query_ignore_results("ROLLBACK") diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py index 3e2c43de..c25bce1c 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/runtime/executor.py @@ -39,6 +39,8 @@ import pathlib from datetime import datetime from pprint import pprint, pformat +from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff +from typing import Optional from .. import constants from ..util import * @@ -118,6 +120,7 @@ def execute( logging.info("Executing benchmark for %d seconds" % duration) start = r.startBenchmark() debug = logging.getLogger().isEnabledFor(logging.DEBUG) + backoff: Optional[RandomizedExponentialBackoff] = None while (time.time() - start) <= duration: txn, params = self.doOne() @@ -127,15 +130,30 @@ def execute( logging.debug("Executing '%s' transaction" % txn) try: val = self.driver.executeTransaction(txn, params) + backoff = None except KeyboardInterrupt: return -1 except (Exception, AssertionError) as ex: if debug: logging.warn("Failed to execute Transaction '%s': %s" % (txn, ex)) traceback.print_exc(file=sys.stdout) + elif random.random() < 0.01: + logging.warning("Aborted transaction: %s: %s", txn, ex) + traceback.print_exc(file=sys.stdout) if self.stop_on_error: raise r.abortTransaction(txn_id) + self.driver.ensureRollback() + + # Back off slightly. + if backoff is None: + backoff = RandomizedExponentialBackoff( + max_retries=10, base_delay_s=0.001, max_delay_s=1.0 + ) + wait_s = backoff.wait_time_s() + if wait_s is not None: + time.sleep(wait_s) + continue # if debug: logging.debug("%s\nParameters:\n%s\nResult:\n%s" % (txn, pformat(params), pformat(val)))