Skip to content

Commit

Permalink
Full rewrite of the parallelization backend
Browse files Browse the repository at this point in the history
  • Loading branch information
asistradition committed Jun 9, 2022
1 parent 554a229 commit 8739a30
Show file tree
Hide file tree
Showing 28 changed files with 624 additions and 834 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ jobs:
- name: Test with pytest & coverage
run: |
python -m coverage run -m pytest
python -m coverage xml
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
2 changes: 1 addition & 1 deletion Tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Install required python libraries:
```
python -m pip install -r requirements.txt
```
Install required libraries for parallelization (running on a single machine requires only `python -m pip install pathos`):
Install required libraries for parallelization (running on a single machine requires only `python -m pip install joblib`):
```
python -m pip install -r requirements-multiprocessing.txt
```
Expand Down
9 changes: 9 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Change Log
==========

Inferelator v0.6.0 `June 9, 2022`
---------------------------------------

Code Refactoring:

- Refactored parallelization around joblib & dask
- Removed pathos


Inferelator v0.5.8 `February 23, 2022`
---------------------------------------

Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# -- Project information -----------------------------------------------------

project = 'inferelator'
copyright = '2019, Flatiron Institute'
copyright = '2022, Flatiron Institute'
author = 'Chris Jackson'

# The full version, including alpha/beta/rc tags
release = 'v0.5.8'
release = 'v0.6.0'


# -- General configuration ---------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions inferelator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
from inferelator.utils import inferelator_verbose_level
from inferelator.distributed.inferelator_mp import MPControl

from inferelator.workflows import amusr_workflow, single_cell_workflow, tfa_workflow, velocity_workflow
8 changes: 8 additions & 0 deletions inferelator/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ class AbstractController:
_controller_name = None
_controller_dask = False

# Does this method require setup
# Or can it be done on the fly
_require_initialization=False

# Does this method require a clean shutdown
# Or can we just abandon it to the GC
_require_shutdown=False

@classmethod
def name(cls):
"""
Expand Down
40 changes: 23 additions & 17 deletions inferelator/distributed/dask_cluster_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import absolute_import, division, print_function
import time
import os
import math
Expand All @@ -11,8 +10,8 @@

from inferelator import utils
from inferelator.utils import Validator as check
from inferelator.distributed import AbstractController
from inferelator.distributed.dask_functions import dask_map
from inferelator.distributed.dask_local_controller import DaskAbstract


_DEFAULT_NUM_JOBS = 1
_DEFAULT_THREADS_PER_WORKER = 1
Expand Down Expand Up @@ -99,7 +98,7 @@ def __init__(self, *args, **kwargs):
self._command_template = memory_limit_0(self._command_template)


class DaskHPCClusterController(AbstractController):
class DaskHPCClusterController(DaskAbstract):
"""
The DaskHPCClusterController launches a HPC cluster and connects as a client. By default it uses the SLURM workload
manager, but other workload managers may be used.
Expand All @@ -111,6 +110,7 @@ class DaskHPCClusterController(AbstractController):
"""
_controller_name = "dask-cluster"
_controller_dask = True
_require_initialization = True

client = None

Expand All @@ -123,7 +123,7 @@ class DaskHPCClusterController(AbstractController):

# Should any local workers be started on this node
_num_local_workers = 0
_runaway_protection = 3
_runaway_protection = 5
_local_worker_command = _DEFAULT_LOCAL_WORKER_CMD

# SLURM specific variables
Expand Down Expand Up @@ -161,14 +161,18 @@ def connect(cls, *args, **kwargs):
local_directory=cls._local_directory,
memory=cls._job_mem,
job_extra=cls._job_slurm_commands,
job_cls=SLURMJobNoMemLimit)
job_cls=SLURMJobNoMemLimit,
**kwargs)

cls.client = distributed.Client(cls._local_cluster, direct_to_workers=True)

cls._add_local_node_workers(cls._num_local_workers)
cls._tracker = WorkerTracker()

utils.Debug.vprint("Dask dashboard: {cl}".format(cl = cls.client.dashboard_link), level=0)
utils.Debug.vprint(
f"Dask dashboard active: {cls.client.dashboard_link}",
level=0
)

return True

Expand All @@ -177,9 +181,8 @@ def shutdown(cls):
cls.client.close()
cls._local_cluster.close()

@classmethod
def map(cls, func, *args, **kwargs):
return dask_map(func, *args, **kwargs)
cls.client = None
cls._local_cluster = None

@classmethod
def use_default_configuration(cls, known_config, n_jobs=1):
Expand All @@ -199,8 +202,9 @@ def use_default_configuration(cls, known_config, n_jobs=1):
setattr(cls, k, v)
cls._job_n = n_jobs
else:
msg = "Configuration {k} is unknown".format(k=known_config)
raise ValueError(msg)
raise ValueError(
f"Configuration {known_config} is unknown"
)

utils.Debug.vprint(cls._config_str(), level=1)

Expand Down Expand Up @@ -273,11 +277,13 @@ def set_processes(cls, process_count):
check.argument_integer(process_count, low=1)
cls._job_n = math.ceil(process_count / cls._job_n_workers)

utils.Debug.vprint("Using `set_processes` is not advised for the DASK CLUSTER configuration", level=0)
utils.Debug.vprint("Using `set_job_size_params` is highly preferred", level=0)
utils.Debug.vprint("Configured {n} jobs with {w} workers per job".format(n=cls._job_n, w=cls._job_n_workers),
level=0)

utils.Debug.vprint(
"Using `set_processes` is not advised for the DASK CLUSTER configuration, "
"Using `set_job_size_params` is highly preferred. "
f"Configured {cls._job_n} jobs with {cls._job_n_workers} workers per job.",
level=0
)

@classmethod
def add_worker_env_line(cls, line):
"""
Expand Down
Loading

0 comments on commit 8739a30

Please sign in to comment.