Skip to content

Commit

Permalink
Merge pull request #298 from proektlab/slurm
Browse files Browse the repository at this point in the history
Run items using SLURM; save batches with platform-independent paths
  • Loading branch information
kushalkolar authored Jul 16, 2024
2 parents 38f6ebe + b0740f2 commit 306f702
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,4 @@ dmypy.json
# test files
tests/tmp
tests/videos

tests/ground_truths*
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependencies:
- jupyterlab
- tqdm
- psutil
- filelock >= 3.15.1
1 change: 1 addition & 0 deletions environment_rtd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependencies:
- tqdm
- psutil
- pydata-sphinx-theme < 0.10.0
- filelock >= 3.15.1
29 changes: 11 additions & 18 deletions mesmerize_core/algorithms/cnmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from caiman.source_extraction.cnmf.params import CNMFParams
import psutil
import numpy as np
import pandas as pd
import traceback
from pathlib import Path
from pathlib import Path, PurePosixPath
from shutil import move as move_file
import os
import time
from datetime import datetime

# prevent circular import
if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
Expand All @@ -27,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
set_parent_raw_data_path(data_path)

df = load_batch(batch_path)
item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)

input_movie_path = item["input_movie_path"]
# resolve full path
Expand Down Expand Up @@ -115,13 +113,14 @@ def run_algo(batch_path, uuid, data_path: str = None):
Yr._mmap.close() # accessing private attr but windows is annoying otherwise
move_file(fname_new, cnmf_memmap_path)

cnmf_hdf5_path = output_path.relative_to(output_dir.parent)
cnmf_memmap_path = cnmf_memmap_path.relative_to(output_dir.parent)
corr_img_path = corr_img_path.relative_to(output_dir.parent)
# save paths as relative path strings with forward slashes
cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent)))
cnmf_memmap_path = str(PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)))
corr_img_path = str(PurePosixPath(corr_img_path.relative_to(output_dir.parent)))
for proj_type in proj_paths.keys():
d[f"{proj_type}-projection-path"] = proj_paths[proj_type].relative_to(
d[f"{proj_type}-projection-path"] = str(PurePosixPath(proj_paths[proj_type].relative_to(
output_dir.parent
)
)))

d.update(
{
Expand All @@ -137,15 +136,9 @@ def run_algo(batch_path, uuid, data_path: str = None):
d = {"success": False, "traceback": traceback.format_exc()}

cm.stop_server(dview=dview)

# Add dictionary to output column of series
df.loc[df["uuid"] == uuid, "outputs"] = [d]
# Add ran timestamp to ran_time column of series
df.loc[df["uuid"] == uuid, "ran_time"] = datetime.now().isoformat(timespec="seconds", sep="T")
df.loc[df["uuid"] == uuid, "algo_duration"] = str(round(time.time() - algo_start, 2)) + " sec"
# save dataframe to disc
df.to_pickle(batch_path)


runtime = round(time.time() - algo_start, 2)
df.caiman.update_item_with_results(uuid, d, runtime)

@click.command()
@click.option("--batch-path", type=str)
Expand Down
17 changes: 6 additions & 11 deletions mesmerize_core/algorithms/cnmfe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from caiman.source_extraction.cnmf.params import CNMFParams
import psutil
import traceback
from pathlib import Path
from pathlib import Path, PurePosixPath
from shutil import move as move_file
import os
import time
from datetime import datetime

if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
from mesmerize_core import set_parent_raw_data_path, load_batch
Expand All @@ -24,7 +23,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
set_parent_raw_data_path(data_path)

df = load_batch(batch_path)
item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)

input_movie_path = item["input_movie_path"]
# resolve full path
Expand Down Expand Up @@ -106,7 +105,8 @@ def run_algo(batch_path, uuid, data_path: str = None):
Yr._mmap.close() # accessing private attr but windows is annoying otherwise
move_file(fname_new, cnmf_memmap_path)

cnmfe_memmap_path = cnmf_memmap_path.relative_to(output_dir.parent)
# save path as relative path strings with forward slashes
cnmfe_memmap_path = str(PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)))

