Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blueprint planning run logging and do a light refactor #349

Merged
merged 7 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
from brad.config.planner import PlannerConfig
from brad.daemon.monitor import Monitor
from brad.data_stats.postgres_estimator import PostgresEstimator
from brad.planner.compare.cost import best_cost_under_perf_ceilings
from brad.planner.compare.provider import PerformanceCeilingComparatorProvider
from brad.planner.estimator import EstimatorProvider, FixedEstimatorProvider
from brad.planner.factory import BlueprintPlannerFactory
from brad.planner.providers import BlueprintProviders
from brad.planner.router_provider import RouterProvider
from brad.planner.scoring.score import Score
from brad.planner.scoring.data_access.precomputed_values import (
PrecomputedDataAccessProvider,
)
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.provider import EmptyTriggerProvider
from brad.planner.triggers.trigger import Trigger
from brad.planner.metrics import (
MetricsFromMonitor,
Expand Down Expand Up @@ -227,25 +230,28 @@ async def run_planner_impl(args) -> None:
else:
estimator_provider = EstimatorProvider()

planner = BlueprintPlannerFactory.create(
current_blueprint=blueprint_mgr.get_blueprint(),
current_blueprint_score=blueprint_mgr.get_active_score(),
planner_config=planner_config,
monitor=monitor,
config=config,
schema_name=args.schema_name,
providers = BlueprintProviders(
# Next workload is the same as the current workload.
workload_provider=FixedWorkloadProvider(workload),
# Used for debugging purposes.
analytics_latency_scorer=prediction_provider,
# TODO: Make this configurable.
comparator=best_cost_under_perf_ceilings(
comparator_provider=PerformanceCeilingComparatorProvider(
max_query_latency_s=args.latency_ceiling_s,
max_txn_p90_latency_s=0.020, # FIXME: Add command-line argument if needed.
max_txn_p90_latency_s=0.030, # FIXME: Add command-line argument if needed.
),
metrics_provider=metrics_provider,
data_access_provider=data_access_provider,
estimator_provider=estimator_provider,
trigger_provider=EmptyTriggerProvider(),
router_provider=RouterProvider(args.schema_name, config, estimator_provider),
)
planner = BlueprintPlannerFactory.create(
config=config,
planner_config=planner_config,
schema_name=args.schema_name,
current_blueprint=blueprint_mgr.get_blueprint(),
current_blueprint_score=blueprint_mgr.get_active_score(),
providers=providers,
)
asyncio.run(monitor.fetch_latest())

Expand Down
46 changes: 30 additions & 16 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
from brad.data_sync.execution.executor import DataSyncExecutor
from brad.front_end.start_front_end import start_front_end
from brad.planner.abstract import BlueprintPlanner
from brad.planner.compare.cost import best_cost_under_perf_ceilings
from brad.planner.compare.provider import PerformanceCeilingComparatorProvider
from brad.planner.estimator import EstimatorProvider
from brad.planner.factory import BlueprintPlannerFactory
from brad.planner.metrics import MetricsFromMonitor
from brad.planner.providers import BlueprintProviders
from brad.planner.router_provider import RouterProvider
from brad.planner.scoring.score import Score
from brad.planner.scoring.data_access.provider import DataAccessProvider
from brad.planner.scoring.data_access.precomputed_values import (
Expand All @@ -47,8 +49,9 @@
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.provider import ConfigDefinedTriggers
from brad.planner.triggers.recent_change import RecentChange
from brad.planner.triggers.trigger import Trigger
from brad.planner.workload import Workload
from brad.planner.workload.builder import WorkloadBuilder
from brad.planner.workload.provider import LoggedWorkloadProvider
Expand Down Expand Up @@ -179,38 +182,49 @@ async def _run_setup(self) -> None:
aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(),
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=self._temp_config.latency_ceiling_s(),
max_txn_p90_latency_s=self._temp_config.txn_latency_p90_ceiling_s(),
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.latency_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)
else:
logger.warning(
"TempConfig not provided. The planner will not be able to run correctly."
)
latency_scorer = _NoopAnalyticsScorer()
data_access_provider = _NoopDataAccessProvider()
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=10, max_txn_p90_latency_s=0.030
)
comparator_provider = PerformanceCeilingComparatorProvider(30.0, 0.030)

self._planner = BlueprintPlannerFactory.create(
planner_config=self._planner_config,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
monitor=self._monitor,
config=self._config,
schema_name=self._schema_name,
router_provider = RouterProvider(
self._schema_name, self._config, self._estimator_provider
)
providers = BlueprintProviders(
workload_provider=LoggedWorkloadProvider(
self._config,
self._planner_config,
self._blueprint_mgr,
self._schema_name,
),
analytics_latency_scorer=latency_scorer,
comparator=comparator,
comparator_provider=comparator_provider,
metrics_provider=MetricsFromMonitor(self._monitor, self._blueprint_mgr),
data_access_provider=data_access_provider,
estimator_provider=self._estimator_provider,
trigger_provider=ConfigDefinedTriggers(
self._config,
self._planner_config,
self._monitor,
data_access_provider,
router_provider,
),
router_provider=router_provider,
)
self._planner = BlueprintPlannerFactory.create(
config=self._config,
planner_config=self._planner_config,
schema_name=self._schema_name,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
providers=providers,
system_event_logger=self._system_event_logger,
)
self._planner.register_new_blueprint_callback(self._handle_new_blueprint)
Expand Down
62 changes: 29 additions & 33 deletions src/brad/planner/abstract.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import asyncio
import logging
from typing import Coroutine, Callable, List, Optional, Iterable
from typing import Coroutine, Callable, List, Optional, Iterable, Tuple

from brad.blueprint import Blueprint
from brad.config.file import ConfigFile
from brad.config.planner import PlannerConfig
from brad.config.system_event import SystemEvent
from brad.daemon.monitor import Monitor
from brad.daemon.system_event_logger import SystemEventLogger
from brad.planner.compare.function import BlueprintComparator
from brad.planner.estimator import EstimatorProvider
from brad.planner.metrics import MetricsProvider
from brad.planner.scoring.data_access.provider import DataAccessProvider
from brad.planner.scoring.performance.analytics_latency import AnalyticsLatencyScorer
from brad.planner.providers import BlueprintProviders
from brad.planner.scoring.score import Score
from brad.planner.triggers.trigger import Trigger
from brad.planner.workload.provider import WorkloadProvider

logger = logging.getLogger(__name__)

Expand All @@ -33,41 +27,35 @@ class BlueprintPlanner:

def __init__(
self,
config: ConfigFile,
planner_config: PlannerConfig,
schema_name: str,
current_blueprint: Blueprint,
current_blueprint_score: Optional[Score],
monitor: Monitor,
config: ConfigFile,
schema_name: str,
workload_provider: WorkloadProvider,
analytics_latency_scorer: AnalyticsLatencyScorer,
comparator: BlueprintComparator,
metrics_provider: MetricsProvider,
data_access_provider: DataAccessProvider,
estimator_provider: EstimatorProvider,
providers: BlueprintProviders,
system_event_logger: Optional[SystemEventLogger],
) -> None:
self._config = config
self._planner_config = planner_config
self._schema_name = schema_name
self._current_blueprint = current_blueprint
self._current_blueprint_score = current_blueprint_score
self._last_suggested_blueprint: Optional[Blueprint] = None
self._last_suggested_blueprint_score: Optional[Score] = None
self._monitor = monitor
self._config = config
self._schema_name = schema_name

self._workload_provider = workload_provider
self._analytics_latency_scorer = analytics_latency_scorer
self._comparator = comparator
self._metrics_provider = metrics_provider
self._data_access_provider = data_access_provider
self._estimator_provider = estimator_provider
self._providers = providers
self._system_event_logger = system_event_logger

self._callbacks: List[NewBlueprintCallback] = []
self._replan_in_progress = False
self._disable_triggers = False

self._triggers = self._providers.trigger_provider.get_triggers()
for t in self._triggers:
t.update_blueprint(self._current_blueprint, self._current_blueprint_score)

self._comparator = self._providers.comparator_provider.get_comparator()

async def run_forever(self) -> None:
"""
Called to start the planner. The planner is meant to run until its task
Expand Down Expand Up @@ -112,21 +100,30 @@ async def run_replan(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
"""
Triggers a "forced" replan. Used for debugging.
Initiates a replan. Call this directly to "force" a replan.

Use `window_multiplier` to expand the window used for planning.
"""
try:
self._replan_in_progress = True
for t in self.get_triggers():
t.on_replan(trigger)
await self._run_replan_impl(trigger, window_multiplier)

result = await self._run_replan_impl(window_multiplier)
if result is None:
return
blueprint, score = result
self._last_suggested_blueprint = blueprint
self._last_suggested_blueprint_score = score

await self._notify_new_blueprint(blueprint, score, trigger)

finally:
self._replan_in_progress = False

async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
self, window_multiplier: int = 1
) -> Optional[Tuple[Blueprint, Score]]:
"""
Implementers should override this method to define the blueprint
optimization algorithm.
Expand All @@ -135,10 +132,9 @@ async def _run_replan_impl(

def get_triggers(self) -> Iterable[Trigger]:
"""
Implementers should return the triggers used to trigger blueprint
replanning.
The triggers used to trigger blueprint replanning.
"""
raise NotImplementedError
return self._triggers

def set_disable_triggers(self, disable: bool) -> None:
"""
Expand Down
Loading
Loading