Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes for the unified pipeline run in gentropy part #107

Merged
merged 12 commits into from
Feb 19, 2025
74 changes: 56 additions & 18 deletions src/ot_orchestration/dags/config/gentropy.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
---

python_main_module: gs://genetics_etl_python_playground/initialisation/cli.py

dataproc_cluster_settings:
cluster_metadata:
GENTROPY_REF: '{gentropy_version}'
cluster_init_script: gs://genetics_etl_python_playground/initialisation/install_dependencies_on_cluster.sh
autoscaling_policy: otg-etl
allow_efm: false
num_workers: 2

- cluster_name: otg-etl
cluster_metadata:
GENTROPY_REF: 'v{gentropy_version}'
autoscaling_policy: otg-etl
num_workers: 2
idle_delete_ttl: 3600 # in seconds

- cluster_name: otg-etl-coloc
cluster_metadata:
GENTROPY_REF: 'v{gentropy_version}'
autoscaling_policy: otg-efm
worker_machine_type: n1-highmem-32
num_workers: 20
allow_efm: true

steps:
biosample:
cluster_name: otg-etl
params:
step: biosample_index
step.session.write_mode: ignore
Expand All @@ -24,6 +32,7 @@ steps:
step.biosample_index_path: '{release_uri}/output/biosample'

study:
cluster_name: otg-etl
params:
step: study_validation
step.session.write_mode: ignore
Expand Down Expand Up @@ -54,6 +63,7 @@ steps:
step.invalid_study_index_path: '{release_uri}/log/study_failed'

credible_set:
cluster_name: otg-etl
params:
step: credible_set_validation
step.session.write_mode: ignore
Expand Down Expand Up @@ -89,6 +99,7 @@ steps:
step.invalid_study_locus_path: '{release_uri}/log/credible_set_failed'

variant_partition:
cluster_name: otg-etl
params:
step: variant_to_vcf
step.partition_size: 2_000 # approximate num of variants per partition!
Expand Down Expand Up @@ -118,7 +129,7 @@ steps:
max_run_duration: '2h'
policy_specs:
machine_type: n1-standard-4
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:v{vep_version}
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{vep_version}
entrypoint: /bin/sh
params:
# INPUTS
Expand All @@ -128,6 +139,7 @@ steps:
vep_output_path: '{release_uri}/intermediate/variant_annotation'

variant:
cluster_name: otg-etl
params:
step: variant_index
step.session.write_mode: ignore
Expand All @@ -136,39 +148,41 @@ steps:
step.vep_output_json_path: '{release_uri}/intermediate/variant_annotation'
step.variant_annotations_path:
- gs://gnomad_data_2/v4.1/variant_index
- gs://otar013-ppp/OTAR2075_lof_curation/lof_curation_variant_annotations
# TODO: Temporary disabled until gnetropy is updated
# step.amino_acid_change_annotations:
# - gs://genetics_etl_python_playground/static_assets/foldx_variant_annotation
# - gs://otar013-ppp/OTAR2075_lof_curation/lof_curation_variant_annotations # PPP ONLY!
step.amino_acid_change_annotations:
- gs://otar013-ppp/OTAR2081_foldx/foldx_variant_annotation
# OUTPUTS
step.variant_index_path: '{release_uri}/output/variant'

colocalisation_coloc:
cluster_name: otg-etl-coloc
params:
step: colocalisation
step.session.write_mode: ignore
step.session.output_partitions: 25
step.colocalisation_method: Coloc
+step.colocalisation_method_params: '{priorc1: 1e-4, priorc2: 1e-4, priorc12: 1e-5}'
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"
+step.session.extended_spark_conf: "{spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
# OUTPUTS
step.coloc_path: '{release_uri}/output/colocalisation'

colocalisation_ecaviar:
cluster_name: otg-etl-coloc
params:
step: colocalisation
step.session.write_mode: ignore
step.session.output_partitions: 25
step.colocalisation_method: ECaviar
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"
+step.session.extended_spark_conf: "{spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
# OUTPUTS
step.coloc_path: '{release_uri}/output/colocalisation'

l2g_feature_matrix:
cluster_name: otg-etl
params:
step: locus_to_gene_feature_matrix
step.session.write_mode: ignore
Expand All @@ -182,30 +196,54 @@ steps:
# OUTPUTS
step.feature_matrix_path: '{release_uri}/output/l2g_feature_matrix'