d.update(
{
Expand All @@ -121,13 +121,8 @@ def run_algo(batch_path, uuid, data_path: str = None):

cm.stop_server(dview=dview)

# Add dictionary to output column of series
df.loc[df["uuid"] == uuid, "outputs"] = [d]
# Add ran timestamp to ran_time column of series
df.loc[df["uuid"] == uuid, "ran_time"] = datetime.now().isoformat(timespec="seconds", sep="T")
df.loc[df["uuid"] == uuid, "algo_duration"] = str(round(time.time() - algo_start, 2)) + " sec"
# save dataframe to disc
df.to_pickle(batch_path)
runtime = round(time.time() - algo_start, 2)
df.caiman.update_item_with_results(uuid, d, runtime)


@click.command()
Expand Down
28 changes: 10 additions & 18 deletions mesmerize_core/algorithms/mcorr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
from caiman.motion_correction import MotionCorrect
from caiman.summary_images import local_correlations_movie_offline
import psutil
import pandas as pd
import os
from pathlib import Path
from pathlib import Path, PurePosixPath
import numpy as np
from shutil import move as move_file
import time
from datetime import datetime


# prevent circular import
if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
Expand All @@ -28,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
batch_path = Path(batch_path)
df = load_batch(batch_path)

item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)
# resolve full path
input_movie_path = str(df.paths.resolve(item["input_movie_path"]))

Expand Down Expand Up @@ -123,14 +120,14 @@ def run_algo(batch_path, uuid, data_path: str = None):
shift_path = output_dir.joinpath(f"{uuid}_shifts.npy")
np.save(str(shift_path), shifts)

# relative paths
cn_path = cn_path.relative_to(output_dir.parent)
mcorr_memmap_path = mcorr_memmap_path.relative_to(output_dir.parent)
shift_path = shift_path.relative_to(output_dir.parent)
# save paths as relative path strings with forward slashes
cn_path = str(PurePosixPath(cn_path.relative_to(output_dir.parent)))
mcorr_memmap_path = str(PurePosixPath(mcorr_memmap_path.relative_to(output_dir.parent)))
shift_path = str(PurePosixPath(shift_path.relative_to(output_dir.parent)))
for proj_type in proj_paths.keys():
d[f"{proj_type}-projection-path"] = proj_paths[proj_type].relative_to(
d[f"{proj_type}-projection-path"] = str(PurePosixPath(proj_paths[proj_type].relative_to(
output_dir.parent
)
)))

d.update(
{
Expand All @@ -148,13 +145,8 @@ def run_algo(batch_path, uuid, data_path: str = None):

cm.stop_server(dview=dview)

# Add dictionary to output column of series
df.loc[df["uuid"] == uuid, "outputs"] = [d]
# Add ran timestamp to ran_time column of series
df.loc[df["uuid"] == uuid, "ran_time"] = datetime.now().isoformat(timespec="seconds", sep="T")
df.loc[df["uuid"] == uuid, "algo_duration"] = str(round(time.time() - algo_start, 2)) + " sec"
# Save DataFrame to disk
df.to_pickle(batch_path)
runtime = round(time.time() - algo_start, 2)
df.caiman.update_item_with_results(uuid, d, runtime)


@click.command()
Expand Down
7 changes: 4 additions & 3 deletions mesmerize_core/batch_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import os
from pathlib import Path
from typing import Union
Expand All @@ -10,7 +11,7 @@
PARENT_DATA_PATH: Path = None

COMPUTE_BACKEND_SUBPROCESS = "subprocess" #: subprocess backend
COMPUTE_BACKEND_SLURM = "slurm" #: SLURM backend, not yet implemented
COMPUTE_BACKEND_SLURM = "slurm" #: SLURM backend
COMPUTE_BACKEND_LOCAL = "local"

COMPUTE_BACKENDS = [COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_SLURM, COMPUTE_BACKEND_LOCAL]
Expand Down Expand Up @@ -232,10 +233,10 @@ def create_batch(path: Union[str, Path], remove_existing: bool = False) -> pd.Da
os.makedirs(Path(path).parent)

df = pd.DataFrame(columns=DATAFRAME_COLUMNS)
df.to_pickle(path) # save before adding platform-dependent batch path

df.paths.set_batch_path(path)

df.to_pickle(path)

return df


Expand Down
7 changes: 7 additions & 0 deletions mesmerize_core/caiman_extensions/_batch_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ class WrongAlgorithmExtensionError(Exception):

class DependencyError(Exception):
pass


class PreventOverwriteError(IndexError):
"""
Error thrown when trying to write to an existing batch file with a potential risk of removing existing rows.
"""
pass
15 changes: 14 additions & 1 deletion mesmerize_core/caiman_extensions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from uuid import UUID

from mesmerize_core.caiman_extensions._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, \
WrongAlgorithmExtensionError
WrongAlgorithmExtensionError, PreventOverwriteError


def validate(algo: str = None):
Expand All @@ -29,6 +29,19 @@ def wrapper(self, *args, **kwargs):
return dec


def _verify_and_lock_batch_file(func):
"""Acquires lock and ensures batch file has the same items as current df before calling wrapped function"""
@wraps(func)
def wrapper(instance, *args, **kwargs):
with instance._batch_lock:
disk_df = instance.reload_from_disk()
# check whether all the same UUIDs are present with the same indices
if not instance._df["uuid"].equals(disk_df["uuid"]):
raise PreventOverwriteError("Items on disk do not match current DataFrame; reload to synchronize")
return func(instance, *args, **kwargs)
return wrapper


def _index_parser(func):
@wraps(func)
def _parser(instance, *args, **kwargs):
Expand Down
Loading

0 comments on commit 306f702

Please sign in to comment.