diff --git a/src/brad/config/system_event.py b/src/brad/config/system_event.py index 4c0a682d..31cdb9d7 100644 --- a/src/brad/config/system_event.py +++ b/src/brad/config/system_event.py @@ -25,3 +25,5 @@ class SystemEvent(enum.Enum): PreTransitionCompleted = "pre_transition_completed" PostTransitionStarted = "post_transition_started" PostTransitionCompleted = "post_transition_completed" + + AuroraPrimaryFailover = "aurora_primary_failover" diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 0a612dfd..c3c56e90 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -392,7 +392,7 @@ async def _handle_new_blueprint( if self._planner is not None: self._planner.set_disable_triggers(disable=True) self._transition_orchestrator = TransitionOrchestrator( - self._config, self._blueprint_mgr + self._config, self._blueprint_mgr, self._system_event_logger ) self._transition_task = asyncio.create_task(self._run_transition_part_one()) diff --git a/src/brad/daemon/transition_orchestrator.py b/src/brad/daemon/transition_orchestrator.py index c4816912..00fc2f96 100644 --- a/src/brad/daemon/transition_orchestrator.py +++ b/src/brad/daemon/transition_orchestrator.py @@ -17,6 +17,8 @@ AURORA_SEQ_COLUMN, AURORA_EXTRACT_PROGRESS_TABLE_NAME, ) +from brad.config.system_event import SystemEvent +from brad.daemon.system_event_logger import SystemEventLogger from brad.data_sync.execution.context import ExecutionContext from brad.data_sync.operators.drop_aurora_triggers import DropAuroraTriggers from brad.data_sync.execution.executor import DataSyncExecutor @@ -37,6 +39,7 @@ def __init__( self, config: ConfigFile, blueprint_mgr: BlueprintManager, + system_event_logger: Optional[SystemEventLogger] = None, ) -> None: self._config = config self._blueprint_mgr = blueprint_mgr @@ -45,6 +48,7 @@ def __init__( self._waiting_for_front_ends = 0 self._data_sync_executor = DataSyncExecutor(self._config, self._blueprint_mgr) self._cxns: Optional[EngineConnections] = None + self._system_event_logger = system_event_logger self._refresh_transition_metadata() @@ -294,48 +298,40 @@ async def _run_aurora_pre_transition( # NOTE: We will need a more robust process to deal with cases where we # are at the replica limit (max. 15 replicas). + # + # We should also take into account the kind of transition to make the + # change more graceful (e.g., when switching to lower resourced + # provisionings). - if old.instance_type() != new.instance_type(): - # Handle the primary first. - old_primary_instance = ( - self._blueprint_mgr.get_directory().aurora_writer().instance_id() - ) - new_primary_instance = _AURORA_PRIMARY_FORMAT.format( - cluster_id=self._config.aurora_cluster_id, - version=str(next_version).zfill(5), - ) - logger.debug("Creating new Aurora replica: %s", new_primary_instance) - await self._rds.create_replica( - self._config.aurora_cluster_id, - new_primary_instance, - new, - wait_until_available=True, - ) - logger.debug( - "Failing over %s to the new replica: %s", - self._config.aurora_cluster_id, - new_primary_instance, - ) - await self._rds.run_primary_failover( - self._config.aurora_cluster_id, - new_primary_instance, - wait_until_complete=True, - ) - logger.debug("Failover complete for %s", self._config.aurora_cluster_id) - - logger.debug("Deleting the old primary: %s", old_primary_instance) - await self._rds.delete_replica(old_primary_instance) - logger.debug("Done deleting the old primary: %s", old_primary_instance) - + # Create new replicas first. + new_replica_count = max(new.num_nodes() - 1, 0) + old_replica_count = max(old.num_nodes() - 1, 0) + if new_replica_count > 0 and new_replica_count > old_replica_count: + next_index = old_replica_count + while next_index < new_replica_count: + new_replica_id = _AURORA_REPLICA_FORMAT.format( + cluster_id=self._config.aurora_cluster_id, + version=str(next_version).zfill(5), + index=str(next_index).zfill(2), + ) + logger.debug("Creating replica %s", new_replica_id) + # Ideally we wait for the replicas to finish creation in + # parallel. Because of how we make the boto3 client async, + # there's a possibility of having multiple API calls in flight + # at the same time, which boto3 does not support. To be safe, we + # just run these replica creations sequentially. + await self._rds.create_replica( + self._config.aurora_cluster_id, + new_replica_id, + new, + wait_until_available=True, + ) + next_index += 1 await self._blueprint_mgr.refresh_directory() - if on_instance_identity_change is not None: - # The primary changed. We run the callback so that clients can - # update any cached state that relies on instance identities - # (e.g., Performance Insights metrics). - on_instance_identity_change() + if old.instance_type() != new.instance_type(): + # Handle the replicas first (need to change their instance type). replicas_to_modify = min(new.num_nodes() - 1, old.num_nodes() - 1) - if replicas_to_modify == 1 and old.num_nodes() - 1 == 1: # Special case: The current blueprint only has one read replica # and we need to modify it to transition to the next blueprint. @@ -364,7 +360,7 @@ async def _run_aurora_pre_transition( else: # Modify replicas one-by-one. At most one reader replica is down - # at any time - but we consider this acceptable. + # at any time, but we consider this acceptable. for idx, replica in enumerate( self._blueprint_mgr.get_directory().aurora_readers() ): @@ -380,30 +376,45 @@ async def _run_aurora_pre_transition( replica.instance_id(), new, wait_until_available=True ) - new_replica_count = max(new.num_nodes() - 1, 0) - old_replica_count = max(old.num_nodes() - 1, 0) - if new_replica_count > 0 and new_replica_count > old_replica_count: - next_index = old_replica_count - while next_index < new_replica_count: - new_replica_id = _AURORA_REPLICA_FORMAT.format( - cluster_id=self._config.aurora_cluster_id, - version=str(next_version).zfill(5), - index=str(next_index).zfill(2), - ) - logger.debug("Creating replica %s", new_replica_id) - # Ideally we wait for the replicas to finish creation in - # parallel. Because of how we make the boto3 client async, - # there's a possibility of having multiple API calls in flight - # at the same time, which boto3 does not support. To be safe, we - # just run these replica creations sequentially. - await self._rds.create_replica( - self._config.aurora_cluster_id, - new_replica_id, - new, - wait_until_available=True, - ) - next_index += 1 + # Handle the primary last. + old_primary_instance = ( + self._blueprint_mgr.get_directory().aurora_writer().instance_id() + ) + new_primary_instance = _AURORA_PRIMARY_FORMAT.format( + cluster_id=self._config.aurora_cluster_id, + version=str(next_version).zfill(5), + ) + logger.debug("Creating new Aurora replica: %s", new_primary_instance) + await self._rds.create_replica( + self._config.aurora_cluster_id, + new_primary_instance, + new, + wait_until_available=True, + ) + logger.debug( + "Failing over %s to the new replica: %s", + self._config.aurora_cluster_id, + new_primary_instance, + ) + await self._rds.run_primary_failover( + self._config.aurora_cluster_id, + new_primary_instance, + wait_until_complete=True, + ) + logger.debug("Failover complete for %s", self._config.aurora_cluster_id) + if self._system_event_logger is not None: + self._system_event_logger.log(SystemEvent.AuroraPrimaryFailover) + + logger.debug("Deleting the old primary: %s", old_primary_instance) + await self._rds.delete_replica(old_primary_instance) + logger.debug("Done deleting the old primary: %s", old_primary_instance) + await self._blueprint_mgr.refresh_directory() + if on_instance_identity_change is not None: + # The primary changed. We run the callback so that clients can + # update any cached state that relies on instance identities + # (e.g., Performance Insights metrics). + on_instance_identity_change() # Aurora's pre-transition work is complete!