diff --git a/config/planner.yml b/config/planner.yml index f1fcd9c7..b285fd9f 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -110,6 +110,7 @@ athena_load_rate_mb_per_s: 10.0 s3_usd_per_mb_per_month: 0.000023 aurora_regular_usd_per_mb_per_month: 0.00010 aurora_io_opt_usd_per_mb_per_month: 0.000225 +aurora_storage_index_multiplier: 3.0 ### ### Dataset-specific Transition Constants @@ -350,5 +351,6 @@ redshift_scaling: use_io_optimized_aurora: true use_recorded_routing_if_available: true +ensure_tables_together_on_one_engine: true aurora_initialize_load_fraction: 0.25 diff --git a/experiments/15-e2e-scenarios-v2/scale_up/COND b/experiments/15-e2e-scenarios-v2/scale_up/COND index 5721e424..0aac8306 100644 --- a/experiments/15-e2e-scenarios-v2/scale_up/COND +++ b/experiments/15-e2e-scenarios-v2/scale_up/COND @@ -14,6 +14,20 @@ run_experiment( }, ) +run_experiment( + name="brad_100g_2", + run="./run_workload_2.sh", + options={ + # TODO: Ideally, configurations are shared. Only keep AWS secrets separate. + "config-file": "config/config_large_100.yml", + "planner-config-file": "config/planner.yml", + "schema-name": "imdb_extended_100g", + "ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK, + "num-front-ends": 28, + "dataset-type": "100gb", + }, +) + run_command( name="brad_100g_debug", run="./run_workload_debug.sh", diff --git a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh index 2dcc84dd..383c802e 100755 --- a/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh +++ b/experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh @@ -98,8 +98,6 @@ sleep $((20 * 60)) # 20 mins total; 62 mins cumulative log_workload_point "stopping_heavy_rana_8" kill -INT $heavy_rana_pid -sleep 5 -kill -KILL $heavy_rana_pid # To ensure we do not get stuck. wait $heavy_rana_pid log_workload_point "start_heavy_rana_10" @@ -109,8 +107,6 @@ sleep $((10 * 60)) # 10 mins total; 72 mins cumulative log_workload_point "stopping_heavy_rana_10" kill -INT $heavy_rana_pid -sleep 5 -kill -KILL $heavy_rana_pid # To ensure we do not get stuck. wait $heavy_rana_pid log_workload_point "start_heavy_rana_20" diff --git a/experiments/15-e2e-scenarios-v2/scale_up/run_workload_2.sh b/experiments/15-e2e-scenarios-v2/scale_up/run_workload_2.sh new file mode 100755 index 00000000..5156155e --- /dev/null +++ b/experiments/15-e2e-scenarios-v2/scale_up/run_workload_2.sh @@ -0,0 +1,105 @@ +#! /bin/bash + +script_loc=$(cd $(dirname $0) && pwd -P) +cd $script_loc +source ../common.sh + +# Scenario: +# - Start from 2x t4g.medium Aurora, Redshift off +# - 4 T clients, increasing to 24 T clients by 4 every minute +# We expect BRAD to scale up Aurora at this point, but not start Redshift (a replica should be fine for the analytical workload) +# - Increase the analytical load + add new "heavier" queries - expect that these go to Athena +# - Increase frequency of queries, expect Redshift to start (go straight to 2x dc2.large to avoid classic resize for practical purposes) + +# TODO: This executor file should be adapted to run against the baselines too +# (TiDB / Serverless Redshift + Aurora) + +initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46" +heavier_queries="58,61,62,64,69,73,74,51,57,60" + +function step_txns() { + local lo=$1 + local hi=$2 + local gap_minute=$3 +} + +# Arguments: +# --config-file +# --planner-config-file +# --query-indexes +extract_named_arguments $@ + +function txn_sweep() { + local sweep=$1 + local gap_minute=$2 + local keep_last=$3 + + for t_clients in $sweep; do + start_txn_runner $t_clients # Implicit: --dataset-type + txn_pid=$runner_pid + + sleep $(($gap_minute * 60)) + if [[ -z $keep_last ]] || [[ $t_clients != $keep_last ]]; then + kill -INT $txn_pid + wait $txn_pid + fi + done +} + +function inner_cancel_experiment() { + if [ ! -z $heavy_rana_pid ]; then + cancel_experiment $rana_pid $txn_pid $heavy_rana_pid + else + cancel_experiment $rana_pid $txn_pid + fi +} + +trap "inner_cancel_experiment" INT +trap "inner_cancel_experiment" TERM + +start_brad $config_file $planner_config_file +log_workload_point "brad_start_initiated" +sleep 30 + +# Start with 8 analytical clients. +log_workload_point "start_rana_8" +start_repeating_olap_runner 8 15 5 $initial_queries "ra_8" +rana_pid=$runner_pid +sleep 2 + +# Start with 8 transactional clients; hold for 10 minutes to stabilize. +log_workload_point "start_txn_8" +start_txn_runner 4 +txn_pid=$runner_pid +sleep $((10 * 60)) + +# 20 minutes. +log_workload_point "start_heavy_rana_8" +start_repeating_olap_runner 8 15 1 $heavier_queries "ra_8_heavy" 8 +heavy_rana_pid=$runner_pid +sleep $((20 * 60)) # 20 mins total; 30 mins cumulative + +log_workload_point "stopping_heavy_rana_8" +kill -INT $heavy_rana_pid +wait $heavy_rana_pid + +log_workload_point "start_heavy_rana_10" +start_repeating_olap_runner 10 5 1 $heavier_queries "ra_10_heavy" 8 +heavy_rana_pid=$runner_pid +sleep $((10 * 60)) # 10 mins total; 40 mins cumulative + +log_workload_point "stopping_heavy_rana_10" +kill -INT $heavy_rana_pid +wait $heavy_rana_pid + +log_workload_point "start_heavy_rana_20" +start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8 +heavy_rana_pid=$runner_pid +sleep $((30 * 60)) # 30 mins total; 70 mins cumulative + +log_workload_point "experiment_workload_done" + +# Shut down everything now. +>&2 echo "Experiment done. Shutting down runners..." +graceful_shutdown $rana_pid $heavy_rana_pid $txn_pid +log_workload_point "shutdown_complete" diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index dea4a06e..7fa134cb 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -233,3 +233,6 @@ def flag(self, key: str, default: bool = False) -> bool: def aurora_initialize_load_fraction(self) -> float: return self._raw["aurora_initialize_load_fraction"] + + def aurora_storage_index_multiplier(self) -> float: + return float(self._raw["aurora_storage_index_multiplier"]) diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index 23eeab09..1a59c243 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -70,6 +70,17 @@ async def _run_replan_impl( self._providers.data_access_provider.apply_access_statistics(next_workload) self._providers.data_access_provider.apply_access_statistics(current_workload) + if self._planner_config.flag("ensure_tables_together_on_one_engine"): + # This adds a constraint to ensure all tables are present together + # on at least one engine. This ensures that arbitrary unseen join + # templates can always be immediately handled. + all_tables = ", ".join( + [table.name for table in self._current_blueprint.tables()] + ) + next_workload.add_priming_analytical_query( + f"SELECT 1 FROM {all_tables} LIMIT 1" + ) + # If requested, we record this planning pass for later debugging. if ( not self._disable_external_logging diff --git a/src/brad/planner/beam/query_based_candidate.py b/src/brad/planner/beam/query_based_candidate.py index 79c40c73..24e5d72a 100644 --- a/src/brad/planner/beam/query_based_candidate.py +++ b/src/brad/planner/beam/query_based_candidate.py @@ -254,8 +254,20 @@ def add_query( planner_config=ctx.planner_config, ) - # Table movement costs that this query imposes. + # Storage costs and table movement that this query imposes. for name, next_placement in table_diffs: + # If we added a table to Athena or Aurora, we need to take into + # account its storage costs. + if (next_placement & EngineBitmapValues[Engine.Athena]) != 0: + # We added the table to Athena. + self.storage_cost += compute_single_athena_table_cost(name, ctx) + + if (next_placement & EngineBitmapValues[Engine.Aurora]) != 0: + # Added table to Aurora. + # You only pay for 1 copy of the table on Aurora, regardless of + # how many read replicas you have. + self.storage_cost += compute_single_aurora_table_cost(name, ctx) + curr = ctx.current_blueprint.table_locations_bitmap()[name] if ((~curr) & next_placement) == 0: # This table was already present on the engine. @@ -267,18 +279,6 @@ def add_query( self.table_movement_trans_cost += result.movement_cost self.table_movement_trans_time_s += result.movement_time_s - # If we added a table to Athena or Aurora, we need to take into - # account its storage costs. - if (((~curr) & next_placement) & (EngineBitmapValues[Engine.Athena])) != 0: - # We added the table to Athena. - self.storage_cost += compute_single_athena_table_cost(name, ctx) - - if (((~curr) & next_placement) & (EngineBitmapValues[Engine.Aurora])) != 0: - # Added table to Aurora. - # You only pay for 1 copy of the table on Aurora, regardless of - # how many read replicas you have. - self.storage_cost += compute_single_aurora_table_cost(name, ctx) - # Adding a new query can affect the feasibility of the provisioning. self.feasibility = BlueprintFeasibility.Unchecked self.explored_provisionings = False @@ -365,6 +365,7 @@ def add_query_last_step( def add_transactional_tables(self, ctx: ScoringContext) -> None: referenced_tables = set() + newly_added = set() # Make sure that tables referenced in transactions are present on # Aurora. @@ -373,9 +374,17 @@ def add_transactional_tables(self, ctx: ScoringContext) -> None: if tbl not in self.table_placements: # This is a CTE. continue + orig = self.table_placements[tbl] self.table_placements[tbl] |= EngineBitmapValues[Engine.Aurora] referenced_tables.add(tbl) + if ((~orig) & self.table_placements[tbl]) != 0: + newly_added.add(tbl) + + # Account for storage costs (Aurora only charges for 1 copy). + for tbl in newly_added: + self.storage_cost += compute_single_aurora_table_cost(tbl, ctx) + # Update the table movement score if needed. for tbl in referenced_tables: cur = ctx.current_blueprint.table_locations_bitmap()[tbl] diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index 3b27cf33..d0ba2360 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -70,6 +70,17 @@ async def _run_replan_impl( self._providers.data_access_provider.apply_access_statistics(next_workload) self._providers.data_access_provider.apply_access_statistics(current_workload) + if self._planner_config.flag("ensure_tables_together_on_one_engine"): + # This adds a constraint to ensure all tables are present together + # on at least one engine. This ensures that arbitrary unseen join + # templates can always be immediately handled. + all_tables = ", ".join( + [table.name for table in self._current_blueprint.tables()] + ) + next_workload.add_priming_analytical_query( + f"SELECT 1 FROM {all_tables} LIMIT 1" + ) + if ( not self._disable_external_logging and BlueprintPickleDebugLogger.is_log_requested(self._config) diff --git a/src/brad/planner/beam/table_based_candidate.py b/src/brad/planner/beam/table_based_candidate.py index 6954f6a6..b9d048d6 100644 --- a/src/brad/planner/beam/table_based_candidate.py +++ b/src/brad/planner/beam/table_based_candidate.py @@ -190,6 +190,18 @@ def add_placement( changed_tables.append(table_name) changed = True + # If we added the table to Athena or Aurora, we need to take into + # account its storage costs. + if (((~cur) & nxt) & (EngineBitmapValues[Engine.Athena])) != 0: + # We added the table to Athena. + self.storage_cost += compute_single_athena_table_cost(table_name, ctx) + + if (((~cur) & nxt) & (EngineBitmapValues[Engine.Aurora])) != 0: + # Added table to Aurora. + # You only pay for 1 copy of the table on Aurora, regardless of + # how many read replicas you have. + self.storage_cost += compute_single_aurora_table_cost(table_name, ctx) + # Update movement scoring. self._update_movement_score(changed_tables, ctx) @@ -295,6 +307,7 @@ async def _route_queries_compute_scan_stats( def add_transactional_tables(self, ctx: ScoringContext) -> None: referenced_tables = set() + newly_added = set() # Make sure that tables referenced in transactions are present on # Aurora. @@ -303,9 +316,17 @@ def add_transactional_tables(self, ctx: ScoringContext) -> None: if tbl not in self.table_placements: # This is a CTE. continue + orig = self.table_placements[tbl] self.table_placements[tbl] |= EngineBitmapValues[Engine.Aurora] referenced_tables.add(tbl) + if ((~orig) & self.table_placements[tbl]) != 0: + newly_added.add(tbl) + + for tbl in newly_added: + # Aurora only charges for 1 copy of the data. + self.storage_cost += compute_single_aurora_table_cost(tbl, ctx) + # If we made a change to the table placement, see if it corresponds to a # table score change. self._update_movement_score(referenced_tables, ctx) @@ -323,18 +344,6 @@ def _update_movement_score( self.table_movement_trans_cost += result.movement_cost self.table_movement_trans_time_s += result.movement_time_s - # If we added the table to Athena or Aurora, we need to take into - # account its storage costs. - if (((~cur) & nxt) & (EngineBitmapValues[Engine.Athena])) != 0: - # We added the table to Athena. - self.storage_cost += compute_single_athena_table_cost(tbl, ctx) - - if (((~cur) & nxt) & (EngineBitmapValues[Engine.Aurora])) != 0: - # Added table to Aurora. - # You only pay for 1 copy of the table on Aurora, regardless of - # how many read replicas you have. - self.storage_cost += compute_single_aurora_table_cost(tbl, ctx) - def try_to_make_feasible_if_needed(self, ctx: ScoringContext) -> None: """ Checks if this blueprint is already feasible, and if so, does nothing diff --git a/src/brad/planner/scoring/table_placement.py b/src/brad/planner/scoring/table_placement.py index e031fce1..820f10aa 100644 --- a/src/brad/planner/scoring/table_placement.py +++ b/src/brad/planner/scoring/table_placement.py @@ -43,11 +43,15 @@ def compute_single_aurora_table_cost(table_name: str, ctx: ScoringContext) -> fl if ctx.planner_config.use_io_optimized_aurora(): storage_usd_per_mb_per_month += ( - raw_extract_mb * ctx.planner_config.aurora_io_opt_usd_per_mb_per_month() + raw_extract_mb + * ctx.planner_config.aurora_io_opt_usd_per_mb_per_month() + * ctx.planner_config.aurora_storage_index_multiplier() ) else: storage_usd_per_mb_per_month += ( - raw_extract_mb * ctx.planner_config.aurora_regular_usd_per_mb_per_month() + raw_extract_mb + * ctx.planner_config.aurora_regular_usd_per_mb_per_month() + * ctx.planner_config.aurora_storage_index_multiplier() ) source_period = timedelta(days=30) diff --git a/src/brad/planner/workload/workload.py b/src/brad/planner/workload/workload.py index 7202935e..32ee770e 100644 --- a/src/brad/planner/workload/workload.py +++ b/src/brad/planner/workload/workload.py @@ -103,6 +103,33 @@ def __init__( self._table_sizes_mb: Dict[Tuple[str, Engine], int] = {} self._dataset_size_mb = 0 + def add_priming_analytical_query(self, query_str: str) -> None: + """ + Used to add queries to the workload that should be used during planning + as "constraints". This should be called after the workload/statistics + providers. + """ + query = Query(query_str, arrival_count=0) + self._analytical_queries.append(query) + + if self._predicted_analytical_latencies is not None: + self._predicted_analytical_latencies = np.append( + self._predicted_analytical_latencies, np.zeros((1, 3)), axis=0 + ) + if self._predicted_aurora_pages_accessed is not None: + self._predicted_aurora_pages_accessed = np.append( + self._predicted_aurora_pages_accessed, np.zeros((1,)), axis=0 + ) + if self._predicted_athena_bytes_accessed is not None: + self._predicted_athena_bytes_accessed = np.append( + self._predicted_athena_bytes_accessed, np.zeros((1,)), axis=0 + ) + self._query_index_mapping.append(-1) + + self._analytical_query_arrival_counts = np.append( + self._analytical_query_arrival_counts, np.zeros((1,)), axis=0 + ) + def clone(self) -> "Workload": workload = Workload( self._period, @@ -218,6 +245,7 @@ def compute_latency_gains(self) -> npt.NDArray: ratios.append(preds[:, j] / preds[:, i]) combined = np.stack(ratios, axis=1) gains = np.amax(combined, axis=1) + gains[~np.isfinite(gains)] = 0.0 return gains ###