Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal enhancements #336

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ triggers:
ceiling_s: 30.0
sustained_epochs: 3

txn_latency_ceiling:
ceiling_s: 0.030
sustained_epochs: 3

###
### Beam planning constants.
###
Expand Down Expand Up @@ -311,19 +315,19 @@ aurora_txns:
C_2: 0.00108688

# Used for latency.
K: 1.0581694841384888
b_p50: 0.0022556402254849672
b_p95: 0.00383179122582078
K: 1.0194002389907837
b_p50: 0.005365931428968906
b_p90: 0.005891922861337662 # TODO: Update

imdb_extended_100g:
# Used for "load translation"
C_1: 0.00282164
C_2: 0.00108688

# Used for latency.
K: 1.0729172229766846
b_p50: 0.001035563531331718
b_p95: 0.00274773221462965
K: 1.0811012983322144
b_p50: 0.0008631267119199038
b_p90: 0.002251814818009734


aurora_scaling:
Expand Down
2 changes: 1 addition & 1 deletion config/temp_config_sample.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
latency_ceiling_s: 30.0
txn_latency_p50_ceiling_s: 0.020 # Currently unused.
txn_latency_p95_ceiling_s: 0.030
txn_latency_p90_ceiling_s: 0.030

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/
Expand Down
34 changes: 22 additions & 12 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def parse_metrics(kv_str: str) -> Dict[str, float]:
return metrics


