Skip to content

Commit

Permalink
Merge pull request #109 from OHDSI/katy__refactors
Browse files Browse the repository at this point in the history
Refactors to improve performance and simplify SQL code
  • Loading branch information
katy-sadowski authored Jan 11, 2025
2 parents 87f347e + 12bda01 commit e5f072f
Show file tree
Hide file tree
Showing 18 changed files with 219 additions and 221 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ logs/
data/
.Rdata
.Rhistory
*.duckdb
*.duckdb
.dbt/
profiles.yml
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Users are welcomed, however, to utilize their own Synthea and/or OMOP vocabulary
source dbt-env/bin/activate # activate the environment for Mac and Linux OR
dbt-env\Scripts\activate # activate the environment for Windows
```
3. Set up your [profiles.yml file](https://docs.getdbt.com/docs/core/connect-data-platform/profiles.yml). You can either:
- Create a file in the `~/.dbt/` directory named `profiles.yml` (if you've already got this directory and file, you can skip this step and add profile block(s) for this project to that file)
- Create a `profiles.yml` file in the root of the `dbt-synthea` repo folder
- Create the file wherever you wish, following the guidance [here](https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory)

### DuckDB Setup
1. In your virtual environment install requirements for duckdb (see [here for contents](./requirements/duckdb.in))
Expand All @@ -45,9 +49,7 @@ pip3 install -r requirements/duckdb.txt
pre-commit install
```