l2g_training:
cluster_name: otg-etl
params:
step: locus_to_gene
step.session.write_mode: ignore
+step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}"
step.run_mode: train
step.hyperparameters.n_estimators: 100
step.hyperparameters.max_depth: 5
step.hyperparameters.loss: log_loss
step.wandb_run_name: '{l2g_training}'
step.hf_hub_repo_id: opentargets/locus_to_gene
step.hf_model_commit_message: 'chore: update model base model for {l2g_training} run'
# INPUTS
step.credible_set_path: '{release_dir}/output/credible_set'
step.variant_index_path: '{release_dir}/output/variant'
step.feature_matrix_path: '{release_dir}/output/l2g_feature_matrix'
step.gold_standard_curation_path: '{release_dir}/inputs/l2g/gold_standard.json'
step.gene_interactions_path: '{release_dir}/output/interaction'
# OUTPUTS
step.model_path: '{release_dir}/etc/models/locus_to_gene_model/classifier.skops'

l2g_prediction:
cluster_name: otg-etl
params:
step: locus_to_gene
step.session.write_mode: ignore
step.session.output_partitions: 1
step.run_mode: predict
step.l2g_threshold: 0.05
# INPUTS
step.download_from_hub: false
step.hf_hub_repo_id: opentargets/locus_to_gene
# INPUTS
step.feature_matrix_path: '{release_uri}/output/l2g_feature_matrix'
step.credible_set_path: '{release_uri}/output/credible_set'
step.model_path: '{release_uri}/input/l2g_prediction/classifier.skops'
step.model_path: '{release_dir}/etc/models/locus_to_gene_model/classifier.skops'
# OUTPUTS
step.predictions_path: '{release_uri}/output/l2g_prediction'

l2g_evidence:
cluster_name: otg-etl
params:
step: locus_to_gene_evidence
step.session.write_mode: ignore
step.locus_to_gene_threshold: 0.05
# INPUTS
step.credible_set_path: '{release_uri}/output/credible_set'
step.locus_to_gene_predictions_path: '{release_uri}/output/l2g_prediction'
step.study_index_path: '{release_uri}/output/genetics/parquet/study_index'
step.study_index_path: '{release_uri}/output/study_index'
# OUTPUTS
step.evidence_output_path: '{release_uri}/intermediate/evidence/gentropy'
10 changes: 5 additions & 5 deletions src/ot_orchestration/dags/config/pis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ steps:
source: http://purl.obolibrary.org/obo/go.obo
destination: go/go.obo

gold_standard:
- name: download L2G gold standard
source: gs://genetics-portal-dev-analysis/xg1/L2G_intervals/training_set_v5_1_efo.json
destination: l2g/gold_standard.json

interaction:
- name: download ensembl interactions grch38
source: https://ftp.ensembl.org/pub/release-${ensembl_version}/gtf/homo_sapiens/Homo_sapiens.GRCh38.${ensembl_version}.chr.gtf.gz
Expand All @@ -228,11 +233,6 @@ steps:
source: gs://otar001-core/stringInteractions
destination: interaction/string-interactions.txt.gz

l2g_prediction:
- name: download model
source: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops
destination: l2g_prediction/classifier.skops

literature:
- name: download literature
source: https://ftp.ebi.ac.uk/pub/databases/pmc/DOI/PMID_PMCID_DOI.csv.gz
Expand Down
3 changes: 2 additions & 1 deletion src/ot_orchestration/dags/config/unified_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ def __init__(self) -> None:

# GENTROPY-specific settings.
self.gentropy_version = settings["gentropy_version"]
self.l2g_training = settings["l2g_training"]
self.vep_version = settings["vep_version"]
self.gentropy_config = self.init_gentropy_settings()
self.gentropy_dataproc_cluster_settings = self.gentropy_config["dataproc_cluster_settings"]
self.gentropy_step_list = [s for s in settings["steps"].keys() if s.startswith("gentropy_")]
self.gentropy_python_main_module = self.gentropy_config["python_main_module"]

