Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify prediction injection to support multiple datasets #387

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/temp_config_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ comparator:
penalty_threshold: 0.8 # Only used by the `benefit_perf_ceiling` comparator

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/
std_datasets:
- name: regular
path: workloads/IMDB_100GB/regular_test/
- name: adhoc
path: workloads/IMDB_100GB/adhoc_test/

# The configurations below are deprecated. Do not use them in new experiment
# code (they don't need to be included either).

std_dataset_path: workloads/IMDB_20GB/regular_test/

aurora_preds_path: workloads/IMDB/OLAP_queries_new/pred_aurora_300.npy
redshift_preds_path: workloads/IMDB/OLAP_queries_new/pred_redshift_300.npy
athena_preds_path: workloads/IMDB/OLAP_queries_new/pred_athena_300.npy
Expand Down
11 changes: 8 additions & 3 deletions src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
import yaml
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
from datetime import timedelta


Expand Down Expand Up @@ -41,13 +41,18 @@ def benefit_horizon(self) -> timedelta:
def penalty_threshold(self) -> float:
return self._raw["comparator"]["penalty_threshold"]

def std_datasets(self) -> List[Dict[str, str]]:
if "std_datasets" not in self._raw:
return []
return self._raw["std_datasets"]

# The below configs are now deprecated.

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
return None
return pathlib.Path(self._raw["std_dataset_path"])

# The below configs are now deprecated.

def aurora_preds_path(self) -> pathlib.Path:
return pathlib.Path(self._raw["aurora_preds_path"])

