Skip to content

Commit b932280

Browse files
committed
add daskmanager
1 parent ea6e9ff commit b932280

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

pipeline/daskmanager/__init__.py

Whitespace-only changes.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# code from https://github.com/MoonVision/django-dask-demo
2+
3+
import logging
4+
5+
from dask.distributed import Client, LocalCluster
6+
from django.conf import settings as s
7+
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class Singleton(type):
13+
_instances = {}
14+
15+
def __call__(cls, *args, **kwargs):
16+
if cls not in cls._instances:
17+
cls._instances[cls] = (
18+
super(Singleton, cls).__call__(*args, **kwargs)
19+
)
20+
return cls._instances[cls]
21+
22+
23+
class DaskManager(metaclass=Singleton):
24+
def __init__(self):
25+
if not s.DASK_SCHEDULER_HOST and not s.DASK_SCHEDULER_PORT:
26+
# assume a local cluster
27+
logger.info('Starting local Dask Cluster')
28+
self.cluster = LocalCluster()
29+
self.client = Client()
30+
logger.info('Connected to local Dask Cluster')
31+
else:
32+
self.client = Client(
33+
f'{s.DASK_SCHEDULER_HOST}:{s.DASK_SCHEDULER_PORT}'
34+
)
35+
self.cluster = self.client.cluster
36+
logger.info('Connected to Dask Cluster')

0 commit comments

Comments
 (0)