From 68eb4c528f194593bb29622b8c12bb1365b8170d Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Fri, 3 Nov 2023 18:44:18 -0400 Subject: [PATCH] Add blueprint planning run logging and do a light refactor (#349) * First step at planner refactoring * Add new providers * Migrate planners to the new providers * Add infrastructure to record a blueprint planning run for later analysis * Add logging to the table-based beam planner too * Various fixes * Remove obsoleted file --- src/brad/admin/run_planner.py | 28 ++- src/brad/daemon/daemon.py | 46 ++-- src/brad/planner/abstract.py | 62 +++-- src/brad/planner/beam/query_based.py | 227 +++++++++++++----- src/brad/planner/beam/table_based.py | 224 +++++++++++------ src/brad/planner/beam/triggers.py | 97 -------- src/brad/planner/compare/provider.py | 26 ++ src/brad/planner/debug_logger.py | 8 +- src/brad/planner/factory.py | 63 ++--- src/brad/planner/neighborhood/neighborhood.py | 23 +- src/brad/planner/providers.py | 35 +++ src/brad/planner/recorded_run.py | 37 +++ .../planner/scoring/data_access/provider.py | 5 + .../scoring/performance/analytics_latency.py | 5 + src/brad/planner/triggers/provider.py | 124 ++++++++++ 15 files changed, 657 insertions(+), 353 deletions(-) delete mode 100644 src/brad/planner/beam/triggers.py create mode 100644 src/brad/planner/compare/provider.py create mode 100644 src/brad/planner/providers.py create mode 100644 src/brad/planner/recorded_run.py create mode 100644 src/brad/planner/triggers/provider.py diff --git a/src/brad/admin/run_planner.py b/src/brad/admin/run_planner.py index 4f1fdd85..e4d3c19a 100644 --- a/src/brad/admin/run_planner.py +++ b/src/brad/admin/run_planner.py @@ -11,9 +11,11 @@ from brad.config.planner import PlannerConfig from brad.daemon.monitor import Monitor from brad.data_stats.postgres_estimator import PostgresEstimator -from brad.planner.compare.cost import best_cost_under_perf_ceilings +from brad.planner.compare.provider import PerformanceCeilingComparatorProvider from brad.planner.estimator import EstimatorProvider, FixedEstimatorProvider from brad.planner.factory import BlueprintPlannerFactory +from brad.planner.providers import BlueprintProviders +from brad.planner.router_provider import RouterProvider from brad.planner.scoring.score import Score from brad.planner.scoring.data_access.precomputed_values import ( PrecomputedDataAccessProvider, @@ -21,6 +23,7 @@ from brad.planner.scoring.performance.precomputed_predictions import ( PrecomputedPredictions, ) +from brad.planner.triggers.provider import EmptyTriggerProvider from brad.planner.triggers.trigger import Trigger from brad.planner.metrics import ( MetricsFromMonitor, @@ -227,25 +230,28 @@ async def run_planner_impl(args) -> None: else: estimator_provider = EstimatorProvider() - planner = BlueprintPlannerFactory.create( - current_blueprint=blueprint_mgr.get_blueprint(), - current_blueprint_score=blueprint_mgr.get_active_score(), - planner_config=planner_config, - monitor=monitor, - config=config, - schema_name=args.schema_name, + providers = BlueprintProviders( # Next workload is the same as the current workload. workload_provider=FixedWorkloadProvider(workload), # Used for debugging purposes. analytics_latency_scorer=prediction_provider, - # TODO: Make this configurable. - comparator=best_cost_under_perf_ceilings( + comparator_provider=PerformanceCeilingComparatorProvider( max_query_latency_s=args.latency_ceiling_s, - max_txn_p90_latency_s=0.020, # FIXME: Add command-line argument if needed. + max_txn_p90_latency_s=0.030, # FIXME: Add command-line argument if needed. ), metrics_provider=metrics_provider, data_access_provider=data_access_provider, estimator_provider=estimator_provider, + trigger_provider=EmptyTriggerProvider(), + router_provider=RouterProvider(args.schema_name, config, estimator_provider), + ) + planner = BlueprintPlannerFactory.create( + config=config, + planner_config=planner_config, + schema_name=args.schema_name, + current_blueprint=blueprint_mgr.get_blueprint(), + current_blueprint_score=blueprint_mgr.get_active_score(), + providers=providers, ) asyncio.run(monitor.fetch_latest()) diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 49ddc586..f1b1e828 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -34,10 +34,12 @@ from brad.data_sync.execution.executor import DataSyncExecutor from brad.front_end.start_front_end import start_front_end from brad.planner.abstract import BlueprintPlanner -from brad.planner.compare.cost import best_cost_under_perf_ceilings +from brad.planner.compare.provider import PerformanceCeilingComparatorProvider from brad.planner.estimator import EstimatorProvider from brad.planner.factory import BlueprintPlannerFactory from brad.planner.metrics import MetricsFromMonitor +from brad.planner.providers import BlueprintProviders +from brad.planner.router_provider import RouterProvider from brad.planner.scoring.score import Score from brad.planner.scoring.data_access.provider import DataAccessProvider from brad.planner.scoring.data_access.precomputed_values import ( @@ -47,8 +49,9 @@ from brad.planner.scoring.performance.precomputed_predictions import ( PrecomputedPredictions, ) -from brad.planner.triggers.trigger import Trigger +from brad.planner.triggers.provider import ConfigDefinedTriggers from brad.planner.triggers.recent_change import RecentChange +from brad.planner.triggers.trigger import Trigger from brad.planner.workload import Workload from brad.planner.workload.builder import WorkloadBuilder from brad.planner.workload.provider import LoggedWorkloadProvider @@ -179,9 +182,9 @@ async def _run_setup(self) -> None: aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(), athena_accessed_bytes_path=self._temp_config.athena_data_access_path(), ) - comparator = best_cost_under_perf_ceilings( - max_query_latency_s=self._temp_config.latency_ceiling_s(), - max_txn_p90_latency_s=self._temp_config.txn_latency_p90_ceiling_s(), + comparator_provider = PerformanceCeilingComparatorProvider( + self._temp_config.latency_ceiling_s(), + self._temp_config.txn_latency_p90_ceiling_s(), ) else: logger.warning( @@ -189,17 +192,12 @@ async def _run_setup(self) -> None: ) latency_scorer = _NoopAnalyticsScorer() data_access_provider = _NoopDataAccessProvider() - comparator = best_cost_under_perf_ceilings( - max_query_latency_s=10, max_txn_p90_latency_s=0.030 - ) + comparator_provider = PerformanceCeilingComparatorProvider(30.0, 0.030) - self._planner = BlueprintPlannerFactory.create( - planner_config=self._planner_config, - current_blueprint=self._blueprint_mgr.get_blueprint(), - current_blueprint_score=self._blueprint_mgr.get_active_score(), - monitor=self._monitor, - config=self._config, - schema_name=self._schema_name, + router_provider = RouterProvider( + self._schema_name, self._config, self._estimator_provider + ) + providers = BlueprintProviders( workload_provider=LoggedWorkloadProvider( self._config, self._planner_config, @@ -207,10 +205,26 @@ async def _run_setup(self) -> None: self._schema_name, ), analytics_latency_scorer=latency_scorer, - comparator=comparator, + comparator_provider=comparator_provider, metrics_provider=MetricsFromMonitor(self._monitor, self._blueprint_mgr), data_access_provider=data_access_provider, estimator_provider=self._estimator_provider, + trigger_provider=ConfigDefinedTriggers( + self._config, + self._planner_config, + self._monitor, + data_access_provider, + router_provider, + ), + router_provider=router_provider, + ) + self._planner = BlueprintPlannerFactory.create( + config=self._config, + planner_config=self._planner_config, + schema_name=self._schema_name, + current_blueprint=self._blueprint_mgr.get_blueprint(), + current_blueprint_score=self._blueprint_mgr.get_active_score(), + providers=providers, system_event_logger=self._system_event_logger, ) self._planner.register_new_blueprint_callback(self._handle_new_blueprint) diff --git a/src/brad/planner/abstract.py b/src/brad/planner/abstract.py index 4f9e55ec..59e3130d 100644 --- a/src/brad/planner/abstract.py +++ b/src/brad/planner/abstract.py @@ -1,21 +1,15 @@ import asyncio import logging -from typing import Coroutine, Callable, List, Optional, Iterable +from typing import Coroutine, Callable, List, Optional, Iterable, Tuple from brad.blueprint import Blueprint from brad.config.file import ConfigFile from brad.config.planner import PlannerConfig from brad.config.system_event import SystemEvent -from brad.daemon.monitor import Monitor from brad.daemon.system_event_logger import SystemEventLogger -from brad.planner.compare.function import BlueprintComparator -from brad.planner.estimator import EstimatorProvider -from brad.planner.metrics import MetricsProvider -from brad.planner.scoring.data_access.provider import DataAccessProvider -from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer +from brad.planner.providers import BlueprintProviders from brad.planner.scoring.score import Score from brad.planner.triggers.trigger import Trigger -from brad.planner.workload.provider import WorkloadProvider logger = logging.getLogger(__name__) @@ -33,41 +27,35 @@ class BlueprintPlanner: def __init__( self, + config: ConfigFile, planner_config: PlannerConfig, + schema_name: str, current_blueprint: Blueprint, current_blueprint_score: Optional[Score], - monitor: Monitor, - config: ConfigFile, - schema_name: str, - workload_provider: WorkloadProvider, - analytics_latency_scorer: AnalyticsLatencyScorer, - comparator: BlueprintComparator, - metrics_provider: MetricsProvider, - data_access_provider: DataAccessProvider, - estimator_provider: EstimatorProvider, + providers: BlueprintProviders, system_event_logger: Optional[SystemEventLogger], ) -> None: + self._config = config self._planner_config = planner_config + self._schema_name = schema_name self._current_blueprint = current_blueprint self._current_blueprint_score = current_blueprint_score self._last_suggested_blueprint: Optional[Blueprint] = None self._last_suggested_blueprint_score: Optional[Score] = None - self._monitor = monitor - self._config = config - self._schema_name = schema_name - self._workload_provider = workload_provider - self._analytics_latency_scorer = analytics_latency_scorer - self._comparator = comparator - self._metrics_provider = metrics_provider - self._data_access_provider = data_access_provider - self._estimator_provider = estimator_provider + self._providers = providers self._system_event_logger = system_event_logger self._callbacks: List[NewBlueprintCallback] = [] self._replan_in_progress = False self._disable_triggers = False + self._triggers = self._providers.trigger_provider.get_triggers() + for t in self._triggers: + t.update_blueprint(self._current_blueprint, self._current_blueprint_score) + + self._comparator = self._providers.comparator_provider.get_comparator() + async def run_forever(self) -> None: """ Called to start the planner. The planner is meant to run until its task @@ -112,7 +100,7 @@ async def run_replan( self, trigger: Optional[Trigger], window_multiplier: int = 1 ) -> None: """ - Triggers a "forced" replan. Used for debugging. + Initiates a replan. Call this directly to "force" a replan. Use `window_multiplier` to expand the window used for planning. """ @@ -120,13 +108,22 @@ async def run_replan( self._replan_in_progress = True for t in self.get_triggers(): t.on_replan(trigger) - await self._run_replan_impl(trigger, window_multiplier) + + result = await self._run_replan_impl(window_multiplier) + if result is None: + return + blueprint, score = result + self._last_suggested_blueprint = blueprint + self._last_suggested_blueprint_score = score + + await self._notify_new_blueprint(blueprint, score, trigger) + finally: self._replan_in_progress = False async def _run_replan_impl( - self, trigger: Optional[Trigger], window_multiplier: int = 1 - ) -> None: + self, window_multiplier: int = 1 + ) -> Optional[Tuple[Blueprint, Score]]: """ Implementers should override this method to define the blueprint optimization algorithm. @@ -135,10 +132,9 @@ async def _run_replan_impl( def get_triggers(self) -> Iterable[Trigger]: """ - Implementers should return the triggers used to trigger blueprint - replanning. + The triggers used to trigger blueprint replanning. """ - raise NotImplementedError + return self._triggers def set_disable_triggers(self, disable: bool) -> None: """ diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index 30fbb581..0d4589c0 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -2,62 +2,94 @@ import heapq import json import logging -from datetime import timedelta -from typing import Iterable, List, Optional +from datetime import timedelta, datetime +from typing import List, Tuple, Optional +from brad.blueprint.blueprint import Blueprint from brad.config.engine import Engine, EngineBitmapValues +from brad.config.file import ConfigFile +from brad.config.planner import PlannerConfig from brad.planner.abstract import BlueprintPlanner +from brad.planner.compare.provider import BlueprintComparatorProvider from brad.planner.beam.feasibility import BlueprintFeasibility from brad.planner.beam.query_based_candidate import BlueprintCandidate -from brad.planner.beam.triggers import get_beam_triggers -from brad.planner.router_provider import RouterProvider from brad.planner.debug_logger import ( BlueprintPlanningDebugLogger, BlueprintPickleDebugLogger, ) from brad.planner.enumeration.provisioning import ProvisioningEnumerator +from brad.planner.estimator import EstimatorProvider +from brad.planner.metrics import Metrics, FixedMetricsProvider +from brad.planner.providers import BlueprintProviders +from brad.planner.recorded_run import RecordedPlanningRun +from brad.planner.router_provider import RouterProvider from brad.planner.scoring.context import ScoringContext +from brad.planner.scoring.data_access.provider import NoopDataAccessProvider +from brad.planner.scoring.performance.analytics_latency import ( + NoopAnalyticsLatencyScorer, +) +from brad.planner.scoring.score import Score from brad.planner.scoring.table_placement import compute_single_athena_table_cost -from brad.planner.triggers.trigger import Trigger +from brad.planner.triggers.provider import EmptyTriggerProvider +from brad.planner.workload import Workload +from brad.planner.workload.provider import WorkloadProvider logger = logging.getLogger(__name__) class QueryBasedBeamPlanner(BlueprintPlanner): - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + *args, + disable_external_logging: bool = False, + **kwargs, + ) -> None: super().__init__(*args, **kwargs) - self._router_provider = RouterProvider( - self._schema_name, self._config, self._estimator_provider - ) - self._triggers = get_beam_triggers( - self._config, - self._planner_config, - self._monitor, - self._data_access_provider, - self._router_provider, - ) - for t in self._triggers: - t.update_blueprint(self._current_blueprint, self._current_blueprint_score) - - def get_triggers(self) -> Iterable[Trigger]: - return self._triggers + self._disable_external_logging = disable_external_logging async def _run_replan_impl( - self, trigger: Optional[Trigger], window_multiplier: int = 1 - ) -> None: - logger.info("Running a replan...") + self, window_multiplier: int = 1 + ) -> Optional[Tuple[Blueprint, Score]]: + logger.info("Running a query-based beam replan...") # 1. Fetch the next workload and apply predictions. - metrics, metrics_timestamp = self._metrics_provider.get_metrics() - logger.debug("Using metrics: %s", str(metrics)) - current_workload, next_workload = await self._workload_provider.get_workloads( + metrics, metrics_timestamp = self._providers.metrics_provider.get_metrics() + ( + current_workload, + next_workload, + ) = await self._providers.workload_provider.get_workloads( metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1) ) - self._analytics_latency_scorer.apply_predicted_latencies(next_workload) - self._analytics_latency_scorer.apply_predicted_latencies(current_workload) - self._data_access_provider.apply_access_statistics(next_workload) - self._data_access_provider.apply_access_statistics(current_workload) + self._providers.analytics_latency_scorer.apply_predicted_latencies( + next_workload + ) + self._providers.analytics_latency_scorer.apply_predicted_latencies( + current_workload + ) + self._providers.data_access_provider.apply_access_statistics(next_workload) + self._providers.data_access_provider.apply_access_statistics(current_workload) + + # If requested, we record this planning pass for later debugging. + if ( + not self._disable_external_logging + and BlueprintPickleDebugLogger.is_log_requested(self._config) + ): + planning_run = RecordedQueryBasedPlanningRun( + self._config, + self._planner_config, + self._schema_name, + self._current_blueprint, + self._current_blueprint_score, + current_workload, + next_workload, + metrics, + metrics_timestamp, + self._providers.comparator_provider, + ) + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "query_beam_run", planning_run + ) # 2. Compute query gains and reorder queries by their gain in descending # order. @@ -70,7 +102,7 @@ async def _run_replan_impl( # workload. if len(query_indices) == 0: logger.info("No queries in the workload. Cannot replan.") - return + return None if len(next_workload.analytical_queries()) < 20: logger.info("[Query-Based Planner] Query arrival counts") @@ -87,7 +119,7 @@ async def _run_replan_impl( self._planner_config, ) await ctx.simulate_current_workload_routing( - await self._router_provider.get_router( + await self._providers.router_provider.get_router( self._current_blueprint.table_locations_bitmap() ) ) @@ -126,9 +158,10 @@ async def _run_replan_impl( if len(current_top_k) == 0: logger.error( - "Query-based beam blueprint planning failed. Could not generate an initial set of feasible blueprints." + "Query-based beam blueprint planning failed. " + "Could not generate an initial set of feasible blueprints." ) - return + return None # 5. Run beam search to formulate the table placements. for j, query_idx in enumerate(query_indices[1:]): @@ -201,13 +234,14 @@ async def _run_replan_impl( current_top_k = next_top_k - # Log the placement top k for debugging purposes, if needed. - placement_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( - self._config, "query_beam_placement_topk" - ) - if placement_top_k_logger is not None: - for candidate in current_top_k: - placement_top_k_logger.log_debug_values(candidate.to_debug_values()) + if not self._disable_external_logging: + # Log the placement top k for debugging purposes, if needed. + placement_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( + self._config, "query_beam_placement_topk" + ) + if placement_top_k_logger is not None: + for candidate in current_top_k: + placement_top_k_logger.log_debug_values(candidate.to_debug_values()) # 8. We generated the placements by placing queries using run time # predictions. Now we re-route the queries using the fixed placements @@ -217,7 +251,9 @@ async def _run_replan_impl( for candidate in current_top_k: query_indices = candidate.get_all_query_indices() candidate.reset_routing() - router = await self._router_provider.get_router(candidate.table_placements) + router = await self._providers.router_provider.get_router( + candidate.table_placements + ) for qidx in query_indices: query = analytical_queries[qidx] routing_engine = await router.engine_for(query) @@ -238,9 +274,10 @@ async def _run_replan_impl( if len(rerouted_top_k) == 0: logger.error( - "The query-based beam planner failed to find any feasible placements after re-routing the queries." + "The query-based beam planner failed to find any " + "feasible placements after re-routing the queries." ) - return + return None # 8. Run a final greedy search over provisionings in the top-k set. final_top_k: List[BlueprintCandidate] = [] @@ -289,27 +326,27 @@ async def _run_replan_impl( logger.error( "The query-based beam planner failed to find any feasible blueprints." ) - return + return None # The best blueprint will be ordered first (we have a negated # `__lt__` method to work with `heapq` to create a max heap). final_top_k.sort(reverse=True) - # For later interactive inspection in Python. - BlueprintPickleDebugLogger.log_object_if_requested( - self._config, "final_query_based_blueprints", final_top_k - ) - BlueprintPickleDebugLogger.log_object_if_requested( - self._config, "scoring_context", ctx - ) - - # Log the final top k for debugging purposes, if needed. - final_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( - self._config, "query_beam_final_topk" - ) - if final_top_k_logger is not None: - for candidate in final_top_k: - final_top_k_logger.log_debug_values(candidate.to_debug_values()) + if not self._disable_external_logging: + # For later interactive inspection in Python. + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "final_query_based_blueprints", final_top_k + ) + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "scoring_context", ctx + ) + # Log the final top k for debugging purposes, if needed. + final_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( + self._config, "query_beam_final_topk" + ) + if final_top_k_logger is not None: + for candidate in final_top_k: + final_top_k_logger.log_debug_values(candidate.to_debug_values()) best_candidate = final_top_k[0] @@ -327,12 +364,9 @@ async def _run_replan_impl( # 9. Output the new blueprint. best_blueprint = best_candidate.to_blueprint() best_blueprint_score = best_candidate.to_score() - self._last_suggested_blueprint = best_blueprint - self._last_suggested_blueprint_score = best_blueprint_score logger.info("Selected blueprint:") logger.info("%s", best_blueprint) - debug_values = best_candidate.to_debug_values() logger.info( "Selected blueprint details: %s", json.dumps(debug_values, indent=2) @@ -341,4 +375,67 @@ async def _run_replan_impl( "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) ) - await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger) + return best_blueprint, best_blueprint_score + + +class RecordedQueryBasedPlanningRun(RecordedPlanningRun, WorkloadProvider): + def __init__( + self, + config: ConfigFile, + planner_config: PlannerConfig, + schema_name: str, + current_blueprint: Blueprint, + current_blueprint_score: Optional[Score], + current_workload: Workload, + next_workload: Workload, + metrics: Metrics, + metrics_timestamp: datetime, + comparator_provider: BlueprintComparatorProvider, + ) -> None: + self._config = config + self._planner_config = planner_config + self._schema_name = schema_name + self._current_blueprint = current_blueprint + self._current_blueprint_score = current_blueprint_score + self._current_workload = current_workload + self._next_workload = next_workload + self._metrics = metrics + self._metrics_timestamp = metrics_timestamp + self._comparator_provider = comparator_provider + + def create_planner(self, estimator_provider: EstimatorProvider) -> BlueprintPlanner: + providers = BlueprintProviders( + workload_provider=self, + analytics_latency_scorer=NoopAnalyticsLatencyScorer(), + comparator_provider=self._comparator_provider, + metrics_provider=FixedMetricsProvider( + self._metrics, self._metrics_timestamp + ), + data_access_provider=NoopDataAccessProvider(), + estimator_provider=estimator_provider, + trigger_provider=EmptyTriggerProvider(), + router_provider=RouterProvider( + self._schema_name, self._config, estimator_provider + ), + ) + return QueryBasedBeamPlanner( + self._config, + self._planner_config, + self._schema_name, + self._current_blueprint, + self._current_blueprint_score, + providers, + # N.B. Purposefully set to `None`. + system_event_logger=None, + disable_external_logging=True, + ) + + # Provider methods follow. + + async def get_workloads( + self, + window_end: datetime, + window_multiplier: int = 1, + desired_period: Optional[timedelta] = None, + ) -> Tuple[Workload, Workload]: + return self._current_workload, self._next_workload diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index 05eae9ac..c13445f7 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -3,66 +3,92 @@ import itertools import json import logging -from datetime import timedelta -from typing import Iterable, List, Tuple, Dict, Optional +from datetime import timedelta, datetime +from typing import List, Tuple, Dict, Optional +from brad.blueprint.blueprint import Blueprint from brad.config.engine import Engine, EngineBitmapValues +from brad.config.file import ConfigFile +from brad.config.planner import PlannerConfig from brad.planner.abstract import BlueprintPlanner from brad.planner.beam.feasibility import BlueprintFeasibility -from brad.planner.beam.triggers import get_beam_triggers -from brad.planner.router_provider import RouterProvider from brad.planner.beam.table_based_candidate import BlueprintCandidate +from brad.planner.compare.provider import BlueprintComparatorProvider from brad.planner.debug_logger import ( BlueprintPlanningDebugLogger, BlueprintPickleDebugLogger, ) from brad.planner.enumeration.provisioning import ProvisioningEnumerator +from brad.planner.estimator import EstimatorProvider +from brad.planner.metrics import Metrics, FixedMetricsProvider +from brad.planner.providers import BlueprintProviders +from brad.planner.recorded_run import RecordedPlanningRun +from brad.planner.router_provider import RouterProvider from brad.planner.scoring.context import ScoringContext +from brad.planner.scoring.data_access.provider import NoopDataAccessProvider +from brad.planner.scoring.performance.analytics_latency import ( + NoopAnalyticsLatencyScorer, +) +from brad.planner.scoring.score import Score from brad.planner.scoring.table_placement import compute_single_athena_table_cost -from brad.planner.triggers.trigger import Trigger +from brad.planner.triggers.provider import EmptyTriggerProvider from brad.planner.workload import Workload +from brad.planner.workload.provider import WorkloadProvider logger = logging.getLogger(__name__) class TableBasedBeamPlanner(BlueprintPlanner): - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + *args, + disable_external_logging: bool = False, + **kwargs, + ) -> None: super().__init__(*args, **kwargs) - self._router_provider = RouterProvider( - self._schema_name, self._config, self._estimator_provider - ) - self._triggers = get_beam_triggers( - self._config, - self._planner_config, - self._monitor, - self._data_access_provider, - self._router_provider, - ) - for t in self._triggers: - t.update_blueprint(self._current_blueprint, self._current_blueprint_score) - - def get_triggers(self) -> Iterable[Trigger]: - return self._triggers - - def _check_if_metrics_warrant_replanning(self) -> bool: - # See if the metrics indicate that we should trigger the planning - # process. - return False + self._disable_external_logging = disable_external_logging async def _run_replan_impl( - self, trigger: Optional[Trigger], window_multiplier: int = 1 - ) -> None: - logger.info("Running a replan...") + self, window_multiplier: int = 1 + ) -> Optional[Tuple[Blueprint, Score]]: + logger.info("Running a table-based beam replan...") # 1. Fetch metrics and the next workload and then apply predictions. - metrics, metrics_timestamp = self._metrics_provider.get_metrics() - current_workload, next_workload = await self._workload_provider.get_workloads( + metrics, metrics_timestamp = self._providers.metrics_provider.get_metrics() + ( + current_workload, + next_workload, + ) = await self._providers.workload_provider.get_workloads( metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1) ) - self._analytics_latency_scorer.apply_predicted_latencies(next_workload) - self._analytics_latency_scorer.apply_predicted_latencies(current_workload) - self._data_access_provider.apply_access_statistics(next_workload) - self._data_access_provider.apply_access_statistics(current_workload) + self._providers.analytics_latency_scorer.apply_predicted_latencies( + next_workload + ) + self._providers.analytics_latency_scorer.apply_predicted_latencies( + current_workload + ) + self._providers.data_access_provider.apply_access_statistics(next_workload) + self._providers.data_access_provider.apply_access_statistics(current_workload) + + if ( + not self._disable_external_logging + and BlueprintPickleDebugLogger.is_log_requested(self._config) + ): + planning_run = RecordedTableBasedPlanningRun( + self._config, + self._planner_config, + self._schema_name, + self._current_blueprint, + self._current_blueprint_score, + current_workload, + next_workload, + metrics, + metrics_timestamp, + self._providers.comparator_provider, + ) + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "table_beam_run", planning_run + ) # 2. Cluster queries by tables and sort by gains (sum). clusters = self._preprocess_workload_queries(next_workload) @@ -72,7 +98,7 @@ async def _run_replan_impl( assert len(clusters) > 0 if len(clusters) == 0: logger.info("No queries in the workload. Cannot replan.") - return + return None if len(next_workload.analytical_queries()) < 20: logger.info("[Table-Based Planner] Query arrival counts") @@ -89,7 +115,7 @@ async def _run_replan_impl( self._planner_config, ) await ctx.simulate_current_workload_routing( - await self._router_provider.get_router( + await self._providers.router_provider.get_router( self._current_blueprint.table_locations_bitmap(), ) ) @@ -113,7 +139,7 @@ async def _run_replan_impl( tables, queries, _ = first_cluster placement_changed = candidate.add_placement(placement_bitmap, tables, ctx) await candidate.add_query_cluster( - self._router_provider, + self._providers.router_provider, queries, reroute_prev=placement_changed, ctx=ctx, @@ -127,9 +153,10 @@ async def _run_replan_impl( if len(current_top_k) == 0: logger.error( - "Table-based beam blueprint planning failed. Could not generate an initial set of feasible blueprints." + "Table-based beam blueprint planning failed. " + "Could not generate an initial set of feasible blueprints." ) - return + return None # 5. Run beam search to formulate the rest of the table placements. for j, cluster in enumerate(clusters[1:]): @@ -166,7 +193,7 @@ async def _run_replan_impl( continue await next_candidate.add_query_cluster( - self._router_provider, + self._providers.router_provider, queries, reroute_prev=placement_changed, ctx=ctx, @@ -221,12 +248,13 @@ async def _run_replan_impl( current_top_k = next_top_k # Log the placement top k for debugging purposes, if needed. - placement_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( - self._config, "brad_table_beam_placement_topk" - ) - if placement_top_k_logger is not None: - for candidate in current_top_k: - placement_top_k_logger.log_debug_values(candidate.to_debug_values()) + if not self._disable_external_logging: + placement_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( + self._config, "brad_table_beam_placement_topk" + ) + if placement_top_k_logger is not None: + for candidate in current_top_k: + placement_top_k_logger.log_debug_values(candidate.to_debug_values()) # 6. Run a final greedy search over provisionings in the top-k set. final_top_k: List[BlueprintCandidate] = [] @@ -279,23 +307,24 @@ async def _run_replan_impl( logger.error( "The table-based beam planner failed to find any feasible blueprints." ) - return + return None - # For later interactive inspection in Python. - BlueprintPickleDebugLogger.log_object_if_requested( - self._config, "final_table_based_blueprints", final_top_k - ) - BlueprintPickleDebugLogger.log_object_if_requested( - self._config, "scoring_context", ctx - ) + if not self._disable_external_logging: + # For later interactive inspection in Python. + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "final_table_based_blueprints", final_top_k + ) + BlueprintPickleDebugLogger.log_object_if_requested( + self._config, "scoring_context", ctx + ) - # Log the final top k for debugging purposes, if needed. - final_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( - self._config, "brad_table_beam_final_topk" - ) - if final_top_k_logger is not None: - for candidate in final_top_k: - final_top_k_logger.log_debug_values(candidate.to_debug_values()) + # Log the final top k for debugging purposes, if needed. + final_top_k_logger = BlueprintPlanningDebugLogger.create_if_requested( + self._config, "brad_table_beam_final_topk" + ) + if final_top_k_logger is not None: + for candidate in final_top_k: + final_top_k_logger.log_debug_values(candidate.to_debug_values()) best_candidate = final_top_k[0] @@ -312,22 +341,18 @@ async def _run_replan_impl( # 8. Output the new blueprint. best_blueprint = best_candidate.to_blueprint() best_blueprint_score = best_candidate.to_score() - self._last_suggested_blueprint = best_blueprint - self._last_suggested_blueprint_score = best_blueprint_score logger.info("Selected blueprint:") logger.info("%s", best_blueprint) - debug_values = best_candidate.to_debug_values() logger.info( - "Selected blueprint details: %s", - json.dumps(debug_values, indent=2, default=str), + "Selected blueprint details: %s", json.dumps(debug_values, indent=2) ) logger.info( "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) ) - await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger) + return best_blueprint, best_blueprint_score def _preprocess_workload_queries( self, workload: Workload @@ -377,3 +402,66 @@ def _get_table_placement_options_bitmap(self) -> List[int]: # list with the values [1, 2, ..., 7]. But this function works with # general bit masks. return placement_bitmaps + + +class RecordedTableBasedPlanningRun(RecordedPlanningRun, WorkloadProvider): + def __init__( + self, + config: ConfigFile, + planner_config: PlannerConfig, + schema_name: str, + current_blueprint: Blueprint, + current_blueprint_score: Optional[Score], + current_workload: Workload, + next_workload: Workload, + metrics: Metrics, + metrics_timestamp: datetime, + comparator_provider: BlueprintComparatorProvider, + ) -> None: + self._config = config + self._planner_config = planner_config + self._schema_name = schema_name + self._current_blueprint = current_blueprint + self._current_blueprint_score = current_blueprint_score + self._current_workload = current_workload + self._next_workload = next_workload + self._metrics = metrics + self._metrics_timestamp = metrics_timestamp + self._comparator_provider = comparator_provider + + def create_planner(self, estimator_provider: EstimatorProvider) -> BlueprintPlanner: + providers = BlueprintProviders( + workload_provider=self, + analytics_latency_scorer=NoopAnalyticsLatencyScorer(), + comparator_provider=self._comparator_provider, + metrics_provider=FixedMetricsProvider( + self._metrics, self._metrics_timestamp + ), + data_access_provider=NoopDataAccessProvider(), + estimator_provider=estimator_provider, + trigger_provider=EmptyTriggerProvider(), + router_provider=RouterProvider( + self._schema_name, self._config, estimator_provider + ), + ) + return TableBasedBeamPlanner( + self._config, + self._planner_config, + self._schema_name, + self._current_blueprint, + self._current_blueprint_score, + providers, + # N.B. Purposefully set to `None`. + system_event_logger=None, + disable_external_logging=True, + ) + + # Provider methods follow. + + async def get_workloads( + self, + window_end: datetime, + window_multiplier: int = 1, + desired_period: Optional[timedelta] = None, + ) -> Tuple[Workload, Workload]: + return self._current_workload, self._next_workload diff --git a/src/brad/planner/beam/triggers.py b/src/brad/planner/beam/triggers.py deleted file mode 100644 index b50dd3ed..00000000 --- a/src/brad/planner/beam/triggers.py +++ /dev/null @@ -1,97 +0,0 @@ -from typing import List - -from brad.config.file import ConfigFile -from brad.config.planner import PlannerConfig -from brad.daemon.monitor import Monitor -from brad.planner.router_provider import RouterProvider -from brad.planner.scoring.data_access.provider import DataAccessProvider -from brad.planner.triggers.aurora_cpu_utilization import AuroraCpuUtilization -from brad.planner.triggers.redshift_cpu_utilization import RedshiftCpuUtilization -from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger -from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling -from brad.planner.triggers.recent_change import RecentChange -from brad.planner.triggers.trigger import Trigger -from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling -from brad.planner.triggers.variable_costs import VariableCosts - - -def get_beam_triggers( - config: ConfigFile, - planner_config: PlannerConfig, - monitor: Monitor, - data_access_provider: DataAccessProvider, - router_provider: RouterProvider, -) -> List[Trigger]: - trigger_config = planner_config.trigger_configs() - if not trigger_config["enabled"]: - return [] - - planning_window = planner_config.planning_window() - trigger_list: List[Trigger] = [] - - et_config = trigger_config["elapsed_time"] - if "disabled" not in et_config: - trigger_list.append( - ElapsedTimeTrigger( - planning_window * et_config["multiplier"], - epoch_length=config.epoch_length, - ) - ) - - aurora_cpu = trigger_config["aurora_cpu"] - if "disabled" not in aurora_cpu: - trigger_list.append( - AuroraCpuUtilization( - monitor, epoch_length=config.epoch_length, **aurora_cpu - ) - ) - - redshift_cpu = trigger_config["redshift_cpu"] - if "disabled" not in redshift_cpu: - trigger_list.append( - RedshiftCpuUtilization( - monitor, epoch_length=config.epoch_length, **redshift_cpu - ) - ) - - var_costs = trigger_config["variable_costs"] - if "disabled" not in var_costs: - trigger_list.append( - VariableCosts( - config, - planner_config, - monitor, - data_access_provider, - router_provider, - var_costs["threshold"], - config.epoch_length, - ) - ) - - latency_ceiling = trigger_config["query_latency_ceiling"] - if "disabled" not in latency_ceiling: - trigger_list.append( - QueryLatencyCeiling( - monitor, - latency_ceiling["ceiling_s"], - latency_ceiling["sustained_epochs"], - config.epoch_length, - ) - ) - - txn_latency_ceiling = trigger_config["txn_latency_ceiling"] - if "disabled" not in txn_latency_ceiling: - trigger_list.append( - TransactionLatencyCeiling( - monitor, - latency_ceiling["ceiling_s"], - latency_ceiling["sustained_epochs"], - config.epoch_length, - ) - ) - - recent_change = trigger_config["recent_change"] - if "disabled" not in recent_change: - trigger_list.append(RecentChange(planner_config, config.epoch_length)) - - return trigger_list diff --git a/src/brad/planner/compare/provider.py b/src/brad/planner/compare/provider.py new file mode 100644 index 00000000..c33d429c --- /dev/null +++ b/src/brad/planner/compare/provider.py @@ -0,0 +1,26 @@ +from brad.planner.compare.function import BlueprintComparator +from brad.planner.compare.cost import best_cost_under_perf_ceilings + + +class BlueprintComparatorProvider: + """ + Used to customize the comparator used by the blueprint planner. This is + primarily useful for serialization/deserialization purposes since our + comparator captures state (it is a closure) and cannot be pickled. + """ + + def get_comparator(self) -> BlueprintComparator: + raise NotImplementedError + + +class PerformanceCeilingComparatorProvider(BlueprintComparatorProvider): + def __init__( + self, max_query_latency_s: float, max_txn_p90_latency_s: float + ) -> None: + self._max_query_latency_s = max_query_latency_s + self._max_txn_p90_latency_s = max_txn_p90_latency_s + + def get_comparator(self) -> BlueprintComparator: + return best_cost_under_perf_ceilings( + self._max_query_latency_s, self._max_txn_p90_latency_s + ) diff --git a/src/brad/planner/debug_logger.py b/src/brad/planner/debug_logger.py index 30adf248..0ae7682a 100644 --- a/src/brad/planner/debug_logger.py +++ b/src/brad/planner/debug_logger.py @@ -50,9 +50,13 @@ def log_debug_values(self, values: Dict[str, int | float | str]) -> None: class BlueprintPickleDebugLogger: + @staticmethod + def is_log_requested(config: ConfigFile) -> bool: + return config.planner_log_path is not None + @staticmethod def log_object_if_requested( - config: ConfigFile, file_name_prefix: str, blueprints: Any + config: ConfigFile, file_name_prefix: str, py_object: Any ) -> None: log_path = config.planner_log_path if log_path is None: @@ -60,7 +64,7 @@ def log_object_if_requested( out_file_name = f"{file_name_prefix}_{_get_timestamp_str()}.pkl" with open(log_path / out_file_name, "wb") as file: - pickle.dump(blueprints, file) + pickle.dump(py_object, file) def _get_timestamp_str() -> str: diff --git a/src/brad/planner/factory.py b/src/brad/planner/factory.py index 1752657a..804dbd5b 100644 --- a/src/brad/planner/factory.py +++ b/src/brad/planner/factory.py @@ -3,37 +3,25 @@ from brad.blueprint import Blueprint from brad.config.file import ConfigFile from brad.config.planner import PlannerConfig -from brad.daemon.monitor import Monitor from brad.daemon.system_event_logger import SystemEventLogger from brad.planner.abstract import BlueprintPlanner -from brad.planner.compare.function import BlueprintComparator -from brad.planner.estimator import EstimatorProvider -from brad.planner.neighborhood.neighborhood import NeighborhoodSearchPlanner from brad.planner.beam.query_based import QueryBasedBeamPlanner from brad.planner.beam.table_based import TableBasedBeamPlanner -from brad.planner.metrics import MetricsProvider +from brad.planner.neighborhood.neighborhood import NeighborhoodSearchPlanner +from brad.planner.providers import BlueprintProviders from brad.planner.scoring.score import Score -from brad.planner.scoring.data_access.provider import DataAccessProvider -from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer from brad.planner.strategy import PlanningStrategy -from brad.planner.workload.provider import WorkloadProvider class BlueprintPlannerFactory: @staticmethod def create( + config: ConfigFile, planner_config: PlannerConfig, + schema_name: str, current_blueprint: Blueprint, current_blueprint_score: Optional[Score], - monitor: Monitor, - config: ConfigFile, - schema_name: str, - workload_provider: WorkloadProvider, - analytics_latency_scorer: AnalyticsLatencyScorer, - comparator: BlueprintComparator, - metrics_provider: MetricsProvider, - data_access_provider: DataAccessProvider, - estimator_provider: EstimatorProvider, + providers: BlueprintProviders, system_event_logger: Optional[SystemEventLogger] = None, ) -> BlueprintPlanner: strategy = planner_config.strategy() @@ -42,52 +30,33 @@ def create( or strategy == PlanningStrategy.SampledNeighborhood ): return NeighborhoodSearchPlanner( + config=config, + planner_config=planner_config, current_blueprint=current_blueprint, current_blueprint_score=current_blueprint_score, - planner_config=planner_config, - monitor=monitor, - config=config, - schema_name=schema_name, - workload_provider=workload_provider, - analytics_latency_scorer=analytics_latency_scorer, - comparator=comparator, - metrics_provider=metrics_provider, - data_access_provider=data_access_provider, - estimator_provider=estimator_provider, + providers=providers, system_event_logger=system_event_logger, ) elif strategy == PlanningStrategy.QueryBasedBeam: return QueryBasedBeamPlanner( - current_blueprint=current_blueprint, - current_blueprint_score=current_blueprint_score, - planner_config=planner_config, - monitor=monitor, config=config, + planner_config=planner_config, schema_name=schema_name, - workload_provider=workload_provider, - analytics_latency_scorer=analytics_latency_scorer, - comparator=comparator, - metrics_provider=metrics_provider, - data_access_provider=data_access_provider, - estimator_provider=estimator_provider, + current_blueprint=current_blueprint, + current_blueprint_score=current_blueprint_score, + providers=providers, system_event_logger=system_event_logger, ) elif strategy == PlanningStrategy.TableBasedBeam: return TableBasedBeamPlanner( - current_blueprint=current_blueprint, - current_blueprint_score=current_blueprint_score, - planner_config=planner_config, - monitor=monitor, config=config, + planner_config=planner_config, schema_name=schema_name, - workload_provider=workload_provider, - analytics_latency_scorer=analytics_latency_scorer, - comparator=comparator, - metrics_provider=metrics_provider, - data_access_provider=data_access_provider, - estimator_provider=estimator_provider, + current_blueprint=current_blueprint, + current_blueprint_score=current_blueprint_score, + providers=providers, system_event_logger=system_event_logger, ) diff --git a/src/brad/planner/neighborhood/neighborhood.py b/src/brad/planner/neighborhood/neighborhood.py index 2489c36e..113eecc0 100644 --- a/src/brad/planner/neighborhood/neighborhood.py +++ b/src/brad/planner/neighborhood/neighborhood.py @@ -3,9 +3,10 @@ import pytz import pandas as pd from io import TextIOWrapper -from typing import Dict, Iterable, List, Optional +from typing import Dict, List, Optional, Tuple from datetime import datetime +from brad.blueprint.blueprint import Blueprint from brad.config.engine import Engine from brad.config.planner import PlannerConfig from brad.planner.abstract import BlueprintPlanner @@ -25,7 +26,6 @@ ) from brad.planner.scoring.score import Score from brad.planner.strategy import PlanningStrategy -from brad.planner.triggers.trigger import Trigger from brad.planner.workload import Workload from brad.provisioning.directory import Directory from brad.routing.rule_based import RuleBased @@ -84,19 +84,18 @@ async def run_forever(self) -> None: if self._metrics_out is not None: self._metrics_out.close() - def get_triggers(self) -> Iterable[Trigger]: - # TODO: Add triggers if needed. - return [] - async def _run_replan_impl( - self, trigger: Optional[Trigger], window_multiplier: int = 1 - ) -> None: + self, window_multiplier: int = 1 + ) -> Optional[Tuple[Blueprint, Score]]: # This will be long-running and will block the event loop. For our # current needs, this is fine since the planner is the main component in # the daemon process. logger.info("Running a replan.") self._log_current_metrics() - current_workload, next_workload = await self._workload_provider.get_workloads( + ( + current_workload, + next_workload, + ) = await self._providers.workload_provider.get_workloads( datetime.now().astimezone(pytz.utc), window_multiplier ) workload_filters = [ @@ -182,11 +181,7 @@ async def _run_replan_impl( selected_blueprint = self._impl.on_enumeration_complete(scoring_ctx) # TODO: Populate the score if needed. selected_score = Score() - self._last_suggested_blueprint = selected_blueprint - self._last_suggested_blueprint_score = selected_score - await self._notify_new_blueprint( - selected_blueprint, selected_score, trigger - ) + return selected_blueprint, selected_score finally: engines.close_sync() diff --git a/src/brad/planner/providers.py b/src/brad/planner/providers.py new file mode 100644 index 00000000..8f2e0e4f --- /dev/null +++ b/src/brad/planner/providers.py @@ -0,0 +1,35 @@ +from brad.planner.compare.provider import BlueprintComparatorProvider +from brad.planner.estimator import EstimatorProvider +from brad.planner.metrics import MetricsProvider +from brad.planner.scoring.data_access.provider import DataAccessProvider +from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer +from brad.planner.triggers.provider import TriggerProvider +from brad.planner.workload.provider import WorkloadProvider +from brad.planner.router_provider import RouterProvider + + +class BlueprintProviders: + """ + This is a convenience class used to help avoid passing in a lot of arguments + into a blueprint planner. + """ + + def __init__( + self, + workload_provider: WorkloadProvider, + analytics_latency_scorer: AnalyticsLatencyScorer, + comparator_provider: BlueprintComparatorProvider, + metrics_provider: MetricsProvider, + data_access_provider: DataAccessProvider, + estimator_provider: EstimatorProvider, + trigger_provider: TriggerProvider, + router_provider: RouterProvider, + ) -> None: + self.workload_provider = workload_provider + self.analytics_latency_scorer = analytics_latency_scorer + self.comparator_provider = comparator_provider + self.metrics_provider = metrics_provider + self.data_access_provider = data_access_provider + self.estimator_provider = estimator_provider + self.trigger_provider = trigger_provider + self.router_provider = router_provider diff --git a/src/brad/planner/recorded_run.py b/src/brad/planner/recorded_run.py new file mode 100644 index 00000000..0ebf99a2 --- /dev/null +++ b/src/brad/planner/recorded_run.py @@ -0,0 +1,37 @@ +import pathlib +import pickle + +from brad.planner.abstract import BlueprintPlanner +from brad.planner.estimator import EstimatorProvider + + +class RecordedPlanningRun: + """ + Represents a blueprint planning invocation. This is used for debugging + purposes (e.g., to record a blueprint planning run and to replay it to + understand why certain decisions were made). + + Concrete instances of this class must be pickle-able. + """ + + @staticmethod + def load(file_path: pathlib.Path) -> "RecordedPlanningRun": + with open(file_path, "rb") as file: + return pickle.load(file) + + def serialize(self, file_path: pathlib.Path) -> None: + with open(file_path, "wb") as file: + pickle.dump(self, file) + + def create_planner(self, estimator_provider: EstimatorProvider) -> BlueprintPlanner: + """ + Creates a `BlueprintPlanner` that will always run the recorded blueprint + planning instance when `_run_replan_impl()` is called. + + Note that you must pass in an estimator provider because it relies on + external state that cannot be serialized. + + The behavior of other planning methods is undefined (i.e., this is not + meant to be a planner serializer). + """ + raise NotImplementedError diff --git a/src/brad/planner/scoring/data_access/provider.py b/src/brad/planner/scoring/data_access/provider.py index ec84cc6b..1d343849 100644 --- a/src/brad/planner/scoring/data_access/provider.py +++ b/src/brad/planner/scoring/data_access/provider.py @@ -14,3 +14,8 @@ def apply_access_statistics(self, workload: Workload) -> None: accessed bytes in Athena). """ raise NotImplementedError + + +class NoopDataAccessProvider(DataAccessProvider): + def apply_access_statistics(self, workload: Workload) -> None: + pass diff --git a/src/brad/planner/scoring/performance/analytics_latency.py b/src/brad/planner/scoring/performance/analytics_latency.py index 4acbb395..541ffb64 100644 --- a/src/brad/planner/scoring/performance/analytics_latency.py +++ b/src/brad/planner/scoring/performance/analytics_latency.py @@ -13,3 +13,8 @@ def apply_predicted_latencies(self, workload: Workload) -> None: predicted execution latencies. """ raise NotImplementedError + + +class NoopAnalyticsLatencyScorer(AnalyticsLatencyScorer): + def apply_predicted_latencies(self, workload: Workload) -> None: + pass diff --git a/src/brad/planner/triggers/provider.py b/src/brad/planner/triggers/provider.py new file mode 100644 index 00000000..f4d4d6cc --- /dev/null +++ b/src/brad/planner/triggers/provider.py @@ -0,0 +1,124 @@ +from typing import List + +from brad.config.file import ConfigFile +from brad.config.planner import PlannerConfig +from brad.daemon.monitor import Monitor +from brad.planner.router_provider import RouterProvider +from brad.planner.scoring.data_access.provider import DataAccessProvider +from brad.planner.triggers.aurora_cpu_utilization import AuroraCpuUtilization +from brad.planner.triggers.redshift_cpu_utilization import RedshiftCpuUtilization +from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger +from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling +from brad.planner.triggers.recent_change import RecentChange +from brad.planner.triggers.trigger import Trigger +from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling +from brad.planner.triggers.variable_costs import VariableCosts + + +class TriggerProvider: + """ + Used to customize the triggers that are passed into a blueprint planner. + """ + + def get_triggers(self) -> List[Trigger]: + raise NotImplementedError + + +class EmptyTriggerProvider(TriggerProvider): + def get_triggers(self) -> List[Trigger]: + return [] + + +class ConfigDefinedTriggers(TriggerProvider): + def __init__( + self, + config: ConfigFile, + planner_config: PlannerConfig, + monitor: Monitor, + data_access_provider: DataAccessProvider, + router_provider: RouterProvider, + ) -> None: + self._config = config + self._planner_config = planner_config + self._monitor = monitor + self._data_access_provider = data_access_provider + self._router_provider = router_provider + + def get_triggers(self) -> List[Trigger]: + trigger_config = self._planner_config.trigger_configs() + if not trigger_config["enabled"]: + return [] + + planning_window = self._planner_config.planning_window() + trigger_list: List[Trigger] = [] + + et_config = trigger_config["elapsed_time"] + if "disabled" not in et_config: + trigger_list.append( + ElapsedTimeTrigger( + planning_window * et_config["multiplier"], + epoch_length=self._config.epoch_length, + ) + ) + + aurora_cpu = trigger_config["aurora_cpu"] + if "disabled" not in aurora_cpu: + trigger_list.append( + AuroraCpuUtilization( + self._monitor, epoch_length=self._config.epoch_length, **aurora_cpu + ) + ) + + redshift_cpu = trigger_config["redshift_cpu"] + if "disabled" not in redshift_cpu: + trigger_list.append( + RedshiftCpuUtilization( + self._monitor, + epoch_length=self._config.epoch_length, + **redshift_cpu + ) + ) + + var_costs = trigger_config["variable_costs"] + if "disabled" not in var_costs: + trigger_list.append( + VariableCosts( + self._config, + self._planner_config, + self._monitor, + self._data_access_provider, + self._router_provider, + var_costs["threshold"], + self._config.epoch_length, + ) + ) + + latency_ceiling = trigger_config["query_latency_ceiling"] + if "disabled" not in latency_ceiling: + trigger_list.append( + QueryLatencyCeiling( + self._monitor, + latency_ceiling["ceiling_s"], + latency_ceiling["sustained_epochs"], + self._config.epoch_length, + ) + ) + + txn_latency_ceiling = trigger_config["txn_latency_ceiling"] + if "disabled" not in txn_latency_ceiling: + trigger_list.append( + TransactionLatencyCeiling( + self._monitor, + latency_ceiling["ceiling_s"], + latency_ceiling["sustained_epochs"], + self._config.epoch_length, + ) + ) + + recent_change = trigger_config["recent_change"] + if "disabled" not in recent_change: + trigger_list.append( + RecentChange(self._planner_config, self._config.epoch_length) + ) + + return trigger_list