Skip to content

Commit

Permalink
fix: don't load NDJSON data into memory all at once
Browse files Browse the repository at this point in the history
This commit changes how --load-ndjson-dir works, telling DuckDB to load
the data from files on disk itself, rather than us loading that data
into memory all at once, then handing it to DuckDB.

Which allows querying larger-than-memory data sets.

But... the SQL itself can still be a memory bottleneck, if it requires
loading too much data into memory.
  • Loading branch information
mikix committed Jan 30, 2025
1 parent 8bd9789 commit b7ecea0
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cumulus_library/databases/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def execute_as_pandas(
def parser(self) -> base.DatabaseParser:
return AthenaParser()

def operational_errors(self) -> tuple[Exception]:
def operational_errors(self) -> tuple[type[Exception], ...]:
return (pyathena.OperationalError,)

def col_parquet_types_from_pandas(self, field_types: list) -> list:
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def execute_as_pandas(
def parser(self) -> DatabaseParser:
"""Returns parser object for interrogating DB schemas"""

def operational_errors(self) -> tuple[Exception]:
def operational_errors(self) -> tuple[type[Exception], ...]:
"""Returns a tuple of operational exception classes
An operational error is something that went wrong while performing a database
Expand Down
5 changes: 3 additions & 2 deletions cumulus_library/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import duckdb
import pandas
import pyarrow
import pyarrow.dataset

from cumulus_library import errors
from cumulus_library.databases import base
Expand Down Expand Up @@ -90,7 +91,7 @@ def connect(self):
duckdb.typing.VARCHAR,
)

def insert_tables(self, tables: dict[str, pyarrow.Table]) -> None:
def insert_tables(self, tables: dict[str, pyarrow.dataset.Dataset]) -> None:
"""Ingests all ndjson data from a folder tree.
This is often the output folder of Cumulus ETL"""
Expand Down Expand Up @@ -197,7 +198,7 @@ def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
def parser(self) -> base.DatabaseParser:
return DuckDbParser()

def operational_errors(self) -> tuple[Exception]:
def operational_errors(self) -> tuple[type[Exception], ...]:
return (
duckdb.OperationalError,
duckdb.BinderException,
Expand Down
82 changes: 63 additions & 19 deletions cumulus_library/databases/utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,60 @@
import pathlib
import sys
from collections.abc import Iterable

import cumulus_fhir_support
import pyarrow
import pyarrow.dataset

from cumulus_library import db_config, errors
from cumulus_library import base_utils, db_config, errors
from cumulus_library.databases import athena, base, duckdb


def _read_rows_for_resource(path: pathlib.Path, resource: str) -> list[dict]:
rows = []
def _list_files_for_resource(path: pathlib.Path, resource: str) -> list[str]:
files = []

# Support any ndjson files from the target folder directly
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(path, resource))
files += list(cumulus_fhir_support.list_multiline_json_in_dir(path, resource))

# Also support being given an ETL output folder, and look in the table subdir
subdir = path / resource.lower()
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(subdir, resource))
files += list(cumulus_fhir_support.list_multiline_json_in_dir(subdir, resource))

return rows
return files


def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
def _rows_from_files(files: list[str]) -> Iterable[dict]:
for file in files:
yield from cumulus_fhir_support.read_multiline_json(file)


def _load_custom_etl_table(path: str) -> pyarrow.dataset.Dataset | None:
"""Loads a non-FHIR ETL table from disk (tables like etl__completion)."""
files = list(cumulus_fhir_support.list_multiline_json_in_dir(path))
if not files:
return None

# This is a custom Cumulus ETL table, with no easy way to get a schema definition.
# We **could** put a hard-coded shared schema in cumulus-fhir-support, but since these
# tables are so simple, we mostly don't yet need to have that level of pre-coordination.
#
# Instead, we let PyArrow infer the types, with one hiccup: its JSON parser will interpret
# datetime strings as a TIMESTAMP type, which isn't insane, but not what we want - we want to
# keep those as strings. So we let PyArrow infer from Python objects, where it works like we
# want.
#
# Thus, we sip on the data a bit to infer a schema from the first row as a Python object.
first_row = next(iter(_rows_from_files(files)), None)
if not first_row:
return None
schema = pyarrow.Table.from_pylist([first_row]).schema

# Now let PyArrow load the rest of the data from disk on demand, rather than loading it all
# into memory, but using the detected schema rather than inferring it with timestamps.
return pyarrow.dataset.dataset(files, schema=schema, format="json")


def read_ndjson_dir(path: str) -> dict[str, pyarrow.dataset.Dataset]:
"""Loads a directory tree of raw ndjson into schema-ful tables.
:param path: a directory path
Expand Down Expand Up @@ -49,32 +82,46 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
]
for resource in resources:
table_name = resource.lower()
rows = _read_rows_for_resource(pathlib.Path(path), resource)
files = _list_files_for_resource(pathlib.Path(path), resource)

# Make a pyarrow table with full schema from the data
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows)
all_tables[table_name] = pyarrow.Table.from_pylist(rows, schema)
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, _rows_from_files(files))
# Use a PyArrow Dataset (vs a Table) to avoid loading all the files in memory.
all_tables[table_name] = pyarrow.dataset.dataset(files, schema=schema, format="json")

