diff --git a/experiments/17-chbenchmark/common.sh b/experiments/17-chbenchmark/common.sh new file mode 100644 index 00000000..04ad6a0b --- /dev/null +++ b/experiments/17-chbenchmark/common.sh @@ -0,0 +1,96 @@ +function start_brad() { + system_config_file=$1 + physical_config_file=$2 + + pushd ../../../ + brad daemon \ + --physical-config-file $physical_config_file \ + --system-config-file $system_config_file \ + --schema-name $schema_name \ + & + brad_pid=$! + popd +} + +function run_tpcc() { + pushd ../../../workloads/chbenchmark/py-tpcc/ + RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc brad \ + --no-load \ + --config $abs_txn_config_file \ + --warehouses $txn_warehouses \ + --duration $run_for_s \ + --clients $t_clients \ + --scalefactor $txn_scale_factor & + tpcc_pid=$! + popd +} + +function extract_named_arguments() { + # Evaluates any environment variables in this script's arguments. This script + # should only be run on trusted input. + orig_args=($@) + for val in "${orig_args[@]}"; do + phys_arg=$(eval "echo $val") + + if [[ $phys_arg =~ --ra-clients=.+ ]]; then + ra_clients=${phys_arg:13} + fi + + if [[ $phys_arg =~ --t-clients=.+ ]]; then + t_clients=${phys_arg:12} + fi + + if [[ $phys_arg =~ --ra-query-indexes=.+ ]]; then + ra_query_indexes=${phys_arg:19} + fi + + if [[ $phys_arg =~ --ra-query-bank-file=.+ ]]; then + ra_query_bank_file=${phys_arg:21} + fi + + if [[ $phys_arg =~ --ra-gap-s=.+ ]]; then + ra_gap_s=${phys_arg:11} + fi + + if [[ $phys_arg =~ --ra-gap-std-s=.+ ]]; then + ra_gap_std_s=${phys_arg:15} + fi + + if [[ $phys_arg =~ --num-front-ends=.+ ]]; then + num_front_ends=${phys_arg:17} + fi + + if [[ $phys_arg =~ --run-for-s=.+ ]]; then + run_for_s=${phys_arg:12} + fi + + if [[ $phys_arg =~ --physical-config-file=.+ ]]; then + physical_config_file=${phys_arg:23} + fi + + if [[ $phys_arg =~ --system-config-file=.+ ]]; then + system_config_file=${phys_arg:21} + fi + + if [[ $phys_arg =~ --schema-name=.+ ]]; then + schema_name=${phys_arg:14} + fi + + if [[ $phys_arg =~ --query-sequence-file=.+ ]]; then + query_sequence_file=${phys_arg:22} + fi + + if [[ $phys_arg =~ --txn-scale-factor=.+ ]]; then + txn_scale_factor=${phys_arg:19} + fi + + if [[ $phys_arg =~ --txn-warehouses=.+ ]]; then + txn_warehouses=${phys_arg:17} + fi + + if [[ $phys_arg =~ --txn-config-file=.+ ]]; then + txn_config_file=${phys_arg:18} + fi + done +} + diff --git a/experiments/17-chbenchmark/debug/.gitignore b/experiments/17-chbenchmark/debug/.gitignore new file mode 100644 index 00000000..0949a3cb --- /dev/null +++ b/experiments/17-chbenchmark/debug/.gitignore @@ -0,0 +1 @@ +aurora.config diff --git a/experiments/17-chbenchmark/debug/COND b/experiments/17-chbenchmark/debug/COND new file mode 100644 index 00000000..f00864cf --- /dev/null +++ b/experiments/17-chbenchmark/debug/COND @@ -0,0 +1,26 @@ +run_command( + name="txn_lat", + run="./run_tpcc.sh", + options={ + "physical-config-file": "../../../config/physical_config_chbench.yml", + "system-config-file": "debug_config.yml", # Relative to one level up. + "txn-config-file": "brad.config", + "schema-name": "chbenchmark", + "txn-warehouses": 1740, + "txn-scale-factor": 1, # TBD + "t-clients": 1, # TBD + "run-for-s": 180, + }, +) + +run_command( + name="aurora_direct", + run="./run_aurora_direct.sh", + options={ + "txn-config-file": "aurora.config", + "txn-warehouses": 1740, + "txn-scale-factor": 1, # TBD + "t-clients": 1, # TBD + "run-for-s": 180, + }, +) diff --git a/experiments/17-chbenchmark/debug/brad.config b/experiments/17-chbenchmark/debug/brad.config new file mode 100644 index 00000000..c71fe1e5 --- /dev/null +++ b/experiments/17-chbenchmark/debug/brad.config @@ -0,0 +1,6 @@ +# BradDriver Configuration File +[brad] +host = localhost +port = 6583 +isolation_level = REPEATABLE READ +use_worker_offset = true diff --git a/experiments/17-chbenchmark/debug/debug_config.yml b/experiments/17-chbenchmark/debug/debug_config.yml new file mode 100644 index 00000000..c279878d --- /dev/null +++ b/experiments/17-chbenchmark/debug/debug_config.yml @@ -0,0 +1,167 @@ +# This file contains configurations that are used by BRAD. These are default +# values and should be customized for specific situations. + +# BRAD's front end servers will listen for client connections on this interface +# and port. If `num_front_ends` is greater than one, subsequent front ends will +# listen on successive ports (e.g., 6584, 6585, etc.). +front_end_interface: "0.0.0.0" +front_end_port: 6583 +num_front_ends: 1 + +# If installed and enabled, BRAD will serve its UI from a webserver that listens +# for connections on this network interface and port. +ui_interface: "0.0.0.0" +ui_port: 7583 + +# Logging paths. If the value is in ALL_CAPS (with underscores), it is +# interpreted as an environment variable (BRAD will log to the path stored in +# the environment variable). + +# Where BRAD's daemon process will write its logs. +daemon_log_file: COND_OUT + +# Where BRAD's front end processes will write their logs. +front_end_log_path: COND_OUT + +# Where BRAD's blueprint planner will write debug logs. +planner_log_path: COND_OUT + +# Where BRAD's metrics loggers will write their logs. +metrics_log_path: COND_OUT + +# Probability that each transactional query will be logged. +txn_log_prob: 0.10 + +# Set to a non-zero value enable automatic data syncing. When this is set to 0, +# automatic syncing is disabled. +data_sync_period_seconds: 0 + +# BRAD's front end servers will report their metrics at regular intervals. +front_end_metrics_reporting_period_seconds: 30 +front_end_query_latency_buffer_size: 100 + +# `default` means to use the policy encoded in the blueprint. Other values will +# override the blueprint. +routing_policy: always_aurora + +# Whether to disable table movement for benchmark purposes (i.e., keep all +# tables on all engines.) +disable_table_movement: true + +# Epoch length for metrics and forecasting. This is the granularity at which +# metrics/forecasting will be performed. +epoch_length: + weeks: 0 + days: 0 + hours: 0 + minutes: 1 + +# Blueprint planning strategy. +strategy: fp_query_based_beam + +# Used to specify the period of time over which to use data for planning. +# Currrently, this is a "look behind" window for the workload. +planning_window: + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + +# Used to aggregate metrics collected in the planning window. +metrics_agg: + method: ewm # 'mean' is another option + alpha: 0.86466472 # 1 - 1 / e^2 + +# Used during planning. +reinterpret_second_as: 1 + +# The query distribution must change by at least this much for a new blueprint +# to be accepted. +query_dist_change_frac: 0.1 + +# The search bound for the provisioning. +max_provisioning_multiplier: 2.5 + +# Flag options for blueprint planning. +use_io_optimized_aurora: true +use_recorded_routing_if_available: true +ensure_tables_together_on_one_engine: true + +# Loads used to prime the system when no information is available. +aurora_initialize_load_fraction: 0.25 +redshift_initialize_load_fraction: 0.25 + +# BRAD will not reduce predicted load lower than these values. Raise these +# values to be more conservative against mispredictions. +aurora_min_load_removal_fraction: 0.8 +redshift_min_load_removal_fraction: 0.8 + +# Blueprint planning performance ceilings. +query_latency_p90_ceiling_s: 30.0 +txn_latency_p90_ceiling_s: 0.030 + +# If set to true, BRAD will attempt to use the specified preset Redshift +# clusters instead of resizing the main Redshift cluster. +use_preset_redshift_clusters: false + +# Used for ordering blueprints during planning. +comparator: + type: benefit_perf_ceiling # or `perf_ceiling` + + benefit_horizon: # Only used by the `benefit_perf_ceiling` comparator + weeks: 0 + days: 0 + hours: 1 + minutes: 0 + + penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator + penalty_power: 8 # Only used by the `benefit_perf_ceiling` comparator + +# Used for precomputed predictions. +std_datasets: + - name: regular + path: workloads/IMDB_100GB/regular_test/ + - name: adhoc + path: workloads/IMDB_100GB/adhoc_test/ + +# Blueprint planning trigger configs. + +triggers: + enabled: false + check_period_s: 90 # Triggers are checked every X seconds. + check_period_offset_s: 360 # Wait 6 mins before starting. + + # Triggers will not fire for at least this many minutes after a new blueprint + # takes effect. Usually this should be greater than zero to give BRAD + # sufficient time to observe the effect of the blueprint on the workload. BRAD + # may wait longer to ensure metrics are also available for this many minutes. + observe_new_blueprint_mins: 3 + + elapsed_time: + disabled: true + multiplier: 60 # Multiplier over `planning_window`. + + redshift_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + aurora_cpu: + lo: 15 + hi: 85 + sustained_epochs: 3 + + variable_costs: + disabled: true + threshold: 1.0 + + query_latency_ceiling: + ceiling_s: 30.0 + sustained_epochs: 3 + + txn_latency_ceiling: + ceiling_s: 0.030 + sustained_epochs: 3 + + recent_change: + delay_epochs: 5 diff --git a/experiments/17-chbenchmark/debug/run_aurora_direct.sh b/experiments/17-chbenchmark/debug/run_aurora_direct.sh new file mode 100755 index 00000000..df6b232a --- /dev/null +++ b/experiments/17-chbenchmark/debug/run_aurora_direct.sh @@ -0,0 +1,18 @@ +#! /bin/bash + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source ../common.sh +extract_named_arguments $@ + +# Resolve paths into absolute paths +abs_txn_config_file=$(realpath $txn_config_file) + +cd ../../../workloads/chbenchmark/py-tpcc/ +RECORD_DETAILED_STATS=1 python3 -m pytpcc.tpcc aurora \ + --no-load \ + --config $abs_txn_config_file \ + --warehouses $txn_warehouses \ + --duration $run_for_s \ + --clients $t_clients \ + --scalefactor $txn_scale_factor diff --git a/experiments/17-chbenchmark/debug/run_tpcc.sh b/experiments/17-chbenchmark/debug/run_tpcc.sh new file mode 100755 index 00000000..658f3087 --- /dev/null +++ b/experiments/17-chbenchmark/debug/run_tpcc.sh @@ -0,0 +1,27 @@ +#! /bin/bash + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source ../common.sh +extract_named_arguments $@ + +# Resolve paths into absolute paths +abs_txn_config_file=$(realpath $txn_config_file) +abs_system_config_file=$(realpath $system_config_file) +abs_physical_config_file=$(realpath $physical_config_file) + +export BRAD_IGNORE_BLUEPRINT=1 +start_brad $abs_system_config_file $abs_physical_config_file + +# Wait for BRAD to start up. +sleep 30 + +# Start the TPC-C workload. +run_tpcc + +sleep $run_for_s +sleep 10 + +kill $tpcc_pid +kill $brad_pid +wait $brad_pid diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py new file mode 100644 index 00000000..f1f68b5f --- /dev/null +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/auroradriver.py @@ -0,0 +1,564 @@ +import logging +import traceback +import decimal +from typing import Dict, Tuple, Any, Optional, List + +from .abstractdriver import * +from .. import constants + +from brad.connection.psycopg_connection import PsycopgConnection +from brad.connection.psycopg_cursor import PsycopgCursor + +Config = Dict[str, Tuple[str, Any]] + +logger = logging.getLogger(__name__) + + +TXN_QUERIES = { + "DELIVERY": { + "getNewOrder": "SELECT no_o_id FROM new_order WHERE no_d_id = {} AND no_w_id = {} AND no_o_id > -1 LIMIT 1", # + "deleteNewOrder": "DELETE FROM new_order WHERE no_d_id = {} AND no_w_id = {} AND no_o_id = {}", # d_id, w_id, no_o_id + "getCId": "SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", # no_o_id, d_id, w_id + "updateOrders": "UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", # o_carrier_id, no_o_id, d_id, w_id + "updateOrderLine": "UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", # o_entry_d, no_o_id, d_id, w_id + "sumOLAmount": "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", # no_o_id, d_id, w_id + "updateCustomer": "UPDATE customer SET c_balance = c_balance + {} WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", # ol_total, c_id, d_id, w_id + }, + "NEW_ORDER": { + "getWarehouseTaxRate": "SELECT w_tax FROM warehouse WHERE w_id = {}", # w_id + "getDistrict": "SELECT d_tax, d_next_o_id FROM district WHERE d_id = {} AND d_w_id = {}", # d_id, w_id + "incrementNextOrderId": "UPDATE district SET d_next_o_id = {} WHERE d_id = {} AND d_w_id = {}", # d_next_o_id, d_id, w_id + "getCustomer": "SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id + "createOrder": "INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_carrier_id, o_ol_cnt, o_all_local) VALUES ({}, {}, {}, {}, '{}', {}, {}, {})", # d_next_o_id, d_id, w_id, c_id, o_entry_d, o_carrier_id, o_ol_cnt, o_all_local + "createNewOrder": "INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES ({}, {}, {})", # o_id, d_id, w_id + "getItemInfo": "SELECT i_price, i_name, i_data FROM item WHERE i_id = {}", # ol_i_id + "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dist_{:02d} FROM stock WHERE s_i_id = {} AND s_w_id = {}", # d_id, ol_i_id, ol_supply_w_id + "updateStock": "UPDATE stock SET s_quantity = {}, s_ytd = {}, s_order_cnt = {}, s_remote_cnt = {} WHERE s_i_id = {} AND s_w_id = {}", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id + "createOrderLine": "INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ({}, {}, {}, {}, {}, {}, '{}', {}, {}, '{}')", # o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info + }, + "ORDER_STATUS": { + "getCustomerByCustomerId": "SELECT c_id, c_first, c_middle, c_last, c_balance FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id + "getCustomersByLastName": "SELECT c_id, c_first, c_middle, c_last, c_balance FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", # w_id, d_id, c_last + "getLastOrder": "SELECT o_id, o_carrier_id, o_entry_d FROM orders WHERE o_w_id = {} AND o_d_id = {} AND o_c_id = {} ORDER BY o_id DESC LIMIT 1", # w_id, d_id, c_id + "getOrderLines": "SELECT ol_supply_w_id, ol_i_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = {} AND ol_d_id = {} AND ol_o_id = {}", # w_id, d_id, o_id + }, + "PAYMENT": { + "getWarehouse": "SELECT w_name, w_street_1, w_street_2, w_city, w_state, w_zip FROM warehouse WHERE w_id = {}", # w_id + "updateWarehouseBalance": "UPDATE warehouse SET w_ytd = w_ytd + {} WHERE w_id = {}", # h_amount, w_id + "getDistrict": "SELECT d_name, d_street_1, d_street_2, d_city, d_state, d_zip FROM district WHERE d_w_id = {} AND d_id = {}", # w_id, d_id + "updateDistrictBalance": "UPDATE district SET d_ytd = d_ytd + {} WHERE d_w_id = {} AND d_id = {}", # h_amount, d_w_id, d_id + "getCustomerByCustomerId": "SELECT c_id, c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim, c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_data FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # w_id, d_id, c_id + "getCustomersByLastName": "SELECT c_id, c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim, c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_data FROM customer WHERE c_w_id = {} AND c_d_id = {} AND c_last = '{}' ORDER BY c_first", # w_id, d_id, c_last + "updateBCCustomer": "UPDATE customer SET c_balance = {}, c_ytd_payment = {}, c_payment_cnt = {}, c_data = '{}' WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # c_balance, c_ytd_payment, c_payment_cnt, c_data, c_w_id, c_d_id, c_id + "updateGCCustomer": "UPDATE customer SET c_balance = {}, c_ytd_payment = {}, c_payment_cnt = {} WHERE c_w_id = {} AND c_d_id = {} AND c_id = {}", # c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id + "insertHistory": "INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES ({}, {}, {}, {}, {}, '{}', {}, '{}')", + }, + "STOCK_LEVEL": { + "getOId": "SELECT d_next_o_id FROM district WHERE d_w_id = {} AND d_id = {}", + "getStockCount": """ + SELECT COUNT(DISTINCT(ol_i_id)) FROM order_line, stock + WHERE ol_w_id = {} + AND ol_d_id = {} + AND ol_o_id < {} + AND ol_o_id >= {} + AND s_w_id = {} + AND s_i_id = ol_i_id + AND s_quantity < {} + """, + }, +} + + +class AuroraDriver(AbstractDriver): + DEFAULT_CONFIG = { + "host": ("Host running the database.", "localhost"), + "port": ("Port on which the database is listening.", 5432), + "user": ("Username", "postgres"), + "password": ("Password", ""), + "database": ("Database", "chbenchmark"), + "isolation_level": ("The isolation level to use.", "REPEATABLE READ"), + } + + def __init__(self, ddl: str) -> None: + super().__init__("brad", ddl) + self._connection: Optional[PsycopgConnection] = None + self._cursor: Optional[PsycopgCursor] = None + self._config: Dict[str, Any] = {} + + def makeDefaultConfig(self) -> Config: + return AuroraDriver.DEFAULT_CONFIG + + def loadConfig(self, config: Config) -> None: + self._config = config + address = self._config["host"] + port = int(self._config["port"]) + user = self._config["user"] + password = self._config["password"] + database = self._config["database"] + cstr = f"host={address} port={port} user={user} password={password} dbname={database}" + self._connection = PsycopgConnection.connect_sync(cstr, autocommit=True) + self._cursor = self._connection.cursor_sync() + + def loadTuples(self, tableName: str, tuples) -> None: + # We don't support data loading directly here. + pass + + def executeStart(self): + assert self._cursor is not None + # We use this callback to set the isolation level. + logger.info("Setting isolation level to %s", self._config["isolation_level"]) + self._cursor.execute_sync( + f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {self._config['isolation_level']}" + ) + return None + + def doDelivery(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + try: + assert self._cursor is not None + + q = TXN_QUERIES["DELIVERY"] + w_id = params["w_id"] + o_carrier_id = params["o_carrier_id"] + ol_delivery_d = params["ol_delivery_d"] + + result: List[Tuple[Any, ...]] = [] + self._cursor.execute_sync("BEGIN") + for d_id in range(1, constants.DISTRICTS_PER_WAREHOUSE + 1): + self._cursor.execute_sync(q["getNewOrder"].format(d_id, w_id)) + r = self._cursor.fetchall_sync() + if len(r) == 0: + ## No orders for this district: skip it. Note: This must be reported if > 1% + continue + no_o_id = r[0][0] + + self._cursor.execute_sync(q["getCId"].format(no_o_id, d_id, w_id)) + r = self._cursor.fetchall_sync() + c_id = r[0][0] + + self._cursor.execute_sync(q["sumOLAmount"].format(no_o_id, d_id, w_id)) + r = self._cursor.fetchall_sync() + ol_total = decimal.Decimal(r[0][0]) + + self._cursor.execute_sync( + q["deleteNewOrder"].format(d_id, w_id, no_o_id) + ) + updateOrders = q["updateOrders"].format( + o_carrier_id, no_o_id, d_id, w_id + ) + self._cursor.execute_sync(updateOrders) + updateOrderLine = q["updateOrderLine"].format( + ol_delivery_d.strftime("%Y-%m-%d %H:%M:%S"), no_o_id, d_id, w_id + ) + self._cursor.execute_sync(updateOrderLine) + + # These must be logged in the "result file" according to TPC-C 2.7.2.2 (page 39) + # We remove the queued time, completed time, w_id, and o_carrier_id: the client can figure + # them out + # If there are no order lines, SUM returns null. There should always be order lines. + assert ( + ol_total != None + ), "ol_total is NULL: there are no order lines. This should not happen" + assert ol_total > 0.0 + + self._cursor.execute_sync( + q["updateCustomer"].format( + ol_total.quantize(decimal.Decimal("1.00")), c_id, d_id, w_id + ) + ) + + result.append((d_id, no_o_id)) + + self._cursor.execute_sync("COMMIT") + return result + + except Exception as ex: + print("Error in DELIVERY", str(ex)) + print(traceback.format_exc()) + raise + + def doNewOrder(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + try: + assert self._cursor is not None + + q = TXN_QUERIES["NEW_ORDER"] + w_id = params["w_id"] + d_id = params["d_id"] + c_id = params["c_id"] + o_entry_d = params["o_entry_d"] + i_ids = params["i_ids"] + i_w_ids = params["i_w_ids"] + i_qtys = params["i_qtys"] + + assert len(i_ids) > 0 + assert len(i_ids) == len(i_w_ids) + assert len(i_ids) == len(i_qtys) + + self._cursor.execute_sync("BEGIN") + all_local = True + items = [] + for i in range(len(i_ids)): + ## Determine if this is an all local order or not + all_local = all_local and i_w_ids[i] == w_id + self._cursor.execute_sync(q["getItemInfo"].format(i_ids[i])) + r = self._cursor.fetchall_sync() + items.append(r[0]) + assert len(items) == len(i_ids) + + ## TPCC defines 1% of neworder gives a wrong itemid, causing rollback. + ## Note that this will happen with 1% of transactions on purpose. + for item in items: + if len(item) == 0: + self._cursor.execute_sync("ROLLBACK") + return + ## FOR + + ## ---------------- + ## Collect Information from WAREHOUSE, DISTRICT, and CUSTOMER + ## ---------------- + self._cursor.execute_sync(q["getWarehouseTaxRate"].format(w_id)) + r = self._cursor.fetchall_sync() + w_tax = r[0][0] + + self._cursor.execute_sync(q["getDistrict"].format(d_id, w_id)) + r = self._cursor.fetchall_sync() + district_info = r[0] + d_tax = district_info[0] + d_next_o_id = district_info[1] + + self._cursor.execute_sync(q["getCustomer"].format(w_id, d_id, c_id)) + r = self._cursor.fetchall_sync() + customer_info = r[0] + c_discount = customer_info[0] + + ## ---------------- + ## Insert Order Information + ## ---------------- + ol_cnt = len(i_ids) + o_carrier_id = constants.NULL_CARRIER_ID + + self._cursor.execute_sync( + q["incrementNextOrderId"].format(d_next_o_id + 1, d_id, w_id) + ) + createOrder = q["createOrder"].format( + d_next_o_id, + d_id, + w_id, + c_id, + o_entry_d.strftime("%Y-%m-%d %H:%M:%S"), + o_carrier_id, + ol_cnt, + 1 if all_local else 0, + ) + self._cursor.execute_sync(createOrder) + self._cursor.execute_sync( + q["createNewOrder"].format(d_next_o_id, d_id, w_id) + ) + + ## ---------------- + ## Insert Order Item Information + ## ---------------- + item_data = [] + total = 0 + for i in range(len(i_ids)): + ol_number = i + 1 + ol_supply_w_id = i_w_ids[i] + ol_i_id = i_ids[i] + ol_quantity = i_qtys[i] + + itemInfo = items[i] + i_name = itemInfo[1] + i_data = itemInfo[2] + i_price = decimal.Decimal(itemInfo[0]) + + self._cursor.execute_sync( + q["getStockInfo"].format(d_id, ol_i_id, ol_supply_w_id) + ) + r = self._cursor.fetchall_sync() + if len(r) == 0: + logger.warning( + "No STOCK record for (ol_i_id=%d, ol_supply_w_id=%d)", + ol_i_id, + ol_supply_w_id, + ) + continue + stockInfo = r[0] + s_quantity = stockInfo[0] + s_ytd = decimal.Decimal(stockInfo[2]) + s_order_cnt = int(stockInfo[3]) + s_remote_cnt = int(stockInfo[4]) + s_data = stockInfo[1] + s_dist_xx = stockInfo[5] # Fetches data from the s_dist_[d_id] column + + ## Update stock + s_ytd += ol_quantity + if s_quantity >= ol_quantity + 10: + s_quantity = s_quantity - ol_quantity + else: + s_quantity = s_quantity + 91 - ol_quantity + s_order_cnt += 1 + + if ol_supply_w_id != w_id: + s_remote_cnt += 1 + + self._cursor.execute_sync( + q["updateStock"].format( + s_quantity, + s_ytd.quantize(decimal.Decimal("1.00")), + s_order_cnt, + s_remote_cnt, + ol_i_id, + ol_supply_w_id, + ), + ) + + if ( + i_data.find(constants.ORIGINAL_STRING) != -1 + and s_data.find(constants.ORIGINAL_STRING) != -1 + ): + brand_generic = "B" + else: + brand_generic = "G" + + ## Transaction profile states to use "ol_quantity * i_price" + ol_amount = ol_quantity * i_price + total += ol_amount + + createOrderLine = q["createOrderLine"].format( + d_next_o_id, + d_id, + w_id, + ol_number, + ol_i_id, + ol_supply_w_id, + o_entry_d.strftime("%Y-%m-%d %H:%M:%S"), + ol_quantity, + ol_amount, + s_dist_xx, + ) + self._cursor.execute_sync(createOrderLine) + + ## Add the info to be returned + item_data.append( + (i_name, s_quantity, brand_generic, i_price, ol_amount) + ) + ## FOR + + ## Commit! + self._cursor.execute_sync("COMMIT") + + ## Adjust the total for the discount + # print "c_discount:", c_discount, type(c_discount) + # print "w_tax:", w_tax, type(w_tax) + # print "d_tax:", d_tax, type(d_tax) + total = int( + total + * (1 - decimal.Decimal(c_discount)) + * (1 + decimal.Decimal(w_tax) + decimal.Decimal(d_tax)) + ) + + ## Pack up values the client is missing (see TPC-C 2.4.3.5) + misc = [(w_tax, d_tax, d_next_o_id, total)] + + return [customer_info, misc, item_data] + + except Exception as ex: + print("Error in NEWORDER", str(ex)) + print(traceback.format_exc()) + raise + + def doOrderStatus(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + try: + assert self._cursor is not None + + q = TXN_QUERIES["ORDER_STATUS"] + w_id = params["w_id"] + d_id = params["d_id"] + c_id = params["c_id"] + c_last = params["c_last"] + + self._cursor.execute_sync("BEGIN") + if c_id != None: + self._cursor.execute_sync( + q["getCustomerByCustomerId"].format(w_id, d_id, c_id) + ) + r = self._cursor.fetchall_sync() + customer = r[0] + else: + # Get the midpoint customer's id + self._cursor.execute_sync( + q["getCustomersByLastName"].format(w_id, d_id, c_last) + ) + r = self._cursor.fetchall_sync() + all_customers = r + assert len(all_customers) > 0 + namecnt = len(all_customers) + index = (namecnt - 1) // 2 + customer = all_customers[index] + c_id = customer[0] + assert len(customer) > 0 + assert c_id != None + + getLastOrder = q["getLastOrder"].format(w_id, d_id, c_id) + self._cursor.execute_sync(getLastOrder) + r = self._cursor.fetchall_sync() + order = r[0] + if order: + self._cursor.execute_sync( + q["getOrderLines"].format(w_id, d_id, order[0]) + ) + r = self._cursor.fetchall_sync() + orderLines = r + else: + orderLines = [] + + self._cursor.execute_sync("COMMIT") + return [customer, order, orderLines] + + except Exception as ex: + print("Error in ORDER_STATUS", str(ex)) + print(traceback.format_exc()) + raise + + def doPayment(self, params: Dict[str, Any]) -> List[Tuple[Any, ...]]: + try: + assert self._cursor is not None + + q = TXN_QUERIES["PAYMENT"] + w_id = params["w_id"] + d_id = params["d_id"] + h_amount = decimal.Decimal(params["h_amount"]) + c_w_id = params["c_w_id"] + c_d_id = params["c_d_id"] + c_id = params["c_id"] + c_last = params["c_last"] + h_date = params["h_date"] # Python datetime + + self._cursor.execute_sync("BEGIN") + if c_id != None: + self._cursor.execute_sync( + q["getCustomerByCustomerId"].format(w_id, d_id, c_id) + ) + r = self._cursor.fetchall_sync() + customer = r[0] + else: + # Get the midpoint customer's id + self._cursor.execute_sync( + q["getCustomersByLastName"].format(w_id, d_id, c_last) + ) + r = self._cursor.fetchall_sync() + all_customers = r + assert len(all_customers) > 0 + namecnt = len(all_customers) + index = (namecnt - 1) // 2 + customer = all_customers[index] + c_id = customer[0] + assert len(customer) > 0 + c_balance = decimal.Decimal(customer[14]) - h_amount + c_ytd_payment = decimal.Decimal(customer[15]) + h_amount + c_payment_cnt = int(customer[16]) + 1 + c_data = customer[17] + + self._cursor.execute_sync(q["getWarehouse"].format(w_id)) + r = self._cursor.fetchall_sync() + warehouse = r[0] + + self._cursor.execute_sync(q["getDistrict"].format(w_id, d_id)) + r = self._cursor.fetchall_sync() + district = r[0] + + self._cursor.execute_sync( + q["updateWarehouseBalance"].format(h_amount, w_id) + ) + self._cursor.execute_sync( + q["updateDistrictBalance"].format(h_amount, w_id, d_id) + ) + + # Customer Credit Information + if customer[11] == constants.BAD_CREDIT: + newData = " ".join( + map(str, [c_id, c_d_id, c_w_id, d_id, w_id, h_amount]) + ) + c_data = newData + "|" + c_data + if len(c_data) > constants.MAX_C_DATA: + c_data = c_data[: constants.MAX_C_DATA] + updateCustomer = q["updateBCCustomer"].format( + c_balance, + c_ytd_payment, + c_payment_cnt, + c_data, + c_w_id, + c_d_id, + c_id, + ) + self._cursor.execute_sync(updateCustomer) + else: + c_data = "" + self._cursor.execute_sync( + q["updateGCCustomer"].format( + c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id + ), + ) + + # Concatenate w_name, four spaces, d_name + h_data = "%s %s" % (warehouse[0], district[0]) + # Create the history record + insertHistory = q["insertHistory"].format( + c_id, + c_d_id, + c_w_id, + d_id, + w_id, + h_date.strftime("%Y-%m-%d %H:%M:%S"), + h_amount.quantize(decimal.Decimal("1.00")), + h_data, + ) + self._cursor.execute_sync(insertHistory) + + self._cursor.execute_sync("COMMIT") + + # TPC-C 2.5.3.3: Must display the following fields: + # W_ID, D_ID, C_ID, C_D_ID, C_W_ID, W_STREET_1, W_STREET_2, W_CITY, W_STATE, W_ZIP, + # D_STREET_1, D_STREET_2, D_CITY, D_STATE, D_ZIP, C_FIRST, C_MIDDLE, C_LAST, C_STREET_1, + # C_STREET_2, C_CITY, C_STATE, C_ZIP, C_PHONE, C_SINCE, C_CREDIT, C_CREDIT_LIM, + # C_DISCOUNT, C_BALANCE, the first 200 characters of C_DATA (only if C_CREDIT = "BC"), + # H_AMOUNT, and H_DATE. + + # Hand back all the warehouse, district, and customer data + return [warehouse, district, customer] + + except Exception as ex: + print("Error in PAYMENT", str(ex)) + print(traceback.format_exc()) + raise + + def doStockLevel(self, params: Dict[str, Any]) -> int: + try: + assert self._cursor is not None + + q = TXN_QUERIES["STOCK_LEVEL"] + w_id = params["w_id"] + d_id = params["d_id"] + threshold = params["threshold"] + + self._cursor.execute_sync("BEGIN") + self._cursor.execute_sync(q["getOId"].format(w_id, d_id)) + r = self._cursor.fetchall_sync() + result = r[0] + assert result + o_id = result[0] + + self._cursor.execute_sync( + q["getStockCount"].format( + w_id, d_id, o_id, (o_id - 20), w_id, threshold + ) + ) + r = self._cursor.fetchall_sync() + result = r[0] + + self._cursor.execute_sync("COMMIT") + return int(result[0]) + + except Exception as ex: + print("Error in STOCK_LEVEL", str(ex)) + print(traceback.format_exc()) + raise diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py index 9041b289..99e0cc53 100644 --- a/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/drivers/braddriver.py @@ -73,6 +73,10 @@ class BradDriver(AbstractDriver): "host": ("Host running the BRAD front end.", "localhost"), "port": ("Port on which the BRAD front end is listening.", 6583), "isolation_level": ("The isolation level to use.", "REPEATABLE READ"), + "use_worker_offset": ( + "If true, add the worker index to the port (to have 1 client per front end)", + True, + ), } def __init__(self, ddl: str) -> None: @@ -85,7 +89,10 @@ def makeDefaultConfig(self) -> Config: def loadConfig(self, config: Config) -> None: self._config = config - self._client = BradGrpcClient(host=config["host"], port=config["port"]) + port = int(self._config["port"]) + if self._config["use_worker_offset"]: + port += self._config["worker_index"] + self._client = BradGrpcClient(host=config["host"], port=port) self._client.connect() def loadTuples(self, tableName: str, tuples) -> None: diff --git a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py index 388a095d..25f43f89 100755 --- a/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py +++ b/workloads/chbenchmark/py-tpcc/pytpcc/tpcc.py @@ -42,6 +42,7 @@ from .util import * from .runtime import * +from .drivers.auroradriver import AuroraDriver from .drivers.braddriver import BradDriver logging.basicConfig( @@ -56,9 +57,12 @@ ## createDriverClass ## ============================================== def createDriverClass(name): - if name != "brad": + if name == "brad": + return BradDriver + elif name == "aurora": + return AuroraDriver + else: raise NotImplementedError - return BradDriver ## DEF @@ -200,6 +204,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index config["execute"] = True config["reset"] = False + config["worker_index"] = worker_index driver.loadConfig(config) e = executor.Executor( @@ -318,6 +323,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug, worker_index config["reset"] = args["reset"] config["load"] = False config["execute"] = False + config["worker_index"] = 0 if config["reset"]: logging.info("Reseting database") driver.loadConfig(config)