Skip to content

Commit

Permalink
Add table creation from files (#230)
Browse files Browse the repository at this point in the history
* Add table creation from files

* remove mock bsvs

* pr feedback

* meant to adjust the other test, whoops

* test cleanup

* corrected upload path
  • Loading branch information
dogversioning authored May 1, 2024
1 parent ca83935 commit 0aacf20
Show file tree
Hide file tree
Showing 28 changed files with 504 additions and 183 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ jobs:
WG: cumulus
DB: cumulus_library_regression_db
run: |
cumulus-library build -t core --profile $PROFILE --workgroup $WG --database $DB
cumulus-library export -t core ./tests/regression/data_export/ --profile $PROFILE --workgroup $WG --database $DB
cumulus-library build -t vocab -t core --profile $PROFILE --workgroup $WG --database $DB
cumulus-library export -t vocab -t core ./tests/regression/data_export/ --profile $PROFILE --workgroup $WG --database $DB
- name: Compare vs known data
run: python ./tests/regression/run_regression.py

Expand Down
4 changes: 4 additions & 0 deletions .sqlfluffignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ encounter.sql.jinja
# Ignoring for now - could be addressed with an in-folder .sqlfluff config
# or by a refactor of variable names
count.sql.jinja

# The following files try to adapt syntax to databases and so are not
# well checkable via static sqlfluff variables
ctas_from_parquet.sql.jinja
2 changes: 2 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ join_cols_by_table =
}
}
join_id = subject_ref
local_location = /var/study/data/
neg_source_table = neg_source_table
output_table_name = 'created_table'
parent_field = 'parent'
prefix = Test
primary_ref = encounter_ref
pos_source_table = pos_source_table
remote_location = s3://bucket/study/data/
schema_name = test_schema
schema =
{
Expand Down
52 changes: 51 additions & 1 deletion cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import datetime
import json
import os
import pathlib
import sys
from pathlib import Path
from typing import Any, Protocol

import boto3
import cumulus_fhir_support
import duckdb
import pandas
Expand All @@ -24,7 +26,7 @@
from pyathena.common import BaseCursor as AthenaCursor
from pyathena.pandas.cursor import PandasCursor as AthenaPandasCursor

from cumulus_library import db_config
from cumulus_library import db_config, errors


class DatabaseCursor(Protocol):
Expand Down Expand Up @@ -497,3 +499,51 @@ def create_db_backend(args: dict[str, str]) -> DatabaseBackend:
raise ValueError(f"Unexpected --db-type value '{db_type}'")

return backend


def upload_file(
*,
cursor: DatabaseCursor,
file: pathlib.Path,
study: str,
topic: str,
remote_filename: str | None = None,
) -> str | None:
if db_config.db_type == "athena":
# We'll investigate the cursor to get the relevant params for S3 upload
# TODO: this could be retrieved from a config object passed to builders
wg_conf = cursor.connection._client.get_work_group(
WorkGroup=cursor.connection.work_group
)["WorkGroup"]["Configuration"]["ResultConfiguration"]
s3_path = wg_conf["OutputLocation"]
bucket = "/".join(s3_path.split("/")[2:3])
key_prefix = "/".join(s3_path.split("/")[3:])
encryption_type = wg_conf.get("EncryptionConfiguration", {}).get(
"EncryptionOption", {}
)
if encryption_type != "SSE_KMS":
raise errors.AWSError(
f"Bucket {bucket} has unexpected encryption type {encryption_type}."
"AWS KMS encryption is expected for Cumulus buckets"
)
kms_arn = wg_conf.get("EncryptionConfiguration", {}).get("KmsKey", None)
profile = cursor.connection.profile_name
s3_key = (
f"{key_prefix}cumulus_user_uploads/{cursor._schema_name}/"
f"{study}/{topic}"
)
if not remote_filename:
remote_filename = file
session = boto3.Session(profile_name=profile)
s3_client = session.client("s3")
with open(file, "rb") as b_file:
s3_client.put_object(
Bucket=bucket,
Key=f"{s3_key}/{remote_filename}",
Body=b_file,
ServerSideEncryption="aws:kms",
SSEKMSKeyId=kms_arn,
)
return f"s3://{bucket}/{s3_key}"
# For DBs not requiring a remote upload
return None
4 changes: 4 additions & 0 deletions cumulus_library/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ class StudyManifestQueryError(Exception):
"""Errors related to data queries from StudyManifestParser"""


class AWSError(Exception):
"""Errors from interacting with AWS"""


class ApiError(Exception):
"""Errors from external API calls"""
1 change: 1 addition & 0 deletions cumulus_library/studies/vocab/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
icd/*.parquet
6 changes: 6 additions & 0 deletions cumulus_library/studies/vocab/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ file_names = [
file_names = [
"icd_legend.sql",
]

# This export is only intended for use as part of CI regression testing
[export_config]
export_list = [
"vocab__icd"
]
18 changes: 18 additions & 0 deletions cumulus_library/studies/vocab/reference_sql/vocab_icd_builder.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- noqa: disable=all
-- This sql was autogenerated as a reference example using the library
-- CLI. Its format is tied to the specific database it was run against,
-- and it may not be correct for all databases. Use the CLI's build
-- option to derive the best SQL for your dataset.

-- ###########################################################

CREATE EXTERNAL TABLE IF NOT EXISTS `cumulus_mhg_dev_db`.`vocab__icd` (
CUI STRING,
TTY STRING,
CODE STRING,
SAB STRING,
STR STRING
)
STORED AS PARQUET
LOCATION 's3://cumulus-athena-933137588087-us-east-1/results/cumulus_user_uploads/cumulus_mhg_dev_db/vocab/icd'
tblproperties ("parquet.compression"="SNAPPY");
75 changes: 29 additions & 46 deletions cumulus_library/studies/vocab/vocab_icd_builder.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
""" Module for directly loading ICD bsvs into athena tables """

import csv
import pathlib

from cumulus_library import base_table_builder
import pandas

from cumulus_library import base_table_builder, databases
from cumulus_library.template_sql import base_templates


Expand All @@ -30,49 +31,31 @@ def prepare_queries(self, cursor: object, schema: str, *args, **kwargs):
"""

table_name = "vocab__icd"
icd_files = ["ICD10CM_2023AA", "ICD10PCS_2023AA", "ICD9CM_2023AA"]
path = pathlib.Path(__file__).parent

icd_files = path.glob("icd/*.bsv")
headers = ["CUI", "TTY", "CODE", "SAB", "STR"]
rows_processed = 0
dataset = []
created = False
for filename in icd_files:
with open(f"{path}/{filename}.bsv") as file:
# For the first row in the dataset, we want to coerce types from
# varchar(len(item)) athena default to to an unrestricted varchar, so
# we'll create a table with one row - this make the recast faster, and
# lets us set the partition_size a little higher by limiting the
# character bloat to keep queries under athena's limit of 262144.
reader = csv.reader(file, delimiter="|")
if not created:
row = self.clean_row(next(reader), filename)
self.queries.append(
base_templates.get_ctas_query(
schema_name=schema,
table_name=table_name,
dataset=[row],
table_cols=headers,
)
)
created = True
for row in reader:
row = self.clean_row(row, filename)
dataset.append(row)
rows_processed += 1
if rows_processed == self.partition_size:
self.queries.append(
base_templates.get_insert_into_query(
table_name=table_name,
table_cols=headers,
dataset=dataset,
)
)
dataset = []
rows_processed = 0
if rows_processed > 0:
self.queries.append(
base_templates.get_insert_into_query(
table_name=table_name, table_cols=headers, dataset=dataset
)
)
header_types = ["STRING", "STRING", "STRING", "STRING", "STRING"]
for file in icd_files:
parquet_path = path / f"icd/{file.stem}.parquet"
if not parquet_path.is_file():
df = pandas.read_csv(file, delimiter="|", names=headers)
df.to_parquet(parquet_path)
remote_path = databases.upload_file(
cursor=cursor,
file=parquet_path,
study="vocab",
topic="icd",
remote_filename=f"{file.stem}.parquet",
)
# Since we are building one table from these three files, it's fine to just
# use the last value of remote location
self.queries.append(
base_templates.get_ctas_from_parquet_query(
schema_name=schema,
table_name=table_name,
local_location=path / "icd",
remote_location=remote_path,
table_cols=headers,
remote_table_cols_types=header_types,
)
)
35 changes: 35 additions & 0 deletions cumulus_library/template_sql/base_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,41 @@ def get_create_view_query(
)


def get_ctas_from_parquet_query(
schema_name: str,
table_name: str,
local_location: str,
remote_location: str,
table_cols: list[str],
remote_table_cols_types: list[str],
) -> str:
"""Generates a create table as query using a directory of parquet files as a source
This function will generate an appropriate query for the underlying DB.
Not all params are used by each type of database.
:param schema_name: (athena) The schema to create the table in
:param table_name: (all) The name of the athena table to create
:param local_location: (duckdb) A directory containing parquet files to group
into a table
:param remote_location: (athena) An S3 URL to a directory containing parquert
fiels to group into a table
:param table_cols: (all) names of fields in your parquet to use as SQL columns
:param remote_table_cols_types: (athena) The types to assign to the columns
created in athena. Note that these should not be SQL types, but instead
should be parquet types.
"""
return get_base_template(
"ctas_from_parquet",
schema_name=schema_name,
table_name=table_name,
local_location=local_location,
remote_location=remote_location,
table_cols=table_cols,
remote_table_cols_types=remote_table_cols_types,
)


def get_ctas_query(
schema_name: str, table_name: str, dataset: list[list[str]], table_cols: list[str]
) -> str:
Expand Down
24 changes: 24 additions & 0 deletions cumulus_library/template_sql/ctas_from_parquet.sql.jinja
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{%- import 'syntax.sql.jinja' as syntax -%}
{%- if db_type == 'athena' -%}
CREATE EXTERNAL TABLE IF NOT EXISTS `{{ schema_name }}`.`{{ table_name }}` (
{%- elif db_type == 'duckdb' -%}
CREATE TABLE IF NOT EXISTS {{ table_name }} AS SELECT
{%- endif %}
{%- for col in table_cols %}
{{ col }}{% if db_type == 'athena' %} {{ remote_table_cols_types[loop.index0] }}{%- endif -%}
{{- syntax.comma_delineate(loop) }}
{%- endfor %}
{%- if db_type == 'athena' %}
)
{#- TODO: we may want to consider an optional partition parameter for
large tables, though we would need to also run a MSCK REPAIR TABLE query
after this table is created to make the data available.
See https://docs.aws.amazon.com/athena/latest/ug/parquet-serde.html
for more info #}
STORED AS PARQUET
LOCATION '{{ remote_location }}'
tblproperties ("parquet.compression"="SNAPPY");
{%- elif db_type == 'duckdb' %}
FROM read_parquet('{{ local_location }}/*.parquet')
{%- endif %}
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"observation": [["id"], ["encounter", "reference"]],
"patient": [["id"]],
}
VOCAB_ICD_ROW_COUNT = 403230

# Utility functions

Expand Down
6 changes: 6 additions & 0 deletions tests/core/test_core_meds.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for core__medicationrequest"""

import json
import os
from unittest import mock

import pytest

Expand Down Expand Up @@ -79,6 +81,10 @@ def test_core_medreq_only_inline(tmp_path):
assert [x[0] for x in ids] == ["Inline"]


@mock.patch.dict(
os.environ,
clear=True,
)
def test_core_med_all_types(tmp_path):
"""Verify that we handle all types of medications"""
testbed = testbed_utils.LocalTestbed(tmp_path)
Expand Down
Empty file added tests/regression/__init__.py
Empty file.
Loading

0 comments on commit 0aacf20

Please sign in to comment.