Skip to content

Commit

Permalink
Additional resilience fixes (#412)
Browse files Browse the repository at this point in the history
* Make Redshift transitions more resilient to transient errors

* Print extra routing debug information

* Add routing policy check to the blueprint diff
  • Loading branch information
geoffxy authored Dec 18, 2023
1 parent de68108 commit 429b402
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 28 deletions.
7 changes: 4 additions & 3 deletions src/brad/blueprint/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ def __repr__(self) -> str:
)
)
routing_policy = self.get_routing_policy()
indefinite_policies = (
f"Indefinite routing policies: {len(routing_policy.indefinite_policies)}"
indef_policy_string = "\n - ".join(
[str(policy) for policy in routing_policy.indefinite_policies]
)
indefinite_policies = f"Indefinite routing policies: {indef_policy_string}"
definite_policy = (
f"Definite routing policy: {routing_policy.definite_policy.name()}"
f"Definite routing policy: {routing_policy.definite_policy}"
)
return "\n ".join(
[
Expand Down
16 changes: 14 additions & 2 deletions src/brad/blueprint/diff/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,29 @@ def of(cls, old: Blueprint, new: Blueprint) -> Optional["BlueprintDiff"]:
old.redshift_provisioning(), new.redshift_provisioning()
)

if len(table_diffs) == 0 and aurora_diff is None and redshift_diff is None:
has_routing_diff = old.get_routing_policy() != new.get_routing_policy()

if (
len(table_diffs) == 0
and aurora_diff is None
and redshift_diff is None
and (not has_routing_diff)
):
return None

return cls(table_diffs, aurora_diff, redshift_diff)
return cls(table_diffs, aurora_diff, redshift_diff, has_routing_diff)

def __init__(
self,
table_diffs: List[TableDiff],
aurora_diff: Optional[ProvisioningDiff],
redshift_diff: Optional[ProvisioningDiff],
has_routing_diff: bool,
) -> None:
self._table_diffs = table_diffs
self._aurora_diff = aurora_diff
self._redshift_diff = redshift_diff
self._has_routing_diff = has_routing_diff

def aurora_diff(self) -> Optional[ProvisioningDiff]:
return self._aurora_diff
Expand All @@ -55,3 +64,6 @@ def redshift_diff(self) -> Optional[ProvisioningDiff]:

def table_diffs(self) -> List[TableDiff]:
return self._table_diffs

def has_routing_diff(self) -> bool:
return self._has_routing_diff
17 changes: 11 additions & 6 deletions src/brad/daemon/transition_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,18 +544,23 @@ async def _run_redshift_pre_transition(
# Resizes are OK because Redshift maintains read-availability during the
# resize.
is_classic = self._redshift.must_use_classic_resize(old, new)
if is_classic:
resize_completed = False
if not is_classic:
logger.debug(
"Running Redshift classic resize. Old: %s, New: %s", str(old), str(new)
"Running Redshift elastic resize. Old: %s, New: %s", str(old), str(new)
)
await self._redshift.classic_resize(
resize_completed = await self._redshift.elastic_resize(
self._config.redshift_cluster_id, new, wait_until_available=True
)
else:

# Sometimes the elastic resize will not be supported even though it
# should be (according to the docs). This lets us fall back to a classic
# resize, which should always be supported.
if is_classic or not resize_completed:
logger.debug(
"Running Redshift elastic resize. Old: %s, New: %s", str(old), str(new)
"Running Redshift classic resize. Old: %s, New: %s", str(old), str(new)
)
await self._redshift.elastic_resize(
await self._redshift.classic_resize(
self._config.redshift_cluster_id, new, wait_until_available=True
)

Expand Down
74 changes: 58 additions & 16 deletions src/brad/provisioning/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ def must_use_classic_resize(old: Provisioning, new: Provisioning) -> bool:

async def pause_cluster(self, cluster_id: str) -> None:
def do_pause():
self._redshift.pause_cluster(ClusterIdentifier=cluster_id)
try:
self._redshift.pause_cluster(ClusterIdentifier=cluster_id)
except Exception as ex:
message = repr(ex)
# Unclear if there is a better way to check for specific errors.
if "InvalidClusterState" in message:
# This may happen if the cluster is already paused.
logger.info("Proceeding past Redshift pause error: %s", message)
else:
raise

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, do_pause)
Expand All @@ -65,7 +74,9 @@ def do_resume():
self._redshift.resume_cluster(ClusterIdentifier=cluster_id)
except Exception as ex:
message = repr(ex)
# Unclear if there is a better way to check for specific errors.
if "InvalidClusterState" in message:
# This may happen if the cluster is already running.
logger.info("Proceeding past Redshift resume error: %s", message)
else:
raise
Expand Down Expand Up @@ -93,15 +104,31 @@ async def classic_resize(
# TODO: Classic resize is sometimes disasterously slow. It might be
# better to manually create a new cluster.
def do_classic_resize():
self._redshift.modify_cluster(
ClusterIdentifier=cluster_id,
ClusterType=cluster_type,
NodeType=provisioning.instance_type(),
NumberOfNodes=provisioning.num_nodes(),
)
try:
self._redshift.modify_cluster(
ClusterIdentifier=cluster_id,
ClusterType=cluster_type,
NodeType=provisioning.instance_type(),
NumberOfNodes=provisioning.num_nodes(),
)
return True, None
except Exception as ex:
return False, ex

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, do_classic_resize)

while True:
succeeded, ex = await loop.run_in_executor(None, do_classic_resize)
if succeeded:
break
logger.warning(
"Classic resize failed with an exception. Will retry. %s", repr(ex)
)
# This is not a great idea in production code. But to make our
# experiments more resilient to transient states that Redshift may
# go through, we simply continually retry. We log the error to be
# aware of any non-transient issues for later fixing.
await asyncio.sleep(20)

if wait_until_available:
await asyncio.sleep(20)
Expand All @@ -112,21 +139,36 @@ async def elastic_resize(
cluster_id: str,
provisioning: Provisioning,
wait_until_available: bool = True,
) -> None:
) -> bool:
"""
This will return `True` iff the resize succeeded. Sometimes elastic
resizes are not available even when we believe they should be; this
method will return `False` in these cases.
"""

def do_elastic_resize():
self._redshift.resize_cluster(
ClusterIdentifier=cluster_id,
NodeType=provisioning.instance_type(),
NumberOfNodes=provisioning.num_nodes(),
Classic=False,
)
try:
self._redshift.resize_cluster(
ClusterIdentifier=cluster_id,
NodeType=provisioning.instance_type(),
NumberOfNodes=provisioning.num_nodes(),
Classic=False,
)
return True, None
except Exception as ex:
return False, ex

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, do_elastic_resize)
succeeded, ex = await loop.run_in_executor(None, do_elastic_resize)

if not succeeded:
logger.warning("Elastic resize failed with exception. %s", repr(ex))
return False

if wait_until_available:
await asyncio.sleep(20)
await self.wait_until_available(cluster_id)
return True

async def wait_until_available(
self, cluster_id: str, polling_interval: float = 20.0
Expand Down
3 changes: 3 additions & 0 deletions src/brad/routing/abstract_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class AbstractRoutingPolicy:
def name(self) -> str:
raise NotImplementedError

def __repr__(self) -> str:
return self.name()

async def engine_for(
self, query_rep: QueryRep, ctx: RoutingContext
) -> List[Engine]:
Expand Down
20 changes: 20 additions & 0 deletions src/brad/routing/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,23 @@ def engine_for_sync(
return [self._query_map[query_rep]]
except KeyError:
return []

def __repr__(self) -> str:
# Summarize the cached routing state.
routing_count: Dict[Engine, int] = {
Engine.Aurora: 0,
Engine.Redshift: 0,
Engine.Athena: 0,
}
for engine in self._query_map.values():
routing_count[engine] += 1
return (
f"CachedLocations(Aurora={routing_count[Engine.Aurora]}, "
f"Redshift={routing_count[Engine.Redshift]}, "
f"Athena={routing_count[Engine.Athena]})"
)

def __eq__(self, other: object) -> bool:
if not isinstance(other, CachedLocationPolicy):
return False
return self._query_map == other._query_map
52 changes: 51 additions & 1 deletion tests/test_blueprint_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ def test_provisioning_change():
initial.table_locations(),
initial.aurora_provisioning(),
Provisioning(instance_type="dc2.large", num_nodes=4),
full_routing_policy=FullRoutingPolicy([], AlwaysOneRouter(Engine.Aurora)),
full_routing_policy=initial.get_routing_policy(),
)
diff = BlueprintDiff.of(initial, changed)
assert diff is not None
assert len(diff.table_diffs()) == 0
assert diff.aurora_diff() is None
assert diff.redshift_diff() is not None
assert not diff.has_routing_diff()

redshift_diff = diff.redshift_diff()
assert redshift_diff.new_instance_type() is None
Expand Down Expand Up @@ -128,6 +129,7 @@ def test_location_change():
assert diff1.aurora_diff() is None
assert diff1.redshift_diff() is None
assert len(diff1.table_diffs()) == 1
assert not diff1.has_routing_diff()

tdiff1 = diff1.table_diffs()[0]
assert tdiff1.added_locations() == [Engine.Redshift]
Expand Down Expand Up @@ -155,7 +157,55 @@ def test_location_change():
assert diff2.aurora_diff() is None
assert diff2.redshift_diff() is None
assert len(diff2.table_diffs()) == 1
assert not diff2.has_routing_diff()

tdiff2 = diff2.table_diffs()[0]
assert tdiff2.added_locations() == [Engine.Athena]
assert tdiff2.removed_locations() == [Engine.Aurora]


def test_routing_policy_change():
table_config = """
schema_name: test
tables:
- table_name: table1
columns:
- name: col1
data_type: BIGINT
primary_key: true
- table_name: table2
columns:
- name: col1
data_type: BIGINT
primary_key: true
dependencies:
- table1
- table_name: table3
columns:
- name: col1
data_type: BIGINT
primary_key: true
provisioning:
aurora:
num_nodes: 1
instance_type: db.r6g.large
redshift:
num_nodes: 1
instance_type: dc2.large
"""
user = UserProvidedBlueprint.load_from_yaml_str(table_config)
initial = bootstrap_blueprint(user)
changed = Blueprint(
initial.schema_name(),
initial.tables(),
initial.table_locations(),
initial.aurora_provisioning(),
initial.redshift_provisioning(),
full_routing_policy=FullRoutingPolicy([], AlwaysOneRouter(Engine.Athena)),
)
diff = BlueprintDiff.of(initial, changed)
assert diff is not None
assert len(diff.table_diffs()) == 0
assert diff.aurora_diff() is None
assert diff.redshift_diff() is None
assert diff.has_routing_diff()

0 comments on commit 429b402

Please sign in to comment.