From b38406c7df234356253f56ce2c6f7b5a7d9d480e Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Wed, 23 Oct 2024 10:28:04 -0400 Subject: [PATCH] Add support for bare tables in the bulk load tool --- src/brad/admin/bulk_load.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/brad/admin/bulk_load.py b/src/brad/admin/bulk_load.py index 8cb07fa4..d616576d 100644 --- a/src/brad/admin/bulk_load.py +++ b/src/brad/admin/bulk_load.py @@ -108,6 +108,12 @@ def register_admin_action(subparser) -> None: action="store_true", help="If set, truncate the tables in Aurora and reload them.", ) + parser.add_argument( + "--bare-aurora-tables", + action="store_true", + help="If set, load to bare Aurora tables instead of the ones " + "augmented with extraction metadata.", + ) parser.set_defaults(admin_action=bulk_load) @@ -154,11 +160,13 @@ async def _load_aurora( table_name: str, table_options, aurora_connection: Connection, + bare_aurora_tables: bool = False, ) -> Engine: logger.info("Loading %s on Aurora...", table_name) table = ctx.blueprint.get_table(table_name) + load_table_name = source_table_name(table) if not bare_aurora_tables else table.name load_query = _AURORA_LOAD_TEMPLATE.format( - table_name=source_table_name(table), + table_name=load_table_name, columns=comma_separated_column_names(table.columns), options=( "({})".format(table_options["aurora_options"]) @@ -178,7 +186,7 @@ async def _load_aurora( for column in table.columns: if column.data_type != "SERIAL" and column.data_type != "BIGSERIAL": continue - q = "SELECT MAX({}) FROM {}".format(column.name, source_table_name(table)) + q = "SELECT MAX({}) FROM {}".format(column.name, load_table_name) logger.debug("Running on Aurora: %s", q) await cursor.execute(q) row = await cursor.fetchone() @@ -186,7 +194,7 @@ async def _load_aurora( continue max_serial_val = row[0] q = "ALTER SEQUENCE {}_{}_seq RESTART WITH {}".format( - source_table_name(table), column.name, str(max_serial_val + 1) + load_table_name, column.name, str(max_serial_val + 1) ) logger.debug("Running on Aurora: %s", q) await cursor.execute(q) @@ -355,6 +363,9 @@ async def bulk_load_impl(args, manifest: Dict[str, Any]) -> None: directory = Directory(config) await directory.refresh() + if args.bare_aurora_tables: + logger.info("Loading bare Aurora tables.") + # Check for specific engines. if args.only_engines is not None and len(args.only_engines) > 0: engines_filter = {Engine.from_str(val) for val in args.only_engines} @@ -407,6 +418,7 @@ def load_tasks_for_engine( table_name, table_options, engines.get_connection(Engine.Aurora), + args.bare_aurora_tables, ) elif ( engine == Engine.Redshift