From 53e3966bfe8f7eea4dfb523afa5e0b254bb7b1cb Mon Sep 17 00:00:00 2001 From: julius lukas Date: Thu, 22 May 2025 06:54:12 -0600 Subject: [PATCH 1/2] Added hcpc_corrections DAG --- .idea/.gitignore | 8 ++ .idea/.name | 1 + .../inspectionProfiles/profiles_settings.xml | 6 ++ .idea/modules.xml | 8 ++ .idea/sagerx.iml | 12 +++ .idea/sqldialects.xml | 6 ++ .idea/vcs.xml | 6 ++ airflow/dags/hcpc_corrections/dag.py | 83 +++++++++++++++++++ airflow/dags/hcpc_corrections/load_sample.sql | 7 ++ docker-compose.yml | 10 +-- requirements.txt | 1 + test_upload.txt | 1 + 12 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/.name create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/sagerx.iml create mode 100644 .idea/sqldialects.xml create mode 100644 .idea/vcs.xml create mode 100644 airflow/dags/hcpc_corrections/dag.py create mode 100644 airflow/dags/hcpc_corrections/load_sample.sql create mode 100644 requirements.txt create mode 100644 test_upload.txt diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..e79a331 --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +dag.py \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..572a5ec --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sagerx.iml b/.idea/sagerx.iml new file mode 100644 index 0000000..8b8c395 --- /dev/null +++ b/.idea/sagerx.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..c23d77c --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/airflow/dags/hcpc_corrections/dag.py b/airflow/dags/hcpc_corrections/dag.py new file mode 100644 index 0000000..5e9f35a --- /dev/null +++ b/airflow/dags/hcpc_corrections/dag.py @@ -0,0 +1,83 @@ +import pendulum +from airflow_operator import create_dag +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.operators.python import PythonOperator +from common_dag_tasks import get_ds_folder, get_data_folder +from pathlib import Path +import requests, pandas as pd + +dag_id = "cms_hcpcs_2020" +url = "https://www.cms.gov/files/zip/2020-corrections-alpha-numeric-hcpcs-file.zip" +"""change this url to whichever one you want""" +dag = create_dag( + dag_id=dag_id, + schedule="0 4 * * *", + start_date=pendulum.yesterday(), + catchup=False, + concurrency=2, +) + +def download_excel(): + folder = get_data_folder(dag_id) + folder.mkdir(parents=True, exist_ok=True) + out_path = folder / "HCPC2020_Corrections_Alpha.xlsx" + response = requests.get(url) + response.raise_for_status() + + zip_path = folder / "hcpcs.zip" + with open(zip_path, "wb") as f: + f.write(response.content) + + import zipfile + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(folder) + +"""converts xlsx to csv""" +def convert_to_csv(): + folder = get_data_folder(dag_id) + excel_file = next(folder.glob("*.xlsx")) + csv_file = excel_file.with_suffix(".csv") + df = pd.read_excel(excel_file, skiprows=4) + df.to_csv(csv_file, index=False) + print(f"Saved CSV: {csv_file}") + +with dag: + download_task = PythonOperator( + task_id="download_excel", + python_callable=download_excel, + ) + + convert_task = PythonOperator( + task_id="convert_to_csv", + python_callable=convert_to_csv, + ) + + load_task = PostgresOperator( + task_id="load_hcpcs", + postgres_conn_id="postgres_default", + sql=""" + DROP TABLE IF EXISTS sagerx_lake.cms_hcpcs_2020 CASCADE; + + CREATE TABLE sagerx_lake.cms_hcpcs_2020 ( + code TEXT, + action TEXT, + eff_date TEXT, + short_desc TEXT, + long_desc TEXT, + tos TEXT, + betos TEXT, + cov TEXT, + price TEXT, + xref_code TEXT, + asc_ind TEXT, + asc_date TEXT, + comments TEXT + ); + + COPY sagerx_lake.cms_hcpcs_2020 + FROM '/opt/airflow/data/cms_hcpcs_2020/HCPC2020_Corrections_Alpha.csv' + DELIMITER ',' CSV HEADER; + """, + ) + + download_task >> convert_task >> load_task diff --git a/airflow/dags/hcpc_corrections/load_sample.sql b/airflow/dags/hcpc_corrections/load_sample.sql new file mode 100644 index 0000000..e7e78f2 --- /dev/null +++ b/airflow/dags/hcpc_corrections/load_sample.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS public.julius_hcpcs_corrections_2020 CASCADE; + +CREATE TABLE public.julius_hcpcs_corrections_2020 ( + hcpcs_code TEXT, + description TEXT, + action TEXT +); diff --git a/docker-compose.yml b/docker-compose.yml index 7b00e23..3c4f4e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -111,7 +111,8 @@ services: airflow-webserver: <<: *airflow-common container_name: airflow-webserver - command: webserver + command: > + bash -c "pip install openpyxl && airflow webserver" environment: <<: *airflow-common-env AWS_ACCESS_KEY_ID: ${ACCESS_KEY} @@ -119,17 +120,16 @@ services: AWS_DEST_BUCKET: ${DEST_BUCKET} ports: - 8001:8080 - airflow-scheduler: <<: *airflow-common container_name: airflow-scheduler - command: scheduler + command: > + bash -c "pip install openpyxl && airflow scheduler" environment: <<: *airflow-common-env AWS_ACCESS_KEY_ID: ${ACCESS_KEY} AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} AWS_DEST_BUCKET: ${DEST_BUCKET} - networks: airflow-dbt-network: - driver: bridge \ No newline at end of file + driver: bridge diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..794cc3d --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +openpyxl diff --git a/test_upload.txt b/test_upload.txt new file mode 100644 index 0000000..19a44f5 --- /dev/null +++ b/test_upload.txt @@ -0,0 +1 @@ +This is a test file from sagerx From 46e7d744d52625d3bb5f1e1f75a06581bf1462d8 Mon Sep 17 00:00:00 2001 From: Julius Lukas Date: Thu, 22 May 2025 07:06:21 -0600 Subject: [PATCH 2/2] Added HCPC Corrections DAG --- .idea/.gitignore | 16 +- .idea/modules.xml | 14 +- .idea/sagerx.iml | 22 +-- .idea/sqldialects.xml | 10 +- airflow/dags/hcpc_corrections/dag.py | 166 +++++++++--------- airflow/dags/hcpc_corrections/load_sample.sql | 14 +- 6 files changed, 121 insertions(+), 121 deletions(-) diff --git a/.idea/.gitignore b/.idea/.gitignore index 13566b8..1c2fda5 100644 --- a/.idea/.gitignore +++ b/.idea/.gitignore @@ -1,8 +1,8 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml index 572a5ec..ed2834c 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -1,8 +1,8 @@ - - - - - - - + + + + + + + \ No newline at end of file diff --git a/.idea/sagerx.iml b/.idea/sagerx.iml index 8b8c395..2946dc0 100644 --- a/.idea/sagerx.iml +++ b/.idea/sagerx.iml @@ -1,12 +1,12 @@ - - - - - - - - - + + + + + + + + + \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml index c23d77c..eddcc27 100644 --- a/.idea/sqldialects.xml +++ b/.idea/sqldialects.xml @@ -1,6 +1,6 @@ - - - - - + + + + + \ No newline at end of file diff --git a/airflow/dags/hcpc_corrections/dag.py b/airflow/dags/hcpc_corrections/dag.py index 5e9f35a..13da465 100644 --- a/airflow/dags/hcpc_corrections/dag.py +++ b/airflow/dags/hcpc_corrections/dag.py @@ -1,83 +1,83 @@ -import pendulum -from airflow_operator import create_dag -from airflow.providers.postgres.operators.postgres import PostgresOperator -from airflow.operators.python import PythonOperator -from common_dag_tasks import get_ds_folder, get_data_folder -from pathlib import Path -import requests, pandas as pd - -dag_id = "cms_hcpcs_2020" -url = "https://www.cms.gov/files/zip/2020-corrections-alpha-numeric-hcpcs-file.zip" -"""change this url to whichever one you want""" -dag = create_dag( - dag_id=dag_id, - schedule="0 4 * * *", - start_date=pendulum.yesterday(), - catchup=False, - concurrency=2, -) - -def download_excel(): - folder = get_data_folder(dag_id) - folder.mkdir(parents=True, exist_ok=True) - out_path = folder / "HCPC2020_Corrections_Alpha.xlsx" - response = requests.get(url) - response.raise_for_status() - - zip_path = folder / "hcpcs.zip" - with open(zip_path, "wb") as f: - f.write(response.content) - - import zipfile - with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(folder) - -"""converts xlsx to csv""" -def convert_to_csv(): - folder = get_data_folder(dag_id) - excel_file = next(folder.glob("*.xlsx")) - csv_file = excel_file.with_suffix(".csv") - df = pd.read_excel(excel_file, skiprows=4) - df.to_csv(csv_file, index=False) - print(f"Saved CSV: {csv_file}") - -with dag: - download_task = PythonOperator( - task_id="download_excel", - python_callable=download_excel, - ) - - convert_task = PythonOperator( - task_id="convert_to_csv", - python_callable=convert_to_csv, - ) - - load_task = PostgresOperator( - task_id="load_hcpcs", - postgres_conn_id="postgres_default", - sql=""" - DROP TABLE IF EXISTS sagerx_lake.cms_hcpcs_2020 CASCADE; - - CREATE TABLE sagerx_lake.cms_hcpcs_2020 ( - code TEXT, - action TEXT, - eff_date TEXT, - short_desc TEXT, - long_desc TEXT, - tos TEXT, - betos TEXT, - cov TEXT, - price TEXT, - xref_code TEXT, - asc_ind TEXT, - asc_date TEXT, - comments TEXT - ); - - COPY sagerx_lake.cms_hcpcs_2020 - FROM '/opt/airflow/data/cms_hcpcs_2020/HCPC2020_Corrections_Alpha.csv' - DELIMITER ',' CSV HEADER; - """, - ) - - download_task >> convert_task >> load_task +import pendulum +from airflow_operator import create_dag +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.operators.python import PythonOperator +from common_dag_tasks import get_ds_folder, get_data_folder +from pathlib import Path +import requests, pandas as pd + +dag_id = "cms_hcpcs_2020" +url = "https://www.cms.gov/files/zip/2020-corrections-alpha-numeric-hcpcs-file.zip" +"""change this url to whichever one you want""" +dag = create_dag( + dag_id=dag_id, + schedule="0 4 * * *", + start_date=pendulum.yesterday(), + catchup=False, + concurrency=2, +) + +def download_excel(): + folder = get_data_folder(dag_id) + folder.mkdir(parents=True, exist_ok=True) + out_path = folder / "HCPC2020_Corrections_Alpha.xlsx" + response = requests.get(url) + response.raise_for_status() + + zip_path = folder / "hcpcs.zip" + with open(zip_path, "wb") as f: + f.write(response.content) + + import zipfile + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(folder) + +"""converts xlsx to csv""" +def convert_to_csv(): + folder = get_data_folder(dag_id) + excel_file = next(folder.glob("*.xlsx")) + csv_file = excel_file.with_suffix(".csv") + df = pd.read_excel(excel_file, skiprows=4) + df.to_csv(csv_file, index=False) + print(f"Saved CSV: {csv_file}") + +with dag: + download_task = PythonOperator( + task_id="download_excel", + python_callable=download_excel, + ) + + convert_task = PythonOperator( + task_id="convert_to_csv", + python_callable=convert_to_csv, + ) + + load_task = PostgresOperator( + task_id="load_hcpcs", + postgres_conn_id="postgres_default", + sql=""" + DROP TABLE IF EXISTS sagerx_lake.cms_hcpcs_2020 CASCADE; + + CREATE TABLE sagerx_lake.cms_hcpcs_2020 ( + code TEXT, + action TEXT, + eff_date TEXT, + short_desc TEXT, + long_desc TEXT, + tos TEXT, + betos TEXT, + cov TEXT, + price TEXT, + xref_code TEXT, + asc_ind TEXT, + asc_date TEXT, + comments TEXT + ); + + COPY sagerx_lake.cms_hcpcs_2020 + FROM '/opt/airflow/data/cms_hcpcs_2020/HCPC2020_Corrections_Alpha.csv' + DELIMITER ',' CSV HEADER; + """, + ) + + download_task >> convert_task >> load_task diff --git a/airflow/dags/hcpc_corrections/load_sample.sql b/airflow/dags/hcpc_corrections/load_sample.sql index e7e78f2..2b7c22e 100644 --- a/airflow/dags/hcpc_corrections/load_sample.sql +++ b/airflow/dags/hcpc_corrections/load_sample.sql @@ -1,7 +1,7 @@ -DROP TABLE IF EXISTS public.julius_hcpcs_corrections_2020 CASCADE; - -CREATE TABLE public.julius_hcpcs_corrections_2020 ( - hcpcs_code TEXT, - description TEXT, - action TEXT -); +DROP TABLE IF EXISTS public.julius_hcpcs_corrections_2020 CASCADE; + +CREATE TABLE public.julius_hcpcs_corrections_2020 ( + hcpcs_code TEXT, + description TEXT, + action TEXT +);