Skip to content

Commit

Permalink
Refactor to put lock, item updating functionality in df extension
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanbb committed Jun 16, 2024
1 parent d753113 commit c6a47a4
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 111 deletions.
2 changes: 0 additions & 2 deletions mesmerize_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
get_parent_raw_data_path,
load_batch,
create_batch,
save_results_safely
)
from .caiman_extensions import *
from pathlib import Path
Expand All @@ -17,7 +16,6 @@
"get_parent_raw_data_path",
"load_batch",
"create_batch",
"save_results_safely",
"CaimanDataFrameExtensions",
"CaimanSeriesExtensions",
"CNMFExtensions",
Expand Down
8 changes: 4 additions & 4 deletions mesmerize_core/algorithms/cnmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

# prevent circular import
if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely
from mesmerize_core import set_parent_raw_data_path, load_batch
from mesmerize_core.utils import IS_WINDOWS
else: # when running with local backend
from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely
from ..batch_utils import set_parent_raw_data_path, load_batch
from ..utils import IS_WINDOWS


Expand All @@ -25,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
set_parent_raw_data_path(data_path)

df = load_batch(batch_path)
item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)

input_movie_path = item["input_movie_path"]
# resolve full path
Expand Down Expand Up @@ -138,7 +138,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
cm.stop_server(dview=dview)

runtime = round(time.time() - algo_start, 2)
save_results_safely(batch_path, uuid, d, runtime)
df.caiman.update_item_with_results(uuid, d, runtime)

@click.command()
@click.option("--batch-path", type=str)
Expand Down
8 changes: 4 additions & 4 deletions mesmerize_core/algorithms/cnmfe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import time

if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely
from mesmerize_core import set_parent_raw_data_path, load_batch
from mesmerize_core.utils import IS_WINDOWS
else: # when running with local backend
from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely
from ..batch_utils import set_parent_raw_data_path, load_batch
from ..utils import IS_WINDOWS


Expand All @@ -23,7 +23,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
set_parent_raw_data_path(data_path)

df = load_batch(batch_path)
item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)

input_movie_path = item["input_movie_path"]
# resolve full path
Expand Down Expand Up @@ -122,7 +122,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
cm.stop_server(dview=dview)

runtime = round(time.time() - algo_start, 2)
save_results_safely(batch_path, uuid, d, runtime)
df.caiman.update_item_with_results(uuid, d, runtime)


@click.command()
Expand Down
8 changes: 4 additions & 4 deletions mesmerize_core/algorithms/mcorr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

# prevent circular import
if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
from mesmerize_core import set_parent_raw_data_path, load_batch, save_results_safely
from mesmerize_core import set_parent_raw_data_path, load_batch
else: # when running with local backend
from ..batch_utils import set_parent_raw_data_path, load_batch, save_results_safely
from ..batch_utils import set_parent_raw_data_path, load_batch


def run_algo(batch_path, uuid, data_path: str = None):
Expand All @@ -25,7 +25,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
batch_path = Path(batch_path)
df = load_batch(batch_path)

item = df[df["uuid"] == uuid].squeeze()
item = df.caiman.uloc(uuid)
# resolve full path
input_movie_path = str(df.paths.resolve(item["input_movie_path"]))

Expand Down Expand Up @@ -146,7 +146,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
cm.stop_server(dview=dview)

runtime = round(time.time() - algo_start, 2)
save_results_safely(batch_path, uuid, d, runtime)
df.caiman.update_item_with_results(uuid, d, runtime)


@click.command()
Expand Down
63 changes: 0 additions & 63 deletions mesmerize_core/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path
from typing import Union

from filelock import SoftFileLock, Timeout
import pandas as pd

from .utils import validate_path
Expand Down Expand Up @@ -247,65 +246,3 @@ def get_full_raw_data_path(path: Union[Path, str]) -> Path:
return PARENT_DATA_PATH.joinpath(path)

return path


class PreventOverwriteError(IndexError):
"""
Error thrown when trying to write to an existing batch file with a potential risk of removing existing rows.
"""
pass


class BatchLock:
"""Locks a batch file for safe writing, returning the dataframe in the target"""
TIMEOUT = 30 # must be consistent or else nested re-entrant locks break
def __init__(self, batch_path: Union[Path, str]):
self.lock = SoftFileLock(str(batch_path) + ".lock", is_singleton=True, timeout=self.TIMEOUT)
self.batch_path = batch_path

def __enter__(self) -> pd.DataFrame:
self.lock.__enter__() # may throw Timeout
return load_batch(self.batch_path)

def __exit__(self, exc_type, exc_value, traceback):
self.lock.__exit__(exc_type, exc_value, traceback)


