2121
2222import SimpleITK as sitk
2323from prefect import flow , task
24- from pytools . HedwigZarrImages import HedwigZarrImage , HedwigZarrImages
24+ from pytools import HedwigZarrImage , HedwigZarrImages
2525
2626from em_workflows .file_path import FilePath
2727from em_workflows .utils import utils
@@ -78,8 +78,11 @@ def copy_zarr_to_assets_dir(file_path: FilePath) -> None:
7878
7979
8080@task
81- def generate_imageset (file_path : FilePath ) -> List [Dict ]:
81+ def generate_imageset (file_path : FilePath ,
82+ use_default_dask = False ) -> List [Dict ]:
8283 """
84+ :param: use_default_dask: If True, reuses the Prefect Dask Scheduler for the ZARR and Dask array operations.
85+
8386 | ImageSet consists of all the assets for a particular zarr sub-image and label images
8487 | Macro image is ignored
8588 | Label image is added as an thumbnail asset
@@ -91,16 +94,17 @@ def generate_imageset(file_path: FilePath) -> List[Dict]:
9194 """
9295 zarr_fp = f"{ file_path .assets_dir } /{ file_path .base } .zarr"
9396 image_set = list ()
94- zarr_images = HedwigZarrImages (Path (zarr_fp ))
97+
98+ if use_default_dask :
99+ compute_args = {}
100+ else :
101+ # This task is used in a sub-flow where it's the only task running.
102+ # use all the cores in a thread pool.
103+ compute_args = {"scheduler" : "threads" }
104+ zarr_images = HedwigZarrImages (Path (zarr_fp ), compute_args = compute_args )
95105 # for image_name, image in zarr_images.series():
96106 for k_idx , image_name in enumerate (zarr_images .get_series_keys ()):
97- # The relative path of the zarr group from the root zarr
98- # this assumes a valid zarr group with OME directory inside
99- ome_index_to_zarr_group = zarr_images .zarr_root ["OME" ].attrs ["series" ]
100- zarr_idx = ome_index_to_zarr_group [k_idx ]
101- image = HedwigZarrImage (
102- zarr_images .zarr_root [zarr_idx ], zarr_images .ome_info , k_idx
103- )
107+ image = zarr_images [k_idx ]
104108 # single image element
105109 image_elt = dict ()
106110 image_elt ["imageMetadata" ] = None
@@ -119,7 +123,7 @@ def generate_imageset(file_path: FilePath) -> List[Dict]:
119123
120124 else :
121125 ng_asset = file_path .gen_asset (
122- asset_type = "neuroglancerZarr" , asset_fp = Path ( zarr_fp ) / zarr_idx
126+ asset_type = "neuroglancerZarr" , asset_fp = image . path
123127 )
124128 # note - dims should be image.dims, but GUI does not want XYC
125129 # hardcoding in XY for now.
@@ -156,7 +160,9 @@ async def generate_czi_imageset(file_path: FilePath) -> List[Dict]:
156160 copy_to_assets = copy_zarr_to_assets_dir .submit (
157161 file_path , wait_for = [rechunk_result ]
158162 )
159- return generate_imageset .submit (file_path , wait_for = [copy_to_assets ])
163+ return generate_imageset .submit (file_path ,
164+ use_default_dask = True ,
165+ wait_for = [copy_to_assets ])
160166
161167
162168@task
0 commit comments