Skip to content

Commit

Permalink
Make S3 loads to Redshift more permissive, add more logs to table mov…
Browse files Browse the repository at this point in the history
…ement
  • Loading branch information
geoffxy committed May 3, 2024
1 parent dc45218 commit 938796d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
42 changes: 29 additions & 13 deletions src/brad/daemon/transition_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def run_prepare_then_transition(
table_awaitables.append(self._enforce_table_diff_additions(diff))
await asyncio.gather(*table_awaitables)

logger.debug("Table movement complete.")
logger.info("Table movement complete.")

# Close connections
await self._cxns.close()
Expand Down Expand Up @@ -490,6 +490,12 @@ async def _run_aurora_post_transition(
tables_to_drop.append(source_table_name(table_diff.table_name()))
tables_to_drop.append(shadow_table_name(table_diff.table_name()))

logger.info("In transition: Dropping Aurora views %s", str(views_to_drop))
logger.info(
"In transition: Dropping Aurora triggers %s", str(triggers_to_drop)
)
logger.info("In transition: Dropping Aurora tables %s", str(tables_to_drop))

ctx = self._new_execution_context()

dv = DropViews(views_to_drop, Engine.Aurora)
Expand Down Expand Up @@ -609,7 +615,7 @@ async def _run_redshift_post_transition(
for table_diff in table_diffs:
if Engine.Redshift in table_diff.removed_locations():
to_drop.append(table_diff.table_name())
logger.debug(f"Tables to drop: {to_drop}")
logger.info("In transition: Dropping Redshift tables %s", str(to_drop))
d = DropTables(to_drop, Engine.Redshift)
ctx = self._new_execution_context()
await d.execute(ctx)
Expand Down Expand Up @@ -639,6 +645,7 @@ async def _run_athena_post_transition(
for table_diff in table_diffs:
if Engine.Athena in table_diff.removed_locations():
to_drop.append(table_diff.table_name())
logger.info("In transition: Dropping Athena tables %s", str(to_drop))
d = DropTables(to_drop, Engine.Athena)
ctx = self._new_execution_context()
await d.execute(ctx)
Expand Down Expand Up @@ -677,28 +684,28 @@ async def _unload_table(self, table_name: str, s3_path: str) -> None:
if Engine.Redshift in curr_locations: # Faster to write out from Redshift
u = UnloadToS3(table_name, s3_path, engine=Engine.Redshift, delimiter=",")
ctx = self._new_execution_context()
await u.execute(ctx)
logger.debug(
f"In transition: table {table_name} written to S3 from Redshift."
logger.info(
"In transition: table %s being written to S3 from Redshift.", table_name
)
await u.execute(ctx)
elif Engine.Aurora in curr_locations:
u = UnloadToS3(table_name, s3_path, engine=Engine.Aurora, delimiter=",")
ctx = self._new_execution_context()
await u.execute(ctx)
logger.debug(
f"In transition: table {table_name} written to S3 from Aurora."
logger.info(
"In transition: table %s being written to S3 from Aurora.", table_name
)
await u.execute(ctx)
elif Engine.Athena in curr_locations:
u = UnloadToS3(table_name, s3_path, engine=Engine.Athena)
ctx = self._new_execution_context()
await u.execute(ctx)
logger.debug(
f"In transition: table {table_name} written to S3 from Athena."
logger.info(
"In transition: table %s being written to S3 from Athena.", table_name
)
await u.execute(ctx)
else:
logger.error(
f"""In transition: table {table_name} does not exist
on any engine in current blueprint."""
"In transition: table %s does not exist on any engine in current blueprint.",
table_name,
)

async def _load_table_to_engine(self, table_name: str, e: Engine) -> None:
Expand All @@ -715,6 +722,9 @@ async def _load_table_to_engine(self, table_name: str, e: Engine) -> None:

if e == Engine.Aurora:
# Load table to aurora from S3
logger.info(
"In transition: loading table %s into Aurora from S3", table_name
)
response = ctx.s3_client().list_objects_v2(
Bucket=ctx.s3_bucket(), Prefix=ctx.s3_path() + s3_path_prefix
)
Expand Down Expand Up @@ -758,12 +768,18 @@ async def _load_table_to_engine(self, table_name: str, e: Engine) -> None:
await cursor.commit()

elif e == Engine.Redshift:
logger.info(
"In transition: loading table %s into Redshift from S3", table_name
)
l = LoadFromS3(table_name, s3_path_prefix, e, delimiter=",", header_rows=1)
await l.execute(ctx)
nonsilent_assert(self._cxns is not None)
assert self._cxns is not None
self._cxns.get_connection(Engine.Redshift).cursor_sync().commit_sync()
elif e == Engine.Athena:
logger.info(
"In transition: loading table %s into Athena from S3", table_name
)
l = LoadFromS3(table_name, s3_path_prefix, e, delimiter=",", header_rows=1)
await l.execute(ctx)

Expand Down
2 changes: 2 additions & 0 deletions src/brad/data_sync/operators/load_from_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
DELIMITER '{delimiter}'
IGNOREHEADER {header_rows}
REMOVEQUOTES
BLANKASNULL
IGNOREALLERRORS
"""

_ATHENA_CREATE_LOAD_TABLE = """
Expand Down

0 comments on commit 938796d

Please sign in to comment.