Skip to content

Commit

Permalink
Add internal command to adjust comparator SLO
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Apr 11, 2024
1 parent 4a8fb32 commit b95375d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
67 changes: 50 additions & 17 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
)
self._monitor = Monitor(self._config, self._blueprint_mgr)
self._estimator_provider = _EstimatorProvider()
self._providers: Optional[BlueprintProviders] = None
self._planner: Optional[BlueprintPlanner] = None

self._process_manager: Optional[mp.managers.SyncManager] = None
Expand Down Expand Up @@ -231,21 +232,11 @@ async def _run_setup(self) -> None:
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)

if self._temp_config.comparator_type() == "benefit_perf_ceiling":
comparator_provider: BlueprintComparatorProvider = (
BenefitPerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
self._temp_config.benefit_horizon(),
self._temp_config.penalty_threshold(),
self._temp_config.penalty_power(),
)
)
else:
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)
comparator_provider = self._get_comparator_provider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)

else:
logger.warning(
"TempConfig not provided. The planner will not be able to run correctly."
Expand All @@ -257,7 +248,7 @@ async def _run_setup(self) -> None:
# Update just to get the most recent startup time.
self._startup_timestamp = universal_now()

providers = BlueprintProviders(
self._providers = BlueprintProviders(
workload_provider=LoggedWorkloadProvider(
self._config,
self._planner_config,
Expand Down Expand Up @@ -291,7 +282,7 @@ async def _run_setup(self) -> None:
schema_name=self._schema_name,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
providers=providers,
providers=self._providers,
system_event_logger=self._system_event_logger,
)
self._planner.register_new_blueprint_callback(self._handle_new_blueprint)
Expand Down Expand Up @@ -735,6 +726,28 @@ async def _handle_internal_command(self, command: str) -> RowList:
self._transition_task = asyncio.create_task(self._run_transition_part_one())
return [(f"Transition to {preset} in progress.",)]

elif command.startswith("BRAD_CHANGE_SLO"):
# Format: BRAD_CHANGE_SLO <query p90 s> <txn p90 s>
parts = command.split(" ")
if self._temp_config is None:
return [("Cannot change SLOs because TempConfig is missing.",)]
if len(parts) <= 3:
return [("Need to specify query and txn p90 SLOs",)]
query_p90_s = float(parts[1])
txn_p90_s = float(parts[2])
# The planner asks for a new comparator from the provider on each
# run. So if we swap out the provider here, we will get a comparator
# with the updated SLOs on the next planning run.
assert self._providers is not None
self._providers.comparator_provider = self._get_comparator_provider(
query_p90_s, txn_p90_s
)
return [
(
f"p90 SLOs changed to (query {query_p90_s:.3f} s), (txn {txn_p90_s:.3f} s)",
)
]

else:
logger.warning("Received unknown internal command: %s", command)
return []
Expand Down Expand Up @@ -874,6 +887,26 @@ async def _run_transition_part_two(self) -> None:
"Transition part two failed due to an unexpected exception."
)

def _get_comparator_provider(
self, query_p90_s: float, txn_p90_s: float
) -> BlueprintComparatorProvider:
assert self._temp_config is not None
if self._temp_config.comparator_type() == "benefit_perf_ceiling":
comparator_provider: BlueprintComparatorProvider = (
BenefitPerformanceCeilingComparatorProvider(
query_p90_s,
txn_p90_s,
self._temp_config.benefit_horizon(),
self._temp_config.penalty_threshold(),
self._temp_config.penalty_power(),
)
)
else:
comparator_provider = PerformanceCeilingComparatorProvider(
query_p90_s, txn_p90_s
)
return comparator_provider


class _NoopAnalyticsScorer(AnalyticsLatencyScorer):
def apply_predicted_latencies(self, _workload: Workload) -> None:
Expand Down
1 change: 1 addition & 0 deletions src/brad/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ async def _handle_internal_command(
or command.startswith("BRAD_RUN_PLANNER")
or command.startswith("BRAD_MODIFY_REDSHIFT")
or command.startswith("BRAD_USE_PRESET_BP")
or command.startswith("BRAD_CHANGE_SLO")
):
if self._daemon_request_mailbox.is_active():
return [
Expand Down

0 comments on commit b95375d

Please sign in to comment.