# And now some special support for a few ETL tables.
metadata_tables = [
"etl__completion",
"etl__completion_encounters",
]
for metadata_table in metadata_tables:
rows = list(cumulus_fhir_support.read_multiline_json_from_dir(f"{path}/{metadata_table}"))
if rows:
# Auto-detecting the schema works for these simple tables
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows)
if dataset := _load_custom_etl_table(f"{path}/{metadata_table}"):
all_tables[metadata_table] = dataset

return all_tables


def _handle_load_ndjson_dir(args: dict[str, str], backend: base.DatabaseBackend) -> None:
load_ndjson_dir = args.get("load_ndjson_dir")
if not load_ndjson_dir:
return # nothing to do

if db_config.db_type != "duckdb":
sys.exit("Loading an NDJSON dir is only supported with --db-type=duckdb.")

if backend.connection is None:
return # connect() was never run, we don't have a live DB connection

with base_utils.get_progress_bar() as progress:
progress.add_task("Detecting JSON schemas...", total=None)
backend.insert_tables(read_ndjson_dir(load_ndjson_dir))


def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str):
"""Retrieves a database backend and target schema from CLI args"""
db_config.db_type = args["db_type"]

if db_config.db_type == "duckdb":
load_ndjson_dir = args.get("load_ndjson_dir")
# TODO: reevaluate as DuckDB's local schema support evolves.
# https://duckdb.org/docs/sql/statements/set.html#syntax
if not (args.get("schema_name") is None or args["schema_name"] == "main"):
Expand Down Expand Up @@ -104,14 +151,11 @@ def create_db_backend(args: dict[str, str]) -> (base.DatabaseBackend, str):
args["profile"],
schema_name,
)
if args.get("load_ndjson_dir"):
sys.exit("Loading an ndjson dir is not supported with --db-type=athena.")
else:
raise errors.CumulusLibraryError(f"'{db_config.db_type}' is not a supported database.")
if "prepare" not in args.keys():
backend.connect()
elif not args["prepare"]:
backend.connect()
if backend.connection is not None and db_config.db_type == "duckdb" and load_ndjson_dir:
backend.insert_tables(read_ndjson_dir(load_ndjson_dir))
_handle_load_ndjson_dir(args, backend)
return (backend, schema_name)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "cumulus-library"
requires-python = ">= 3.11"
dependencies = [
"cumulus-fhir-support >= 1.2",
"cumulus-fhir-support >= 1.3.1", # 1.3.1 fixes a "load all rows into memory" bug
"duckdb >= 1.1.3",
"Jinja2 > 3",
"pandas <3, >=2.1.3",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_duckdb_load_ndjson_dir(tmp_path):
for index, (filename, valid) in enumerate(filenames.items()):
with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f:
row_id = f"Good{index}" if valid else f"Bad{index}"
f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}')
f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}\n')

db, _ = databases.create_db_backend(
{
Expand Down
1 change: 1 addition & 0 deletions tests/testbed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def add(self, table: str, obj: dict) -> None:

with open(table_dir / f"{index}.ndjson", "w", encoding="utf8") as f:
json.dump(obj, f)
f.write("\n")

# ** Now a bunch of resource-specific "add" functions.
# They each take some kwargs that should:
Expand Down

0 comments on commit b7ecea0

Please sign in to comment.