Expand Down
20 changes: 10 additions & 10 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import logging
import queue
import os
import pathlib
import multiprocessing as mp
import numpy as np
from typing import Optional, List, Set
from typing import Optional, List, Set, Tuple

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
Expand Down Expand Up @@ -163,17 +164,16 @@ async def _run_setup(self) -> None:
if self._temp_config is not None:
# TODO: Actually call into the models. We avoid doing so for now to
# avoid having to implement model loading, etc.
std_dataset_path = self._temp_config.std_dataset_path()
if std_dataset_path is not None:
std_datasets = self._temp_config.std_datasets()
if len(std_datasets) > 0:
datasets: List[Tuple[str, str | pathlib.Path]] = [
(dataset["name"], dataset["path"]) for dataset in std_datasets
]
latency_scorer: AnalyticsLatencyScorer = (
PrecomputedPredictions.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
PrecomputedPredictions.load_from_standard_dataset(datasets)
)
data_access_provider = (
PrecomputedDataAccessProvider.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
data_access_provider: DataAccessProvider = (
PrecomputedDataAccessProvider.load_from_standard_dataset(datasets)
)
else:
latency_scorer = PrecomputedPredictions.load(
Expand Down
115 changes: 83 additions & 32 deletions src/brad/planner/scoring/data_access/precomputed_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@
import pathlib
import numpy as np
import numpy.typing as npt
from typing import Dict
from typing import Dict, Tuple, List

from .provider import DataAccessProvider
from brad.planner.workload import Workload

logger = logging.getLogger(__name__)


class PrecomputedDataAccessProvider(DataAccessProvider):
"""
Provides predictions for a fixed workload using precomputed values.
Used for debugging purposes.
"""

class QueryMap:
@classmethod
def load_from_standard_dataset(
cls,
name: str,
dataset_path: str | pathlib.Path,
):
) -> "QueryMap":
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
Expand All @@ -46,7 +42,60 @@ def load_from_standard_dataset(
assert len(aurora.shape) == 1
assert len(athena.shape) == 1

return cls(queries_map, aurora, athena)
return cls(name, queries_map, aurora, athena)

def __init__(
self,
name: str,
queries_map: Dict[str, int],
aurora_accessed_pages: npt.NDArray,
athena_accessed_bytes: npt.NDArray,
) -> None:
self.name = name
self.queries_map = queries_map
self.aurora_accessed_pages = aurora_accessed_pages
self.athena_accessed_bytes = athena_accessed_bytes

def extract_access_statistics(
self, workload: Workload
) -> Tuple[List[int], List[int], npt.NDArray, npt.NDArray]:
workload_query_index = []
indices_in_dataset = []
for wqi, query in enumerate(workload.analytical_queries()):
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
indices_in_dataset.append(self.queries_map[query_str])
workload_query_index.append(wqi)
except KeyError:
continue

return (
workload_query_index,
indices_in_dataset,
self.aurora_accessed_pages[indices_in_dataset],
self.athena_accessed_bytes[indices_in_dataset],
)


class PrecomputedDataAccessProvider(DataAccessProvider):
"""
Provides predictions for a fixed workload using precomputed values.
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls,
datasets: List[Tuple[str, str | pathlib.Path]],
) -> "PrecomputedDataAccessProvider":
return cls(
[
QueryMap.load_from_standard_dataset(name, dataset_path)
for name, dataset_path in datasets
]
)

@classmethod
def load(
Expand All @@ -72,35 +121,37 @@ def load(
assert len(athena.shape) == 1
assert aurora.shape[0] == athena.shape[0]

return cls(queries_map, aurora, athena)
return cls([QueryMap("custom", queries_map, aurora, athena)])

def __init__(
self,
queries_map: Dict[str, int],
aurora_accessed_pages: npt.NDArray,
athena_accessed_bytes: npt.NDArray,
predictions: List[QueryMap],
) -> None:
self._queries_map = queries_map
self._aurora_accessed_pages = aurora_accessed_pages
self._athena_accessed_bytes = athena_accessed_bytes
self._predictions = predictions

def apply_access_statistics(self, workload: Workload) -> None:
query_indices = []
has_unmatched = False
for query in workload.analytical_queries():
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
query_indices.append(self._queries_map[query_str])
except KeyError:
logger.warning("Cannot match query:\n%s", query.raw_query.strip())
query_indices.append(-1)
has_unmatched = True
all_queries = workload.analytical_queries()
applied_aurora = np.ones(len(all_queries)) * np.nan
applied_athena = np.ones(len(all_queries)) * np.nan

for qm in self._predictions:
workload_indices, _, aurora, athena = qm.extract_access_statistics(workload)
applied_aurora[workload_indices] = aurora
applied_athena[workload_indices] = athena

# Special case: vector similarity queries.
special_vector_queries = []
for wqi, q in enumerate(all_queries):
if "<=>" in q.raw_query:
special_vector_queries.append(wqi)
applied_athena[special_vector_queries] = 0.0

# Check for unmatched queries.
num_unmatched_athena = np.isnan(applied_athena).sum()
if num_unmatched_athena > 0:
raise RuntimeError("Unmatched queries: " + num_unmatched_athena)

if has_unmatched:
raise RuntimeError("Workload contains unmatched queries.")
workload.set_predicted_data_access_statistics(
aurora_pages=self._aurora_accessed_pages[query_indices],
athena_bytes=self._athena_accessed_bytes[query_indices],
aurora_pages=applied_aurora,
athena_bytes=applied_athena,
)
121 changes: 91 additions & 30 deletions src/brad/planner/scoring/performance/precomputed_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import numpy as np
import numpy.typing as npt
from typing import Dict
from typing import Dict, List, Tuple

from brad.config.engine import Engine
from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer
Expand All @@ -11,16 +11,13 @@
logger = logging.getLogger(__name__)


class PrecomputedPredictions(AnalyticsLatencyScorer):
"""
Provides predictions for a fixed workload using precomputed predictions.
Used for debugging purposes.
"""

class QueryMap:
@classmethod
def load_from_standard_dataset(
cls, dataset_path: str | pathlib.Path
) -> "PrecomputedPredictions":
cls,
name: str,
dataset_path: str | pathlib.Path,
) -> "QueryMap":
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
Expand Down Expand Up @@ -54,7 +51,58 @@ def load_from_standard_dataset(
predictions[np.isinf(predictions)] = timeout_value_s
predictions[np.isnan(predictions)] = timeout_value_s

return cls(queries_map, predictions)
return cls(name, queries_map, predictions)

def __init__(
self, name: str, queries_map: Dict[str, int], predictions: npt.NDArray
) -> None:
self.name = name
self.queries_map = queries_map
self.predictions = predictions

def extract_matched_predictions(
self, workload: Workload
) -> Tuple[List[int], List[int], npt.NDArray]:
"""
Returns the matched indices and the predicted values.
"""
# The index of the query in the input workload.
workload_query_index = []
# The index of the query in the precomputed predictions bank.
indices_in_dataset = []
for wqi, query in enumerate(workload.analytical_queries()):
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
indices_in_dataset.append(self.queries_map[query_str])
workload_query_index.append(wqi)
except KeyError:
continue
return (
workload_query_index,
indices_in_dataset,
self.predictions[indices_in_dataset, :],
)


class PrecomputedPredictions(AnalyticsLatencyScorer):
"""
Provides predictions for a fixed workload using precomputed predictions.
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls,
datasets: List[Tuple[str, str | pathlib.Path]],
) -> "PrecomputedPredictions":
return cls(
[
QueryMap.load_from_standard_dataset(name, dataset_path)
for name, dataset_path in datasets
]
)

@classmethod
def load(
Expand Down Expand Up @@ -95,28 +143,41 @@ def load(
timeout_value_s = 210.0
predictions[np.isinf(predictions)] = timeout_value_s

return cls(queries_map, predictions)
return cls([QueryMap("custom", queries_map, predictions)])

def __init__(self, queries_map: Dict[str, int], predictions: npt.NDArray) -> None:
self._queries_map = queries_map
def __init__(self, predictions: List[QueryMap]) -> None:
self._predictions = predictions

def apply_predicted_latencies(self, workload: Workload) -> None:
query_indices = []
has_unmatched = False
for query in workload.analytical_queries():
try:
query_str = query.raw_query.strip()
if query_str.endswith(";"):
query_str = query_str[:-1]
query_indices.append(self._queries_map[query_str])
except KeyError:
logger.warning("Cannot match query:\n%s", query.raw_query.strip())
query_indices.append(-1)
has_unmatched = True

if has_unmatched:
raise RuntimeError("Workload contains unmatched queries.")
workload.set_predicted_analytical_latencies(
self._predictions[query_indices, :], query_indices
all_queries = workload.analytical_queries()
applied_predictions = np.ones((len(all_queries), 3)) * np.nan
debug_map = {}

for qm in self._predictions:
workload_indices, dataset_indices, preds = qm.extract_matched_predictions(
workload
)
applied_predictions[workload_indices] = preds
debug_map[qm.name] = list(zip(workload_indices, dataset_indices))

# Special case: vector similarity queries.
special_vector_queries = []
for wqi, q in enumerate(all_queries):
if "<=>" in q.raw_query:
special_vector_queries.append(wqi)

applied_predictions[
special_vector_queries, Workload.EngineLatencyIndex[Engine.Aurora]
] = 3.6
debug_map["special_vector"] = list(
zip(special_vector_queries, [-1] * len(special_vector_queries))
)

# Check for unmatched queries.
num_unmatched_aurora = np.isnan(
applied_predictions[:, Workload.EngineLatencyIndex[Engine.Aurora]]
).sum()
if num_unmatched_aurora > 0:
raise RuntimeError("Unmatched queries: " + num_unmatched_aurora)

workload.set_predicted_analytical_latencies(applied_predictions, debug_map)
Loading