|
9 | 9 | # See the License for the specific language governing permissions and |
10 | 10 | # limitations under the License. |
11 | 11 |
|
12 | | -from typing import Callable, Iterable, Optional |
| 12 | +import math |
| 13 | +from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union |
13 | 14 |
|
14 | 15 | from torch.utils.data import IterableDataset as _TorchIterableDataset |
| 16 | +from torch.utils.data import get_worker_info |
15 | 17 |
|
| 18 | +from monai.data.utils import convert_tables_to_dicts |
16 | 19 | from monai.transforms import apply_transform |
| 20 | +from monai.utils import ensure_tuple, optional_import |
| 21 | + |
| 22 | +pd, _ = optional_import("pandas") |
17 | 23 |
|
18 | 24 |
|
19 | 25 | class IterableDataset(_TorchIterableDataset): |
@@ -43,3 +49,94 @@ def __iter__(self): |
43 | 49 | if self.transform is not None: |
44 | 50 | data = apply_transform(self.transform, data) |
45 | 51 | yield data |
| 52 | + |
| 53 | + |
| 54 | +class CSVIterableDataset(IterableDataset): |
| 55 | + """ |
| 56 | + Iterable dataset to load CSV files and generate dictionary data. |
| 57 | + It can be helpful when loading extemely big CSV files that can't read into memory directly. |
| 58 | + To accelerate the loading process, it can support multi-processing based on PyTorch DataLoader workers, |
| 59 | + every process executes tranforms on part of every loaded chunk. |
| 60 | + Note: the order of output data may not match data source in multi-processing mode. |
| 61 | +
|
| 62 | + It can load data from multiple CSV files and join the tables with addtional `kwargs` arg. |
| 63 | + Support to only load specific columns. |
| 64 | + And it can also group several loaded columns to generate a new column, for example, |
| 65 | + set `col_groups={"meta": ["meta_0", "meta_1", "meta_2"]}`, output can be:: |
| 66 | +
|
| 67 | + [ |
| 68 | + {"image": "./image0.nii", "meta_0": 11, "meta_1": 12, "meta_2": 13, "meta": [11, 12, 13]}, |
| 69 | + {"image": "./image1.nii", "meta_0": 21, "meta_1": 22, "meta_2": 23, "meta": [21, 22, 23]}, |
| 70 | + ] |
| 71 | +
|
| 72 | + Args: |
| 73 | + filename: the filename of expected CSV file to load. if providing a list |
| 74 | + of filenames, it will load all the files and join tables. |
| 75 | + chunksize: rows of a chunk when loading iterable data from CSV files, default to 1000. more details: |
| 76 | + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html. |
| 77 | + col_names: names of the expected columns to load. if None, load all the columns. |
| 78 | + col_types: `type` and `default value` to convert the loaded columns, if None, use original data. |
| 79 | + it should be a dictionary, every item maps to an expected column, the `key` is the column |
| 80 | + name and the `value` is None or a dictionary to define the default value and data type. |
| 81 | + the supported keys in dictionary are: ["type", "default"]. for example:: |
| 82 | +
|
| 83 | + col_types = { |
| 84 | + "subject_id": {"type": str}, |
| 85 | + "label": {"type": int, "default": 0}, |
| 86 | + "ehr_0": {"type": float, "default": 0.0}, |
| 87 | + "ehr_1": {"type": float, "default": 0.0}, |
| 88 | + "image": {"type": str, "default": None}, |
| 89 | + } |
| 90 | +
|
| 91 | + col_groups: args to group the loaded columns to generate a new column, |
| 92 | + it should be a dictionary, every item maps to a group, the `key` will |
| 93 | + be the new column name, the `value` is the names of columns to combine. for example: |
| 94 | + `col_groups={"ehr": [f"ehr_{i}" for i in range(10)], "meta": ["meta_1", "meta_2"]}` |
| 95 | + transform: transform to apply on the loaded items of a dictionary data. |
| 96 | + kwargs: additional arguments for `pandas.merge()` API to join tables. |
| 97 | +
|
| 98 | + """ |
| 99 | + |
| 100 | + def __init__( |
| 101 | + self, |
| 102 | + filename: Union[str, Sequence[str]], |
| 103 | + chunksize: int = 1000, |
| 104 | + col_names: Optional[Sequence[str]] = None, |
| 105 | + col_types: Optional[Dict[str, Optional[Dict[str, Any]]]] = None, |
| 106 | + col_groups: Optional[Dict[str, Sequence[str]]] = None, |
| 107 | + transform: Optional[Callable] = None, |
| 108 | + **kwargs, |
| 109 | + ): |
| 110 | + self.files = ensure_tuple(filename) |
| 111 | + self.chunksize = chunksize |
| 112 | + self.iters = self.reset() |
| 113 | + self.col_names = col_names |
| 114 | + self.col_types = col_types |
| 115 | + self.col_groups = col_groups |
| 116 | + self.kwargs = kwargs |
| 117 | + super().__init__(data=None, transform=transform) # type: ignore |
| 118 | + |
| 119 | + def reset(self, filename: Optional[Union[str, Sequence[str]]] = None): |
| 120 | + if filename is not None: |
| 121 | + # update files if necessary |
| 122 | + self.files = ensure_tuple(filename) |
| 123 | + self.iters = [pd.read_csv(f, chunksize=self.chunksize) for f in self.files] |
| 124 | + return self.iters |
| 125 | + |
| 126 | + def __iter__(self): |
| 127 | + for chunks in zip(*self.iters): |
| 128 | + self.data = convert_tables_to_dicts( |
| 129 | + dfs=chunks, |
| 130 | + col_names=self.col_names, |
| 131 | + col_types=self.col_types, |
| 132 | + col_groups=self.col_groups, |
| 133 | + **self.kwargs, |
| 134 | + ) |
| 135 | + info = get_worker_info() |
| 136 | + if info is not None: |
| 137 | + length = len(self.data) |
| 138 | + per_worker = int(math.ceil(length / float(info.num_workers))) |
| 139 | + start = info.id * per_worker |
| 140 | + self.data = self.data[start : min(start + per_worker, length)] |
| 141 | + |
| 142 | + return super().__iter__() |
0 commit comments