From 59b55fd5ddda01b263335de8911152804456461a Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 13 Nov 2023 19:04:55 -0500 Subject: [PATCH] Make connection initialization more robust (#366) * Add additional connection messages * Improve debugging output * Retry connections on startup, pass back connection errors * Fix oneof bug --- proto/brad.proto | 9 +- src/brad/connection/connection.py | 3 +- src/brad/front_end/front_end.py | 21 +++- src/brad/front_end/grpc.py | 10 +- src/brad/grpc_client.py | 14 ++- src/brad/proto_gen/blueprint_pb2.py | 40 +++--- src/brad/proto_gen/blueprint_pb2.pyi | 84 +++++++------ src/brad/proto_gen/brad_pb2.py | 64 +++++----- src/brad/proto_gen/brad_pb2.pyi | 118 ++++++++++-------- src/brad/routing/router.py | 2 +- .../IMDB_extended/run_repeating_analytics.py | 63 +++++++--- workloads/IMDB_extended/run_transactions.py | 16 ++- .../IMDB_extended/workload_utils/connect.py | 5 +- 13 files changed, 272 insertions(+), 177 deletions(-) diff --git a/proto/brad.proto b/proto/brad.proto index 479840de..43788664 100644 --- a/proto/brad.proto +++ b/proto/brad.proto @@ -30,7 +30,14 @@ message StartSessionRequest { } message StartSessionResponse { - SessionId id = 1; + oneof result { + SessionId id = 1; + StartSessionError error = 2; + } +} + +message StartSessionError { + string error_msg = 1; } message RunQueryRequest { diff --git a/src/brad/connection/connection.py b/src/brad/connection/connection.py index a646b0b3..dc72a131 100644 --- a/src/brad/connection/connection.py +++ b/src/brad/connection/connection.py @@ -40,5 +40,6 @@ def is_connected(self) -> bool: class ConnectionFailed(Exception): """ - Used when + Used when an existing connection fails for any reason, or we failed to + establish a connection to an underlying engine. """ diff --git a/src/brad/front_end/front_end.py b/src/brad/front_end/front_end.py index f36b06f8..af02e83c 100644 --- a/src/brad/front_end/front_end.py +++ b/src/brad/front_end/front_end.py @@ -18,6 +18,7 @@ from brad.blueprint.manager import BlueprintManager from brad.config.engine import Engine from brad.config.file import ConfigFile +from brad.connection.connection import ConnectionFailed from brad.daemon.monitor import Monitor from brad.daemon.messages import ( ShutdownFrontEnd, @@ -262,8 +263,24 @@ async def _run_teardown(self): self._estimator = None async def start_session(self) -> SessionId: - session_id, _ = await self._sessions.create_new_session() - return session_id + rand_backoff = None + while True: + try: + session_id, _ = await self._sessions.create_new_session() + return session_id + except ConnectionFailed: + if rand_backoff is None: + rand_backoff = RandomizedExponentialBackoff( + max_retries=10, base_delay_s=0.5, max_delay_s=10.0 + ) + time_to_wait = rand_backoff.wait_time_s() + if time_to_wait is None: + logger.exception( + "Failed to start a new session due to a repeated " + "connection failure (10 retries)." + ) + raise + await asyncio.sleep(time_to_wait) async def end_session(self, session_id: SessionId) -> None: await self._sessions.end_session(session_id) diff --git a/src/brad/front_end/grpc.py b/src/brad/front_end/grpc.py index 1fccf1f7..441521e5 100644 --- a/src/brad/front_end/grpc.py +++ b/src/brad/front_end/grpc.py @@ -4,6 +4,7 @@ import brad.proto_gen.brad_pb2_grpc as rpc from brad.config.engine import Engine from brad.config.session import SessionId +from brad.connection.connection import ConnectionFailed from brad.front_end.brad_interface import BradInterface from brad.front_end.errors import QueryError @@ -24,8 +25,13 @@ def __init__(self, brad: BradInterface): async def StartSession( self, _request: b.StartSessionRequest, _context ) -> b.StartSessionResponse: - new_session_id = await self._brad.start_session() - return b.StartSessionResponse(id=b.SessionId(id_value=new_session_id.value())) + try: + new_session_id = await self._brad.start_session() + return b.StartSessionResponse( + id=b.SessionId(id_value=new_session_id.value()) + ) + except ConnectionFailed as ex: + return b.StartSessionResponse(error=b.StartSessionError(error_msg=repr(ex))) async def RunQuery( self, request: b.RunQueryRequest, _context diff --git a/src/brad/grpc_client.py b/src/brad/grpc_client.py index 521ff9a0..f558201c 100644 --- a/src/brad/grpc_client.py +++ b/src/brad/grpc_client.py @@ -122,7 +122,19 @@ def close(self) -> None: def start_session(self) -> SessionId: assert self._stub is not None result = self._stub.StartSession(b.StartSessionRequest()) - return SessionId(result.id.id_value) + msg_kind = result.WhichOneof("result") + if msg_kind is None: + raise BradClientError( + message="BRAD RPC error: Unspecified start session result." + ) + elif msg_kind == "id": + return SessionId(result.id.id_value) + elif msg_kind == "error": + raise BradClientError(message=result.error.error_msg) + else: + raise BradClientError( + message="BRAD RPC error: Unknown start session result." + ) def end_session(self, session_id: SessionId) -> None: assert self._stub is not None diff --git a/src/brad/proto_gen/blueprint_pb2.py b/src/brad/proto_gen/blueprint_pb2.py index 77e1c617..b913072f 100644 --- a/src/brad/proto_gen/blueprint_pb2.py +++ b/src/brad/proto_gen/blueprint_pb2.py @@ -2,10 +2,10 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: blueprint.proto """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,25 +15,25 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x62lueprint.proto\x12\x04\x62rad\"\xac\x01\n\tBlueprint\x12\x13\n\x0bschema_name\x18\x01 \x01(\t\x12\x1b\n\x06tables\x18\x02 \x03(\x0b\x32\x0b.brad.Table\x12\"\n\x06\x61urora\x18\x03 \x01(\x0b\x32\x12.brad.Provisioning\x12$\n\x08redshift\x18\x04 \x01(\x0b\x32\x12.brad.Provisioning\x12#\n\x06policy\x18\x05 \x01(\x0b\x32\x13.brad.RoutingPolicy\"\xab\x01\n\x05Table\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\"\n\x07\x63olumns\x18\x02 \x03(\x0b\x32\x11.brad.TableColumn\x12\x1f\n\tlocations\x18\x03 \x03(\x0e\x32\x0c.brad.Engine\x12+\n\x0c\x64\x65pendencies\x18\x04 \x01(\x0b\x32\x15.brad.TableDependency\x12\x1c\n\x07indexes\x18\x05 \x03(\x0b\x32\x0b.brad.Index\"B\n\x0bTableColumn\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x11\n\tdata_type\x18\x02 \x01(\t\x12\x12\n\nis_primary\x18\x03 \x01(\x08\"@\n\x0fTableDependency\x12\x1a\n\x12source_table_names\x18\x02 \x03(\t\x12\x11\n\ttransform\x18\x03 \x01(\t\"8\n\x0cProvisioning\x12\x15\n\rinstance_type\x18\x01 \x01(\t\x12\x11\n\tnum_nodes\x18\x02 \x01(\r\"\x1f\n\rRoutingPolicy\x12\x0e\n\x06policy\x18\x01 \x01(\x0c\"\x1c\n\x05Index\x12\x13\n\x0b\x63olumn_name\x18\x01 \x03(\t*;\n\x06\x45ngine\x12\x0b\n\x07UNKNOWN\x10\x00\x12\n\n\x06\x41URORA\x10\x01\x12\x0c\n\x08REDSHIFT\x10\x02\x12\n\n\x06\x41THENA\x10\x03\x62\x06proto3') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'blueprint_pb2', globals()) +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'blueprint_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _ENGINE._serialized_start=629 - _ENGINE._serialized_end=688 - _BLUEPRINT._serialized_start=26 - _BLUEPRINT._serialized_end=198 - _TABLE._serialized_start=201 - _TABLE._serialized_end=372 - _TABLECOLUMN._serialized_start=374 - _TABLECOLUMN._serialized_end=440 - _TABLEDEPENDENCY._serialized_start=442 - _TABLEDEPENDENCY._serialized_end=506 - _PROVISIONING._serialized_start=508 - _PROVISIONING._serialized_end=564 - _ROUTINGPOLICY._serialized_start=566 - _ROUTINGPOLICY._serialized_end=597 - _INDEX._serialized_start=599 - _INDEX._serialized_end=627 + _globals['_ENGINE']._serialized_start=629 + _globals['_ENGINE']._serialized_end=688 + _globals['_BLUEPRINT']._serialized_start=26 + _globals['_BLUEPRINT']._serialized_end=198 + _globals['_TABLE']._serialized_start=201 + _globals['_TABLE']._serialized_end=372 + _globals['_TABLECOLUMN']._serialized_start=374 + _globals['_TABLECOLUMN']._serialized_end=440 + _globals['_TABLEDEPENDENCY']._serialized_start=442 + _globals['_TABLEDEPENDENCY']._serialized_end=506 + _globals['_PROVISIONING']._serialized_start=508 + _globals['_PROVISIONING']._serialized_end=564 + _globals['_ROUTINGPOLICY']._serialized_start=566 + _globals['_ROUTINGPOLICY']._serialized_end=597 + _globals['_INDEX']._serialized_start=599 + _globals['_INDEX']._serialized_end=627 # @@protoc_insertion_point(module_scope) diff --git a/src/brad/proto_gen/blueprint_pb2.pyi b/src/brad/proto_gen/blueprint_pb2.pyi index 188bddfe..836dc0d3 100644 --- a/src/brad/proto_gen/blueprint_pb2.pyi +++ b/src/brad/proto_gen/blueprint_pb2.pyi @@ -4,68 +4,55 @@ from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union -ATHENA: Engine -AURORA: Engine DESCRIPTOR: _descriptor.FileDescriptor -REDSHIFT: Engine + +class Engine(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] # type: ignore + UNKNOWN: _ClassVar[Engine] + AURORA: _ClassVar[Engine] + REDSHIFT: _ClassVar[Engine] + ATHENA: _ClassVar[Engine] UNKNOWN: Engine +AURORA: Engine +REDSHIFT: Engine +ATHENA: Engine class Blueprint(_message.Message): - __slots__ = ["aurora", "policy", "redshift", "schema_name", "tables"] - AURORA_FIELD_NUMBER: _ClassVar[int] - POLICY_FIELD_NUMBER: _ClassVar[int] - REDSHIFT_FIELD_NUMBER: _ClassVar[int] + __slots__ = ["schema_name", "tables", "aurora", "redshift", "policy"] SCHEMA_NAME_FIELD_NUMBER: _ClassVar[int] TABLES_FIELD_NUMBER: _ClassVar[int] - aurora: Provisioning - policy: RoutingPolicy - redshift: Provisioning + AURORA_FIELD_NUMBER: _ClassVar[int] + REDSHIFT_FIELD_NUMBER: _ClassVar[int] + POLICY_FIELD_NUMBER: _ClassVar[int] schema_name: str tables: _containers.RepeatedCompositeFieldContainer[Table] + aurora: Provisioning + redshift: Provisioning + policy: RoutingPolicy def __init__(self, schema_name: _Optional[str] = ..., tables: _Optional[_Iterable[_Union[Table, _Mapping]]] = ..., aurora: _Optional[_Union[Provisioning, _Mapping]] = ..., redshift: _Optional[_Union[Provisioning, _Mapping]] = ..., policy: _Optional[_Union[RoutingPolicy, _Mapping]] = ...) -> None: ... -class Index(_message.Message): - __slots__ = ["column_name"] - COLUMN_NAME_FIELD_NUMBER: _ClassVar[int] - column_name: _containers.RepeatedScalarFieldContainer[str] - def __init__(self, column_name: _Optional[_Iterable[str]] = ...) -> None: ... - -class Provisioning(_message.Message): - __slots__ = ["instance_type", "num_nodes"] - INSTANCE_TYPE_FIELD_NUMBER: _ClassVar[int] - NUM_NODES_FIELD_NUMBER: _ClassVar[int] - instance_type: str - num_nodes: int - def __init__(self, instance_type: _Optional[str] = ..., num_nodes: _Optional[int] = ...) -> None: ... - -class RoutingPolicy(_message.Message): - __slots__ = ["policy"] - POLICY_FIELD_NUMBER: _ClassVar[int] - policy: bytes - def __init__(self, policy: _Optional[bytes] = ...) -> None: ... - class Table(_message.Message): - __slots__ = ["columns", "dependencies", "indexes", "locations", "table_name"] + __slots__ = ["table_name", "columns", "locations", "dependencies", "indexes"] + TABLE_NAME_FIELD_NUMBER: _ClassVar[int] COLUMNS_FIELD_NUMBER: _ClassVar[int] + LOCATIONS_FIELD_NUMBER: _ClassVar[int] DEPENDENCIES_FIELD_NUMBER: _ClassVar[int] INDEXES_FIELD_NUMBER: _ClassVar[int] - LOCATIONS_FIELD_NUMBER: _ClassVar[int] - TABLE_NAME_FIELD_NUMBER: _ClassVar[int] + table_name: str columns: _containers.RepeatedCompositeFieldContainer[TableColumn] + locations: _containers.RepeatedScalarFieldContainer[Engine] dependencies: TableDependency indexes: _containers.RepeatedCompositeFieldContainer[Index] - locations: _containers.RepeatedScalarFieldContainer[Engine] - table_name: str def __init__(self, table_name: _Optional[str] = ..., columns: _Optional[_Iterable[_Union[TableColumn, _Mapping]]] = ..., locations: _Optional[_Iterable[_Union[Engine, str]]] = ..., dependencies: _Optional[_Union[TableDependency, _Mapping]] = ..., indexes: _Optional[_Iterable[_Union[Index, _Mapping]]] = ...) -> None: ... class TableColumn(_message.Message): - __slots__ = ["data_type", "is_primary", "name"] + __slots__ = ["name", "data_type", "is_primary"] + NAME_FIELD_NUMBER: _ClassVar[int] DATA_TYPE_FIELD_NUMBER: _ClassVar[int] IS_PRIMARY_FIELD_NUMBER: _ClassVar[int] - NAME_FIELD_NUMBER: _ClassVar[int] + name: str data_type: str is_primary: bool - name: str def __init__(self, name: _Optional[str] = ..., data_type: _Optional[str] = ..., is_primary: bool = ...) -> None: ... class TableDependency(_message.Message): @@ -76,5 +63,22 @@ class TableDependency(_message.Message): transform: str def __init__(self, source_table_names: _Optional[_Iterable[str]] = ..., transform: _Optional[str] = ...) -> None: ... -class Engine(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): # type: ignore - __slots__ = [] # type: ignore +class Provisioning(_message.Message): + __slots__ = ["instance_type", "num_nodes"] + INSTANCE_TYPE_FIELD_NUMBER: _ClassVar[int] + NUM_NODES_FIELD_NUMBER: _ClassVar[int] + instance_type: str + num_nodes: int + def __init__(self, instance_type: _Optional[str] = ..., num_nodes: _Optional[int] = ...) -> None: ... + +class RoutingPolicy(_message.Message): + __slots__ = ["policy"] + POLICY_FIELD_NUMBER: _ClassVar[int] + policy: bytes + def __init__(self, policy: _Optional[bytes] = ...) -> None: ... + +class Index(_message.Message): + __slots__ = ["column_name"] + COLUMN_NAME_FIELD_NUMBER: _ClassVar[int] + column_name: _containers.RepeatedScalarFieldContainer[str] + def __init__(self, column_name: _Optional[_Iterable[str]] = ...) -> None: ... diff --git a/src/brad/proto_gen/brad_pb2.py b/src/brad/proto_gen/brad_pb2.py index d854e35d..314c3c1c 100644 --- a/src/brad/proto_gen/brad_pb2.py +++ b/src/brad/proto_gen/brad_pb2.py @@ -2,10 +2,10 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: brad.proto """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -13,37 +13,39 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nbrad.proto\x12\x04\x62rad\"\x1d\n\tSessionId\x12\x10\n\x08id_value\x18\x01 \x01(\x04\"%\n\x13StartSessionRequest\x12\x0e\n\x06unused\x18\x64 \x01(\x04\"3\n\x14StartSessionResponse\x12\x1b\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionId\"=\n\x0fRunQueryRequest\x12\x1b\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionId\x12\r\n\x05query\x18\x02 \x01(\t\"\xa2\x01\n\x10RunQueryResponse\x12#\n\x03row\x18\x01 \x01(\x0b\x32\x14.brad.QueryResultRowH\x00\x12!\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.brad.QueryErrorH\x00\x12\'\n\x08\x65xecutor\x18\x64 \x01(\x0e\x32\x15.brad.ExecutionEngine\x12\x13\n\x0bnot_tabular\x18\x65 \x01(\x08\x42\x08\n\x06result\"o\n\x14RunQueryJsonResponse\x12*\n\x07results\x18\x01 \x01(\x0b\x32\x17.brad.QueryJsonResponseH\x00\x12!\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.brad.QueryErrorH\x00\x42\x08\n\x06result\"\"\n\x0eQueryResultRow\x12\x10\n\x08row_data\x18\x01 \x01(\x0c\"5\n\nQueryError\x12\x11\n\terror_msg\x18\x01 \x01(\t\x12\x14\n\x0cis_transient\x18\x02 \x01(\x08\"g\n\x11QueryJsonResponse\x12\x14\n\x0cresults_json\x18\x01 \x01(\t\x12\'\n\x08\x65xecutor\x18\x02 \x01(\x0e\x32\x15.brad.ExecutionEngine\x12\x13\n\x0bnot_tabular\x18\x03 \x01(\x08\"0\n\x11\x45ndSessionRequest\x12\x1b\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionId\"$\n\x12\x45ndSessionResponse\x12\x0e\n\x06unused\x18\x64 \x01(\x04*T\n\x0f\x45xecutionEngine\x12\x0f\n\x0b\x45NG_UNKNOWN\x10\x00\x12\x0e\n\nENG_AURORA\x10\x01\x12\x10\n\x0c\x45NG_REDSHIFT\x10\x02\x12\x0e\n\nENG_ATHENA\x10\x03\x32\x96\x02\n\x04\x42rad\x12G\n\x0cStartSession\x12\x19.brad.StartSessionRequest\x1a\x1a.brad.StartSessionResponse\"\x00\x12=\n\x08RunQuery\x12\x15.brad.RunQueryRequest\x1a\x16.brad.RunQueryResponse\"\x00\x30\x01\x12\x43\n\x0cRunQueryJson\x12\x15.brad.RunQueryRequest\x1a\x1a.brad.RunQueryJsonResponse\"\x00\x12\x41\n\nEndSession\x12\x17.brad.EndSessionRequest\x1a\x18.brad.EndSessionResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nbrad.proto\x12\x04\x62rad\"\x1d\n\tSessionId\x12\x10\n\x08id_value\x18\x01 \x01(\x04\"%\n\x13StartSessionRequest\x12\x0e\n\x06unused\x18\x64 \x01(\x04\"i\n\x14StartSessionResponse\x12\x1d\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionIdH\x00\x12(\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x17.brad.StartSessionErrorH\x00\x42\x08\n\x06result\"&\n\x11StartSessionError\x12\x11\n\terror_msg\x18\x01 \x01(\t\"=\n\x0fRunQueryRequest\x12\x1b\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionId\x12\r\n\x05query\x18\x02 \x01(\t\"\xa2\x01\n\x10RunQueryResponse\x12#\n\x03row\x18\x01 \x01(\x0b\x32\x14.brad.QueryResultRowH\x00\x12!\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.brad.QueryErrorH\x00\x12\'\n\x08\x65xecutor\x18\x64 \x01(\x0e\x32\x15.brad.ExecutionEngine\x12\x13\n\x0bnot_tabular\x18\x65 \x01(\x08\x42\x08\n\x06result\"o\n\x14RunQueryJsonResponse\x12*\n\x07results\x18\x01 \x01(\x0b\x32\x17.brad.QueryJsonResponseH\x00\x12!\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.brad.QueryErrorH\x00\x42\x08\n\x06result\"\"\n\x0eQueryResultRow\x12\x10\n\x08row_data\x18\x01 \x01(\x0c\"5\n\nQueryError\x12\x11\n\terror_msg\x18\x01 \x01(\t\x12\x14\n\x0cis_transient\x18\x02 \x01(\x08\"g\n\x11QueryJsonResponse\x12\x14\n\x0cresults_json\x18\x01 \x01(\t\x12\'\n\x08\x65xecutor\x18\x02 \x01(\x0e\x32\x15.brad.ExecutionEngine\x12\x13\n\x0bnot_tabular\x18\x03 \x01(\x08\"0\n\x11\x45ndSessionRequest\x12\x1b\n\x02id\x18\x01 \x01(\x0b\x32\x0f.brad.SessionId\"$\n\x12\x45ndSessionResponse\x12\x0e\n\x06unused\x18\x64 \x01(\x04*T\n\x0f\x45xecutionEngine\x12\x0f\n\x0b\x45NG_UNKNOWN\x10\x00\x12\x0e\n\nENG_AURORA\x10\x01\x12\x10\n\x0c\x45NG_REDSHIFT\x10\x02\x12\x0e\n\nENG_ATHENA\x10\x03\x32\x96\x02\n\x04\x42rad\x12G\n\x0cStartSession\x12\x19.brad.StartSessionRequest\x1a\x1a.brad.StartSessionResponse\"\x00\x12=\n\x08RunQuery\x12\x15.brad.RunQueryRequest\x1a\x16.brad.RunQueryResponse\"\x00\x30\x01\x12\x43\n\x0cRunQueryJson\x12\x15.brad.RunQueryRequest\x1a\x1a.brad.RunQueryJsonResponse\"\x00\x12\x41\n\nEndSession\x12\x17.brad.EndSessionRequest\x1a\x18.brad.EndSessionResponse\"\x00\x62\x06proto3') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'brad_pb2', globals()) +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'brad_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _EXECUTIONENGINE._serialized_start=768 - _EXECUTIONENGINE._serialized_end=852 - _SESSIONID._serialized_start=20 - _SESSIONID._serialized_end=49 - _STARTSESSIONREQUEST._serialized_start=51 - _STARTSESSIONREQUEST._serialized_end=88 - _STARTSESSIONRESPONSE._serialized_start=90 - _STARTSESSIONRESPONSE._serialized_end=141 - _RUNQUERYREQUEST._serialized_start=143 - _RUNQUERYREQUEST._serialized_end=204 - _RUNQUERYRESPONSE._serialized_start=207 - _RUNQUERYRESPONSE._serialized_end=369 - _RUNQUERYJSONRESPONSE._serialized_start=371 - _RUNQUERYJSONRESPONSE._serialized_end=482 - _QUERYRESULTROW._serialized_start=484 - _QUERYRESULTROW._serialized_end=518 - _QUERYERROR._serialized_start=520 - _QUERYERROR._serialized_end=573 - _QUERYJSONRESPONSE._serialized_start=575 - _QUERYJSONRESPONSE._serialized_end=678 - _ENDSESSIONREQUEST._serialized_start=680 - _ENDSESSIONREQUEST._serialized_end=728 - _ENDSESSIONRESPONSE._serialized_start=730 - _ENDSESSIONRESPONSE._serialized_end=766 - _BRAD._serialized_start=855 - _BRAD._serialized_end=1133 + _globals['_EXECUTIONENGINE']._serialized_start=862 + _globals['_EXECUTIONENGINE']._serialized_end=946 + _globals['_SESSIONID']._serialized_start=20 + _globals['_SESSIONID']._serialized_end=49 + _globals['_STARTSESSIONREQUEST']._serialized_start=51 + _globals['_STARTSESSIONREQUEST']._serialized_end=88 + _globals['_STARTSESSIONRESPONSE']._serialized_start=90 + _globals['_STARTSESSIONRESPONSE']._serialized_end=195 + _globals['_STARTSESSIONERROR']._serialized_start=197 + _globals['_STARTSESSIONERROR']._serialized_end=235 + _globals['_RUNQUERYREQUEST']._serialized_start=237 + _globals['_RUNQUERYREQUEST']._serialized_end=298 + _globals['_RUNQUERYRESPONSE']._serialized_start=301 + _globals['_RUNQUERYRESPONSE']._serialized_end=463 + _globals['_RUNQUERYJSONRESPONSE']._serialized_start=465 + _globals['_RUNQUERYJSONRESPONSE']._serialized_end=576 + _globals['_QUERYRESULTROW']._serialized_start=578 + _globals['_QUERYRESULTROW']._serialized_end=612 + _globals['_QUERYERROR']._serialized_start=614 + _globals['_QUERYERROR']._serialized_end=667 + _globals['_QUERYJSONRESPONSE']._serialized_start=669 + _globals['_QUERYJSONRESPONSE']._serialized_end=772 + _globals['_ENDSESSIONREQUEST']._serialized_start=774 + _globals['_ENDSESSIONREQUEST']._serialized_end=822 + _globals['_ENDSESSIONRESPONSE']._serialized_start=824 + _globals['_ENDSESSIONRESPONSE']._serialized_end=860 + _globals['_BRAD']._serialized_start=949 + _globals['_BRAD']._serialized_end=1227 # @@protoc_insertion_point(module_scope) diff --git a/src/brad/proto_gen/brad_pb2.pyi b/src/brad/proto_gen/brad_pb2.pyi index 3f41a0f6..b48248f9 100644 --- a/src/brad/proto_gen/brad_pb2.pyi +++ b/src/brad/proto_gen/brad_pb2.pyi @@ -4,54 +4,43 @@ from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor -ENG_ATHENA: ExecutionEngine + +class ExecutionEngine(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] # type: ignore + ENG_UNKNOWN: _ClassVar[ExecutionEngine] + ENG_AURORA: _ClassVar[ExecutionEngine] + ENG_REDSHIFT: _ClassVar[ExecutionEngine] + ENG_ATHENA: _ClassVar[ExecutionEngine] +ENG_UNKNOWN: ExecutionEngine ENG_AURORA: ExecutionEngine ENG_REDSHIFT: ExecutionEngine -ENG_UNKNOWN: ExecutionEngine +ENG_ATHENA: ExecutionEngine -class EndSessionRequest(_message.Message): - __slots__ = ["id"] - ID_FIELD_NUMBER: _ClassVar[int] - id: SessionId - def __init__(self, id: _Optional[_Union[SessionId, _Mapping]] = ...) -> None: ... +class SessionId(_message.Message): + __slots__ = ["id_value"] + ID_VALUE_FIELD_NUMBER: _ClassVar[int] + id_value: int + def __init__(self, id_value: _Optional[int] = ...) -> None: ... -class EndSessionResponse(_message.Message): +class StartSessionRequest(_message.Message): __slots__ = ["unused"] UNUSED_FIELD_NUMBER: _ClassVar[int] unused: int def __init__(self, unused: _Optional[int] = ...) -> None: ... -class QueryError(_message.Message): - __slots__ = ["error_msg", "is_transient"] +class StartSessionResponse(_message.Message): + __slots__ = ["id", "error"] + ID_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] + id: SessionId + error: StartSessionError + def __init__(self, id: _Optional[_Union[SessionId, _Mapping]] = ..., error: _Optional[_Union[StartSessionError, _Mapping]] = ...) -> None: ... + +class StartSessionError(_message.Message): + __slots__ = ["error_msg"] ERROR_MSG_FIELD_NUMBER: _ClassVar[int] - IS_TRANSIENT_FIELD_NUMBER: _ClassVar[int] error_msg: str - is_transient: bool - def __init__(self, error_msg: _Optional[str] = ..., is_transient: bool = ...) -> None: ... - -class QueryJsonResponse(_message.Message): - __slots__ = ["executor", "not_tabular", "results_json"] - EXECUTOR_FIELD_NUMBER: _ClassVar[int] - NOT_TABULAR_FIELD_NUMBER: _ClassVar[int] - RESULTS_JSON_FIELD_NUMBER: _ClassVar[int] - executor: ExecutionEngine - not_tabular: bool - results_json: str - def __init__(self, results_json: _Optional[str] = ..., executor: _Optional[_Union[ExecutionEngine, str]] = ..., not_tabular: bool = ...) -> None: ... - -class QueryResultRow(_message.Message): - __slots__ = ["row_data"] - ROW_DATA_FIELD_NUMBER: _ClassVar[int] - row_data: bytes - def __init__(self, row_data: _Optional[bytes] = ...) -> None: ... - -class RunQueryJsonResponse(_message.Message): - __slots__ = ["error", "results"] - ERROR_FIELD_NUMBER: _ClassVar[int] - RESULTS_FIELD_NUMBER: _ClassVar[int] - error: QueryError - results: QueryJsonResponse - def __init__(self, results: _Optional[_Union[QueryJsonResponse, _Mapping]] = ..., error: _Optional[_Union[QueryError, _Mapping]] = ...) -> None: ... + def __init__(self, error_msg: _Optional[str] = ...) -> None: ... class RunQueryRequest(_message.Message): __slots__ = ["id", "query"] @@ -62,34 +51,57 @@ class RunQueryRequest(_message.Message): def __init__(self, id: _Optional[_Union[SessionId, _Mapping]] = ..., query: _Optional[str] = ...) -> None: ... class RunQueryResponse(_message.Message): - __slots__ = ["error", "executor", "not_tabular", "row"] + __slots__ = ["row", "error", "executor", "not_tabular"] + ROW_FIELD_NUMBER: _ClassVar[int] ERROR_FIELD_NUMBER: _ClassVar[int] EXECUTOR_FIELD_NUMBER: _ClassVar[int] NOT_TABULAR_FIELD_NUMBER: _ClassVar[int] - ROW_FIELD_NUMBER: _ClassVar[int] + row: QueryResultRow error: QueryError executor: ExecutionEngine not_tabular: bool - row: QueryResultRow def __init__(self, row: _Optional[_Union[QueryResultRow, _Mapping]] = ..., error: _Optional[_Union[QueryError, _Mapping]] = ..., executor: _Optional[_Union[ExecutionEngine, str]] = ..., not_tabular: bool = ...) -> None: ... -class SessionId(_message.Message): - __slots__ = ["id_value"] - ID_VALUE_FIELD_NUMBER: _ClassVar[int] - id_value: int - def __init__(self, id_value: _Optional[int] = ...) -> None: ... +class RunQueryJsonResponse(_message.Message): + __slots__ = ["results", "error"] + RESULTS_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] + results: QueryJsonResponse + error: QueryError + def __init__(self, results: _Optional[_Union[QueryJsonResponse, _Mapping]] = ..., error: _Optional[_Union[QueryError, _Mapping]] = ...) -> None: ... -class StartSessionRequest(_message.Message): - __slots__ = ["unused"] - UNUSED_FIELD_NUMBER: _ClassVar[int] - unused: int - def __init__(self, unused: _Optional[int] = ...) -> None: ... +class QueryResultRow(_message.Message): + __slots__ = ["row_data"] + ROW_DATA_FIELD_NUMBER: _ClassVar[int] + row_data: bytes + def __init__(self, row_data: _Optional[bytes] = ...) -> None: ... -class StartSessionResponse(_message.Message): +class QueryError(_message.Message): + __slots__ = ["error_msg", "is_transient"] + ERROR_MSG_FIELD_NUMBER: _ClassVar[int] + IS_TRANSIENT_FIELD_NUMBER: _ClassVar[int] + error_msg: str + is_transient: bool + def __init__(self, error_msg: _Optional[str] = ..., is_transient: bool = ...) -> None: ... + +class QueryJsonResponse(_message.Message): + __slots__ = ["results_json", "executor", "not_tabular"] + RESULTS_JSON_FIELD_NUMBER: _ClassVar[int] + EXECUTOR_FIELD_NUMBER: _ClassVar[int] + NOT_TABULAR_FIELD_NUMBER: _ClassVar[int] + results_json: str + executor: ExecutionEngine + not_tabular: bool + def __init__(self, results_json: _Optional[str] = ..., executor: _Optional[_Union[ExecutionEngine, str]] = ..., not_tabular: bool = ...) -> None: ... + +class EndSessionRequest(_message.Message): __slots__ = ["id"] ID_FIELD_NUMBER: _ClassVar[int] id: SessionId def __init__(self, id: _Optional[_Union[SessionId, _Mapping]] = ...) -> None: ... -class ExecutionEngine(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): # type: ignore - __slots__ = [] # type: ignore +class EndSessionResponse(_message.Message): + __slots__ = ["unused"] + UNUSED_FIELD_NUMBER: _ClassVar[int] + unused: int + def __init__(self, unused: _Optional[int] = ...) -> None: ... diff --git a/src/brad/routing/router.py b/src/brad/routing/router.py index f70e3687..b27f06ab 100644 --- a/src/brad/routing/router.py +++ b/src/brad/routing/router.py @@ -49,7 +49,7 @@ def log_policy(self) -> None: logger.info(" Indefinite policies:") for p in self._full_policy.indefinite_policies: logger.info(" - %s", p.name()) - logger.info(" Definite policy: %s") + logger.info(" Definite policy: %s", self._full_policy.definite_policy.name()) async def run_setup(self, estimator: Optional[Estimator] = None) -> None: """ diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index dcb21aca..a64af53e 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -27,6 +27,7 @@ ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"] RUNNER_EXIT = "runner_exit" +STARTUP_FAILED = "startup_failed" def get_time_of_the_day_unsimulated( @@ -80,14 +81,20 @@ def noop(_signal, _frame): else: engine = None - database = connect_to_db( - args, - runner_idx, - direct_engine=engine, - # Ensure we disable the result cache if we are running directly on - # Redshift. - disable_direct_redshift_result_cache=True, - ) + try: + database = connect_to_db( + args, + runner_idx, + direct_engine=engine, + # Ensure we disable the result cache if we are running directly on + # Redshift. + disable_direct_redshift_result_cache=True, + ) + except BradClientError as ex: + print(f"[RA {runner_idx}] Failed to connect to BRAD:", str(ex)) + start_queue.put_nowait(STARTUP_FAILED) + _ = stop_queue.get() + return if query_frequency is not None: query_frequency = query_frequency[queries] @@ -120,6 +127,7 @@ def noop(_signal, _frame): query_order = query_order_main.copy() # Signal that we're ready to start and wait for the controller. + print(f"Runner {runner_idx} is ready to start running.") start_queue.put_nowait("") msg = stop_queue.get() @@ -182,21 +190,25 @@ def noop(_signal, _frame): file=file, flush=True, ) - exec_count += 1 - rand_backoff = None if exec_count % 20 == 0: # To avoid data loss if this script crashes. os.fsync(file.fileno()) + exec_count += 1 + if rand_backoff is not None: + print(f"[RA {runner_idx}] Continued after transient errors.") + rand_backoff = None + except BradClientError as ex: if ex.is_transient(): - print( - "Transient query error:", - ex.message(), - flush=True, - file=sys.stderr, - ) + # This is too verbose during a transition. + # print( + # "Transient query error:", + # ex.message(), + # flush=True, + # file=sys.stderr, + # ) if rand_backoff is None: rand_backoff = RandomizedExponentialBackoff( @@ -204,12 +216,15 @@ def noop(_signal, _frame): base_delay_s=2.0, max_delay_s=timedelta(minutes=10).total_seconds(), ) + print(f"[RA {runner_idx}] Backing off due to transient errors.") # Delay retrying in the case of a transient error (this # happens during blueprint transitions). wait_s = rand_backoff.wait_time_s() if wait_s is None: - print("Aborting benchmark. Too many transient errors.") + print( + f"[RA {runner_idx}] Aborting benchmark. Too many transient errors." + ) break time.sleep(wait_s) @@ -628,8 +643,20 @@ def main(): processes.append(p) print("Waiting for startup...", flush=True) + one_startup_failed = False for i in range(args.num_clients): - start_queue[i].get() + msg = start_queue[i].get() + if msg == STARTUP_FAILED: + one_startup_failed = True + + if one_startup_failed: + print("At least one runner failed to start up. Aborting the experiment.") + for i in range(args.num_clients): + stop_queue[i].put_nowait(RUNNER_EXIT) + for p in processes: + p.join() + print("Abort complete.") + return global EXECUTE_START_TIME # pylint: disable=global-statement EXECUTE_START_TIME = datetime.now().astimezone( diff --git a/workloads/IMDB_extended/run_transactions.py b/workloads/IMDB_extended/run_transactions.py index 3b8bd310..ac11c479 100644 --- a/workloads/IMDB_extended/run_transactions.py +++ b/workloads/IMDB_extended/run_transactions.py @@ -103,16 +103,19 @@ def noop_handler(_signal, _frame): else: succeeded = txn(db) - rand_backoff = None + if rand_backoff is not None: + print(f"[T {worker_idx}] Continued after transient errors.") + rand_backoff = None except BradClientError as ex: succeeded = False if ex.is_transient(): - print( - "Encountered transient error (probably engine change). Will retry...", - flush=True, - file=sys.stderr, - ) + # Too verbose during a transition. + # print( + # "Encountered transient error (probably engine change). Will retry...", + # flush=True, + # file=sys.stderr, + # ) if rand_backoff is None: rand_backoff = RandomizedExponentialBackoff( @@ -120,6 +123,7 @@ def noop_handler(_signal, _frame): base_delay_s=0.1, max_delay_s=timedelta(minutes=1).total_seconds(), ) + print(f"[T {worker_idx}] Backing off due to transient errors.") # Delay retrying in the case of a transient error (this # happens during blueprint transitions). diff --git a/workloads/IMDB_extended/workload_utils/connect.py b/workloads/IMDB_extended/workload_utils/connect.py index de4876c9..e6a992dd 100644 --- a/workloads/IMDB_extended/workload_utils/connect.py +++ b/workloads/IMDB_extended/workload_utils/connect.py @@ -49,8 +49,11 @@ def connect_to_db( else: port_offset = (worker_index + args.client_offset) % args.num_front_ends - brad = BradGrpcClient(args.brad_host, args.brad_port + port_offset) + port = args.brad_port + port_offset + print(f"[{worker_index}] Connecting to BRAD at {args.brad_host}:{port}") + brad = BradGrpcClient(args.brad_host, port) brad.connect() + print(f"[{worker_index}] Connected to BRAD.") db = BradDatabase(brad) return db