Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes for the unified pipeline run in gentropy part #107

Merged
merged 12 commits into from
Feb 19, 2025
74 changes: 74 additions & 0 deletions src/ot_orchestration/dags/config/cluster_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Module defining common logic to build multiple cluster definitions for a single part of the unified pipeline."""

from __future__ import annotations

from dataclasses import dataclass

from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
)
from ot_orchestration.utils.common import GCP_PROJECT_PLATFORM
from ot_orchestration.utils.dataproc import create_cluster, delete_cluster
from ot_orchestration.utils.utils import create_cluster_name


@dataclass
class Cluster:
"""Box class to store the tasks to build the dataproc cluster and the reference to it's name."""

create: DataprocCreateClusterOperator
delete: DataprocDeleteClusterOperator
name: str


class ClusterRegistry:
"""CLuster registry object.

This registry allows to build a dictionary of DataprocClusterOperator tasks:
- `DataprocCreateClusterOperator`
- `DataprocDeleteClusterOperator`
"""

def __init__(self):
self.clusters: dict[str, Cluster] = {}

def _add_cluster(self, cluster_settings: dict):
"""Method to add cluster tasks to the cluster registry.

The original name of the cluster - `cluster_name` is used as a key for the registry,
the actual `cluster_name` is defined at a runtime with the `clean_cluster_name` function.
"""
cluster_name = cluster_settings["cluster_name"]
if not self.clusters.get(cluster_name):
clean_name = create_cluster_name(cluster_name)
cluster_settings.update({"cluster_name": clean_name})
c = create_cluster(
task_id=f"create_{cluster_name}",
project_id=GCP_PROJECT_PLATFORM,
**cluster_settings,
)
d = delete_cluster(
task_id=f"delete_{cluster_name}",
cluster_name=clean_name,
project_id=GCP_PROJECT_PLATFORM,
)
self.clusters[cluster_name] = Cluster(create=c, delete=d, name=clean_name)
return self

@classmethod
def from_dataproc_cluster_settings(cls, dataproc_cluster_settings: list[dict]) -> ClusterRegistry:
"""Build the cluster registry directly from the unified pipeline configuration.

Args:
dataproc_cluster_settings (list[dict]): reference to the unified pipeline configuration.

Returns:
ClusterRegistry: the registry with clusters defined in the dataproc_cluster_settings


"""
registry = cls()
for cluster_settings in dataproc_cluster_settings:
registry._add_cluster(cluster_settings)
return registry
82 changes: 61 additions & 21 deletions src/ot_orchestration/dags/config/gentropy.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
---

python_main_module: gs://genetics_etl_python_playground/initialisation/cli.py

dataproc_cluster_settings:
cluster_metadata:
GENTROPY_REF: '{gentropy_version}'
cluster_init_script: gs://genetics_etl_python_playground/initialisation/install_dependencies_on_cluster.sh
autoscaling_policy: otg-etl
allow_efm: false
num_workers: 2

- cluster_name: otg-etl
cluster_metadata:
GENTROPY_REF: 'v{gentropy_version}'
autoscaling_policy: otg-etl
num_workers: 2
idle_delete_ttl: 3600 # in seconds

- cluster_name: otg-etl-coloc
cluster_metadata:
GENTROPY_REF: 'v{gentropy_version}'
autoscaling_policy: otg-efm
worker_machine_type: n1-highmem-32
num_workers: 20
allow_efm: true

steps:
biosample:
cluster_name: otg-etl
params:
step: biosample_index
step.session.write_mode: ignore
Expand All @@ -24,6 +32,7 @@ steps:
step.biosample_index_path: '{release_uri}/output/biosample'

study:
cluster_name: otg-etl
params:
step: study_validation
step.session.write_mode: ignore
Expand All @@ -45,7 +54,7 @@ steps:
- gs://gwas_catalog_sumstats_pics/study_index
- gs://eqtl_catalogue_data/study_index
- gs://ukb_ppp_eur_data/study_index
- gs://finngen_data/r11/study_index
- gs://finngen_data/r12/study_index
step.target_index_path: '{release_uri}/output/target'
step.disease_index_path: '{release_uri}/output/disease'
step.biosample_index_path: '{release_uri}/output/biosample'
Expand All @@ -54,6 +63,7 @@ steps:
step.invalid_study_index_path: '{release_uri}/log/study_failed'

