From 4a4467049d78637a99b6a46815fdb1b15ade2545 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 29 Jan 2025 12:43:36 -0500 Subject: [PATCH] fix: don't load NDJSON data into memory all at once This commit changes how --load-ndjson-dir works, telling DuckDB to load the data from files on disk itself, rather than us loading that data into memory all at once, then handing it to DuckDB. Which allows querying larger-than-memory data sets. But... the SQL itself can still be a memory bottleneck, if it requires loading too much data into memory. --- cumulus_library/databases/athena.py | 2 +- cumulus_library/databases/base.py | 2 +- cumulus_library/databases/duckdb.py | 5 +- cumulus_library/databases/utils.py | 85 ++++++++++++++++++++++------- pyproject.toml | 2 +- tests/test_duckdb.py | 2 +- tests/testbed_utils.py | 1 + 7 files changed, 74 insertions(+), 25 deletions(-) diff --git a/cumulus_library/databases/athena.py b/cumulus_library/databases/athena.py index 2469e6c..a3cd4e3 100644 --- a/cumulus_library/databases/athena.py +++ b/cumulus_library/databases/athena.py @@ -70,7 +70,7 @@ def execute_as_pandas( def parser(self) -> base.DatabaseParser: return AthenaParser() - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: return (pyathena.OperationalError,) def col_parquet_types_from_pandas(self, field_types: list) -> list: diff --git a/cumulus_library/databases/base.py b/cumulus_library/databases/base.py index 4ab679d..edbe937 100644 --- a/cumulus_library/databases/base.py +++ b/cumulus_library/databases/base.py @@ -146,7 +146,7 @@ def execute_as_pandas( def parser(self) -> DatabaseParser: """Returns parser object for interrogating DB schemas""" - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: """Returns a tuple of operational exception classes An operational error is something that went wrong while performing a database diff --git a/cumulus_library/databases/duckdb.py b/cumulus_library/databases/duckdb.py index aada2ce..118da31 100644 --- a/cumulus_library/databases/duckdb.py +++ b/cumulus_library/databases/duckdb.py @@ -14,6 +14,7 @@ import duckdb import pandas import pyarrow +import pyarrow.dataset from cumulus_library import errors from cumulus_library.databases import base @@ -90,7 +91,7 @@ def connect(self): duckdb.typing.VARCHAR, ) - def insert_tables(self, tables: dict[str, pyarrow.Table]) -> None: + def insert_tables(self, tables: dict[str, pyarrow.dataset.Dataset]) -> None: """Ingests all ndjson data from a folder tree. This is often the output folder of Cumulus ETL""" @@ -197,7 +198,7 @@ def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list: def parser(self) -> base.DatabaseParser: return DuckDbParser() - def operational_errors(self) -> tuple[Exception]: + def operational_errors(self) -> tuple[type[Exception], ...]: return ( duckdb.OperationalError, duckdb.BinderException, diff --git a/cumulus_library/databases/utils.py b/cumulus_library/databases/utils.py index 4807aa1..f970e11 100644 --- a/cumulus_library/databases/utils.py +++ b/cumulus_library/databases/utils.py @@ -1,27 +1,60 @@ import pathlib import sys +from collections.abc import Iterable import cumulus_fhir_support import pyarrow +import pyarrow.dataset -from cumulus_library import db_config, errors +from cumulus_library import base_utils, db_config, errors from cumulus_library.databases import athena, base, duckdb -def _read_rows_for_resource(path: pathlib.Path, resource: str) -> list[dict]: - rows = [] +def _list_files_for_resource(path: pathlib.Path, resource: str) -> list[str]: + files = [] # Support any ndjson files from the target folder directly - rows += list(cumulus_fhir_support.read_multiline_json_from_dir(path, resource)) + files += list(cumulus_fhir_support.list_multiline_json_in_dir(path, resource)) # Also support being given an ETL output folder, and look in the table subdir subdir = path / resource.lower() - rows += list(cumulus_fhir_support.read_multiline_json_from_dir(subdir, resource)) + files += list(cumulus_fhir_support.list_multiline_json_in_dir(subdir, resource)) - return rows + return files -def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: +def _rows_from_files(files: list[str]) -> Iterable[dict]: + for file in files: + yield from cumulus_fhir_support.read_multiline_json(file) + + +def _load_custom_etl_table(path: str) -> pyarrow.dataset.Dataset | None: + """Loads a non-FHIR ETL table from disk (tables like etl__completion).""" + files = list(cumulus_fhir_support.list_multiline_json_in_dir(path)) + if not files: + return None + + # This is a custom Cumulus ETL table, with no easy way to get a schema definition. + # We **could** put a hard-coded shared schema in cumulus-fhir-support, but since these + # tables are so simple, we mostly don't yet need to have that level of pre-coordination. + # + # Instead, we let PyArrow infer the types, with one hiccup: its JSON parser will interpret + # datetime strings as a TIMESTAMP type, which isn't insane, but not what we want - we want to + # keep those as strings. So we let PyArrow infer from Python objects, where it works like we + # want. + # + # Thus, we sip on the data a bit to infer a schema from the first row as a Python object. + first_row = next(iter(_rows_from_files(files)), None) + if not first_row: + return None + schema = pyarrow.Table.from_pylist([first_row]).schema + + # Now let PyArrow load the rest of the data from disk on demand, rather than loading it all + # into memory, but using the detected schema rather than inferring it with timestamps. + return pyarrow.dataset.dataset(files, schema=schema, format="json") + + +def read_ndjson_dir(path: str) -> dict[str, pyarrow.dataset.Dataset]: """Loads a directory tree of raw ndjson into schema-ful tables. :param path: a directory path @@ -49,11 +82,12 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: ] for resource in resources: table_name = resource.lower() - rows = _read_rows_for_resource(pathlib.Path(path), resource) + files = _list_files_for_resource(pathlib.Path(path), resource) # Make a pyarrow table with full schema from the data - schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows) - all_tables[table_name] = pyarrow.Table.from_pylist(rows, schema) + schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, _rows_from_files(files)) + # Use a PyArrow Dataset (vs a Table) to avoid loading all the files in memory. + all_tables[table_name] = pyarrow.dataset.dataset(files, schema=schema, format="json") # And now some special support for a few ETL tables. metadata_tables = [ @@ -61,20 +95,36 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: "etl__completion_encounters", ] for metadata_table in metadata_tables: - rows = list(cumulus_fhir_support.read_multiline_json_from_dir(f"{path}/{metadata_table}")) - if rows: - # Auto-detecting the schema works for these simple tables - all_tables[metadata_table] = pyarrow.Table.from_pylist(rows) + if dataset := _load_custom_etl_table(f"{path}/{metadata_table}"): + all_tables[metadata_table] = dataset return all_tables +def _handle_load_ndjson_dir(args: dict[str, str], backend: base.DatabaseBackend) -> None: + load_ndjson_dir = args.get("load_ndjson_dir") + + if db_config.db_type != "duckdb": + if load_ndjson_dir: + sys.exit("Loading an NDJSON dir is only supported with --db-type=duckdb.") + return + + if not load_ndjson_dir: + return # nothing to do + + if backend.connection is None: + return # connect() was never run, we don't have a live DB connection + + with base_utils.get_progress_bar() as progress: + progress.add_task("Detecting JSON schemas...", total=None) + backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) + + def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): """Retrieves a database backend and target schema from CLI args""" db_config.db_type = args["db_type"] if db_config.db_type == "duckdb": - load_ndjson_dir = args.get("load_ndjson_dir") # TODO: reevaluate as DuckDB's local schema support evolves. # https://duckdb.org/docs/sql/statements/set.html#syntax if not (args.get("schema_name") is None or args["schema_name"] == "main"): @@ -104,14 +154,11 @@ def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): args["profile"], schema_name, ) - if args.get("load_ndjson_dir"): - sys.exit("Loading an ndjson dir is not supported with --db-type=athena.") else: raise errors.CumulusLibraryError(f"'{db_config.db_type}' is not a supported database.") if "prepare" not in args.keys(): backend.connect() elif not args["prepare"]: backend.connect() - if backend.connection is not None and db_config.db_type == "duckdb" and load_ndjson_dir: - backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) + _handle_load_ndjson_dir(args, backend) return (backend, schema_name) diff --git a/pyproject.toml b/pyproject.toml index 76b0b6c..2c349b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "cumulus-library" requires-python = ">= 3.11" dependencies = [ - "cumulus-fhir-support >= 1.2", + "cumulus-fhir-support >= 1.3.1", # 1.3.1 fixes a "load all rows into memory" bug "duckdb >= 1.1.3", "Jinja2 > 3", "pandas <3, >=2.1.3", diff --git a/tests/test_duckdb.py b/tests/test_duckdb.py index ad4a4a4..6ad1b63 100644 --- a/tests/test_duckdb.py +++ b/tests/test_duckdb.py @@ -75,7 +75,7 @@ def test_duckdb_load_ndjson_dir(tmp_path): for index, (filename, valid) in enumerate(filenames.items()): with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f: row_id = f"Good{index}" if valid else f"Bad{index}" - f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}') + f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}\n') db, _ = databases.create_db_backend( { diff --git a/tests/testbed_utils.py b/tests/testbed_utils.py index 7bd6b86..f793330 100644 --- a/tests/testbed_utils.py +++ b/tests/testbed_utils.py @@ -33,6 +33,7 @@ def add(self, table: str, obj: dict) -> None: with open(table_dir / f"{index}.ndjson", "w", encoding="utf8") as f: json.dump(obj, f) + f.write("\n") # ** Now a bunch of resource-specific "add" functions. # They each take some kwargs that should: