diff --git a/mesmerize_core/__init__.py b/mesmerize_core/__init__.py index ae1f38c..02f9674 100644 --- a/mesmerize_core/__init__.py +++ b/mesmerize_core/__init__.py @@ -3,7 +3,6 @@ get_parent_raw_data_path, load_batch, create_batch, - save_results_safely ) from .caiman_extensions import * from pathlib import Path @@ -17,7 +16,6 @@ "get_parent_raw_data_path", "load_batch", "create_batch", - "save_results_safely", "CaimanDataFrameExtensions", "CaimanSeriesExtensions", "CNMFExtensions", diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index a24e987..4c71174 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -13,10 +13,10 @@ # prevent circular import if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess - from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely + from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS else: # when running with local backend - from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely + from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS @@ -25,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None): set_parent_raw_data_path(data_path) df = load_batch(batch_path) - item = df[df["uuid"] == uuid].squeeze() + item = df.caiman.uloc(uuid) input_movie_path = item["input_movie_path"] # resolve full path @@ -138,7 +138,7 @@ def run_algo(batch_path, uuid, data_path: str = None): cm.stop_server(dview=dview) runtime = round(time.time() - algo_start, 2) - save_results_safely(batch_path, uuid, d, runtime) + df.caiman.update_item_with_results(uuid, d, runtime) @click.command() @click.option("--batch-path", type=str) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 3fb7a87..e053869 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -11,10 +11,10 @@ import time if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess - from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely + from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS else: # when running with local backend - from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely + from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS @@ -23,7 +23,7 @@ def run_algo(batch_path, uuid, data_path: str = None): set_parent_raw_data_path(data_path) df = load_batch(batch_path) - item = df[df["uuid"] == uuid].squeeze() + item = df.caiman.uloc(uuid) input_movie_path = item["input_movie_path"] # resolve full path @@ -122,7 +122,7 @@ def run_algo(batch_path, uuid, data_path: str = None): cm.stop_server(dview=dview) runtime = round(time.time() - algo_start, 2) - save_results_safely(batch_path, uuid, d, runtime) + df.caiman.update_item_with_results(uuid, d, runtime) @click.command() diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 7ae88b7..0962d0f 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -13,9 +13,9 @@ # prevent circular import if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess - from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely + from mesmerize_core import set_parent_raw_data_path, load_batch else: # when running with local backend - from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely + from ..batch_utils import set_parent_raw_data_path, load_batch def run_algo(batch_path, uuid, data_path: str = None): @@ -25,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None): batch_path = Path(batch_path) df = load_batch(batch_path) - item = df[df["uuid"] == uuid].squeeze() + item = df.caiman.uloc(uuid) # resolve full path input_movie_path = str(df.paths.resolve(item["input_movie_path"])) @@ -146,7 +146,7 @@ def run_algo(batch_path, uuid, data_path: str = None): cm.stop_server(dview=dview) runtime = round(time.time() - algo_start, 2) - save_results_safely(batch_path, uuid, d, runtime) + df.caiman.update_item_with_results(uuid, d, runtime) @click.command() diff --git a/mesmerize_core/batch_utils.py b/mesmerize_core/batch_utils.py index 7c01f55..800a288 100644 --- a/mesmerize_core/batch_utils.py +++ b/mesmerize_core/batch_utils.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Union -from filelock import SoftFileLock, Timeout import pandas as pd from .utils import validate_path @@ -247,65 +246,3 @@ def get_full_raw_data_path(path: Union[Path, str]) -> Path: return PARENT_DATA_PATH.joinpath(path) return path - - -class PreventOverwriteError(IndexError): - """ - Error thrown when trying to write to an existing batch file with a potential risk of removing existing rows. - """ - pass - - -class BatchLock: - """Locks a batch file for safe writing, returning the dataframe in the target""" - TIMEOUT = 30 # must be consistent or else nested re-entrant locks break - def __init__(self, batch_path: Union[Path, str]): - self.lock = SoftFileLock(str(batch_path) + ".lock", is_singleton=True, timeout=self.TIMEOUT) - self.batch_path = batch_path - - def __enter__(self) -> pd.DataFrame: - self.lock.__enter__() # may throw Timeout - return load_batch(self.batch_path) - - def __exit__(self, exc_type, exc_value, traceback): - self.lock.__exit__(exc_type, exc_value, traceback) - - -def open_batch_for_safe_writing(batch_path: Union[Path, str]) -> BatchLock: - """Just a more self-documenting constructor""" - return BatchLock(batch_path) - - -def save_results_safely(batch_path: Union[Path, str], uuid, results: dict, runtime: float): - """ - Try to load the given batch and save results to the given item - Uses a file lock to ensure that no other process is writing to the same batch using this function, - which gives up after lock_timeout seconds. - """ - try: - with open_batch_for_safe_writing(batch_path) as df: - # Add dictionary to output column of series - df.loc[df["uuid"] == uuid, "outputs"] = [results] - # Add ran timestamp to ran_time column of series - df.loc[df["uuid"] == uuid, "ran_time"] = datetime.now().isoformat(timespec="seconds", sep="T") - df.loc[df["uuid"] == uuid, "algo_duration"] = str(runtime) + " sec" - - # save dataframe to disk - df.caiman.save_to_disk() - - except BaseException as e: - # Print a message with details in lieu of writing to the batch file - msg = f"Batch file could not be written to" - if isinstance(e, Timeout): - msg += f" (file locked for {BatchLock.TIMEOUT} seconds)" - elif isinstance(e, PreventOverwriteError): - msg += f" (items would be overwritten, even though file was locked)" - - if results["success"]: - output_dir = Path(batch_path).parent.joinpath(str(uuid)) - msg += f"\nRun succeeded; results are in {output_dir}." - else: - msg += f"\nRun failed.\n" - msg += results["traceback"] - - raise RuntimeError(msg) \ No newline at end of file diff --git a/mesmerize_core/caiman_extensions/_batch_exceptions.py b/mesmerize_core/caiman_extensions/_batch_exceptions.py index 780b09a..208f8d7 100644 --- a/mesmerize_core/caiman_extensions/_batch_exceptions.py +++ b/mesmerize_core/caiman_extensions/_batch_exceptions.py @@ -12,3 +12,10 @@ class WrongAlgorithmExtensionError(Exception): class DependencyError(Exception): pass + + +class PreventOverwriteError(IndexError): + """ + Error thrown when trying to write to an existing batch file with a potential risk of removing existing rows. + """ + pass \ No newline at end of file diff --git a/mesmerize_core/caiman_extensions/_utils.py b/mesmerize_core/caiman_extensions/_utils.py index 1f5ab05..f403bdd 100644 --- a/mesmerize_core/caiman_extensions/_utils.py +++ b/mesmerize_core/caiman_extensions/_utils.py @@ -3,7 +3,7 @@ from uuid import UUID from mesmerize_core.caiman_extensions._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, \ - WrongAlgorithmExtensionError + WrongAlgorithmExtensionError, PreventOverwriteError def validate(algo: str = None): @@ -29,6 +29,19 @@ def wrapper(self, *args, **kwargs): return dec +def _verify_and_lock_batch_file(func): + """Acquires lock and ensures batch file has the same items as current df before calling wrapped function""" + @wraps(func) + def wrapper(instance, *args, **kwargs): + with instance._batch_lock: + disk_df = instance.reload_from_disk() + # check whether all the same UUIDs are present with the same indices + if not instance._df["uuid"].equals(disk_df["uuid"]): + raise PreventOverwriteError("Items on disk do not match current DataFrame; reload to synchronize") + return func(instance, *args, **kwargs) + return wrapper + + def _index_parser(func): @wraps(func) def _parser(instance, *args, **kwargs): diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index db65779..b0cc838 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -13,17 +13,16 @@ import numpy as np import pandas as pd +from filelock import SoftFileLock, Timeout -from ._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, DependencyError -from ._utils import validate, _index_parser +from ._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, DependencyError, PreventOverwriteError +from ._utils import validate, _index_parser, _verify_and_lock_batch_file from ..batch_utils import ( COMPUTE_BACKENDS, COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_LOCAL, get_parent_raw_data_path, load_batch, - open_batch_for_safe_writing, - PreventOverwriteError ) from ..utils import validate_path, IS_WINDOWS, make_runfile, warning_experimental from .cnmf import cnmf_cache @@ -46,6 +45,8 @@ class CaimanDataFrameExtensions: def __init__(self, df: pd.DataFrame): self._df = df + self._batch_lock = SoftFileLock(str(df.paths.get_batch_path()) + ".lock", + timeout=30, is_singleton=True) def uloc(self, u: Union[str, UUID]) -> pd.Series: """ @@ -63,6 +64,7 @@ def uloc(self, u: Union[str, UUID]) -> pd.Series: return df_u.squeeze() + @_verify_and_lock_batch_file def add_item(self, algo: str, item_name: str, input_movie_path: Union[str, pd.Series], params: dict): """ Add an item to the DataFrame to organize parameters @@ -131,41 +133,87 @@ def add_item(self, algo: str, item_name: str, input_movie_path: Union[str, pd.Se self._df.loc[self._df.index.size] = s # Save DataFrame to disk - self.save_to_disk(max_index_diff=1) + self._save_to_disk_unsafe() - def save_to_disk(self, max_index_diff: int = 0): + @_verify_and_lock_batch_file + @_index_parser + def update_item(self, index: int, updates: Union[dict, pd.Series]): """ - Saves DataFrame to disk, copies to a backup before overwriting existing file. + Update the item with the given uuid with the data in updated_row and write to disk. + + Parameters + ---------- + index: int, str or UUID + The index of the batch item to update as a numerical ``int`` index, ``str`` representing + a UUID, or a UUID object. + + updates: dict or Series + Data to change in the selected row. Each key in updates (or index entry if it is a Series) + should match a column name. If not, that data will not be saved. """ - path: Path = self._df.paths.get_batch_path() + row = self._df.iloc[index] + row.update(updates) + self._df.iloc[index] = row + self.save_to_disk() + + def update_item_with_results(self, uuid: Union[str, UUID], results: dict, run_duration: float): + """Helper for algorithms to save their results to disk""" + updates = { + "outputs": results, + "ran_time": datetime.now().isoformat(timespec="seconds", sep="T"), + "algo_duration": str(run_duration) + " sec" + } + try: + # reload first because it should be safe since we have a UUID and we want to + # avoid failing to save results just because other items were added/removed + with self._batch_lock: + self._df = self.reload_from_disk() + self.update_item(uuid, updates) + + except Exception as e: + # TODO maybe handle differently + # Print a message with details in lieu of writing to the batch file + msg = f"Batch file could not be written to" + if isinstance(e, Timeout): + msg += f" (file locked for {self._batch_lock.timeout} seconds)" + elif isinstance(e, PreventOverwriteError): + msg += f" (items would be overwritten, even though file was locked)" + + if results["success"]: + output_dir = self._df.paths.get_batch_path().parent.joinpath(str(uuid)) + msg += f"\nRun succeeded; results are in {output_dir}." + else: + msg += f"\nRun failed.\n" + msg += results["traceback"] - with open_batch_for_safe_writing(path) as disk_df: - # check that max_index_diff is not exceeded - if abs(disk_df.index.size - self._df.index.size) > max_index_diff: - raise PreventOverwriteError( - f"The number of rows in the DataFrame on disk differs more " - f"than has been allowed by the `max_index_diff` kwarg which " - f"is set to <{max_index_diff}>. This is to prevent overwriting " - f"the full DataFrame with a sub-DataFrame. If you still wish " - f"to save the smaller DataFrame, use `caiman.save_to_disk()` " - f"with `max_index_diff` set to the highest allowable difference " - f"in row number." - ) + raise RuntimeError(msg) + + @_verify_and_lock_batch_file + def save_to_disk(self): + """ + Saves DataFrame to disk, copies to a backup before overwriting existing file. + Raises PreventOverwriteError if the df on disk has a different set of items than + the one in memory (before saving). + """ + self._save_to_disk_unsafe() - bak = path.with_suffix(path.suffix + f"bak.{time.time()}") + def _save_to_disk_unsafe(self): + path: Path = self._df.paths.get_batch_path() + bak = path.with_suffix(path.suffix + f"bak.{time.time()}") - shutil.copyfile(path, bak) - try: - # don't save batch path because it's redundant/possibly incorrect when loading from disk - del self._df.attrs["batch_path"] + shutil.copyfile(path, bak) + try: + # don't save batch path because it's redundant/possibly incorrect when loading from disk + del self._df.attrs["batch_path"] + with self._batch_lock: # ensure we have the lock to avoid messing up other "safe" operations self._df.to_pickle(path) - os.remove(bak) - except: - shutil.copyfile(bak, path) - raise IOError(f"Could not save dataframe to disk.") - finally: - # restore batch path - self._df.paths.set_batch_path(path) + os.remove(bak) + except: + shutil.copyfile(bak, path) + raise IOError(f"Could not save dataframe to disk.") + finally: + # restore batch path + self._df.paths.set_batch_path(path) def reload_from_disk(self) -> pd.DataFrame: """ @@ -184,6 +232,7 @@ def reload_from_disk(self) -> pd.DataFrame: """ return load_batch(self._df.paths.get_batch_path()) + @_verify_and_lock_batch_file @_index_parser def remove_item(self, index: Union[int, str, UUID], remove_data: bool = True, safe_removal: bool = True): """ @@ -250,7 +299,7 @@ def remove_item(self, index: Union[int, str, UUID], remove_data: bool = True, sa # Reset indices so there are no 'jumps' self._df.reset_index(drop=True, inplace=True) # Save new df to disk - self.save_to_disk(max_index_diff=1) + self._save_to_disk_unsafe() def get_params_diffs(self, algo: str, item_name: str) -> pd.DataFrame: """