credible_set:
cluster_name: otg-etl
params:
step: credible_set_validation
step.session.write_mode: ignore
Expand All @@ -75,20 +85,23 @@ steps:
- INVALID_VARIANT_IDENTIFIER
- INVALID_CHROMOSOME
- TOP_HIT_AND_SUMMARY_STATS
step.trans_qtl_threshold: 5_000_000
# INPUTS
step.study_locus_path:
- gs://gwas_catalog_top_hits/credible_sets/
- gs://gwas_catalog_sumstats_pics/credible_sets/
- gs://gwas_catalog_sumstats_susie/credible_set_clean/20250204/
- gs://eqtl_catalogue_data/credible_set_datasets/eqtl_catalogue_susie_patched/
- gs://eqtl_catalogue_data/credible_set_datasets/eqtl_catalogue_susie_patched_v2/
- gs://ukb_ppp_eur_data/credible_set_clean/20250129/
- gs://finngen_data/r11/credible_set_datasets/susie/
- gs://finngen_data/r12/credible_set_datasets/susie/
step.study_index_path: '{release_uri}/output/study'
step.target_index_path: '{release_uri}/output/target'
# OUTPUTS
step.valid_study_locus_path: '{release_uri}/output/credible_set'
step.invalid_study_locus_path: '{release_uri}/log/credible_set_failed'

variant_partition:
cluster_name: otg-etl
params:
step: variant_to_vcf
step.partition_size: 2_000 # approximate num of variants per partition!
Expand Down Expand Up @@ -118,7 +131,7 @@ steps:
max_run_duration: '2h'
policy_specs:
machine_type: n1-standard-4
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:v{vep_version}
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{vep_version}
entrypoint: /bin/sh
params:
# INPUTS
Expand All @@ -128,6 +141,7 @@ steps:
vep_output_path: '{release_uri}/intermediate/variant_annotation'

variant:
cluster_name: otg-etl
params:
step: variant_index
step.session.write_mode: ignore
Expand All @@ -136,39 +150,41 @@ steps:
step.vep_output_json_path: '{release_uri}/intermediate/variant_annotation'
step.variant_annotations_path:
- gs://gnomad_data_2/v4.1/variant_index
- gs://otar013-ppp/OTAR2075_lof_curation/lof_curation_variant_annotations
# TODO: Temporary disabled until gnetropy is updated
# step.amino_acid_change_annotations:
# - gs://genetics_etl_python_playground/static_assets/foldx_variant_annotation
# - gs://otar013-ppp/OTAR2075_lof_curation/lof_curation_variant_annotations # PPP ONLY!
step.amino_acid_change_annotations:
- gs://otar013-ppp/OTAR2081_foldx/foldx_variant_annotation
# OUTPUTS
step.variant_index_path: '{release_uri}/output/variant'

colocalisation_coloc:
cluster_name: otg-etl-coloc
params:
step: colocalisation
step.session.write_mode: ignore
step.session.output_partitions: 25
step.colocalisation_method: Coloc
+step.colocalisation_method_params: '{priorc1: 1e-4, priorc2: 1e-4, priorc12: 1e-5}'
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"
+step.session.extended_spark_conf: "{spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
# OUTPUTS
step.coloc_path: '{release_uri}/output/colocalisation'

colocalisation_ecaviar:
cluster_name: otg-etl-coloc
params:
step: colocalisation
step.session.write_mode: ignore
step.session.output_partitions: 25
step.colocalisation_method: ECaviar
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"
+step.session.extended_spark_conf: "{spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
# OUTPUTS
step.coloc_path: '{release_uri}/output/colocalisation'

l2g_feature_matrix:
cluster_name: otg-etl
params:
step: locus_to_gene_feature_matrix
step.session.write_mode: ignore
Expand All @@ -182,30 +198,54 @@ steps:
# OUTPUTS
step.feature_matrix_path: '{release_uri}/output/l2g_feature_matrix'

l2g_training:
cluster_name: otg-etl
params:
step: locus_to_gene
step.session.write_mode: ignore
+step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}"
step.run_mode: train
step.hyperparameters.n_estimators: 100
step.hyperparameters.max_depth: 5
step.hyperparameters.loss: log_loss
step.wandb_run_name: '{l2g_training}'
step.hf_hub_repo_id: opentargets/locus_to_gene
step.hf_model_commit_message: 'chore: update model base model for {l2g_training} run'
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
step.variant_index_path: '{release_uri}/output/variant'
step.feature_matrix_path: '{release_uri}/output/l2g_feature_matrix'
step.gold_standard_curation_path: '{release_uri}/inputs/l2g/gold_standard.json'
step.gene_interactions_path: '{release_uri}/output/interaction'
# OUTPUTS
step.model_path: '{release_uri}/etc/model/locus_to_gene_model/classifier.skops'

