Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes for the unified pipeline run in gentropy part #107

Merged
merged 12 commits into from
Feb 19, 2025

Conversation

project-defiant
Copy link
Collaborator

@project-defiant project-defiant commented Feb 18, 2025

Context

  1. Allow to create multiple clusters in gentropy part of unified pipeline
  2. Unified pipeline configuration update to reflect changes to genentics_etl.yaml
  3. Addition of the l2g_training and pis_gold_standard steps for the training of the L2G model.

Notes about the coloc steps

Ability to run the colocalisation (coloc + ecaviar) steps within the gentropy part of the unified_pipelien requires to change the cluster configuration to allow for arbitrary number of primary workers. This is due to the fact that the cluster with ogt_etl autoscaling policy scales down secondary workers after overlaps step resulting in cache misses when the actual coloc methods are running.

To make sure the cache is only stored on the primary workers we need to use different autoscaling policy and enable the efm mode. The easiest solution for this problem with current code base it to create additional cluster only for coloc steps and delete it afterwards.

Note

There is a possibilitity to tweak the cluster autoscaling policy to decrease the scale down factor for the secondary workers, and/or update the number of workers (the EFM mode is not feasible with neighter solution, since it has to be requrested upon cluster creation). Both solutions (although affecting a single cluster) will have to be experimented on to see the most optimal solution for the run of the coloc steps. They would also require a way to set the DataprocUpdateClusterOperator in a correct locations (before and afer coloc steps) to upate the cluster.

@project-defiant project-defiant force-pushed the allow-multiple-dataproc-clusters branch from 8c5d6e9 to e1900ba Compare February 18, 2025 15:38
@project-defiant project-defiant changed the title feat: allow for multiple clusters in gentropy part of unified pipeline feat: changes for the unified pipeline run in gentropy part Feb 18, 2025
Comment on lines 265 to 297
gentropy_cluster_name = create_cluster_name("gentropy")
clusterless_steps = []

c = create_cluster(
cluster_name=gentropy_cluster_name,
project_id=GCP_PROJECT_PLATFORM,
**config.gentropy_dataproc_cluster_settings,
idle_delete_ttl=90 * 60,
)
clusters = {}
for cluster_settings in config.gentropy_dataproc_cluster_settings:
name = cluster_settings["cluster_name"]
clean_name = create_cluster_name(name)
# The name of the cluster must be adjusted to match the clean name
cluster_settings["cluster_name"] = clean_name
create_cluster_task_id = f"create_{name}_cluster"
delete_cluster_task_id = f"delete_{name}_cluster"
# Collect all cluster_create tasks by the original cluster names, so they can be
# referenced by the step `cluster_name` config.
labels = StepLabels(
"gentropy",
step_name="create_cluster",
is_ppp=config.is_ppp,
)
clusters[name] = {}
clusters[name]["create"] = create_cluster(
task_id=create_cluster_task_id,
project_id=GCP_PROJECT_PLATFORM,
labels=labels,
**cluster_settings,
)
clusters[name]["delete"] = delete_cluster(
task_id=delete_cluster_task_id,
project_id=GCP_PROJECT_PLATFORM,
cluster_name=cluster_settings["cluster_name"],
)
clusters[name]["create_id"] = create_cluster_task_id
clusters[name]["delete_id"] = delete_cluster_task_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I keep pushing for cluster management operators. You are extracting a big chunk of their responsibilities here; which will make the orchestrator much more confusing.

What about something like this? (This changes the whole gentropy part, lines 228:333).

    if len(config.gentropy_step_list):
        clusters = {}

        def _get_create_cluster(cluster_name: str):
            if not clusters.get(cluster_name):
                clusters[cluster_name]["create"] = create_cluster(
                    cluster_name=create_cluster_name(cluster_name),
                    project_id=GCP_PROJECT_PLATFORM,
                    **config.gentropy_dataproc_cluster_settings[cluster_name],
                    idle_delete_ttl=90 * 60,
                )
            return clusters[cluster_name]["create"]

        def _get_delete_cluster(cluster_name: str):
            if not clusters.get(cluster_name):
                clusters[cluster_name]["delete"] = delete_cluster(
                    cluster_name,
                    project_id=GCP_PROJECT_PLATFORM,
                )
            return clusters[cluster_name]["delete"]

        @task_group(group_id="gentropy_stage")
        def gentropy_stage() -> None:
            for step_name in config.gentropy_step_list:
                step_config = config.gentropy_step(step_name)
                labels = StepLabels("gentropy", step_name, config.is_ppp)

                match step_name:
                    case "gentropy_variant_annotation":
                        r = VepAnnotateOperator(
                            job_name=create_name("variant_annotation"),
                            task_id=f"run_{step_name}",
                            project_id=GCP_PROJECT_PLATFORM,
                            **step_config["params"],
                            google_batch=step_config["google_batch"],
                            labels=labels,
                        )
                        chain(r)
                    case _:
                        r = submit_gentropy_step(
                            cluster_name=config.gentropy_dataproc_cluster_settings.get(step_name, "default"),
                            step_name=step_name,
                            project_id=GCP_PROJECT_PLATFORM,
                            params=step_config["params"],
                            labels=labels,
                        )
                        chain(_get_create_cluster(step_name), r, _get_delete_cluster(step_name))

        r = gentropy_stage()

At least this way we reduce the complexity into those two functions. You should test it, I haven't run the code and it might require some changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the logic (hopefully it is more manageable now)

@javfg
Copy link
Member

javfg commented Feb 19, 2025

I like it, let's merge quickly and check it out. There are a couple of typos in the docstrings but I'll fix them in my subsequent PR.

@project-defiant project-defiant merged commit e0e31ce into dev Feb 19, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants