-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async_local backend and allow using an existing dview for local and async_local backends #311
base: master
Are you sure you want to change the base?
Conversation
# in fname new load in memmap order C | ||
cm.stop_server(dview=dview) | ||
c, dview, n_processes = cm.cluster.setup_cluster( | ||
backend="local", n_processes=None, single_thread=False | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure what this part is for; it doesn't really work with the changes here, so I just deleted it, but we can try to do something else if it's important.
Thanks! I haven't used the |
Sure, no worries! I also haven't used that module before today. Thus I'm not 100% sure there won't be issues with it, but so far it seems pretty straightforward. If it helps, I used this toy script to help convince myself that it's doing the right thing: from concurrent.futures import ThreadPoolExecutor
from typing import Iterable
from multiprocessing.pool import Pool
import time
from caiman.cluster import setup_cluster, stop_server
def calc_square(x: int) -> int:
return x ** 2
def calc_squares(xs: Iterable[int], pool: Pool):
time.sleep(10)
return pool.map_async(calc_square, xs).get()
if __name__ == '__main__':
_, pool, n_procs = setup_cluster(backend="multiprocessing", n_processes=4)
t0 = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:
future0 = executor.submit(calc_squares, range(2), pool)
future1 = executor.submit(calc_squares, range(2, 4), pool)
print(future0.result())
print(future1.result())
t1 = time.time()
print(f'Executed in {t1-t0:.2f} seconds')
stop_server(dview=pool) |
This makes me think whether we can use it to easily run blocking calculations in the background and visualizing the results in real-time as they come in with fastplotlib 🤔 |
This is a feature I added for myself, and I think it may be useful for others so I'm offering it here.
Problem: mesmerize is currently inflexible with regard to how the parallel processing is set up. Running an item always opens a new multiprocessing pool; you can control the number of processes through the MESMERIZE_N_PROCESSES environment variable, but that's it. I wanted to have the ability to a) pass in an existing cluster to multiple runs, to save overhead, and/or b) use a different type of cluster (e.g. an ipyparallel cluster spanning multiple nodes, or even a load-balanced view).
Solution: Passing a pool or dview into the run function clearly won't work with a subprocess, so the subprocess and slurm backends are out. The local backend calls the function directly, but it has the disadvantage that it blocks, so only one gridsearch run can be done at a time. However, we can get around that by spawning a thread (again, not a subprocess; the cluster objects can't be pickled).
I added a new backend called "local_async," which just launches a local run in a thread using the
concurrent.futures
module from the standard library. I also made it possible to pass a dview into both the local and local_async backends. I factored out some boilerplate from the 3 algorithm files into a_utils.py
file that launches a new multiprocessing pool if no dview was passed in (and closes it when finished), and otherwise just forwards what was passed in.Finally, I added a test that compares results from the 3 non-SLURM backends.
The diff is not really as big as Github is saying; there are a lot of whitespace changes, since I added a context manager in the 3 algorithms files, but checking the box to hide whitespace changes shows something more reasonable.