From 2eefd45b2db3d132075e584088c0c872fb8c3bc6 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 26 Oct 2023 00:37:38 -0400 Subject: [PATCH] Fix transaction latency measurements (#333) * Track full transaction latency * Undo base scoring change - we need to adjust the coefficients * Fix scoring issue --- src/brad/front_end/front_end.py | 15 +++++++++++---- src/brad/front_end/session.py | 9 +++++++++ src/brad/planner/beam/query_based_candidate.py | 2 +- src/brad/planner/beam/table_based_candidate.py | 2 +- .../planner/scoring/performance/unified_aurora.py | 2 +- .../scoring/performance/unified_redshift.py | 2 +- 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/brad/front_end/front_end.py b/src/brad/front_end/front_end.py index 1511d83f..598ad1b1 100644 --- a/src/brad/front_end/front_end.py +++ b/src/brad/front_end/front_end.py @@ -307,6 +307,8 @@ async def _run_query_impl( connection = session.engines.get_connection(engine_to_use) cursor = connection.cursor_sync() start = datetime.now(tz=timezone.utc) + if query_rep.is_transaction_start(): + session.set_txn_start_timestamp(start) # Using execute_sync() is lower overhead than the async # interface. For transactions, we won't necessarily need the # async interface. @@ -339,7 +341,8 @@ async def _run_query_impl( if query_rep.is_transaction_start(): session.set_in_transaction(in_txn=True) - if query_rep.is_transaction_end(): + is_transaction_end = query_rep.is_transaction_end() + if is_transaction_end: session.set_in_transaction(in_txn=False) self._transaction_end_counter.bump() @@ -353,10 +356,14 @@ async def _run_query_impl( f"IsTransaction: {transactional_query}" ) run_time_s_float = run_time_s.total_seconds() - if transactional_query: - self._txn_latency_sketch.add(run_time_s_float) - else: + if not transactional_query: self._query_latency_sketch.add(run_time_s_float) + elif is_transaction_end: + # We want to record the duration of the entire transaction + # (not just one query in the transaction). + self._txn_latency_sketch.add( + (end - session.txn_start_timestamp()).total_seconds() + ) # Extract and return the results, if any. try: diff --git a/src/brad/front_end/session.py b/src/brad/front_end/session.py index bc5bac1c..bf15836f 100644 --- a/src/brad/front_end/session.py +++ b/src/brad/front_end/session.py @@ -1,5 +1,7 @@ import asyncio import logging +import pytz +from datetime import datetime from typing import Dict, Tuple, Optional from brad.config.engine import Engine @@ -23,6 +25,7 @@ def __init__(self, session_id: SessionId, engines: EngineConnections): self._engines = engines self._in_txn = False self._closed = False + self._txn_start_timestamp = datetime.now(tz=pytz.utc) @property def identifier(self) -> SessionId: @@ -43,6 +46,12 @@ def closed(self) -> bool: def set_in_transaction(self, in_txn: bool) -> None: self._in_txn = in_txn + def set_txn_start_timestamp(self, timestamp: datetime) -> None: + self._txn_start_timestamp = timestamp + + def txn_start_timestamp(self) -> datetime: + return self._txn_start_timestamp + async def close(self): self._closed = True await self._engines.close() diff --git a/src/brad/planner/beam/query_based_candidate.py b/src/brad/planner/beam/query_based_candidate.py index 78d669c7..008cbd37 100644 --- a/src/brad/planner/beam/query_based_candidate.py +++ b/src/brad/planner/beam/query_based_candidate.py @@ -432,7 +432,7 @@ def recompute_provisioning_dependent_scoring(self, ctx: ScoringContext) -> None: self.redshift_score = RedshiftProvisioningScore.compute( np.array(self.base_query_latencies[Engine.Redshift]), ctx.next_workload.get_arrival_counts_batch( - self.query_locations[Engine.Aurora] + self.query_locations[Engine.Redshift] ), ctx.current_blueprint.redshift_provisioning(), self.redshift_provisioning, diff --git a/src/brad/planner/beam/table_based_candidate.py b/src/brad/planner/beam/table_based_candidate.py index 9f936313..9ee32fbf 100644 --- a/src/brad/planner/beam/table_based_candidate.py +++ b/src/brad/planner/beam/table_based_candidate.py @@ -405,7 +405,7 @@ def recompute_provisioning_dependent_scoring(self, ctx: ScoringContext) -> None: self.query_locations[Engine.Redshift], Engine.Redshift ), ctx.next_workload.get_arrival_counts_batch( - self.query_locations[Engine.Aurora] + self.query_locations[Engine.Redshift] ), ctx.current_blueprint.redshift_provisioning(), self.redshift_provisioning, diff --git a/src/brad/planner/scoring/performance/unified_aurora.py b/src/brad/planner/scoring/performance/unified_aurora.py index 752af1e9..6e7bcbef 100644 --- a/src/brad/planner/scoring/performance/unified_aurora.py +++ b/src/brad/planner/scoring/performance/unified_aurora.py @@ -298,4 +298,4 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: dest.update(self.debug_values) -_AURORA_BASE_RESOURCE_VALUE = aurora_num_cpus(Provisioning("db.r6g.xlarge", 1)) +_AURORA_BASE_RESOURCE_VALUE = aurora_num_cpus(Provisioning("db.r6g.large", 1)) diff --git a/src/brad/planner/scoring/performance/unified_redshift.py b/src/brad/planner/scoring/performance/unified_redshift.py index c0a152bb..d88561f1 100644 --- a/src/brad/planner/scoring/performance/unified_redshift.py +++ b/src/brad/planner/scoring/performance/unified_redshift.py @@ -130,4 +130,4 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None: dest.update(self.debug_values) -_REDSHIFT_BASE_RESOURCE_VALUE = redshift_num_cpus(Provisioning("dc2.large", 2)) +_REDSHIFT_BASE_RESOURCE_VALUE = redshift_num_cpus(Provisioning("dc2.large", 1))