diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..1c2fda5 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..e79a331 --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +dag.py \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..ed2834c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sagerx.iml b/.idea/sagerx.iml new file mode 100644 index 0000000..2946dc0 --- /dev/null +++ b/.idea/sagerx.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..eddcc27 --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/airflow/dags/hcpc_corrections/dag.py b/airflow/dags/hcpc_corrections/dag.py new file mode 100644 index 0000000..13da465 --- /dev/null +++ b/airflow/dags/hcpc_corrections/dag.py @@ -0,0 +1,83 @@ +import pendulum +from airflow_operator import create_dag +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.operators.python import PythonOperator +from common_dag_tasks import get_ds_folder, get_data_folder +from pathlib import Path +import requests, pandas as pd + +dag_id = "cms_hcpcs_2020" +url = "https://www.cms.gov/files/zip/2020-corrections-alpha-numeric-hcpcs-file.zip" +"""change this url to whichever one you want""" +dag = create_dag( + dag_id=dag_id, + schedule="0 4 * * *", + start_date=pendulum.yesterday(), + catchup=False, + concurrency=2, +) + +def download_excel(): + folder = get_data_folder(dag_id) + folder.mkdir(parents=True, exist_ok=True) + out_path = folder / "HCPC2020_Corrections_Alpha.xlsx" + response = requests.get(url) + response.raise_for_status() + + zip_path = folder / "hcpcs.zip" + with open(zip_path, "wb") as f: + f.write(response.content) + + import zipfile + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(folder) + +"""converts xlsx to csv""" +def convert_to_csv(): + folder = get_data_folder(dag_id) + excel_file = next(folder.glob("*.xlsx")) + csv_file = excel_file.with_suffix(".csv") + df = pd.read_excel(excel_file, skiprows=4) + df.to_csv(csv_file, index=False) + print(f"Saved CSV: {csv_file}") + +with dag: + download_task = PythonOperator( + task_id="download_excel", + python_callable=download_excel, + ) + + convert_task = PythonOperator( + task_id="convert_to_csv", + python_callable=convert_to_csv, + ) + + load_task = PostgresOperator( + task_id="load_hcpcs", + postgres_conn_id="postgres_default", + sql=""" + DROP TABLE IF EXISTS sagerx_lake.cms_hcpcs_2020 CASCADE; + + CREATE TABLE sagerx_lake.cms_hcpcs_2020 ( + code TEXT, + action TEXT, + eff_date TEXT, + short_desc TEXT, + long_desc TEXT, + tos TEXT, + betos TEXT, + cov TEXT, + price TEXT, + xref_code TEXT, + asc_ind TEXT, + asc_date TEXT, + comments TEXT + ); + + COPY sagerx_lake.cms_hcpcs_2020 + FROM '/opt/airflow/data/cms_hcpcs_2020/HCPC2020_Corrections_Alpha.csv' + DELIMITER ',' CSV HEADER; + """, + ) + + download_task >> convert_task >> load_task diff --git a/airflow/dags/hcpc_corrections/load_sample.sql b/airflow/dags/hcpc_corrections/load_sample.sql new file mode 100644 index 0000000..2b7c22e --- /dev/null +++ b/airflow/dags/hcpc_corrections/load_sample.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS public.julius_hcpcs_corrections_2020 CASCADE; + +CREATE TABLE public.julius_hcpcs_corrections_2020 ( + hcpcs_code TEXT, + description TEXT, + action TEXT +); diff --git a/docker-compose.yml b/docker-compose.yml index 7b00e23..3c4f4e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -111,7 +111,8 @@ services: airflow-webserver: <<: *airflow-common container_name: airflow-webserver - command: webserver + command: > + bash -c "pip install openpyxl && airflow webserver" environment: <<: *airflow-common-env AWS_ACCESS_KEY_ID: ${ACCESS_KEY} @@ -119,17 +120,16 @@ services: AWS_DEST_BUCKET: ${DEST_BUCKET} ports: - 8001:8080 - airflow-scheduler: <<: *airflow-common container_name: airflow-scheduler - command: scheduler + command: > + bash -c "pip install openpyxl && airflow scheduler" environment: <<: *airflow-common-env AWS_ACCESS_KEY_ID: ${ACCESS_KEY} AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} AWS_DEST_BUCKET: ${DEST_BUCKET} - networks: airflow-dbt-network: - driver: bridge \ No newline at end of file + driver: bridge diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..794cc3d --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +openpyxl diff --git a/test_upload.txt b/test_upload.txt new file mode 100644 index 0000000..19a44f5 --- /dev/null +++ b/test_upload.txt @@ -0,0 +1 @@ +This is a test file from sagerx