Skip to content

Commit

Permalink
Add ability to change latency ceilings while BRAD is running (#489)
Browse files Browse the repository at this point in the history
* Add internal command to adjust comparator SLO

* Update trigger ceilings as well
  • Loading branch information
geoffxy authored Apr 11, 2024
1 parent 4a8fb32 commit 7d156d2
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 17 deletions.
82 changes: 65 additions & 17 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
)
from brad.planner.triggers.provider import ConfigDefinedTriggers
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling
from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling
from brad.planner.workload import Workload
from brad.planner.workload.builder import WorkloadBuilder
from brad.planner.workload.provider import LoggedWorkloadProvider
Expand Down Expand Up @@ -109,6 +111,7 @@ def __init__(
)
self._monitor = Monitor(self._config, self._blueprint_mgr)
self._estimator_provider = _EstimatorProvider()
self._providers: Optional[BlueprintProviders] = None
self._planner: Optional[BlueprintPlanner] = None

self._process_manager: Optional[mp.managers.SyncManager] = None
Expand Down Expand Up @@ -231,21 +234,11 @@ async def _run_setup(self) -> None:
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)

if self._temp_config.comparator_type() == "benefit_perf_ceiling":
comparator_provider: BlueprintComparatorProvider = (
BenefitPerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
self._temp_config.benefit_horizon(),
self._temp_config.penalty_threshold(),
self._temp_config.penalty_power(),
)
)
else:
comparator_provider = PerformanceCeilingComparatorProvider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)
comparator_provider = self._get_comparator_provider(
self._temp_config.query_latency_p90_ceiling_s(),
self._temp_config.txn_latency_p90_ceiling_s(),
)

else:
logger.warning(
"TempConfig not provided. The planner will not be able to run correctly."
Expand All @@ -257,7 +250,7 @@ async def _run_setup(self) -> None:
# Update just to get the most recent startup time.
self._startup_timestamp = universal_now()

providers = BlueprintProviders(
self._providers = BlueprintProviders(
workload_provider=LoggedWorkloadProvider(
self._config,
self._planner_config,
Expand Down Expand Up @@ -291,7 +284,7 @@ async def _run_setup(self) -> None:
schema_name=self._schema_name,
current_blueprint=self._blueprint_mgr.get_blueprint(),
current_blueprint_score=self._blueprint_mgr.get_active_score(),
providers=providers,
providers=self._providers,
system_event_logger=self._system_event_logger,
)
self._planner.register_new_blueprint_callback(self._handle_new_blueprint)
Expand Down Expand Up @@ -735,6 +728,41 @@ async def _handle_internal_command(self, command: str) -> RowList:
self._transition_task = asyncio.create_task(self._run_transition_part_one())
return [(f"Transition to {preset} in progress.",)]

elif command.startswith("BRAD_CHANGE_SLO"):
# Format: BRAD_CHANGE_SLO <query p90 s> <txn p90 s>
parts = command.split(" ")
if self._temp_config is None:
return [("Cannot change SLOs because TempConfig is missing.",)]
if len(parts) <= 3:
return [("Need to specify query and txn p90 SLOs",)]

query_p90_s = float(parts[1])
txn_p90_s = float(parts[2])

# The planner asks for a new comparator from the provider on each
# run. So if we swap out the provider here, we will get a comparator
# with the updated SLOs on the next planning run.
assert self._providers is not None
self._providers.comparator_provider = self._get_comparator_provider(
query_p90_s, txn_p90_s
)

# Adjust triggers if applicable. This works because `get_triggers()`
# returns references to the actual triggers (i.e., it is not a
# read-only copy of the triggers).
assert self._planner is not None
for t in self._planner.get_triggers():
if isinstance(t, QueryLatencyCeiling):
t.set_latency_ceiling(query_p90_s)
elif isinstance(t, TransactionLatencyCeiling):
t.set_latency_ceiling(txn_p90_s)

return [
(
f"p90 SLOs changed to (query {query_p90_s:.3f} s), (txn {txn_p90_s:.3f} s)",
)
]

else:
logger.warning("Received unknown internal command: %s", command)
return []
Expand Down Expand Up @@ -874,6 +902,26 @@ async def _run_transition_part_two(self) -> None:
"Transition part two failed due to an unexpected exception."
)

def _get_comparator_provider(
self, query_p90_s: float, txn_p90_s: float
) -> BlueprintComparatorProvider:
assert self._temp_config is not None
if self._temp_config.comparator_type() == "benefit_perf_ceiling":
comparator_provider: BlueprintComparatorProvider = (
BenefitPerformanceCeilingComparatorProvider(
query_p90_s,
txn_p90_s,
self._temp_config.benefit_horizon(),
self._temp_config.penalty_threshold(),
self._temp_config.penalty_power(),
)
)
else:
comparator_provider = PerformanceCeilingComparatorProvider(
query_p90_s, txn_p90_s
)
return comparator_provider


class _NoopAnalyticsScorer(AnalyticsLatencyScorer):
def apply_predicted_latencies(self, _workload: Workload) -> None:
Expand Down
1 change: 1 addition & 0 deletions src/brad/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ async def _handle_internal_command(
or command.startswith("BRAD_RUN_PLANNER")
or command.startswith("BRAD_MODIFY_REDSHIFT")
or command.startswith("BRAD_USE_PRESET_BP")
or command.startswith("BRAD_CHANGE_SLO")
):
if self._daemon_request_mailbox.is_active():
return [
Expand Down
3 changes: 3 additions & 0 deletions src/brad/planner/triggers/query_latency_ceiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(
self._sustained_epochs = sustained_epochs
self._lookahead_epochs = lookahead_epochs

def set_latency_ceiling(self, ceiling_s: float) -> None:
self._latency_ceiling_s = ceiling_s

async def should_replan(self) -> bool:
if not self._passed_delays_since_cutoff():
logger.debug(
Expand Down
3 changes: 3 additions & 0 deletions src/brad/planner/triggers/txn_latency_ceiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(
self._sustained_epochs = sustained_epochs
self._lookahead_epochs = lookahead_epochs

def set_latency_ceiling(self, ceiling_s: float) -> None:
self._latency_ceiling_s = ceiling_s

async def should_replan(self) -> bool:
if not self._passed_delays_since_cutoff():
logger.debug(
Expand Down

0 comments on commit 7d156d2

Please sign in to comment.