def pis_config_uri(self, step_name: str) -> str:
"""Return the google cloud url of the PIS configuration file for a step."""
Expand Down Expand Up @@ -156,6 +156,7 @@ def init_gentropy_settings(self) -> dict[str, Any]:
return read_yaml_config(
self.gentropy_config_local_path,
sentinels={
"l2g_training": self.l2g_training,
"release_uri": self.release_uri,
"gentropy_version": self.gentropy_version,
"vep_version": self.vep_version,
Expand Down
11 changes: 10 additions & 1 deletion src/ot_orchestration/dags/config/unified_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ vep_version: '2.0.1'
chembl_version: '35'
efo_version: '3.71.0'
ensembl_version: '113'
l2g_training: '2025-02'

is_ppp: false

Expand Down Expand Up @@ -42,6 +43,7 @@ steps:
ppp_only: true
pis_expression:
pis_go:
pis_gold_standard:
pis_interaction:
pis_literature:
pis_openfda:
Expand Down Expand Up @@ -197,9 +199,16 @@ steps:
- gentropy_colocalisation_ecaviar
- gentropy_variant

gentropy_l2g_prediction:
gentropy_l2g_training:
depends_on:
- gentropy_credible_set
- gentropy_variant
- gentropy_l2g_feature_matrix
- pis_gold_standard
- etl_interaction

gentropy_l2g_prediction:
depends_on:
- gentropy_l2g_feature_matrix

gentropy_l2g_evidence:
Expand Down
52 changes: 37 additions & 15 deletions src/ot_orchestration/dags/unified_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ def etl_stage() -> None:
# ==============================================================================================
# Gentropy stage of the DAG.
#
# The process parses the list of dataproc_cluster_settings found in the gentropy.yaml
# to obtain all clusters required by the steps, then based on the `cluster_name` defined
# in each step it assigns the gentropy step to a correct cluster.
#
# c. Prepare the Gentropy Dataproc cluster.
# r. The Gentropy steps are run in parallel, as their prerequisites are met.
# There are different types of Gentropy steps. We match special cases by
Expand All @@ -262,20 +266,37 @@ def etl_stage() -> None:
# management functions into operators.
# ==============================================================================================
if len(config.gentropy_step_list):
gentropy_cluster_name = create_cluster_name("gentropy")
clusterless_steps = []

c = create_cluster(
cluster_name=gentropy_cluster_name,
project_id=GCP_PROJECT_PLATFORM,
**config.gentropy_dataproc_cluster_settings,
idle_delete_ttl=90 * 60,
)
clusters = {}
for cluster_settings in config.gentropy_dataproc_cluster_settings:
name = cluster_settings["cluster_name"]
clean_name = create_cluster_name(name)
# The name of the cluster must be adjusted to match the clean name
cluster_settings["cluster_name"] = clean_name
create_cluster_task_id = f"create_{name}_cluster"
delete_cluster_task_id = f"delete_{name}_cluster"
# Collect all cluster_create tasks by the original cluster names, so they can be
# referenced by the step `cluster_name` config.
labels = StepLabels(
"gentropy",
step_name="create_cluster",
is_ppp=config.is_ppp,
)
clusters[name] = {}
clusters[name]["create"] = create_cluster(
task_id=create_cluster_task_id,
project_id=GCP_PROJECT_PLATFORM,
labels=labels,
**cluster_settings,
)
clusters[name]["delete"] = delete_cluster(
task_id=delete_cluster_task_id,
project_id=GCP_PROJECT_PLATFORM,
cluster_name=cluster_settings["cluster_name"],
)
clusters[name]["create_id"] = create_cluster_task_id
clusters[name]["delete_id"] = delete_cluster_task_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I keep pushing for cluster management operators. You are extracting a big chunk of their responsibilities here; which will make the orchestrator much more confusing.

What about something like this? (This changes the whole gentropy part, lines 228:333).

    if len(config.gentropy_step_list):
        clusters = {}

        def _get_create_cluster(cluster_name: str):
            if not clusters.get(cluster_name):
                clusters[cluster_name]["create"] = create_cluster(
                    cluster_name=create_cluster_name(cluster_name),
                    project_id=GCP_PROJECT_PLATFORM,
                    **config.gentropy_dataproc_cluster_settings[cluster_name],
                    idle_delete_ttl=90 * 60,
                )
            return clusters[cluster_name]["create"]

        def _get_delete_cluster(cluster_name: str):
            if not clusters.get(cluster_name):
                clusters[cluster_name]["delete"] = delete_cluster(
                    cluster_name,
                    project_id=GCP_PROJECT_PLATFORM,
                )
            return clusters[cluster_name]["delete"]

        @task_group(group_id="gentropy_stage")
        def gentropy_stage() -> None:
            for step_name in config.gentropy_step_list:
                step_config = config.gentropy_step(step_name)
                labels = StepLabels("gentropy", step_name, config.is_ppp)

                match step_name:
                    case "gentropy_variant_annotation":
                        r = VepAnnotateOperator(
                            job_name=create_name("variant_annotation"),
                            task_id=f"run_{step_name}",
                            project_id=GCP_PROJECT_PLATFORM,
                            **step_config["params"],
                            google_batch=step_config["google_batch"],
                            labels=labels,
                        )
                        chain(r)
                    case _:
                        r = submit_gentropy_step(
                            cluster_name=config.gentropy_dataproc_cluster_settings.get(step_name, "default"),
                            step_name=step_name,
                            project_id=GCP_PROJECT_PLATFORM,
                            params=step_config["params"],
                            labels=labels,
                        )
                        chain(_get_create_cluster(step_name), r, _get_delete_cluster(step_name))

        r = gentropy_stage()

At least this way we reduce the complexity into those two functions. You should test it, I haven't run the code and it might require some changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the logic (hopefully it is more manageable now)


d = delete_cluster(
gentropy_cluster_name,
project_id=GCP_PROJECT_PLATFORM,
)
clusterless_steps = []

@task_group(group_id="gentropy_stage")
def gentropy_stage() -> None:
Expand All @@ -296,16 +317,17 @@ def gentropy_stage() -> None:
clusterless_steps.append(r)
case _:
r = submit_gentropy_step(
cluster_name=gentropy_cluster_name,
cluster_name=clusters[step_config["cluster_name"]],
step_name=step_name,
project_id=GCP_PROJECT_PLATFORM,
python_main_module=config.gentropy_python_main_module,
params=step_config["params"],
labels=labels,
)

steps[step_name] = r
if r not in clusterless_steps:
c = clusters[step_config["cluster_name"]]["create"]
d = clusters[step_config["cluster_name"]]["delete"]
chain(c, r, d)

r = gentropy_stage()
Expand Down
9 changes: 6 additions & 3 deletions src/ot_orchestration/utils/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def create_cluster(
allow_efm: bool = False,
idle_delete_ttl: int = 30 * 60,
labels: Labels | None = None,
task_id: str = "create_cluster",
**kwargs: Any,
) -> DataprocCreateClusterOperator:
"""Generate an Airflow task to create a Dataproc cluster. Common parameters are reused, and varying parameters can be specified as needed.
Expand All @@ -67,6 +68,7 @@ def create_cluster(
allow_efm (bool): Wether to allow for Enhanced Flexibility Mode in spark cluster to store the shuffle partitions in the primary workers only.
idle_delete_ttl (int): Time in seconds to wait before deleting the cluster after it becomes idle. Defaults to 30 minutes.
labels (Labels): Optional labels to add to the cluster.
task_id (str): task id used to during the dataproc cluster creation
**kwargs (Any): Other parameters to the ClusterGenerator.

NOTE: When `allow_efm` is enabled, the autoscaling policy can not use the graceful decommissioning for primary workers!
Expand Down Expand Up @@ -127,10 +129,9 @@ def create_cluster(
cluster_config[worker_section].setdefault("disk_config", {})
# Specify the number of local SSDs.
cluster_config[worker_section]["disk_config"]["num_local_ssds"] = num_local_ssds

# Return the cluster creation operator.
return DataprocCreateClusterOperator(
task_id="create_cluster",
task_id=task_id,
project_id=project_id,
cluster_config=cluster_config,
region=GCP_REGION,
Expand Down Expand Up @@ -286,19 +287,21 @@ def submit_job(

def delete_cluster(
cluster_name: str,
task_id: str = "delete_cluster",
project_id: str = GCP_PROJECT_GENETICS,
) -> DataprocDeleteClusterOperator:
"""Generate an Airflow task to delete a Dataproc cluster.

Args:
cluster_name (str): Name of the cluster.
task_id (str): Dataproc delete cluster task id.
project_id (str): Project ID. Defaults to GCP_PROJECT_GENETICS.

Returns:
DataprocDeleteClusterOperator: Airflow task to delete a Dataproc cluster.
"""
return DataprocDeleteClusterOperator(
task_id="delete_cluster",
task_id=task_id,
project_id=project_id,
cluster_name=cluster_name,
region=GCP_REGION,
Expand Down