2. Set up your [profiles.yml file](https://docs.getdbt.com/docs/core/connect-data-platform/profiles.yml):
- Create a directory `.dbt` in your root directory if one doesn't exist already, then create a `profiles.yml` file in `.dbt`
- Add the following block to the file:
2. Add the following block to your `profiles.yml` file:
```yaml
synthea_omop_etl:
outputs:
Expand Down Expand Up @@ -101,9 +103,7 @@ pre-commit install
```
2. Set up a local Postgres database with a dedicated schema for developing this project (e.g. `dbt_synthea_dev`)

3. Set up your [profiles.yml file](https://docs.getdbt.com/docs/core/connect-data-platform/profiles.yml):
- Create a directory `.dbt` in your root directory if one doesn't exist already, then create a `profiles.yml` file in `.dbt`
- Add the following block to the file:
3. Add the following block to your `profiles.yml` file:
```yaml
synthea_omop_etl:
outputs:
Expand Down
32 changes: 29 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,43 @@ vars:

models:
synthea_omop_etl:
+materialized: table
intermediate:
+materialized: table
+docs:
node_color: '#FBC511'
omop:
+materialized: table
+docs:
node_color: '#EB6622'
staging:
synthea:
+materialized: view
+docs:
node_color: '#336B91'
vocabulary:
+materialized: view
+docs:
node_color: '#336B91'
map:
+materialized: view
+docs:
node_color: '#336B91'

seeds:
+quote_columns: true
synthea_omop_etl:
map:
+enabled: true
+schema: map_seeds
+docs:
node_color: '#69AED5'
vocabulary:
+enabled: true
+schema: vocab_seeds
+docs:
node_color: '#69AED5'
synthea:
+enabled: true
+schema: synthea_seeds
+quote_columns: true
+schema: synthea_seeds
+docs:
node_color: '#69AED5'
10 changes: 9 additions & 1 deletion models/intermediate/int__drug_immunisations.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
SELECT
i.patient_id
p.person_id
, i.patient_id
, i.encounter_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, srctostdvm.target_concept_id AS drug_concept_id
, {{ dbt.cast("i.immunization_date", api.Column.translate_type("date")) }} AS drug_exposure_start_date
, i.immunization_date AS drug_exposure_start_datetime
Expand Down Expand Up @@ -31,3 +35,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
ON
i.immunization_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'CVX'
INNER JOIN {{ ref ('int__person') }} AS p
ON i.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON i.encounter_id = epr.encounter_id
10 changes: 9 additions & 1 deletion models/intermediate/int__drug_medications.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
SELECT
m.patient_id
p.person_id
, m.patient_id
, m.encounter_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, srctostdvm.target_concept_id AS drug_concept_id
, m.medication_start_date AS drug_exposure_start_date
, m.medication_start_datetime AS drug_exposure_start_datetime
Expand Down Expand Up @@ -35,3 +39,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
ON
m.medication_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'RxNorm'
INNER JOIN {{ ref ('int__person') }} AS p
ON m.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON m.encounter_id = epr.encounter_id
8 changes: 7 additions & 1 deletion models/intermediate/int__encounter_provider.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
{# This bit of SQL gets reused several times in the OMOP layer #}
SELECT
e.patient_id
p.person_id
, e.patient_id
, e.encounter_id
, vid.visit_occurrence_id_new AS visit_occurrence_id
, pr.provider_id
FROM {{ ref ('stg_synthea__encounters') }} AS e
INNER JOIN {{ ref ('provider') }} AS pr
ON e.provider_id = pr.provider_source_value
INNER JOIN {{ ref ('int__person') }} AS p
ON e.patient_id = p.person_source_value
INNER JOIN {{ ref( 'int__final_visit_ids') }} AS vid
ON e.encounter_id = vid.encounter_id
58 changes: 58 additions & 0 deletions models/intermediate/int__measurement_observations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
SELECT
p.person_id
, srctostdvm.target_concept_id AS measurement_concept_id
, o.observation_date AS measurement_date
, o.observation_datetime AS measurement_datetime
, {{ dbt.cast("o.observation_datetime", api.Column.translate_type("time")) }} AS measurement_time
, 32827 AS measurement_type_concept_id
, 0 AS operator_concept_id
, CASE
WHEN {{ regexp_like("o.observation_value", "^[-+]?[0-9]+\.?[0-9]*$") }}
THEN {{ dbt.cast("o.observation_value", api.Column.translate_type("decimal")) }}
ELSE {{ dbt.cast("null", api.Column.translate_type("decimal")) }}
END AS value_as_number
, coalesce(srcmap2.target_concept_id, 0) AS value_as_concept_id
, coalesce(srcmap1.target_concept_id, 0) AS unit_concept_id
, {{ dbt.cast("null", api.Column.translate_type("decimal")) }} AS range_low
, {{ dbt.cast("null", api.Column.translate_type("decimal")) }} AS range_high
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, o.observation_code AS measurement_source_value
, coalesce(
srctosrcvm.source_concept_id, 0
) AS measurement_source_concept_id
, o.observation_units AS unit_source_value
, o.observation_value AS value_source_value
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS unit_source_concept_id
, {{ dbt.cast("null", api.Column.translate_type("bigint")) }} AS measurement_event_id
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS meas_event_field_concept_id
FROM {{ ref ('stg_synthea__observations') }} AS o
INNER JOIN {{ ref ('int__source_to_standard_vocab_map') }} AS srctostdvm
ON
o.observation_code = srctostdvm.source_code
AND srctostdvm.target_domain_id = 'Measurement'
AND srctostdvm.source_vocabulary_id = 'LOINC'
AND srctostdvm.target_standard_concept = 'S'
AND srctostdvm.target_invalid_reason IS null
LEFT JOIN {{ ref ('int__source_to_standard_vocab_map') }} AS srcmap1
ON
o.observation_units = srcmap1.source_code
AND srcmap1.target_vocabulary_id = 'UCUM'
AND srcmap1.source_vocabulary_id = 'UCUM'
AND srcmap1.target_standard_concept = 'S'
AND srcmap1.target_invalid_reason IS null
LEFT JOIN {{ ref ('int__source_to_standard_vocab_map') }} AS srcmap2
ON
o.observation_value = srcmap2.source_code
AND srcmap2.target_domain_id = 'Meas value'
AND srcmap2.target_standard_concept = 'S'
AND srcmap2.target_invalid_reason IS null
LEFT JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
ON
o.observation_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'LOINC'
INNER JOIN {{ ref ('int__person') }} AS p
ON o.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON o.encounter_id = epr.encounter_id
39 changes: 39 additions & 0 deletions models/intermediate/int__measurement_procedures.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
SELECT
p.person_id
, srctostdvm.target_concept_id AS measurement_concept_id
, pr.procedure_start_date AS measurement_date
, pr.procedure_start_datetime AS measurement_datetime
, {{ dbt.cast("pr.procedure_start_datetime", api.Column.translate_type("time")) }} AS measurement_time
, 32827 AS measurement_type_concept_id
, 0 AS operator_concept_id
, {{ dbt.cast("null", api.Column.translate_type("decimal")) }} AS value_as_number
, 0 AS value_as_concept_id
, 0 AS unit_concept_id
, {{ dbt.cast("null", api.Column.translate_type("decimal")) }} AS range_low
, {{ dbt.cast("null", api.Column.translate_type("decimal")) }} AS range_high
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, pr.procedure_code AS measurement_source_value
, srctosrcvm.source_concept_id AS measurement_source_concept_id
, {{ dbt.cast("null", api.Column.translate_type("varchar")) }} AS unit_source_value
, {{ dbt.cast("null", api.Column.translate_type("varchar")) }} AS value_source_value
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS unit_source_concept_id
, {{ dbt.cast("null", api.Column.translate_type("bigint")) }} AS measurement_event_id
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS meas_event_field_concept_id
FROM {{ ref ('stg_synthea__procedures') }} AS pr
INNER JOIN {{ ref ('int__source_to_standard_vocab_map') }} AS srctostdvm
ON
pr.procedure_code = srctostdvm.source_code
AND srctostdvm.target_domain_id = 'Measurement'
AND srctostdvm.source_vocabulary_id = 'SNOMED'
AND srctostdvm.target_standard_concept = 'S'
AND srctostdvm.target_invalid_reason IS null
INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
ON
pr.procedure_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'SNOMED'
INNER JOIN {{ ref ('int__person') }} AS p
ON pr.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON pr.encounter_id = epr.encounter_id
10 changes: 9 additions & 1 deletion models/intermediate/int__observation_allergies.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
SELECT
a.patient_id
p.person_id
, a.patient_id
, a.encounter_id
, srctostdvm.target_concept_id AS observation_concept_id
, a.allergy_start_date AS observation_date
, a.allergy_start_date AS observation_datetime
, 32827 AS observation_type_concept_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, a.allergy_code AS observation_source_value
, srctosrcvm.source_concept_id AS observation_source_concept_id
FROM {{ ref ('stg_synthea__allergies') }} AS a
Expand All @@ -20,3 +24,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
a.allergy_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'SNOMED'
AND srctosrcvm.source_domain_id = 'Observation'
INNER JOIN {{ ref ('int__person') }} AS p
ON a.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON a.encounter_id = epr.encounter_id
10 changes: 9 additions & 1 deletion models/intermediate/int__observation_conditions.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
SELECT
c.patient_id
p.person_id
, c.patient_id
, c.encounter_id
, srctostdvm.target_concept_id AS observation_concept_id
, c.condition_start_date AS observation_date
, c.condition_start_date AS observation_datetime
, 38000280 AS observation_type_concept_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, c.condition_code AS observation_source_value
, srctosrcvm.source_concept_id AS observation_source_concept_id
FROM {{ ref ('stg_synthea__conditions') }} AS c
Expand All @@ -20,3 +24,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
c.condition_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'SNOMED'
AND srctosrcvm.source_domain_id = 'Observation'
INNER JOIN {{ ref ('int__person') }} AS p
ON c.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON c.encounter_id = epr.encounter_id
10 changes: 9 additions & 1 deletion models/intermediate/int__observation_observations.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
SELECT
o.patient_id
p.person_id
, o.patient_id
, o.encounter_id
, srctostdvm.target_concept_id AS observation_concept_id
, o.observation_date
, o.observation_datetime
, 38000280 AS observation_type_concept_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, o.observation_code AS observation_source_value
, srctosrcvm.source_concept_id AS observation_source_concept_id
FROM {{ ref ('stg_synthea__observations') }} AS o
Expand All @@ -20,3 +24,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
o.observation_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'LOINC'
AND srctosrcvm.source_domain_id = 'Observation'
INNER JOIN {{ ref ('int__person') }} AS p
ON o.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON o.encounter_id = epr.encounter_id
20 changes: 7 additions & 13 deletions models/omop/device_exposure.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
row_number() OVER (ORDER BY person_id) AS device_exposure_id
row_number() OVER (ORDER BY epr.person_id) AS device_exposure_id
, p.person_id
, srctostdvm.target_concept_id AS device_concept_id
, d.device_start_date AS device_exposure_start_date
Expand All @@ -10,9 +10,9 @@ SELECT
, d.udi AS unique_device_id
, {{ dbt.cast("null", api.Column.translate_type("varchar")) }} AS production_id
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS quantity
, pr.provider_id
, fv.visit_occurrence_id_new AS visit_occurrence_id
, fv.visit_occurrence_id_new + 1000000 AS visit_detail_id
, epr.provider_id
, epr.visit_occurrence_id
, epr.visit_occurrence_id + 1000000 AS visit_detail_id
, d.device_code AS device_source_value
, srctosrcvm.source_concept_id AS device_source_concept_id
, {{ dbt.cast("null", api.Column.translate_type("integer")) }} AS unit_concept_id
Expand All @@ -31,13 +31,7 @@ INNER JOIN {{ ref ('int__source_to_source_vocab_map') }} AS srctosrcvm
ON
d.device_code = srctosrcvm.source_code
AND srctosrcvm.source_vocabulary_id = 'SNOMED'
LEFT JOIN {{ ref ('int__final_visit_ids') }} AS fv
ON d.encounter_id = fv.encounter_id
LEFT JOIN {{ ref('stg_synthea__encounters') }} AS e
ON
d.encounter_id = e.encounter_id
AND d.patient_id = e.patient_id
LEFT JOIN {{ ref ('provider') }} AS pr
ON e.provider_id = pr.provider_source_value
INNER JOIN {{ ref ('person') }} AS p
INNER JOIN {{ ref ('int__person') }} AS p
ON d.patient_id = p.person_source_value
LEFT JOIN {{ ref ('int__encounter_provider') }} AS epr
ON d.encounter_id = epr.encounter_id
22 changes: 6 additions & 16 deletions models/omop/drug_exposure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ WITH all_drugs AS (
)

SELECT
row_number() OVER (ORDER BY p.person_id) AS drug_exposure_id
, p.person_id
row_number() OVER (ORDER BY person_id, drug_concept_id, drug_exposure_start_datetime) AS drug_exposure_id
, person_id
, drug_concept_id
, drug_exposure_start_date
, drug_exposure_start_datetime
Expand All @@ -21,22 +21,12 @@ SELECT
, sig
, route_concept_id
, lot_number
, pr.provider_id
, fv.visit_occurrence_id_new AS visit_occurrence_id
, fv.visit_occurrence_id_new + 1000000 AS visit_detail_id
, provider_id
, visit_occurrence_id
, visit_detail_id
, drug_source_value
, drug_source_concept_id
, route_source_value
, dose_unit_source_value
FROM
all_drugs AS ad
LEFT JOIN {{ ref ('int__final_visit_ids') }} AS fv
ON ad.encounter_id = fv.encounter_id
LEFT JOIN {{ ref ('stg_synthea__encounters') }} AS e
ON
ad.encounter_id = e.encounter_id
AND ad.patient_id = e.patient_id
LEFT JOIN {{ ref ('provider') }} AS pr
ON e.provider_id = pr.provider_source_value
INNER JOIN {{ ref ('person') }} AS p
ON ad.patient_id = p.person_source_value
all_drugs
Loading

0 comments on commit e5f072f

Please sign in to comment.