Skip to content

Commit

Permalink
Add support for bare tables in the bulk load tool
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Oct 23, 2024
1 parent db7048d commit b38406c
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/brad/admin/bulk_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ def register_admin_action(subparser) -> None:
action="store_true",
help="If set, truncate the tables in Aurora and reload them.",
)
parser.add_argument(
"--bare-aurora-tables",
action="store_true",
help="If set, load to bare Aurora tables instead of the ones "
"augmented with extraction metadata.",
)
parser.set_defaults(admin_action=bulk_load)


Expand Down Expand Up @@ -154,11 +160,13 @@ async def _load_aurora(
table_name: str,
table_options,
aurora_connection: Connection,
bare_aurora_tables: bool = False,
) -> Engine:
logger.info("Loading %s on Aurora...", table_name)
table = ctx.blueprint.get_table(table_name)
load_table_name = source_table_name(table) if not bare_aurora_tables else table.name
load_query = _AURORA_LOAD_TEMPLATE.format(
table_name=source_table_name(table),
table_name=load_table_name,
columns=comma_separated_column_names(table.columns),
options=(
"({})".format(table_options["aurora_options"])
Expand All @@ -178,15 +186,15 @@ async def _load_aurora(
for column in table.columns:
if column.data_type != "SERIAL" and column.data_type != "BIGSERIAL":
continue
q = "SELECT MAX({}) FROM {}".format(column.name, source_table_name(table))
q = "SELECT MAX({}) FROM {}".format(column.name, load_table_name)
logger.debug("Running on Aurora: %s", q)
await cursor.execute(q)
row = await cursor.fetchone()
if row is None:
continue
max_serial_val = row[0]
q = "ALTER SEQUENCE {}_{}_seq RESTART WITH {}".format(
source_table_name(table), column.name, str(max_serial_val + 1)
load_table_name, column.name, str(max_serial_val + 1)
)
logger.debug("Running on Aurora: %s", q)
await cursor.execute(q)
Expand Down Expand Up @@ -355,6 +363,9 @@ async def bulk_load_impl(args, manifest: Dict[str, Any]) -> None:
directory = Directory(config)
await directory.refresh()

if args.bare_aurora_tables:
logger.info("Loading bare Aurora tables.")

# Check for specific engines.
if args.only_engines is not None and len(args.only_engines) > 0:
engines_filter = {Engine.from_str(val) for val in args.only_engines}
Expand Down Expand Up @@ -407,6 +418,7 @@ def load_tasks_for_engine(
table_name,
table_options,
engines.get_connection(Engine.Aurora),
args.bare_aurora_tables,
)
elif (
engine == Engine.Redshift
Expand Down

0 comments on commit b38406c

Please sign in to comment.