Skip to content

Commit

Permalink
Add blueprint planning run logging and do a light refactor (#349)
Browse files Browse the repository at this point in the history
* First step at planner refactoring

* Add new providers

* Migrate planners to the new providers

* Add infrastructure to record a blueprint planning run for later analysis

* Add logging to the table-based beam planner too

* Various fixes

* Remove obsoleted file
  • Loading branch information
geoffxy authored Nov 3, 2023
1 parent cd323c4 commit 68eb4c5
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 353 deletions.
28 changes: 17 additions & 11 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
from brad.config.planner import PlannerConfig
from brad.daemon.monitor import Monitor
from brad.data_stats.postgres_estimator import PostgresEstimator
from brad.planner.compare.cost import best_cost_under_perf_ceilings
from brad.planner.compare.provider import PerformanceCeilingComparatorProvider
from brad.planner.estimator import EstimatorProvider, FixedEstimatorProvider
from brad.planner.factory import BlueprintPlannerFactory
from brad.planner.providers import BlueprintProviders
from brad.planner.router_provider import RouterProvider
from brad.planner.scoring.score import Score
from brad.planner.scoring.data_access.precomputed_values import (
PrecomputedDataAccessProvider,
)
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.provider import EmptyTriggerProvider
from brad.planner.triggers.trigger import Trigger
from brad.planner.metrics import (
MetricsFromMonitor,
Expand Down Expand Up @@ -227,25 +230,28 @@ async def run_planner_impl(args) -> None:
else:
estimator_provider = EstimatorProvider()

planner = BlueprintPlannerFactory.create(
current_blueprint=blueprint_mgr.get_blueprint(),
current_blueprint_score=blueprint_mgr.get_active_score(),
planner_config=planner_config,
monitor=monitor,
config=config,
schema_name=args.schema_name,
providers = BlueprintProviders(
# Next workload is the same as the current workload.
workload_provider=FixedWorkloadProvider(workload),
# Used for debugging purposes.
analytics_latency_scorer=prediction_provider,
# TODO: Make this configurable.
comparator=best_cost_under_perf_ceilings(
comparator_provider=PerformanceCeilingComparatorProvider(
max_query_latency_s=args.latency_ceiling_s,
max_txn_p90_latency_s=0.020, # FIXME: Add command-line argument if needed.
max_txn_p90_latency_s=0.030, # FIXME: Add command-line argument if needed.
),
metrics_provider=metrics_provider,
data_access_provider=data_access_provider,
estimator_provider=estimator_provider,
trigger_provider=EmptyTriggerProvider(),
router_provider=RouterProvider(args.schema_name, config, estimator_provider),
)
planner = BlueprintPlannerFactory.create(
config=config,
planner_config=planner_config,
schema_name=args.schema_name,
current_blueprint=blueprint_mgr.get_blueprint(),
current_blueprint_score=blueprint_mgr.get_active_score(),
providers=providers,
)
asyncio.run(monitor.fetch_latest())

Expand Down
46 changes: 30 additions & 16 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
from brad.data_sync.execution.executor import DataSyncExecutor
from brad.front_end.start_front_end import start_front_end
from brad.planner.abstract import BlueprintPlanner
from brad.planner.compare.cost import best_cost_under_perf_ceilings
from brad.planner.compare.provider import PerformanceCeilingComparatorProvider
from brad.planner.estimator import EstimatorProvider
from brad.planner.factory import BlueprintPlannerFactory
from brad.planner.metrics import MetricsFromMonitor
from brad.planner.providers import BlueprintProviders
from brad.planner.router_provider import RouterProvider
from brad.planner.scoring.score import Score
from brad.planner.scoring.data_access.provider import DataAccessProvider
from brad.planner.scoring.data_access.precomputed_values import (
Expand All @@ -47,8 +49,9 @@
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.provider import ConfigDefinedTriggers
from brad.planner.triggers.recent_change import RecentChange
from brad.planner.triggers.trigger import Trigger
from brad.planner.workload import Workload
from brad.planner.workload.builder import WorkloadBuilder
from brad.planner.workload.provider import LoggedWorkloadProvider
Expand Down Expand Up @@ -179,38 +182,49 @@ async def _run_setup(self) -> None:
aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(),
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=self._temp_config.latency_ceiling_s(),
max_txn_p90_latency_s=self._temp_config.txn_latency_p90_ceiling_s(),
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.latency_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)
else:
logger.warning(
"TempConfig not provided. The planner will not be able to run correctly."
)
latency_scorer = _NoopAnalyticsScorer()
data_access_provider = _NoopDataAccessProvider()
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=10, max_txn_p90_latency_s=0.030
)
comparator_provider = PerformanceCeilingComparatorProvider(30.0, 0.030)

self._planner = BlueprintPlannerFactory.create(
planner_config=self._planner_config,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
monitor=self._monitor,
config=self._config,
schema_name=self._schema_name,
router_provider = RouterProvider(
self._schema_name, self._config, self._estimator_provider
)
providers = BlueprintProviders(
workload_provider=LoggedWorkloadProvider(
self._config,
self._planner_config,
self._blueprint_mgr,
self._schema_name,
),
analytics_latency_scorer=latency_scorer,
comparator=comparator,
comparator_provider=comparator_provider,
metrics_provider=MetricsFromMonitor(self._monitor, self._blueprint_mgr),
data_access_provider=data_access_provider,
estimator_provider=self._estimator_provider,
trigger_provider=ConfigDefinedTriggers(
self._config,
self._planner_config,
self._monitor,
data_access_provider,
router_provider,
),
router_provider=router_provider,
)
self._planner = BlueprintPlannerFactory.create(
config=self._config,
planner_config=self._planner_config,
schema_name=self._schema_name,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
providers=providers,
system_event_logger=self._system_event_logger,
)
self._planner.register_new_blueprint_callback(self._handle_new_blueprint)
Expand Down
62 changes: 29 additions & 33 deletions src/brad/planner/abstract.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import asyncio
import logging
from typing import Coroutine, Callable, List, Optional, Iterable
from typing import Coroutine, Callable, List, Optional, Iterable, Tuple

from brad.blueprint import Blueprint
from brad.config.file import ConfigFile
from brad.config.planner import PlannerConfig
from brad.config.system_event import SystemEvent
from brad.daemon.monitor import Monitor
from brad.daemon.system_event_logger import SystemEventLogger
from brad.planner.compare.function import BlueprintComparator
from brad.planner.estimator import EstimatorProvider
from brad.planner.metrics import MetricsProvider
from brad.planner.scoring.data_access.provider import DataAccessProvider
from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer
from brad.planner.providers import BlueprintProviders
from brad.planner.scoring.score import Score
from brad.planner.triggers.trigger import Trigger
from brad.planner.workload.provider import WorkloadProvider

logger = logging.getLogger(__name__)

Expand All @@ -33,41 +27,35 @@ class BlueprintPlanner:

def __init__(
self,
config: ConfigFile,
planner_config: PlannerConfig,
schema_name: str,
current_blueprint: Blueprint,
current_blueprint_score: Optional[Score],
monitor: Monitor,
config: ConfigFile,
schema_name: str,
workload_provider: WorkloadProvider,
analytics_latency_scorer: AnalyticsLatencyScorer,
comparator: BlueprintComparator,
metrics_provider: MetricsProvider,
data_access_provider: DataAccessProvider,
estimator_provider: EstimatorProvider,
providers: BlueprintProviders,
system_event_logger: Optional[SystemEventLogger],
) -> None:
self._config = config
self._planner_config = planner_config
self._schema_name = schema_name
self._current_blueprint = current_blueprint
self._current_blueprint_score = current_blueprint_score
self._last_suggested_blueprint: Optional[Blueprint] = None
self._last_suggested_blueprint_score: Optional[Score] = None
self._monitor = monitor
self._config = config
self._schema_name = schema_name

self._workload_provider = workload_provider
self._analytics_latency_scorer = analytics_latency_scorer
self._comparator = comparator
self._metrics_provider = metrics_provider
self._data_access_provider = data_access_provider
self._estimator_provider = estimator_provider
self._providers = providers
self._system_event_logger = system_event_logger

self._callbacks: List[NewBlueprintCallback] = []
self._replan_in_progress = False
self._disable_triggers = False

self._triggers = self._providers.trigger_provider.get_triggers()
for t in self._triggers:
t.update_blueprint(self._current_blueprint, self._current_blueprint_score)

self._comparator = self._providers.comparator_provider.get_comparator()

async def run_forever(self) -> None:
"""
Called to start the planner. The planner is meant to run until its task
Expand Down Expand Up @@ -112,21 +100,30 @@ async def run_replan(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
"""
Triggers a "forced" replan. Used for debugging.
Initiates a replan. Call this directly to "force" a replan.
Use `window_multiplier` to expand the window used for planning.
"""
try:
self._replan_in_progress = True
for t in self.get_triggers():
t.on_replan(trigger)
await self._run_replan_impl(trigger, window_multiplier)

result = await self._run_replan_impl(window_multiplier)
if result is None:
return
blueprint, score = result
self._last_suggested_blueprint = blueprint
self._last_suggested_blueprint_score = score

await self._notify_new_blueprint(blueprint, score, trigger)

finally:
self._replan_in_progress = False

async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
self, window_multiplier: int = 1
) -> Optional[Tuple[Blueprint, Score]]:
"""
Implementers should override this method to define the blueprint
optimization algorithm.
Expand All @@ -135,10 +132,9 @@ async def _run_replan_impl(

def get_triggers(self) -> Iterable[Trigger]:
"""
Implementers should return the triggers used to trigger blueprint
replanning.
The triggers used to trigger blueprint replanning.
"""
raise NotImplementedError
return self._triggers

def set_disable_triggers(self, disable: bool) -> None:
"""
Expand Down
Loading

0 comments on commit 68eb4c5

Please sign in to comment.