From a0f642725285018c6d1643e5c63b8c321c900116 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Tue, 28 Nov 2023 15:41:46 -0500 Subject: [PATCH] Modify prediction injection to support multiple datasets (#387) * Modify prediction injection to support multiple datasets * Add support for data accessed predictions --- config/temp_config_sample.yml | 8 +- src/brad/config/temp_config.py | 11 +- src/brad/daemon/daemon.py | 20 +-- .../scoring/data_access/precomputed_values.py | 115 ++++++++++++----- .../performance/precomputed_predictions.py | 121 +++++++++++++----- src/brad/planner/workload/workload.py | 9 +- 6 files changed, 204 insertions(+), 80 deletions(-) diff --git a/config/temp_config_sample.yml b/config/temp_config_sample.yml index 4dab3636..0905b31d 100644 --- a/config/temp_config_sample.yml +++ b/config/temp_config_sample.yml @@ -14,11 +14,17 @@ comparator: penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator # Use this instead of the individual paths below. -std_dataset_path: workloads/IMDB_20GB/regular_test/ +std_datasets: + - name: regular + path: workloads/IMDB_100GB/regular_test/ + - name: adhoc + path: workloads/IMDB_100GB/adhoc_test/ # The configurations below are deprecated. Do not use them in new experiment # code (they don't need to be included either). +std_dataset_path: workloads/IMDB_20GB/regular_test/ + aurora_preds_path: workloads/IMDB/OLAP_queries_new/pred_aurora_300.npy redshift_preds_path: workloads/IMDB/OLAP_queries_new/pred_redshift_300.npy athena_preds_path: workloads/IMDB/OLAP_queries_new/pred_athena_300.npy diff --git a/src/brad/config/temp_config.py b/src/brad/config/temp_config.py index a5ebe82e..528bff59 100644 --- a/src/brad/config/temp_config.py +++ b/src/brad/config/temp_config.py @@ -1,6 +1,6 @@ import pathlib import yaml -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List from datetime import timedelta @@ -41,13 +41,18 @@ def benefit_horizon(self) -> timedelta: def penalty_threshold(self) -> float: return self._raw["comparator"]["penalty_threshold"] + def std_datasets(self) -> List[Dict[str, str]]: + if "std_datasets" not in self._raw: + return [] + return self._raw["std_datasets"] + + # The below configs are now deprecated. + def std_dataset_path(self) -> Optional[pathlib.Path]: if "std_dataset_path" not in self._raw: return None return pathlib.Path(self._raw["std_dataset_path"]) - # The below configs are now deprecated. - def aurora_preds_path(self) -> pathlib.Path: return pathlib.Path(self._raw["aurora_preds_path"]) diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index a1603b9c..a0afac63 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -2,9 +2,10 @@ import logging import queue import os +import pathlib import multiprocessing as mp import numpy as np -from typing import Optional, List, Set +from typing import Optional, List, Set, Tuple from brad.asset_manager import AssetManager from brad.blueprint import Blueprint @@ -163,17 +164,16 @@ async def _run_setup(self) -> None: if self._temp_config is not None: # TODO: Actually call into the models. We avoid doing so for now to # avoid having to implement model loading, etc. - std_dataset_path = self._temp_config.std_dataset_path() - if std_dataset_path is not None: + std_datasets = self._temp_config.std_datasets() + if len(std_datasets) > 0: + datasets: List[Tuple[str, str | pathlib.Path]] = [ + (dataset["name"], dataset["path"]) for dataset in std_datasets + ] latency_scorer: AnalyticsLatencyScorer = ( - PrecomputedPredictions.load_from_standard_dataset( - dataset_path=std_dataset_path, - ) + PrecomputedPredictions.load_from_standard_dataset(datasets) ) - data_access_provider = ( - PrecomputedDataAccessProvider.load_from_standard_dataset( - dataset_path=std_dataset_path, - ) + data_access_provider: DataAccessProvider = ( + PrecomputedDataAccessProvider.load_from_standard_dataset(datasets) ) else: latency_scorer = PrecomputedPredictions.load( diff --git a/src/brad/planner/scoring/data_access/precomputed_values.py b/src/brad/planner/scoring/data_access/precomputed_values.py index c8fc6819..19e3306e 100644 --- a/src/brad/planner/scoring/data_access/precomputed_values.py +++ b/src/brad/planner/scoring/data_access/precomputed_values.py @@ -2,7 +2,7 @@ import pathlib import numpy as np import numpy.typing as npt -from typing import Dict +from typing import Dict, Tuple, List from .provider import DataAccessProvider from brad.planner.workload import Workload @@ -10,17 +10,13 @@ logger = logging.getLogger(__name__) -class PrecomputedDataAccessProvider(DataAccessProvider): - """ - Provides predictions for a fixed workload using precomputed values. - Used for debugging purposes. - """ - +class QueryMap: @classmethod def load_from_standard_dataset( cls, + name: str, dataset_path: str | pathlib.Path, - ): + ) -> "QueryMap": if isinstance(dataset_path, pathlib.Path): dsp = dataset_path else: @@ -46,7 +42,60 @@ def load_from_standard_dataset( assert len(aurora.shape) == 1 assert len(athena.shape) == 1 - return cls(queries_map, aurora, athena) + return cls(name, queries_map, aurora, athena) + + def __init__( + self, + name: str, + queries_map: Dict[str, int], + aurora_accessed_pages: npt.NDArray, + athena_accessed_bytes: npt.NDArray, + ) -> None: + self.name = name + self.queries_map = queries_map + self.aurora_accessed_pages = aurora_accessed_pages + self.athena_accessed_bytes = athena_accessed_bytes + + def extract_access_statistics( + self, workload: Workload + ) -> Tuple[List[int], List[int], npt.NDArray, npt.NDArray]: + workload_query_index = [] + indices_in_dataset = [] + for wqi, query in enumerate(workload.analytical_queries()): + try: + query_str = query.raw_query.strip() + if query_str.endswith(";"): + query_str = query_str[:-1] + indices_in_dataset.append(self.queries_map[query_str]) + workload_query_index.append(wqi) + except KeyError: + continue + + return ( + workload_query_index, + indices_in_dataset, + self.aurora_accessed_pages[indices_in_dataset], + self.athena_accessed_bytes[indices_in_dataset], + ) + + +class PrecomputedDataAccessProvider(DataAccessProvider): + """ + Provides predictions for a fixed workload using precomputed values. + Used for debugging purposes. + """ + + @classmethod + def load_from_standard_dataset( + cls, + datasets: List[Tuple[str, str | pathlib.Path]], + ) -> "PrecomputedDataAccessProvider": + return cls( + [ + QueryMap.load_from_standard_dataset(name, dataset_path) + for name, dataset_path in datasets + ] + ) @classmethod def load( @@ -72,35 +121,37 @@ def load( assert len(athena.shape) == 1 assert aurora.shape[0] == athena.shape[0] - return cls(queries_map, aurora, athena) + return cls([QueryMap("custom", queries_map, aurora, athena)]) def __init__( self, - queries_map: Dict[str, int], - aurora_accessed_pages: npt.NDArray, - athena_accessed_bytes: npt.NDArray, + predictions: List[QueryMap], ) -> None: - self._queries_map = queries_map - self._aurora_accessed_pages = aurora_accessed_pages - self._athena_accessed_bytes = athena_accessed_bytes + self._predictions = predictions def apply_access_statistics(self, workload: Workload) -> None: - query_indices = [] - has_unmatched = False - for query in workload.analytical_queries(): - try: - query_str = query.raw_query.strip() - if query_str.endswith(";"): - query_str = query_str[:-1] - query_indices.append(self._queries_map[query_str]) - except KeyError: - logger.warning("Cannot match query:\n%s", query.raw_query.strip()) - query_indices.append(-1) - has_unmatched = True + all_queries = workload.analytical_queries() + applied_aurora = np.ones(len(all_queries)) * np.nan + applied_athena = np.ones(len(all_queries)) * np.nan + + for qm in self._predictions: + workload_indices, _, aurora, athena = qm.extract_access_statistics(workload) + applied_aurora[workload_indices] = aurora + applied_athena[workload_indices] = athena + + # Special case: vector similarity queries. + special_vector_queries = [] + for wqi, q in enumerate(all_queries): + if "<=>" in q.raw_query: + special_vector_queries.append(wqi) + applied_athena[special_vector_queries] = 0.0 + + # Check for unmatched queries. + num_unmatched_athena = np.isnan(applied_athena).sum() + if num_unmatched_athena > 0: + raise RuntimeError("Unmatched queries: " + num_unmatched_athena) - if has_unmatched: - raise RuntimeError("Workload contains unmatched queries.") workload.set_predicted_data_access_statistics( - aurora_pages=self._aurora_accessed_pages[query_indices], - athena_bytes=self._athena_accessed_bytes[query_indices], + aurora_pages=applied_aurora, + athena_bytes=applied_athena, ) diff --git a/src/brad/planner/scoring/performance/precomputed_predictions.py b/src/brad/planner/scoring/performance/precomputed_predictions.py index bb7f4f77..4c7dd86f 100644 --- a/src/brad/planner/scoring/performance/precomputed_predictions.py +++ b/src/brad/planner/scoring/performance/precomputed_predictions.py @@ -2,7 +2,7 @@ import logging import numpy as np import numpy.typing as npt -from typing import Dict +from typing import Dict, List, Tuple from brad.config.engine import Engine from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer @@ -11,16 +11,13 @@ logger = logging.getLogger(__name__) -class PrecomputedPredictions(AnalyticsLatencyScorer): - """ - Provides predictions for a fixed workload using precomputed predictions. - Used for debugging purposes. - """ - +class QueryMap: @classmethod def load_from_standard_dataset( - cls, dataset_path: str | pathlib.Path - ) -> "PrecomputedPredictions": + cls, + name: str, + dataset_path: str | pathlib.Path, + ) -> "QueryMap": if isinstance(dataset_path, pathlib.Path): dsp = dataset_path else: @@ -54,7 +51,58 @@ def load_from_standard_dataset( predictions[np.isinf(predictions)] = timeout_value_s predictions[np.isnan(predictions)] = timeout_value_s - return cls(queries_map, predictions) + return cls(name, queries_map, predictions) + + def __init__( + self, name: str, queries_map: Dict[str, int], predictions: npt.NDArray + ) -> None: + self.name = name + self.queries_map = queries_map + self.predictions = predictions + + def extract_matched_predictions( + self, workload: Workload + ) -> Tuple[List[int], List[int], npt.NDArray]: + """ + Returns the matched indices and the predicted values. + """ + # The index of the query in the input workload. + workload_query_index = [] + # The index of the query in the precomputed predictions bank. + indices_in_dataset = [] + for wqi, query in enumerate(workload.analytical_queries()): + try: + query_str = query.raw_query.strip() + if query_str.endswith(";"): + query_str = query_str[:-1] + indices_in_dataset.append(self.queries_map[query_str]) + workload_query_index.append(wqi) + except KeyError: + continue + return ( + workload_query_index, + indices_in_dataset, + self.predictions[indices_in_dataset, :], + ) + + +class PrecomputedPredictions(AnalyticsLatencyScorer): + """ + Provides predictions for a fixed workload using precomputed predictions. + Used for debugging purposes. + """ + + @classmethod + def load_from_standard_dataset( + cls, + datasets: List[Tuple[str, str | pathlib.Path]], + ) -> "PrecomputedPredictions": + return cls( + [ + QueryMap.load_from_standard_dataset(name, dataset_path) + for name, dataset_path in datasets + ] + ) @classmethod def load( @@ -95,28 +143,41 @@ def load( timeout_value_s = 210.0 predictions[np.isinf(predictions)] = timeout_value_s - return cls(queries_map, predictions) + return cls([QueryMap("custom", queries_map, predictions)]) - def __init__(self, queries_map: Dict[str, int], predictions: npt.NDArray) -> None: - self._queries_map = queries_map + def __init__(self, predictions: List[QueryMap]) -> None: self._predictions = predictions def apply_predicted_latencies(self, workload: Workload) -> None: - query_indices = [] - has_unmatched = False - for query in workload.analytical_queries(): - try: - query_str = query.raw_query.strip() - if query_str.endswith(";"): - query_str = query_str[:-1] - query_indices.append(self._queries_map[query_str]) - except KeyError: - logger.warning("Cannot match query:\n%s", query.raw_query.strip()) - query_indices.append(-1) - has_unmatched = True - - if has_unmatched: - raise RuntimeError("Workload contains unmatched queries.") - workload.set_predicted_analytical_latencies( - self._predictions[query_indices, :], query_indices + all_queries = workload.analytical_queries() + applied_predictions = np.ones((len(all_queries), 3)) * np.nan + debug_map = {} + + for qm in self._predictions: + workload_indices, dataset_indices, preds = qm.extract_matched_predictions( + workload + ) + applied_predictions[workload_indices] = preds + debug_map[qm.name] = list(zip(workload_indices, dataset_indices)) + + # Special case: vector similarity queries. + special_vector_queries = [] + for wqi, q in enumerate(all_queries): + if "<=>" in q.raw_query: + special_vector_queries.append(wqi) + + applied_predictions[ + special_vector_queries, Workload.EngineLatencyIndex[Engine.Aurora] + ] = 3.6 + debug_map["special_vector"] = list( + zip(special_vector_queries, [-1] * len(special_vector_queries)) ) + + # Check for unmatched queries. + num_unmatched_aurora = np.isnan( + applied_predictions[:, Workload.EngineLatencyIndex[Engine.Aurora]] + ).sum() + if num_unmatched_aurora > 0: + raise RuntimeError("Unmatched queries: " + num_unmatched_aurora) + + workload.set_predicted_analytical_latencies(applied_predictions, debug_map) diff --git a/src/brad/planner/workload/workload.py b/src/brad/planner/workload/workload.py index 32ee770e..a824cf5f 100644 --- a/src/brad/planner/workload/workload.py +++ b/src/brad/planner/workload/workload.py @@ -85,7 +85,7 @@ def __init__( # relative to the query bank used to run a workload against BRAD.) This # is used to recover the predicted run times for plotting or analysis # purposes later on. - self._query_index_mapping: List[int] = [] + self._query_debug_map: Dict[str, List[Tuple[int, int]]] = {} # Used for reweighing queries. # NOTE: Using these weights directly assumes a static routing decision @@ -124,7 +124,6 @@ def add_priming_analytical_query(self, query_str: str) -> None: self._predicted_athena_bytes_accessed = np.append( self._predicted_athena_bytes_accessed, np.zeros((1,)), axis=0 ) - self._query_index_mapping.append(-1) self._analytical_query_arrival_counts = np.append( self._analytical_query_arrival_counts, np.zeros((1,)), axis=0 @@ -182,10 +181,12 @@ def table_num_rows(self, table_name: str) -> int: ### def set_predicted_analytical_latencies( - self, predicted_latency: npt.NDArray, query_indices: List[int] + self, + predicted_latency: npt.NDArray, + debug_map: Dict[str, List[Tuple[int, int]]], ) -> None: self._predicted_analytical_latencies = predicted_latency - self._query_index_mapping = query_indices + self._query_debug_map = debug_map def get_predicted_analytical_latency(self, query_idx: int, engine: Engine) -> float: assert self._predicted_analytical_latencies is not None