Skip to content

Commit db60da6

Browse files
authored
Specify dask worker memory limits (#843)
* Enable mem specification * Fixed docstring typo * Updated changelog * PEP8
1 parent ee7a870 commit db60da6

File tree

6 files changed

+8
-3
lines changed

6 files changed

+8
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88

99
#### Added
1010

11-
11+
- V2: Enable specification of Dask worker memory limits [#843](https://github.com/askap-vast/vast-pipeline/pull/843)
1212
- V2: Migrate pipeline to used a Dask.distributed.LocalCluster throughout [#816](https://github.com/askap-vast/vast-pipeline/pull/816)
1313
- V2: Add Dask.distributed support
1414
- V2: Use `django-postgres-copy` for database uploads [#803](https://github.com/askap-vast/vast-pipeline/pull/803)
@@ -58,6 +58,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5858

5959
#### List of PRs
6060

61+
- [#843](https://github.com/askap-vast/vast-pipeline/pull/843): feat: V2: Enable specification of Dask worker memory limits
6162
- [#833](https://github.com/askap-vast/vast-pipeline/pull/833): feat: V2: Limit associations upload to using num_io_workers
6263
- [#829](https://github.com/askap-vast/vast-pipeline/pull/829): feat: V2: Allow user specification of dask dashboard paramters and add some further logging to dask setup
6364
- [#817](https://github.com/askap-vast/vast-pipeline/pull/817): fix: V2: Updates to pairs calculation to make it work with Dask `LocalCluster`.

vast_pipeline/daskmanager/manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def _start_cluster():
1414
logger.info('Starting local Dask Cluster...')
1515
logger.info(f"n_workers: {s.DASK_NUM_WORKERS}")
1616
logger.info(f"threads_per_worker: {s.DASK_THREADS_PER_WORKER}")
17+
logger.info(f"memory per worker: {s.DASK_MEM_PER_WORKER}")
1718
logger.info(f"scheduler_host: {s.DASK_SCHEDULER_HOST}")
1819
logger.info(f"scheduler_port: {s.DASK_SCHEDULER_PORT}")
1920
logger.info(f"dashboard_host: {s.DASK_DASHBOARD_HOST}")
@@ -24,6 +25,7 @@ def _start_cluster():
2425
threads_per_worker=s.DASK_THREADS_PER_WORKER,
2526
host=s.DASK_SCHEDULER_HOST,
2627
scheduler_port=int(s.DASK_SCHEDULER_PORT),
28+
memory_limit=s.DASK_MEM_PER_WORKER,
2729
dashboard_address=f"{s.DASK_DASHBOARD_HOST}:{s.DASK_DASHBOARD_PORT}",
2830
)
2931
client = Client(cluster)

vast_pipeline/image/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from astropy.wcs.utils import proj_plane_pixel_scales
1515
from typing import Dict
1616

17-
from .utils import calc_condon_flux_errors, open_fits
17+
from .utils import calc_condon_flux_errors
1818

1919
from vast_pipeline import models
2020
from vast_pipeline.survey.translators import tr_selavy

vast_pipeline/management/commands/runlocalcluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
class Command(BaseCommand):
1212
"""
1313
This script will run a Dask LocalCluster on the IP and port
14-
sepcified in the settings. Use --help for usage.
14+
specified in the settings. Use --help for usage.
1515
"""
1616
help = 'Run a Dask LocalCluster'
1717

webinterface/.env.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ SOCIAL_AUTH_GITHUB_ADMIN_TEAM=fillMeUp
2929
# DASK_DASHBOARD_PORT=fillMeUp
3030
DASK_NUM_WORKERS=14
3131
DASK_THREADS_PER_WORKER=1
32+
DASK_MEM_PER_WORKER='1GB'
3233

3334
# Pipeline
3435
PIPELINE_WORKING_DIR=pipeline-runs

webinterface/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@
253253
DASK_DASHBOARD_PORT = env('DASK_DASHBOARD_PORT', cast=str, default='8787')
254254
DASK_NUM_WORKERS = env('DASK_NUM_WORKERS', cast=int, default=14)
255255
DASK_THREADS_PER_WORKER = env('DASK_THREADS_PER_WORKER', cast=int, default=1)
256+
DASK_MEM_PER_WORKER = env('DASK_MEM_PER_WORKER', cast=str, default='1GB')
256257

257258
# Logging
258259
LOGGING = {

0 commit comments

Comments
 (0)