Skip to content

Commit

Permalink
Fix transaction latency measurements (#333)
Browse files Browse the repository at this point in the history
* Track full transaction latency

* Undo base scoring change - we need to adjust the coefficients

* Fix scoring issue
  • Loading branch information
geoffxy authored Oct 26, 2023
1 parent b508550 commit 2eefd45
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 8 deletions.
15 changes: 11 additions & 4 deletions src/brad/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ async def _run_query_impl(
connection = session.engines.get_connection(engine_to_use)
cursor = connection.cursor_sync()
start = datetime.now(tz=timezone.utc)
if query_rep.is_transaction_start():
session.set_txn_start_timestamp(start)
# Using execute_sync() is lower overhead than the async
# interface. For transactions, we won't necessarily need the
# async interface.
Expand Down Expand Up @@ -339,7 +341,8 @@ async def _run_query_impl(
if query_rep.is_transaction_start():
session.set_in_transaction(in_txn=True)

if query_rep.is_transaction_end():
is_transaction_end = query_rep.is_transaction_end()
if is_transaction_end:
session.set_in_transaction(in_txn=False)
self._transaction_end_counter.bump()

Expand All @@ -353,10 +356,14 @@ async def _run_query_impl(
f"IsTransaction: {transactional_query}"
)
run_time_s_float = run_time_s.total_seconds()
if transactional_query:
self._txn_latency_sketch.add(run_time_s_float)
else:
if not transactional_query:
self._query_latency_sketch.add(run_time_s_float)
elif is_transaction_end:
# We want to record the duration of the entire transaction
# (not just one query in the transaction).
self._txn_latency_sketch.add(
(end - session.txn_start_timestamp()).total_seconds()
)

# Extract and return the results, if any.
try:
Expand Down
9 changes: 9 additions & 0 deletions src/brad/front_end/session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import pytz
from datetime import datetime
from typing import Dict, Tuple, Optional

from brad.config.engine import Engine
Expand All @@ -23,6 +25,7 @@ def __init__(self, session_id: SessionId, engines: EngineConnections):
self._engines = engines
self._in_txn = False
self._closed = False
self._txn_start_timestamp = datetime.now(tz=pytz.utc)

@property
def identifier(self) -> SessionId:
Expand All @@ -43,6 +46,12 @@ def closed(self) -> bool:
def set_in_transaction(self, in_txn: bool) -> None:
self._in_txn = in_txn

def set_txn_start_timestamp(self, timestamp: datetime) -> None:
self._txn_start_timestamp = timestamp

def txn_start_timestamp(self) -> datetime:
return self._txn_start_timestamp

async def close(self):
self._closed = True
await self._engines.close()
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/query_based_candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def recompute_provisioning_dependent_scoring(self, ctx: ScoringContext) -> None:
self.redshift_score = RedshiftProvisioningScore.compute(
np.array(self.base_query_latencies[Engine.Redshift]),
ctx.next_workload.get_arrival_counts_batch(
self.query_locations[Engine.Aurora]
self.query_locations[Engine.Redshift]
),
ctx.current_blueprint.redshift_provisioning(),
self.redshift_provisioning,
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/table_based_candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def recompute_provisioning_dependent_scoring(self, ctx: ScoringContext) -> None:
self.query_locations[Engine.Redshift], Engine.Redshift
),
ctx.next_workload.get_arrival_counts_batch(
self.query_locations[Engine.Aurora]
self.query_locations[Engine.Redshift]
),
ctx.current_blueprint.redshift_provisioning(),
self.redshift_provisioning,
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,4 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None:
dest.update(self.debug_values)


_AURORA_BASE_RESOURCE_VALUE = aurora_num_cpus(Provisioning("db.r6g.xlarge", 1))
_AURORA_BASE_RESOURCE_VALUE = aurora_num_cpus(Provisioning("db.r6g.large", 1))
2 changes: 1 addition & 1 deletion src/brad/planner/scoring/performance/unified_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,4 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None:
dest.update(self.debug_values)


_REDSHIFT_BASE_RESOURCE_VALUE = redshift_num_cpus(Provisioning("dc2.large", 2))
_REDSHIFT_BASE_RESOURCE_VALUE = redshift_num_cpus(Provisioning("dc2.large", 1))

0 comments on commit 2eefd45

Please sign in to comment.