From 938796d7bd12cf0cc313faa11fdb91afc9cf6410 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Fri, 3 May 2024 13:24:07 -0400 Subject: [PATCH] Make S3 loads to Redshift more permissive, add more logs to table movement --- src/brad/daemon/transition_orchestrator.py | 42 ++++++++++++++------ src/brad/data_sync/operators/load_from_s3.py | 2 + 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/brad/daemon/transition_orchestrator.py b/src/brad/daemon/transition_orchestrator.py index 4b2d02cc..9c065721 100644 --- a/src/brad/daemon/transition_orchestrator.py +++ b/src/brad/daemon/transition_orchestrator.py @@ -191,7 +191,7 @@ async def run_prepare_then_transition( table_awaitables.append(self._enforce_table_diff_additions(diff)) await asyncio.gather(*table_awaitables) - logger.debug("Table movement complete.") + logger.info("Table movement complete.") # Close connections await self._cxns.close() @@ -490,6 +490,12 @@ async def _run_aurora_post_transition( tables_to_drop.append(source_table_name(table_diff.table_name())) tables_to_drop.append(shadow_table_name(table_diff.table_name())) + logger.info("In transition: Dropping Aurora views %s", str(views_to_drop)) + logger.info( + "In transition: Dropping Aurora triggers %s", str(triggers_to_drop) + ) + logger.info("In transition: Dropping Aurora tables %s", str(tables_to_drop)) + ctx = self._new_execution_context() dv = DropViews(views_to_drop, Engine.Aurora) @@ -609,7 +615,7 @@ async def _run_redshift_post_transition( for table_diff in table_diffs: if Engine.Redshift in table_diff.removed_locations(): to_drop.append(table_diff.table_name()) - logger.debug(f"Tables to drop: {to_drop}") + logger.info("In transition: Dropping Redshift tables %s", str(to_drop)) d = DropTables(to_drop, Engine.Redshift) ctx = self._new_execution_context() await d.execute(ctx) @@ -639,6 +645,7 @@ async def _run_athena_post_transition( for table_diff in table_diffs: if Engine.Athena in table_diff.removed_locations(): to_drop.append(table_diff.table_name()) + logger.info("In transition: Dropping Athena tables %s", str(to_drop)) d = DropTables(to_drop, Engine.Athena) ctx = self._new_execution_context() await d.execute(ctx) @@ -677,28 +684,28 @@ async def _unload_table(self, table_name: str, s3_path: str) -> None: if Engine.Redshift in curr_locations: # Faster to write out from Redshift u = UnloadToS3(table_name, s3_path, engine=Engine.Redshift, delimiter=",") ctx = self._new_execution_context() - await u.execute(ctx) - logger.debug( - f"In transition: table {table_name} written to S3 from Redshift." + logger.info( + "In transition: table %s being written to S3 from Redshift.", table_name ) + await u.execute(ctx) elif Engine.Aurora in curr_locations: u = UnloadToS3(table_name, s3_path, engine=Engine.Aurora, delimiter=",") ctx = self._new_execution_context() - await u.execute(ctx) - logger.debug( - f"In transition: table {table_name} written to S3 from Aurora." + logger.info( + "In transition: table %s being written to S3 from Aurora.", table_name ) + await u.execute(ctx) elif Engine.Athena in curr_locations: u = UnloadToS3(table_name, s3_path, engine=Engine.Athena) ctx = self._new_execution_context() - await u.execute(ctx) - logger.debug( - f"In transition: table {table_name} written to S3 from Athena." + logger.info( + "In transition: table %s being written to S3 from Athena.", table_name ) + await u.execute(ctx) else: logger.error( - f"""In transition: table {table_name} does not exist - on any engine in current blueprint.""" + "In transition: table %s does not exist on any engine in current blueprint.", + table_name, ) async def _load_table_to_engine(self, table_name: str, e: Engine) -> None: @@ -715,6 +722,9 @@ async def _load_table_to_engine(self, table_name: str, e: Engine) -> None: if e == Engine.Aurora: # Load table to aurora from S3 + logger.info( + "In transition: loading table %s into Aurora from S3", table_name + ) response = ctx.s3_client().list_objects_v2( Bucket=ctx.s3_bucket(), Prefix=ctx.s3_path() + s3_path_prefix ) @@ -758,12 +768,18 @@ async def _load_table_to_engine(self, table_name: str, e: Engine) -> None: await cursor.commit() elif e == Engine.Redshift: + logger.info( + "In transition: loading table %s into Redshift from S3", table_name + ) l = LoadFromS3(table_name, s3_path_prefix, e, delimiter=",", header_rows=1) await l.execute(ctx) nonsilent_assert(self._cxns is not None) assert self._cxns is not None self._cxns.get_connection(Engine.Redshift).cursor_sync().commit_sync() elif e == Engine.Athena: + logger.info( + "In transition: loading table %s into Athena from S3", table_name + ) l = LoadFromS3(table_name, s3_path_prefix, e, delimiter=",", header_rows=1) await l.execute(ctx) diff --git a/src/brad/data_sync/operators/load_from_s3.py b/src/brad/data_sync/operators/load_from_s3.py index ff984efd..248209f6 100644 --- a/src/brad/data_sync/operators/load_from_s3.py +++ b/src/brad/data_sync/operators/load_from_s3.py @@ -29,6 +29,8 @@ DELIMITER '{delimiter}' IGNOREHEADER {header_rows} REMOVEQUOTES + BLANKASNULL + IGNOREALLERRORS """ _ATHENA_CREATE_LOAD_TABLE = """