l2g_prediction:
cluster_name: otg-etl
params:
step: locus_to_gene
step.session.write_mode: ignore
step.session.output_partitions: 1
step.run_mode: predict
step.l2g_threshold: 0.05
# INPUTS
step.download_from_hub: false
step.hf_hub_repo_id: opentargets/locus_to_gene
# INPUTS
step.feature_matrix_path: '{release_uri}/output/l2g_feature_matrix'
step.credible_set_path: '{release_uri}/output/credible_set'
step.model_path: '{release_uri}/input/l2g_prediction/classifier.skops'
step.model_path: '{release_uri}/etc/model/locus_to_gene_model/classifier.skops'
# OUTPUTS
step.predictions_path: '{release_uri}/output/l2g_prediction'

l2g_evidence:
cluster_name: otg-etl
params:
step: locus_to_gene_evidence
step.session.write_mode: ignore
step.locus_to_gene_threshold: 0.05
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
step.locus_to_gene_predictions_path: '{release_uri}/output/l2g_prediction'
step.study_index_path: '{release_uri}/output/genetics/parquet/study_index'
step.study_index_path: '{release_uri}/output/study_index'
# OUTPUTS
step.evidence_output_path: '{release_uri}/intermediate/evidence/gentropy'
10 changes: 5 additions & 5 deletions src/ot_orchestration/dags/config/pis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ steps:
source: http://purl.obolibrary.org/obo/go.obo
destination: go/go.obo

l2g_gold_standard:
- name: download L2G gold standard
source: gs://genetics-portal-dev-analysis/xg1/L2G_intervals/training_set_v5_1_efo.json
destination: l2g/gold_standard.json

interaction:
- name: download ensembl interactions grch38
source: https://ftp.ensembl.org/pub/release-${ensembl_version}/gtf/homo_sapiens/Homo_sapiens.GRCh38.${ensembl_version}.chr.gtf.gz
Expand All @@ -228,11 +233,6 @@ steps:
source: gs://otar001-core/stringInteractions
destination: interaction/string-interactions.txt.gz

l2g_prediction:
- name: download model
source: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops
destination: l2g_prediction/classifier.skops

literature:
- name: download literature
source: https://ftp.ebi.ac.uk/pub/databases/pmc/DOI/PMID_PMCID_DOI.csv.gz
Expand Down
3 changes: 2 additions & 1 deletion src/ot_orchestration/dags/config/unified_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ def __init__(self) -> None:

# GENTROPY-specific settings.
self.gentropy_version = settings["gentropy_version"]
self.l2g_training = settings["l2g_training"]
self.vep_version = settings["vep_version"]
self.gentropy_config = self.init_gentropy_settings()
self.gentropy_dataproc_cluster_settings = self.gentropy_config["dataproc_cluster_settings"]
self.gentropy_step_list = [s for s in settings["steps"].keys() if s.startswith("gentropy_")]
self.gentropy_python_main_module = self.gentropy_config["python_main_module"]

def pis_config_uri(self, step_name: str) -> str:
"""Return the google cloud url of the PIS configuration file for a step."""
Expand Down Expand Up @@ -156,6 +156,7 @@ def init_gentropy_settings(self) -> dict[str, Any]:
return read_yaml_config(
self.gentropy_config_local_path,
sentinels={
"l2g_training": self.l2g_training,
"release_uri": self.release_uri,
"gentropy_version": self.gentropy_version,
"vep_version": self.vep_version,
Expand Down
12 changes: 11 additions & 1 deletion src/ot_orchestration/dags/config/unified_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ vep_version: '2.0.1'
chembl_version: '35'
efo_version: '3.71.0'
ensembl_version: '113'
l2g_training: '2025-02'

is_ppp: false

Expand Down Expand Up @@ -42,6 +43,7 @@ steps:
ppp_only: true
pis_expression:
pis_go:
pis_l2g_gold_standard:
pis_interaction:
pis_literature:
pis_openfda:
Expand Down Expand Up @@ -197,10 +199,18 @@ steps:
- gentropy_colocalisation_ecaviar
- gentropy_variant

gentropy_l2g_prediction:
gentropy_l2g_training:
depends_on:
- gentropy_credible_set
- gentropy_variant
- gentropy_l2g_feature_matrix
- pis_l2g_gold_standard
- etl_interaction

gentropy_l2g_prediction:
depends_on:
- gentropy_l2g_feature_matrix
- gentropy_l2g_training

gentropy_l2g_evidence:
depends_on:
Expand Down
Loading