-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: don't load NDJSON data into memory all at once #344
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,60 @@ | ||
import pathlib | ||
import sys | ||
from collections.abc import Iterable | ||
|
||
import cumulus_fhir_support | ||
import pyarrow | ||
import pyarrow.dataset | ||
|
||
from cumulus_library import db_config, errors | ||
from cumulus_library import base_utils, db_config, errors | ||
from cumulus_library.databases import athena, base, duckdb | ||
|
||
|
||
def _read_rows_for_resource(path: pathlib.Path, resource: str) -> list[dict]: | ||
rows = [] | ||
def _list_files_for_resource(path: pathlib.Path, resource: str) -> list[str]: | ||
files = [] | ||
|
||
# Support any ndjson files from the target folder directly | ||
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(path, resource)) | ||
files += list(cumulus_fhir_support.list_multiline_json_in_dir(path, resource)) | ||
|
||
# Also support being given an ETL output folder, and look in the table subdir | ||
subdir = path / resource.lower() | ||
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(subdir, resource)) | ||
files += list(cumulus_fhir_support.list_multiline_json_in_dir(subdir, resource)) | ||
|
||
return rows | ||
return files | ||
|
||
|
||
def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: | ||
def _rows_from_files(files: list[str]) -> Iterable[dict]: | ||
for file in files: | ||
yield from cumulus_fhir_support.read_multiline_json(file) | ||
|
||
|
||
def _load_custom_etl_table(path: str) -> pyarrow.dataset.Dataset | None: | ||
"""Loads a non-FHIR ETL table from disk (tables like etl__completion).""" | ||
files = list(cumulus_fhir_support.list_multiline_json_in_dir(path)) | ||
if not files: | ||
return None | ||
|
||
# This is a custom Cumulus ETL table, with no easy way to get a schema definition. | ||
# We **could** put a hard-coded shared schema in cumulus-fhir-support, but since these | ||
# tables are so simple, we mostly don't yet need to have that level of pre-coordination. | ||
# | ||
# Instead, we let PyArrow infer the types, with one hiccup: its JSON parser will interpret | ||
# datetime strings as a TIMESTAMP type, which isn't insane, but not what we want - we want to | ||
# keep those as strings. So we let PyArrow infer from Python objects, where it works like we | ||
# want. | ||
# | ||
# Thus, we sip on the data a bit to infer a schema from the first row as a Python object. | ||
first_row = next(iter(_rows_from_files(files)), None) | ||
if not first_row: | ||
return None | ||
schema = pyarrow.Table.from_pylist([first_row]).schema | ||
|
||
# Now let PyArrow load the rest of the data from disk on demand, rather than loading it all | ||
# into memory, but using the detected schema rather than inferring it with timestamps. | ||
return pyarrow.dataset.dataset(files, schema=schema, format="json") | ||
|
||
|
||
def read_ndjson_dir(path: str) -> dict[str, pyarrow.dataset.Dataset]: | ||
"""Loads a directory tree of raw ndjson into schema-ful tables. | ||
|
||
:param path: a directory path | ||
|
@@ -49,32 +82,46 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]: | |
] | ||
for resource in resources: | ||
table_name = resource.lower() | ||
rows = _read_rows_for_resource(pathlib.Path(path), resource) | ||
files = _list_files_for_resource(pathlib.Path(path), resource) | ||
|
||
# Make a pyarrow table with full schema from the data | ||
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows) | ||
all_tables[table_name] = pyarrow.Table.from_pylist(rows, schema) | ||
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, _rows_from_files(files)) | ||
# Use a PyArrow Dataset (vs a Table) to avoid loading all the files in memory. | ||
all_tables[table_name] = pyarrow.dataset.dataset(files, schema=schema, format="json") | ||
|
||
# And now some special support for a few ETL tables. | ||
metadata_tables = [ | ||
"etl__completion", | ||
"etl__completion_encounters", | ||
] | ||
for metadata_table in metadata_tables: | ||
rows = list(cumulus_fhir_support.read_multiline_json_from_dir(f"{path}/{metadata_table}")) | ||
if rows: | ||
# Auto-detecting the schema works for these simple tables | ||
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows) | ||
if dataset := _load_custom_etl_table(f"{path}/{metadata_table}"): | ||
all_tables[metadata_table] = dataset | ||
|
||
return all_tables | ||
|
||
|
||
def _handle_load_ndjson_dir(args: dict[str, str], backend: base.DatabaseBackend) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is mostly a refactor, with the only new functionality a progress bar for loading scanning the NDJSON for its schema. (Which in my 26G test folder takes four minutes.) I was feeling like the |
||
load_ndjson_dir = args.get("load_ndjson_dir") | ||
if not load_ndjson_dir: | ||
return # nothing to do | ||
|
||
if db_config.db_type != "duckdb": | ||
sys.exit("Loading an NDJSON dir is only supported with --db-type=duckdb.") | ||
|
||
if backend.connection is None: | ||
return # connect() was never run, we don't have a live DB connection | ||
|
||
with base_utils.get_progress_bar() as progress: | ||
progress.add_task("Detecting JSON schemas...", total=None) | ||
backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) | ||
|
||
|
||
def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): | ||
"""Retrieves a database backend and target schema from CLI args""" | ||
db_config.db_type = args["db_type"] | ||
|
||
if db_config.db_type == "duckdb": | ||
load_ndjson_dir = args.get("load_ndjson_dir") | ||
# TODO: reevaluate as DuckDB's local schema support evolves. | ||
# https://duckdb.org/docs/sql/statements/set.html#syntax | ||
if not (args.get("schema_name") is None or args["schema_name"] == "main"): | ||
|
@@ -104,14 +151,11 @@ def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str): | |
args["profile"], | ||
schema_name, | ||
) | ||
if args.get("load_ndjson_dir"): | ||
sys.exit("Loading an ndjson dir is not supported with --db-type=athena.") | ||
else: | ||
raise errors.CumulusLibraryError(f"'{db_config.db_type}' is not a supported database.") | ||
if "prepare" not in args.keys(): | ||
backend.connect() | ||
elif not args["prepare"]: | ||
backend.connect() | ||
if backend.connection is not None and db_config.db_type == "duckdb" and load_ndjson_dir: | ||
backend.insert_tables(read_ndjson_dir(load_ndjson_dir)) | ||
_handle_load_ndjson_dir(args, backend) | ||
return (backend, schema_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but I noticed that this method had incorrect typing (I think my fault).