diff --git a/cumulus_library/databases/athena.py b/cumulus_library/databases/athena.py index 2469e6c..a3cd4e3 100644 --- a/cumulus_library/databases/athena.py +++ b/cumulus_library/databases/athena.py @@ -70,7 +70,7 @@ def execute_as_pandas( def parser(self) -> base.DatabaseParser: return AthenaParser() - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: return (pyathena.OperationalError,) def col_parquet_types_from_pandas(self, field_types: list) -> list: diff --git a/cumulus_library/databases/base.py b/cumulus_library/databases/base.py index 4ab679d..edbe937 100644 --- a/cumulus_library/databases/base.py +++ b/cumulus_library/databases/base.py @@ -146,7 +146,7 @@ def execute_as_pandas( def parser(self) -> DatabaseParser: """Returns parser object for interrogating DB schemas""" - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: """Returns a tuple of operational exception classes An operational error is something that went wrong while performing a database diff --git a/cumulus_library/databases/duckdb.py b/cumulus_library/databases/duckdb.py index aada2ce..118da31 100644 --- a/cumulus_library/databases/duckdb.py +++ b/cumulus_library/databases/duckdb.py @@ -14,6 +14,7 @@ import duckdb import pandas import pyarrow +import pyarrow.dataset from cumulus_library import errors from cumulus_library.databases import base @@ -90,7 +91,7 @@ def connect(self): duckdb.typing.VARCHAR, ) - def insert_tables(self, tables: dict[str, pyarrow.Table]) -> None: + def insert_tables(self, tables: dict[str, pyarrow.dataset.Dataset]) -> None: """Ingests all ndjson data from a folder tree. This is often the output folder of Cumulus ETL""" @@ -197,7 +198,7 @@ def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list: def parser(self) -> base.DatabaseParser: return DuckDbParser() - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: return ( duckdb.OperationalError, duckdb.BinderException, diff --git a/cumulus_library/databases/utils.py b/cumulus_library/databases/utils.py index 4807aa1..5f9727c 100644 --- a/cumulus_library/databases/utils.py +++ b/cumulus_library/databases/utils.py @@ -1,27 +1,60 @@ import pathlib import sys +from collections.abc import Iterable import cumulus_fhir_support import pyarrow +import pyarrow.dataset -from cumulus_library import db_config, errors +from cumulus_library import base_utils, db_config, errors from cumulus_library.databases import athena, base, duckdb -def _read_rows_for_resource(path: pathlib.Path, resource: str) -> list[dict]: - rows = [] +def _list_files_for_resource(path: pathlib.Path, resource: str) -> list[str]: + files = [] # Support any ndjson files from the target folder directly - rows += list(cumulus_fhir_support.read_multiline_json_from_dir(path, resource)) + files += list(cumulus_fhir_support.list_multiline_json_in_dir(path, resource)) # Also support being given an ETL output folder, and look in the table subdir subdir = path / resource.lower() - rows += list(cumulus_fhir_support.read_multiline_json_from_dir(subdir, resource)) + files += list(cumulus_fhir_support.list_multiline_json_in_dir(subdir, resource)) - return rows + return files -def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: +def _rows_from_files(files: list[str]) -> Iterable[dict]: + for file in files: + yield from cumulus_fhir_support.read_multiline_json(file) + + +def _load_custom_etl_table(path: str) -> pyarrow.dataset.Dataset | None: + """Loads a non-FHIR ETL table from disk (tables like etl__completion).""" + files = list(cumulus_fhir_support.list_multiline_json_in_dir(path)) + if not files: + return None + + # This is a custom Cumulus ETL table, with no easy way to get a schema definition. + # We **could** put a hard-coded shared schema in cumulus-fhir-support, but since these + # tables are so simple, we mostly don't yet need to have that level of pre-coordination. + # + # Instead, we let PyArrow infer the types, with one hiccup: its JSON parser will interpret + # datetime strings as a TIMESTAMP type, which isn't insane, but not what we want - we want to + # keep those as strings. So we let PyArrow infer from Python objects, where it works like we + # want. + # + # Thus, we sip on the data a bit to infer a schema from the first row as a Python object. + first_row = next(iter(_rows_from_files(files)), None) + if not first_row: + return None + schema = pyarrow.Table.from_pylist([first_row]).schema + + # Now let PyArrow load the rest of the data from disk on demand, rather than loading it all + # into memory, but using the detected schema rather than inferring it with timestamps. + return pyarrow.dataset.dataset(files, schema=schema, format="json") + + +def read_ndjson_dir(path: str) -> dict[str, pyarrow.dataset.Dataset]: """Loads a directory tree of raw ndjson into schema-ful tables. :param path: a directory path @@ -49,11 +82,12 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: ] for resource in resources: table_name = resource.lower() - rows = _read_rows_for_resource(pathlib.Path(path), resource) + files = _list_files_for_resource(pathlib.Path(path), resource) # Make a pyarrow table with full schema from the data - schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows) - all_tables[table_name] = pyarrow.Table.from_pylist(rows, schema) + schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, _rows_from_files(files)) + # Use a PyArrow Dataset (vs a Table) to avoid loading all the files in memory. + all_tables[table_name] = pyarrow.dataset.dataset(files, schema=schema, format="json") # And now some special support for a few ETL tables. metadata_tables = [ @@ -61,20 +95,33 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: "etl__completion_encounters", ] for metadata_table in metadata_tables: - rows = list(cumulus_fhir_support.read_multiline_json_from_dir(f"{path}/{metadata_table}")) - if rows: - # Auto-detecting the schema works for these simple tables - all_tables[metadata_table] = pyarrow.Table.from_pylist(rows) + if dataset := _load_custom_etl_table(f"{path}/{metadata_table}"): + all_tables[metadata_table] = dataset return all_tables +def _handle_load_ndjson_dir(args: dict[str, str], backend: base.DatabaseBackend) -> None: + load_ndjson_dir = args.get("load_ndjson_dir") + if not load_ndjson_dir: + return # nothing to do + + if db_config.db_type != "duckdb": + sys.exit("Loading an NDJSON dir is only supported with --db-type=duckdb.") + + if backend.connection is None: + return # connect() was never run, we don't have a live DB connection + + with base_utils.get_progress_bar() as progress: + progress.add_task("Detecting JSON schemas...", total=None) + backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) + + def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): """Retrieves a database backend and target schema from CLI args""" db_config.db_type = args["db_type"] if db_config.db_type == "duckdb": - load_ndjson_dir = args.get("load_ndjson_dir") # TODO: reevaluate as DuckDB's local schema support evolves. # https://duckdb.org/docs/sql/statements/set.html#syntax if not (args.get("schema_name") is None or args["schema_name"] == "main"): @@ -104,14 +151,11 @@ def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): args["profile"], schema_name, ) - if args.get("load_ndjson_dir"): - sys.exit("Loading an ndjson dir is not supported with --db-type=athena.") else: raise errors.CumulusLibraryError(f"'{db_config.db_type}' is not a supported database.") if "prepare" not in args.keys(): backend.connect() elif not args["prepare"]: backend.connect() - if backend.connection is not None and db_config.db_type == "duckdb" and load_ndjson_dir: - backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) + _handle_load_ndjson_dir(args, backend) return (backend, schema_name) diff --git a/pyproject.toml b/pyproject.toml index 76b0b6c..2c349b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "cumulus-library" requires-python = ">= 3.11" dependencies = [ - "cumulus-fhir-support >= 1.2", + "cumulus-fhir-support >= 1.3.1", # 1.3.1 fixes a "load all rows into memory" bug "duckdb >= 1.1.3", "Jinja2 > 3", "pandas <3, >=2.1.3", diff --git a/tests/test_duckdb.py b/tests/test_duckdb.py index ad4a4a4..6ad1b63 100644 --- a/tests/test_duckdb.py +++ b/tests/test_duckdb.py @@ -75,7 +75,7 @@ def test_duckdb_load_ndjson_dir(tmp_path): for index, (filename, valid) in enumerate(filenames.items()): with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f: row_id = f"Good{index}" if valid else f"Bad{index}" - f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}') + f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}\n') db, _ = databases.create_db_backend( { diff --git a/tests/testbed_utils.py b/tests/testbed_utils.py index 7bd6b86..f793330 100644 --- a/tests/testbed_utils.py +++ b/tests/testbed_utils.py @@ -33,6 +33,7 @@ def add(self, table: str, obj: dict) -> None: with open(table_dir / f"{index}.ndjson", "w", encoding="utf8") as f: json.dump(obj, f) + f.write("\n") # ** Now a bunch of resource-specific "add" functions. # They each take some kwargs that should: