diff --git a/docs/checkpointing.md b/docs/checkpointing.md index 9c7f14e7..a2857e3c 100644 --- a/docs/checkpointing.md +++ b/docs/checkpointing.md @@ -79,10 +79,17 @@ submitted to a slurm job, which will be checkpointed and requeued at most `slurm (and any number of time if preempted): ```python import submitit +import clusterscope from .network import NetworkTraining # must be defined in an importable module! executor = submitit.AutoExecutor(folder="logs_training", slurm_max_num_timeout=3) -executor.update_parameters(timeout_min=30, slurm_partition="your_partition", - gpus_per_node=1, cpus_per_task=2) +slurm_partition = "your_partition" +# clusterscope assigns the proportionate amount of resources based on gpus/cpus being requested. +resources = clusterscope.job_gen_task_slurm(partition=slurm_partition, gpus_per_task=1, tasks_per_node=1) +executor.update_parameters(timeout_min=30, + slurm_partition=slurm_partition, + gpus_per_node=resources["gpus_per_task"] * resources["tasks_per_node"], + cpus_per_task=resources["cpus_per_task"], + mem_gb=resources["mem_gb"]) training_callable = NetworkTraining() job = executor.submit(training_callable, "some/path/for/checkpointing/your/network") ``` diff --git a/docs/examples/torch_distributed.py b/docs/examples/torch_distributed.py index 91f62b4a..9acad67a 100755 --- a/docs/examples/torch_distributed.py +++ b/docs/examples/torch_distributed.py @@ -10,6 +10,7 @@ import sys import time +import clusterscope import torch import submitit @@ -69,11 +70,17 @@ def checkpoint(self): def main(): executor = submitit.AutoExecutor(folder=LOGS_DIR) + gpus_per_node = NUM_TASKS_PER_NODE + tasks_per_node = NUM_TASKS_PER_NODE + gpus_per_task = gpus_per_node // tasks_per_node + # clusterscope assigns the proportionate amount of resources based on gpus/cpus being requested. + resources = clusterscope.job_gen_task_slurm(partition=PARTITION, gpus_per_task=gpus_per_task) executor.update_parameters( nodes=NUM_NODES, - gpus_per_node=NUM_TASKS_PER_NODE, - tasks_per_node=NUM_TASKS_PER_NODE, - cpus_per_task=NUM_CPUS_PER_TASK, + gpus_per_node=gpus_per_node, + tasks_per_node=tasks_per_node, + cpus_per_task=resources["cpus_per_task"], + mem_gb=resources["mem_gb"], slurm_partition=PARTITION, ) task = Task() diff --git a/docs/mnist.py b/docs/mnist.py index 68db253f..d9b54df6 100644 --- a/docs/mnist.py +++ b/docs/mnist.py @@ -10,6 +10,7 @@ import time from pathlib import Path +import clusterscope import numpy as np from sklearn.datasets import fetch_openml from sklearn.linear_model import LogisticRegression @@ -125,7 +126,14 @@ def main(): # Specify the job requirements. # Reserving only as much resource as you need ensure the cluster resource are # efficiently allocated. - ex.update_parameters(mem_gb=1, cpus_per_task=4, timeout_min=5) + resources = { + "cpus_per_task": 4, + "mem_gb": 1, + } + if ex.slurm_partition: + # clusterscope assigns the proportionate amount of resources based on gpus/cpus being requested. + resources = clusterscope.job_gen_task_slurm(partition=ex.slurm_partition, cpus_per_task=4) + ex.update_parameters(mem_gb=resources["mem_gb"], cpus_per_task=resources["cpus_per_task"], timeout_min=5) job = ex.submit(trainer, 5000, model_path=model_path) print(f"Scheduled {job}.") diff --git a/integration/preemption.py b/integration/preemption.py index 288e11cc..fff0eb34 100644 --- a/integration/preemption.py +++ b/integration/preemption.py @@ -13,6 +13,7 @@ from datetime import datetime from pathlib import Path +import clusterscope import submitit from submitit import AutoExecutor, Job from submitit.core import test_core @@ -45,10 +46,12 @@ def clock(partition: str, duration: int): def pascal_job(partition: str, timeout_min: int, node: str = "") -> Job: """Submit a job with specific constraint that we can preempt deterministically.""" ex = submitit.AutoExecutor(folder=LOGS, slurm_max_num_timeout=1) + # clusterscope assigns the proportionate amount of resources based on gpus/cpus being requested. + resources = clusterscope.job_gen_task_slurm(partition=partition, cpus_per_task=50) ex.update_parameters( name=f"submitit_preemption_{partition}", timeout_min=timeout_min, - mem_gb=7, + mem_gb=resources["mem_gb"], slurm_constraint="pascal", slurm_comment="submitit integration test", slurm_partition=partition, @@ -56,7 +59,7 @@ def pascal_job(partition: str, timeout_min: int, node: str = "") -> Job: slurm_mail_user=f"{getpass.getuser()}+slurm@meta.com", # pascal nodes have 80 cpus. # By requesting 50 we now that their can be only one such job with this property. - cpus_per_task=50, + cpus_per_task=resources["cpus_per_task"], slurm_additional_parameters={}, ) if node: diff --git a/pyproject.toml b/pyproject.toml index a5fd6f84..d8f3ff8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dynamic = ["version", "description"] dependencies = [ "cloudpickle>=1.2.1", + "clusterscope", "typing_extensions>=3.7.4.2" ] # zip_safe = false