Skip to content

Commit

Permalink
Modify prediction injection to support multiple datasets (#387)
Browse files Browse the repository at this point in the history
* Modify prediction injection to support multiple datasets

* Add support for data accessed predictions
  • Loading branch information
geoffxy authored Nov 28, 2023
1 parent efaad49 commit a0f6427
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 80 deletions.
8 changes: 7 additions & 1 deletion config/temp_config_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ comparator:
penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/
std_datasets:
- name: regular
path: workloads/IMDB_100GB/regular_test/
- name: adhoc
path: workloads/IMDB_100GB/adhoc_test/

# The configurations below are deprecated. Do not use them in new experiment
# code (they don't need to be included either).

std_dataset_path: workloads/IMDB_20GB/regular_test/

aurora_preds_path: workloads/IMDB/OLAP_queries_new/pred_aurora_300.npy
redshift_preds_path: workloads/IMDB/OLAP_queries_new/pred_redshift_300.npy
athena_preds_path: workloads/IMDB/OLAP_queries_new/pred_athena_300.npy
Expand Down
11 changes: 8 additions & 3 deletions src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
import yaml
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
from datetime import timedelta


Expand Down Expand Up @@ -41,13 +41,18 @@ def benefit_horizon(self) -> timedelta:
def penalty_threshold(self) -> float:
return self._raw["comparator"]["penalty_threshold"]

def std_datasets(self) -> List[Dict[str, str]]:
if "std_datasets" not in self._raw:
return []
return self._raw["std_datasets"]

# The below configs are now deprecated.

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
return None
return pathlib.Path(self._raw["std_dataset_path"])

# The below configs are now deprecated.

def aurora_preds_path(self) -> pathlib.Path:
return pathlib.Path(self._raw["aurora_preds_path"])

Expand Down
20 changes: 10 additions & 10 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import logging
import queue
import os
import pathlib
import multiprocessing as mp
import numpy as np
from typing import Optional, List, Set
from typing import Optional, List, Set, Tuple

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
Expand Down Expand Up @@ -163,17 +164,16 @@ async def _run_setup(self) -> None:
if self._temp_config is not None:
# TODO: Actually call into the models. We avoid doing so for now to
# avoid having to implement model loading, etc.
std_dataset_path = self._temp_config.std_dataset_path()
if std_dataset_path is not None:
std_datasets = self._temp_config.std_datasets()
if len(std_datasets) > 0:
datasets: List[Tuple[str, str | pathlib.Path]] = [
(dataset["name"], dataset["path"]) for dataset in std_datasets
]
latency_scorer: AnalyticsLatencyScorer = (
PrecomputedPredictions.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
PrecomputedPredictions.load_from_standard_dataset(datasets)
)
data_access_provider = (
PrecomputedDataAccessProvider.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
data_access_provider: DataAccessProvider = (
PrecomputedDataAccessProvider.load_from_standard_dataset(datasets)
)
else:
latency_scorer = PrecomputedPredictions.load(
Expand Down
115 changes: 83 additions & 32 deletions src/brad/planner/scoring/data_access/precomputed_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@
import pathlib
import numpy as np
import numpy.typing as npt
from typing import Dict
from typing import Dict, Tuple, List

from .provider import DataAccessProvider
from brad.planner.workload import Workload

logger = logging.getLogger(__name__)


class PrecomputedDataAccessProvider(DataAccessProvider):
"""
Provides predictions for a fixed workload using precomputed values.
Used for debugging purposes.
"""

class QueryMap:
@classmethod
def load_from_standard_dataset(
cls,
name: str,
dataset_path: str | pathlib.Path,
):
) -> "QueryMap":
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
Expand All @@ -46,7 +42,60 @@ def load_from_standard_dataset(
assert len(aurora.shape) == 1
assert len(athena.shape) == 1

return cls(queries_map, aurora, athena)
return cls(name, queries_map, aurora, athena)

def __init__(
self,
name: str,
queries_map: Dict[str, int],
aurora_accessed_pages: npt.NDArray,
athena_accessed_bytes: npt.NDArray,
) -> None:
self.name = name
self.queries_map = queries_map
self.aurora_accessed_pages = aurora_accessed_pages
self.athena_accessed_bytes = athena_accessed_bytes

def extract_access_statistics(
self, workload: Workload
) -> Tuple[List[int], List[int], npt.NDArray, npt.NDArray]:
workload_query_index = []
indices_in_dataset = []
for wqi, query in enumerate(workload.analytical_queries()):
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
indices_in_dataset.append(self.queries_map[query_str])
workload_query_index.append(wqi)
except KeyError:
continue

return (
workload_query_index,
indices_in_dataset,
self.aurora_accessed_pages[indices_in_dataset],
self.athena_accessed_bytes[indices_in_dataset],
)


class PrecomputedDataAccessProvider(DataAccessProvider):
"""
Provides predictions for a fixed workload using precomputed values.
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls,
datasets: List[Tuple[str, str | pathlib.Path]],
) -> "PrecomputedDataAccessProvider":
return cls(
[
QueryMap.load_from_standard_dataset(name, dataset_path)
for name, dataset_path in datasets
]
)

@classmethod
def load(
Expand All @@ -72,35 +121,37 @@ def load(
assert len(athena.shape) == 1
assert aurora.shape[0] == athena.shape[0]

return cls(queries_map, aurora, athena)
return cls([QueryMap("custom", queries_map, aurora, athena)])

def __init__(
self,
queries_map: Dict[str, int],
aurora_accessed_pages: npt.NDArray,
athena_accessed_bytes: npt.NDArray,
predictions: List[QueryMap],
) -> None:
self._queries_map = queries_map
self._aurora_accessed_pages = aurora_accessed_pages
self._athena_accessed_bytes = athena_accessed_bytes
self._predictions = predictions

def apply_access_statistics(self, workload: Workload) -> None:
query_indices = []
has_unmatched = False
for query in workload.analytical_queries():
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
query_indices.append(self._queries_map[query_str])
except KeyError:
logger.warning("Cannot match query:\n%s", query.raw_query.strip())
query_indices.append(-1)
has_unmatched = True
all_queries = workload.analytical_queries()
applied_aurora = np.ones(len(all_queries)) * np.nan
applied_athena = np.ones(len(all_queries)) * np.nan

for qm in self._predictions:
workload_indices, _, aurora, athena = qm.extract_access_statistics(workload)
applied_aurora[workload_indices] = aurora
applied_athena[workload_indices] = athena

# Special case: vector similarity queries.
special_vector_queries = []
for wqi, q in enumerate(all_queries):
if "<=>" in q.raw_query:
special_vector_queries.append(wqi)
applied_athena[special_vector_queries] = 0.0

# Check for unmatched queries.
num_unmatched_athena = np.isnan(applied_athena).sum()
if num_unmatched_athena > 0:
raise RuntimeError("Unmatched queries: " + num_unmatched_athena)

if has_unmatched:
raise RuntimeError("Workload contains unmatched queries.")
workload.set_predicted_data_access_statistics(
aurora_pages=self._aurora_accessed_pages[query_indices],
athena_bytes=self._athena_accessed_bytes[query_indices],
aurora_pages=applied_aurora,
athena_bytes=applied_athena,
)
121 changes: 91 additions & 30 deletions src/brad/planner/scoring/performance/precomputed_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import numpy as np
import numpy.typing as npt
from typing import Dict
from typing import Dict, List, Tuple

from brad.config.engine import Engine
from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer
Expand All @@ -11,16 +11,13 @@
logger = logging.getLogger(__name__)


class PrecomputedPredictions(AnalyticsLatencyScorer):
"""
Provides predictions for a fixed workload using precomputed predictions.
Used for debugging purposes.
"""

class QueryMap:
@classmethod
def load_from_standard_dataset(
cls, dataset_path: str | pathlib.Path
) -> "PrecomputedPredictions":
cls,
name: str,
dataset_path: str | pathlib.Path,
) -> "QueryMap":
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
Expand Down Expand Up @@ -54,7 +51,58 @@ def load_from_standard_dataset(
predictions[np.isinf(predictions)] = timeout_value_s
predictions[np.isnan(predictions)] = timeout_value_s

return cls(queries_map, predictions)
return cls(name, queries_map, predictions)

def __init__(
self, name: str, queries_map: Dict[str, int], predictions: npt.NDArray
) -> None:
self.name = name
self.queries_map = queries_map
self.predictions = predictions

def extract_matched_predictions(
self, workload: Workload
) -> Tuple[List[int], List[int], npt.NDArray]:
"""
Returns the matched indices and the predicted values.
"""
# The index of the query in the input workload.
workload_query_index = []
# The index of the query in the precomputed predictions bank.
indices_in_dataset = []
for wqi, query in enumerate(workload.analytical_queries()):
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
indices_in_dataset.append(self.queries_map[query_str])
workload_query_index.append(wqi)
except KeyError:
continue
return (
workload_query_index,
indices_in_dataset,
self.predictions[indices_in_dataset, :],
)


class PrecomputedPredictions(AnalyticsLatencyScorer):
"""
Provides predictions for a fixed workload using precomputed predictions.
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls,
datasets: List[Tuple[str, str | pathlib.Path]],
) -> "PrecomputedPredictions":
return cls(
[
QueryMap.load_from_standard_dataset(name, dataset_path)
for name, dataset_path in datasets
]
)

@classmethod
def load(
Expand Down Expand Up @@ -95,28 +143,41 @@ def load(
timeout_value_s = 210.0
predictions[np.isinf(predictions)] = timeout_value_s

return cls(queries_map, predictions)
return cls([QueryMap("custom", queries_map, predictions)])

def __init__(self, queries_map: Dict[str, int], predictions: npt.NDArray) -> None:
self._queries_map = queries_map
def __init__(self, predictions: List[QueryMap]) -> None:
self._predictions = predictions

def apply_predicted_latencies(self, workload: Workload) -> None:
query_indices = []
has_unmatched = False
for query in workload.analytical_queries():
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
query_indices.append(self._queries_map[query_str])
except KeyError:
logger.warning("Cannot match query:\n%s", query.raw_query.strip())
query_indices.append(-1)
has_unmatched = True

if has_unmatched:
raise RuntimeError("Workload contains unmatched queries.")
workload.set_predicted_analytical_latencies(
self._predictions[query_indices, :], query_indices
all_queries = workload.analytical_queries()
applied_predictions = np.ones((len(all_queries), 3)) * np.nan
debug_map = {}

for qm in self._predictions:
workload_indices, dataset_indices, preds = qm.extract_matched_predictions(
workload
)
applied_predictions[workload_indices] = preds
debug_map[qm.name] = list(zip(workload_indices, dataset_indices))

# Special case: vector similarity queries.
special_vector_queries = []
for wqi, q in enumerate(all_queries):
if "<=>" in q.raw_query:
special_vector_queries.append(wqi)

applied_predictions[
special_vector_queries, Workload.EngineLatencyIndex[Engine.Aurora]
] = 3.6
debug_map["special_vector"] = list(
zip(special_vector_queries, [-1] * len(special_vector_queries))
)

# Check for unmatched queries.
num_unmatched_aurora = np.isnan(
applied_predictions[:, Workload.EngineLatencyIndex[Engine.Aurora]]
).sum()
if num_unmatched_aurora > 0:
raise RuntimeError("Unmatched queries: " + num_unmatched_aurora)

workload.set_predicted_analytical_latencies(applied_predictions, debug_map)
Loading

0 comments on commit a0f6427

Please sign in to comment.