Skip to content

Commit f22f93a

Browse files
committed
Remove traces of local_storage
1 parent a132b3b commit f22f93a

File tree

6 files changed

+38
-72
lines changed

6 files changed

+38
-72
lines changed

em_workflows/brt/flow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
from pathlib import Path
4545
from natsort import os_sorted
4646
from prefect import task, flow, unmapped
47-
from prefect.states import Completed, Failed
4847
from pytools.HedwigZarrImages import HedwigZarrImages
4948

5049
from em_workflows.utils import utils
@@ -649,7 +648,6 @@ def brt_flow(
649648
# Ref: https://github.com/PrefectHQ/prefect/blob/98d33187ecce032defb8ec7a263de32564e7f7f6/src/prefect/futures.py#L43
650649
callback_result = list()
651650
failed = 0
652-
total = len(prim_fps)
653651
for idx, (fp, cb) in enumerate(zip(fps.result(), callback_with_tilt_mov.result())):
654652
try:
655653
# In Prefect v3, we can directly get the result without checking state
@@ -659,12 +657,16 @@ def brt_flow(
659657
callback_result.append(fp.gen_prim_fp_elt(f"Error: {str(e)}"))
660658
failed += 1
661659

662-
utils.send_callback_body.submit(
660+
send_callback_task = utils.send_callback_body.submit(
663661
x_no_api=x_no_api,
664662
token=token,
665663
callback_url=callback_url,
666664
files_elts=callback_result,
667665
)
668666

667+
utils.final_cleanup_task.submit(
668+
fps, x_keep_workdir, wait_for=[utils.allow_failure(send_callback_task)]
669+
)
670+
669671
# In Prefect v3, flows should return data, not state objects
670672
return callback_result

em_workflows/config.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,6 @@ class Config:
107107
user = os.environ["USER"]
108108
tmp_dir = f"/data/scratch/{user}"
109109

110-
local_storage = LocalFileSystem(basepath=PREFECT_HOME.value() / "local-storage")
111-
pickle_serializer = PickleSerializer(picklelib="pickle")
112-
113110
@staticmethod
114111
def _mount_point(share_name: str) -> str:
115112
share = NFS_MOUNT.get(share_name)

em_workflows/czi/flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ async def czi_flow(
273273
)
274274

275275
# Prefect v3: wait_for is a valid keyword for .submit()
276-
cleanup_task = utils.final_cleanup_task.submit(
276+
utils.final_cleanup_task.submit(
277277
fps, x_keep_workdir, wait_for=[utils.allow_failure(send_callback_task)]
278278
)
279279

em_workflows/lrg_2d_rgb/flow.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -207,22 +207,24 @@ def lrg_2d_flow(
207207
)
208208

209209
callback_result = list()
210-
211-
for idx, (fp, cb) in enumerate(zip(fps.result(), callback_with_pyramids)):
212-
state = cb.wait()
213-
if state.is_completed():
214-
callback_result.append(cb.result())
215-
else:
216-
path = f"{state.state_details.flow_run_id}__{idx}"
217-
try:
218-
message = LRG2DConfig.local_storage.read_path(path)
219-
callback_result.append(fp.gen_prim_fp_elt(message.decode()))
220-
except ValueError:
221-
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
222-
223-
utils.send_callback_body.submit(
210+
failed = 0
211+
for idx, (fp, cb) in enumerate(zip(fps.result(), callback_with_pyramids.result())):
212+
try:
213+
callback_result.append(cb)
214+
except Exception as e:
215+
# If there's an error getting the result, use fallback
216+
callback_result.append(fp.gen_prim_fp_elt(f"Error: {str(e)}"))
217+
failed += 1
218+
219+
send_callback_task = utils.send_callback_body.submit(
224220
x_no_api=x_no_api,
225221
token=token,
226222
callback_url=callback_url,
227223
files_elts=callback_result,
228224
)
225+
226+
utils.final_cleanup_task.submit(
227+
fps, x_keep_workdir, wait_for=[utils.allow_failure(send_callback_task)]
228+
)
229+
230+
return callback_result

em_workflows/sem_tomo/flow.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -420,21 +420,24 @@ def sem_tomo_flow(
420420
)
421421

422422
callback_result = list()
423-
for idx, (fp, cb) in enumerate(zip(fps.result(), callback_with_corr_movies)):
424-
state = cb.wait()
425-
if state.is_completed():
426-
callback_result.append(cb.result())
427-
else:
428-
path = f"{state.state_details.flow_run_id}__{idx}"
429-
try:
430-
message = SEMConfig.local_storage.read_path(path)
431-
callback_result.append(fp.gen_prim_fp_elt(message.decode()))
432-
except ValueError:
433-
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
434-
435-
utils.send_callback_body.submit(
423+
failed = 0
424+
for idx, (fp, cb) in enumerate(zip(fps.result(), callback_with_corr_movies.result())):
425+
try:
426+
callback_result.append(cb)
427+
except Exception as e:
428+
# If there's an error getting the result, use fallback
429+
callback_result.append(fp.gen_prim_fp_elt(f"Error: {str(e)}"))
430+
failed += 1
431+
432+
send_callback_task = utils.send_callback_body.submit(
436433
x_no_api=x_no_api,
437434
token=token,
438435
callback_url=callback_url,
439436
files_elts=callback_result,
440437
)
438+
439+
utils.final_cleanup_task.submit(
440+
fps, x_keep_workdir, wait_for=[utils.allow_failure(send_callback_task)]
441+
)
442+
443+
return callback_result

em_workflows/utils/utils.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -63,28 +63,8 @@ def lookup_dims(fp: Path) -> Header:
6363
return xyz_cleaned
6464

6565

66-
def collect_exception_task_hook(task: Task, task_run: TaskRun, state: State):
67-
"""
68-
This task hook should be used with tasks where you intend to know which step of the flow run broke.
69-
Since most of our tasks are mapped by default using filepaths, it takes map index into account as well
70-
So that we can notify the user, 'this step of this file broke'.
71-
The message is written to a file using prefect's local storage.
72-
In order to retrieve it, the flow needs to have a logic at the end,
73-
to lookup for this file with exception message if the task run has failed.
74-
"""
75-
message = f"Failure in pipeline step: {task.name}"
76-
map_idx = task_run.name.split("-")[-1]
77-
flow_run_id = state.state_details.flow_run_id
78-
path = f"{flow_run_id}__{map_idx}"
79-
try:
80-
Config.local_storage.read_path(path)
81-
except ValueError: # ValueError path not found
82-
Config.local_storage.write_path(path, message.encode())
83-
84-
8566
@task(
8667
name="mrc to movie generation",
87-
on_failure=[collect_exception_task_hook],
8868
)
8969
def mrc_to_movie(file_path: FilePath, root: str, asset_type: str, **kwargs):
9070
"""
@@ -357,7 +337,6 @@ def copy_template(working_dir: Path, template_name: str) -> Path:
357337
name="Batchruntomo conversion",
358338
tags=["brt"],
359339
# timeout_seconds=600,
360-
on_failure=[collect_exception_task_hook],
361340
)
362341
def run_brt(
363342
file_path: FilePath,
@@ -568,9 +547,6 @@ def notify_api_running(
568547
# return ns
569548

570549

571-
import asyncio
572-
573-
574550
async def notify_api_completion(flow: Flow, flow_run: FlowRun, state: State) -> None:
575551
"""
576552
https://docs.prefect.io/core/concepts/states.html#overview.
@@ -722,20 +698,6 @@ def send_callback_body(
722698
)
723699

724700

725-
def copy_workdirs_and_cleanup_hook(flow, flow_run, state):
726-
stored_result = Config.local_storage.read_path(f"{flow_run.id}__gen_fps")
727-
fps: List[FilePath] = Config.pickle_serializer.loads(
728-
json.loads(stored_result)["data"].encode()
729-
)
730-
parameters = flow_run.parameters
731-
x_keep_workdir = parameters.get("x_keep_workdir", False)
732-
733-
for fp in fps:
734-
copy_workdir_logs.fn(file_path=fp)
735-
736-
cleanup_workdir.fn(fps, x_keep_workdir)
737-
738-
739701
def callback_with_cleanup(
740702
fps: List[FilePath],
741703
callback_result: List,

0 commit comments

Comments
 (0)