diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index c8038fb7..b9676ad2 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -109,6 +109,7 @@ def __init__( ) self._monitor = Monitor(self._config, self._blueprint_mgr) self._estimator_provider = _EstimatorProvider() + self._providers: Optional[BlueprintProviders] = None self._planner: Optional[BlueprintPlanner] = None self._process_manager: Optional[mp.managers.SyncManager] = None @@ -231,21 +232,11 @@ async def _run_setup(self) -> None: athena_accessed_bytes_path=self._temp_config.athena_data_access_path(), ) - if self._temp_config.comparator_type() == "benefit_perf_ceiling": - comparator_provider: BlueprintComparatorProvider = ( - BenefitPerformanceCeilingComparatorProvider( - self._temp_config.query_latency_p90_ceiling_s(), - self._temp_config.txn_latency_p90_ceiling_s(), - self._temp_config.benefit_horizon(), - self._temp_config.penalty_threshold(), - self._temp_config.penalty_power(), - ) - ) - else: - comparator_provider = PerformanceCeilingComparatorProvider( - self._temp_config.query_latency_p90_ceiling_s(), - self._temp_config.txn_latency_p90_ceiling_s(), - ) + comparator_provider = self._get_comparator_provider( + self._temp_config.query_latency_p90_ceiling_s(), + self._temp_config.txn_latency_p90_ceiling_s(), + ) + else: logger.warning( "TempConfig not provided. The planner will not be able to run correctly." @@ -257,7 +248,7 @@ async def _run_setup(self) -> None: # Update just to get the most recent startup time. self._startup_timestamp = universal_now() - providers = BlueprintProviders( + self._providers = BlueprintProviders( workload_provider=LoggedWorkloadProvider( self._config, self._planner_config, @@ -291,7 +282,7 @@ async def _run_setup(self) -> None: schema_name=self._schema_name, current_blueprint=self._blueprint_mgr.get_blueprint(), current_blueprint_score=self._blueprint_mgr.get_active_score(), - providers=providers, + providers=self._providers, system_event_logger=self._system_event_logger, ) self._planner.register_new_blueprint_callback(self._handle_new_blueprint) @@ -735,6 +726,28 @@ async def _handle_internal_command(self, command: str) -> RowList: self._transition_task = asyncio.create_task(self._run_transition_part_one()) return [(f"Transition to {preset} in progress.",)] + elif command.startswith("BRAD_CHANGE_SLO"): + # Format: BRAD_CHANGE_SLO + parts = command.split(" ") + if self._temp_config is None: + return [("Cannot change SLOs because TempConfig is missing.",)] + if len(parts) <= 3: + return [("Need to specify query and txn p90 SLOs",)] + query_p90_s = float(parts[1]) + txn_p90_s = float(parts[2]) + # The planner asks for a new comparator from the provider on each + # run. So if we swap out the provider here, we will get a comparator + # with the updated SLOs on the next planning run. + assert self._providers is not None + self._providers.comparator_provider = self._get_comparator_provider( + query_p90_s, txn_p90_s + ) + return [ + ( + f"p90 SLOs changed to (query {query_p90_s:.3f} s), (txn {txn_p90_s:.3f} s)", + ) + ] + else: logger.warning("Received unknown internal command: %s", command) return [] @@ -874,6 +887,26 @@ async def _run_transition_part_two(self) -> None: "Transition part two failed due to an unexpected exception." ) + def _get_comparator_provider( + self, query_p90_s: float, txn_p90_s: float + ) -> BlueprintComparatorProvider: + assert self._temp_config is not None + if self._temp_config.comparator_type() == "benefit_perf_ceiling": + comparator_provider: BlueprintComparatorProvider = ( + BenefitPerformanceCeilingComparatorProvider( + query_p90_s, + txn_p90_s, + self._temp_config.benefit_horizon(), + self._temp_config.penalty_threshold(), + self._temp_config.penalty_power(), + ) + ) + else: + comparator_provider = PerformanceCeilingComparatorProvider( + query_p90_s, txn_p90_s + ) + return comparator_provider + class _NoopAnalyticsScorer(AnalyticsLatencyScorer): def apply_predicted_latencies(self, _workload: Workload) -> None: diff --git a/src/brad/front_end/front_end.py b/src/brad/front_end/front_end.py index 90b4a0ce..bd7cc8e7 100644 --- a/src/brad/front_end/front_end.py +++ b/src/brad/front_end/front_end.py @@ -534,6 +534,7 @@ async def _handle_internal_command( or command.startswith("BRAD_RUN_PLANNER") or command.startswith("BRAD_MODIFY_REDSHIFT") or command.startswith("BRAD_USE_PRESET_BP") + or command.startswith("BRAD_CHANGE_SLO") ): if self._daemon_request_mailbox.is_active(): return [