def run_planner(args) -> None:
async def run_planner_impl(args) -> None:
"""
This admin action is used to manually test the blueprint planner
independently of the rest of BRAD.
Expand Down Expand Up @@ -153,17 +153,20 @@ def run_planner(args) -> None:
)
workload_dir = pathlib.Path(args.workload_dir)
table_sizer = TableSizer(engines, config)
builder = WorkloadBuilder()
workload = (
builder.add_analytical_queries_and_counts_from_file(
builder = (
WorkloadBuilder()
.add_analytical_queries_and_counts_from_file(
args.query_bank_file,
args.query_counts_file,
)
.add_transactional_queries_from_file(workload_dir / "oltp.sql")
.for_period(timedelta(hours=1))
.table_sizes_from_engines(blueprint_mgr.get_blueprint(), table_sizer)
.build()
)
workload = (
await builder.table_sizes_from_engines(
blueprint_mgr.get_blueprint(), table_sizer
)
).build()

elif args.workload_source == "workload_dir":
assert args.analytical_rate_per_s is not None
Expand All @@ -173,17 +176,20 @@ def run_planner(args) -> None:
)
table_sizer = TableSizer(engines, config)
workload_dir = pathlib.Path(args.workload_dir)
builder = WorkloadBuilder()
workload = (
builder.add_analytical_queries_from_file(workload_dir / "olap.sql")
builder = (
WorkloadBuilder()
.add_analytical_queries_from_file(workload_dir / "olap.sql")
.add_transactional_queries_from_file(workload_dir / "oltp.sql")
.uniform_per_analytical_query_rate(
args.analytical_rate_per_s, period=timedelta(seconds=1)
)
.for_period(timedelta(hours=1))
.table_sizes_from_engines(blueprint_mgr.get_blueprint(), table_sizer)
.build()
)
workload = (
await builder.table_sizes_from_engines(
blueprint_mgr.get_blueprint(), table_sizer
)
).build()

# 5. Load the pre-computed predictions.
prediction_dir = pathlib.Path(args.predictions_dir)
Expand Down Expand Up @@ -234,7 +240,7 @@ def run_planner(args) -> None:
# TODO: Make this configurable.
comparator=best_cost_under_perf_ceilings(
max_query_latency_s=args.latency_ceiling_s,
max_txn_p95_latency_s=0.020, # FIXME: Add command-line argument if needed.
max_txn_p90_latency_s=0.020, # FIXME: Add command-line argument if needed.
),
metrics_provider=metrics_provider,
data_access_provider=data_access_provider,
Expand Down Expand Up @@ -278,4 +284,8 @@ async def on_new_blueprint(blueprint: Blueprint, score: Score):
)


def run_planner(args) -> None:
asyncio.run(run_planner_impl(args))


_PICKLE_FILE_NAME = "workload.pickle"
4 changes: 2 additions & 2 deletions src/brad/config/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class FrontEndMetric(enum.Enum):
TxnEndPerSecond = "txn_end_per_s"

QueryLatencySecondP50 = "query_latency_s_p50"
QueryLatencySecondP95 = "query_latency_s_p95"
QueryLatencySecondP90 = "query_latency_s_p90"

TxnLatencySecondP50 = "txn_latency_s_p50"
TxnLatencySecondP95 = "txn_latency_s_p95"
TxnLatencySecondP90 = "txn_latency_s_p90"
4 changes: 2 additions & 2 deletions src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def latency_ceiling_s(self) -> float:
def txn_latency_p50_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p50_ceiling_s"])

def txn_latency_p95_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p95_ceiling_s"])
def txn_latency_p90_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p90_ceiling_s"])

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
Expand Down
4 changes: 2 additions & 2 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def _run_setup(self) -> None:
)
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=self._temp_config.latency_ceiling_s(),
max_txn_p95_latency_s=self._temp_config.txn_latency_p95_ceiling_s(),
max_txn_p90_latency_s=self._temp_config.txn_latency_p90_ceiling_s(),
)
else:
logger.warning(
Expand All @@ -188,7 +188,7 @@ async def _run_setup(self) -> None:
latency_scorer = _NoopAnalyticsScorer()
data_access_provider = _NoopDataAccessProvider()
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=10, max_txn_p95_latency_s=0.030
max_query_latency_s=10, max_txn_p90_latency_s=0.030
)

self._planner = BlueprintPlannerFactory.create(
Expand Down
18 changes: 9 additions & 9 deletions src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def __init__(
FrontEndMetric.TxnEndPerSecond.value,
FrontEndMetric.QueryLatencySecondP50.value,
FrontEndMetric.TxnLatencySecondP50.value,
FrontEndMetric.QueryLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.QueryLatencySecondP90.value,
FrontEndMetric.TxnLatencySecondP90.value,
]
self._values_df = pd.DataFrame(columns=self._ordered_metrics.copy())
self._logger = MetricsLogger.create_from_config(
Expand Down Expand Up @@ -149,26 +149,26 @@ async def fetch_latest(self) -> None:
"Missing latency sketch values for %s", metric_key
)
p50_val = 0.0
p95_val = 0.0
p90_val = 0.0
else:
p50_val_cand = merged.get_quantile_value(0.5)
p95_val_cand = merged.get_quantile_value(0.9)
p90_val_cand = merged.get_quantile_value(0.9)
p50_val = p50_val_cand if p50_val_cand is not None else 0.0
p95_val = p95_val_cand if p95_val_cand is not None else 0.0
p90_val = p90_val_cand if p90_val_cand is not None else 0.0

if metric_key == _MetricKey.QueryLatencySecond:
data_cols[FrontEndMetric.QueryLatencySecondP50.value].append(
p50_val
)
data_cols[FrontEndMetric.QueryLatencySecondP95.value].append(
p95_val
data_cols[FrontEndMetric.QueryLatencySecondP90.value].append(
p90_val
)
else:
data_cols[FrontEndMetric.TxnLatencySecondP50.value].append(
p50_val
)
data_cols[FrontEndMetric.TxnLatencySecondP95.value].append(
p95_val
data_cols[FrontEndMetric.TxnLatencySecondP90.value].append(
p90_val
)
else:
logger.warning("Unhandled front end metric: %s", metric_key)
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
# 1. Fetch the next workload and apply predictions.
metrics, metrics_timestamp = self._metrics_provider.get_metrics()
logger.debug("Using metrics: %s", str(metrics))
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1)
)
self._analytics_latency_scorer.apply_predicted_latencies(next_workload)
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 1. Fetch metrics and the next workload and then apply predictions.
metrics, metrics_timestamp = self._metrics_provider.get_metrics()
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1)
)
self._analytics_latency_scorer.apply_predicted_latencies(next_workload)
Expand Down
11 changes: 11 additions & 0 deletions src/brad/planner/beam/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger
from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling
from brad.planner.triggers.variable_costs import VariableCosts


Expand Down Expand Up @@ -64,4 +65,14 @@ def get_beam_triggers(
)
)

txn_latency_ceiling = trigger_config["txn_latency_ceiling"]
if "disabled" not in txn_latency_ceiling:
trigger_list.append(
TransactionLatencyCeiling(
monitor,
latency_ceiling["ceiling_s"],
latency_ceiling["sustained_epochs"],
)
)

return trigger_list
18 changes: 9 additions & 9 deletions src/brad/planner/compare/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,27 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo

def best_cost_under_perf_ceilings(
max_query_latency_s: float,
max_txn_p95_latency_s: float,
max_txn_p90_latency_s: float,
) -> BlueprintComparator:
def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> bool:
# Check transactional latency ceilings first.
left_txn_p95 = left.get_predicted_transactional_latencies()[0]
right_txn_p95 = right.get_predicted_transactional_latencies()[0]
left_txn_p90 = left.get_predicted_transactional_latencies()[0]
right_txn_p90 = right.get_predicted_transactional_latencies()[0]

# If one of these candidates have NaN predictions, we need to
# consider other factors. NaN indicates that a prediction is not
# available (e.g., due to missing metrics).
if not math.isnan(left_txn_p95) and not math.isnan(right_txn_p95):
if not math.isnan(left_txn_p90) and not math.isnan(right_txn_p90):
# Both above the ceiling, return the blueprint that does better on
# performance.
if (
left_txn_p95 > max_txn_p95_latency_s
and right_txn_p95 > max_txn_p95_latency_s
left_txn_p90 > max_txn_p90_latency_s
and right_txn_p90 > max_txn_p90_latency_s
):
return left_txn_p95 < right_txn_p95
elif left_txn_p95 > max_txn_p95_latency_s:
return left_txn_p90 < right_txn_p90
elif left_txn_p90 > max_txn_p90_latency_s:
return False
elif right_txn_p95 > max_txn_p95_latency_s:
elif right_txn_p90 > max_txn_p90_latency_s:
return True

# Query latency ceilings.
Expand Down
12 changes: 6 additions & 6 deletions src/brad/planner/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"aurora_reader_load_minute_avg",
"txn_completions_per_s",
"txn_lat_s_p50",
"txn_lat_s_p95",
"txn_lat_s_p90",
],
)

Expand Down Expand Up @@ -174,13 +174,13 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
default_value=0.0,
name="txn_lat_s_p50",
)
txn_lat_s_p95 = self._extract_most_recent_possibly_missing(
txn_lat_s_p90 = self._extract_most_recent_possibly_missing(
front_end.loc[
front_end.index <= most_recent_common,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP90.value,
],
default_value=0.0,
name="txn_lat_s_p95",
name="txn_lat_s_p90",
)

aurora_writer_rel = aurora_writer.loc[aurora_writer.index <= most_recent_common]
Expand Down Expand Up @@ -226,7 +226,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
aurora_reader_load_minute_avg=aurora_reader_load_minute,
txn_completions_per_s=txn_per_s,
txn_lat_s_p50=txn_lat_s_p50,
txn_lat_s_p95=txn_lat_s_p95,
txn_lat_s_p90=txn_lat_s_p90,
),
most_recent_common.to_pydatetime(),
)
Expand Down Expand Up @@ -292,5 +292,5 @@ def _recover_load_value(self, aurora_rel: pd.DataFrame, metric_name: str) -> flo
_FRONT_END_METRICS = [
FrontEndMetric.TxnEndPerSecond.value,
FrontEndMetric.TxnLatencySecondP50.value,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP90.value,
]
2 changes: 1 addition & 1 deletion src/brad/planner/neighborhood/neighborhood.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
# the daemon process.
logger.info("Running a replan.")
self._log_current_metrics()
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
datetime.now().astimezone(pytz.utc), window_multiplier
)
workload_filters = [
Expand Down
1 change: 1 addition & 0 deletions src/brad/planner/scoring/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def simulate_current_workload_routing(self, router: Router) -> None:
maybe_eng = query.primary_execution_location()
if maybe_eng is not None:
self.current_query_locations[maybe_eng].append(qidx)
continue

# Fall back to the router if the historical routing location is not
# available.
Expand Down
6 changes: 3 additions & 3 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ def _scale_txn_latency(
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
observed_lats = np.array([ctx.metrics.txn_lat_s_p50, ctx.metrics.txn_lat_s_p95])
observed_lats = np.array([ctx.metrics.txn_lat_s_p50, ctx.metrics.txn_lat_s_p90])

# Q(u) = a / (K - u) + b ; u is CPU utilization in [0, 1]
# --> Q(u') = (K - u) / (K - u') (Q(u) - b) + b

model = ctx.planner_config.aurora_txn_coefs(ctx.schema_name)
K = model["K"]
b = np.array([model["b_p50"], model["b_p95"]])
b = np.array([model["b_p50"], model["b_p90"]])

curr_num_cpus = aurora_num_cpus(curr_prov)
next_num_cpus = aurora_num_cpus(to_prov)
Expand Down Expand Up @@ -301,7 +301,7 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None:
dest["aurora_pred_txn_peak_cpu_denorm"] = self.pred_txn_peak_cpu_denorm
(
dest["aurora_pred_txn_lat_s_p50"],
dest["aurora_pred_txn_lat_s_p95"],
dest["aurora_pred_txn_lat_s_p90"],
) = self.scaled_txn_lats
dest.update(self.debug_values)

Expand Down
Loading