def open_batch_for_safe_writing(batch_path: Union[Path, str]) -> BatchLock:
"""Just a more self-documenting constructor"""
return BatchLock(batch_path)


def save_results_safely(batch_path: Union[Path, str], uuid, results: dict, runtime: float):
"""
Try to load the given batch and save results to the given item
Uses a file lock to ensure that no other process is writing to the same batch using this function,
which gives up after lock_timeout seconds.
"""
try:
with open_batch_for_safe_writing(batch_path) as df:
# Add dictionary to output column of series
df.loc[df["uuid"] == uuid, "outputs"] = [results]
# Add ran timestamp to ran_time column of series
df.loc[df["uuid"] == uuid, "ran_time"] = datetime.now().isoformat(timespec="seconds", sep="T")
df.loc[df["uuid"] == uuid, "algo_duration"] = str(runtime) + " sec"

# save dataframe to disk
df.caiman.save_to_disk()

except BaseException as e:
# Print a message with details in lieu of writing to the batch file
msg = f"Batch file could not be written to"
if isinstance(e, Timeout):
msg += f" (file locked for {BatchLock.TIMEOUT} seconds)"
elif isinstance(e, PreventOverwriteError):
msg += f" (items would be overwritten, even though file was locked)"

if results["success"]:
output_dir = Path(batch_path).parent.joinpath(str(uuid))
msg += f"\nRun succeeded; results are in {output_dir}."
else:
msg += f"\nRun failed.\n"
msg += results["traceback"]

raise RuntimeError(msg)
7 changes: 7 additions & 0 deletions mesmerize_core/caiman_extensions/_batch_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ class WrongAlgorithmExtensionError(Exception):

class DependencyError(Exception):
pass


class PreventOverwriteError(IndexError):
"""
Error thrown when trying to write to an existing batch file with a potential risk of removing existing rows.
"""
pass
15 changes: 14 additions & 1 deletion mesmerize_core/caiman_extensions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from uuid import UUID

from mesmerize_core.caiman_extensions._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, \
WrongAlgorithmExtensionError
WrongAlgorithmExtensionError, PreventOverwriteError


def validate(algo: str = None):
Expand All @@ -29,6 +29,19 @@ def wrapper(self, *args, **kwargs):
return dec


def _verify_and_lock_batch_file(func):
"""Acquires lock and ensures batch file has the same items as current df before calling wrapped function"""
@wraps(func)
def wrapper(instance, *args, **kwargs):
with instance._batch_lock:
disk_df = instance.reload_from_disk()
# check whether all the same UUIDs are present with the same indices
if not instance._df["uuid"].equals(disk_df["uuid"]):
raise PreventOverwriteError("Items on disk do not match current DataFrame; reload to synchronize")
return func(instance, *args, **kwargs)
return wrapper


def _index_parser(func):
@wraps(func)
def _parser(instance, *args, **kwargs):
Expand Down
115 changes: 82 additions & 33 deletions mesmerize_core/caiman_extensions/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@

import numpy as np
import pandas as pd
from filelock import SoftFileLock, Timeout

from ._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, DependencyError
from ._utils import validate, _index_parser
from ._batch_exceptions import BatchItemNotRunError, BatchItemUnsuccessfulError, DependencyError, PreventOverwriteError
from ._utils import validate, _index_parser, _verify_and_lock_batch_file
from ..batch_utils import (
COMPUTE_BACKENDS,
COMPUTE_BACKEND_SUBPROCESS,
COMPUTE_BACKEND_LOCAL,
get_parent_raw_data_path,
load_batch,
open_batch_for_safe_writing,
PreventOverwriteError
)
from ..utils import validate_path, IS_WINDOWS, make_runfile, warning_experimental
from .cnmf import cnmf_cache
Expand All @@ -46,6 +45,8 @@ class CaimanDataFrameExtensions:

def __init__(self, df: pd.DataFrame):
self._df = df
self._batch_lock = SoftFileLock(str(df.paths.get_batch_path()) + ".lock",
timeout=30, is_singleton=True)

def uloc(self, u: Union[str, UUID]) -> pd.Series:
"""
Expand All @@ -63,6 +64,7 @@ def uloc(self, u: Union[str, UUID]) -> pd.Series:

return df_u.squeeze()

@_verify_and_lock_batch_file
def add_item(self, algo: str, item_name: str, input_movie_path: Union[str, pd.Series], params: dict):
"""
Add an item to the DataFrame to organize parameters
Expand Down Expand Up @@ -131,41 +133,87 @@ def add_item(self, algo: str, item_name: str, input_movie_path: Union[str, pd.Se
self._df.loc[self._df.index.size] = s

# Save DataFrame to disk
self.save_to_disk(max_index_diff=1)
self._save_to_disk_unsafe()

def save_to_disk(self, max_index_diff: int = 0):
@_verify_and_lock_batch_file
@_index_parser
def update_item(self, index: int, updates: Union[dict, pd.Series]):
"""
Saves DataFrame to disk, copies to a backup before overwriting existing file.
Update the item with the given uuid with the data in updated_row and write to disk.
Parameters
----------
index: int, str or UUID
The index of the batch item to update as a numerical ``int`` index, ``str`` representing
a UUID, or a UUID object.
updates: dict or Series
Data to change in the selected row. Each key in updates (or index entry if it is a Series)
should match a column name. If not, that data will not be saved.
"""
path: Path = self._df.paths.get_batch_path()
row = self._df.iloc[index]
row.update(updates)
self._df.iloc[index] = row
self.save_to_disk()

def update_item_with_results(self, uuid: Union[str, UUID], results: dict, run_duration: float):
"""Helper for algorithms to save their results to disk"""
updates = {
"outputs": results,
"ran_time": datetime.now().isoformat(timespec="seconds", sep="T"),
"algo_duration": str(run_duration) + " sec"
}
try:
# reload first because it should be safe since we have a UUID and we want to
# avoid failing to save results just because other items were added/removed
with self._batch_lock:
self._df = self.reload_from_disk()
self.update_item(uuid, updates)

except Exception as e:
# TODO maybe handle differently
# Print a message with details in lieu of writing to the batch file
msg = f"Batch file could not be written to"
if isinstance(e, Timeout):
msg += f" (file locked for {self._batch_lock.timeout} seconds)"
elif isinstance(e, PreventOverwriteError):
msg += f" (items would be overwritten, even though file was locked)"

if results["success"]:
output_dir = self._df.paths.get_batch_path().parent.joinpath(str(uuid))
msg += f"\nRun succeeded; results are in {output_dir}."
else:
msg += f"\nRun failed.\n"
msg += results["traceback"]

with open_batch_for_safe_writing(path) as disk_df:
# check that max_index_diff is not exceeded
if abs(disk_df.index.size - self._df.index.size) > max_index_diff:
raise PreventOverwriteError(
f"The number of rows in the DataFrame on disk differs more "
f"than has been allowed by the `max_index_diff` kwarg which "
f"is set to <{max_index_diff}>. This is to prevent overwriting "
f"the full DataFrame with a sub-DataFrame. If you still wish "
f"to save the smaller DataFrame, use `caiman.save_to_disk()` "
f"with `max_index_diff` set to the highest allowable difference "
f"in row number."
)
raise RuntimeError(msg)

@_verify_and_lock_batch_file
def save_to_disk(self):
"""
Saves DataFrame to disk, copies to a backup before overwriting existing file.
Raises PreventOverwriteError if the df on disk has a different set of items than
the one in memory (before saving).
"""
self._save_to_disk_unsafe()

bak = path.with_suffix(path.suffix + f"bak.{time.time()}")
def _save_to_disk_unsafe(self):
path: Path = self._df.paths.get_batch_path()
bak = path.with_suffix(path.suffix + f"bak.{time.time()}")

shutil.copyfile(path, bak)
try:
# don't save batch path because it's redundant/possibly incorrect when loading from disk
del self._df.attrs["batch_path"]
shutil.copyfile(path, bak)
try:
# don't save batch path because it's redundant/possibly incorrect when loading from disk
del self._df.attrs["batch_path"]
with self._batch_lock: # ensure we have the lock to avoid messing up other "safe" operations
self._df.to_pickle(path)
os.remove(bak)
except:
shutil.copyfile(bak, path)
raise IOError(f"Could not save dataframe to disk.")
finally:
# restore batch path
self._df.paths.set_batch_path(path)
os.remove(bak)
except:
shutil.copyfile(bak, path)
raise IOError(f"Could not save dataframe to disk.")
finally:
# restore batch path
self._df.paths.set_batch_path(path)

def reload_from_disk(self) -> pd.DataFrame:
"""
Expand All @@ -184,6 +232,7 @@ def reload_from_disk(self) -> pd.DataFrame:
"""
return load_batch(self._df.paths.get_batch_path())

@_verify_and_lock_batch_file
@_index_parser
def remove_item(self, index: Union[int, str, UUID], remove_data: bool = True, safe_removal: bool = True):
"""
Expand Down Expand Up @@ -250,7 +299,7 @@ def remove_item(self, index: Union[int, str, UUID], remove_data: bool = True, sa
# Reset indices so there are no 'jumps'
self._df.reset_index(drop=True, inplace=True)
# Save new df to disk
self.save_to_disk(max_index_diff=1)
self._save_to_disk_unsafe()

def get_params_diffs(self, algo: str, item_name: str) -> pd.DataFrame:
"""
Expand Down

0 comments on commit c6a47a4

Please sign in to comment.