Skip to content

Commit a2dd5c1

Browse files
authored
Load json lines as a batch (#140)
1 parent 803aabb commit a2dd5c1

File tree

2 files changed

+78
-58
lines changed

2 files changed

+78
-58
lines changed

exca/cachedict.py

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ def __init__(
118118
self._key_info: dict[str, DumpInfo] = {}
119119
# json info file reading
120120
self._folder_modified = -1.0
121-
self._info_files_last: dict[str, int] = {}
122-
self._jsonl_readings = 0 # for perf
121+
self._jsonl_readers: dict[str, JsonlReader] = {}
123122
self._jsonl_reading_allowance = float("inf")
124123
# keep loaders live for optimized loading
125124
# (instances are reinstantiated for dumping though, to make sure they are unique)
@@ -160,10 +159,10 @@ def _read_info_files(self) -> None:
160159
"""Load current info files"""
161160
if self.folder is None:
162161
return
163-
if self._jsonl_reading_allowance <= self._jsonl_readings:
162+
readings = max((r.readings for r in self._jsonl_readers.values()), default=0)
163+
if self._jsonl_reading_allowance <= readings:
164164
# bypass reloading info files
165165
return
166-
self._jsonl_readings += 1
167166
folder = Path(self.folder)
168167
# read all existing jsonl files
169168
find_cmd = 'find . -type f -name "*-info.jsonl"'
@@ -179,52 +178,8 @@ def _read_info_files(self) -> None:
179178
out = e.output # stderr contains missing tmp files
180179
names = out.decode("utf8").splitlines()
181180
for name in names:
182-
fp = folder / name
183-
last = 0
184-
meta = {}
185-
fail = ""
186-
with fp.open("rb") as f:
187-
for k, line in enumerate(f):
188-
if fail:
189-
msg = f"Failed to read non-last line #{k - 1} in {fp}:\n{fail!r}"
190-
raise RuntimeError(msg)
191-
count = len(line)
192-
last = last + count
193-
line = line.strip()
194-
if not line:
195-
logger.debug("Skipping empty line #%s", k)
196-
continue
197-
strline = line.decode("utf8")
198-
if not k:
199-
if not strline.startswith(METADATA_TAG):
200-
raise RuntimeError(f"metadata missing in info file {fp}")
201-
strline = strline[len(METADATA_TAG) :]
202-
try:
203-
info = json.loads(strline)
204-
except json.JSONDecodeError:
205-
msg = "Failed to read to line #%s in %s in info file %s"
206-
logger.warning(msg, k, name, strline)
207-
# last line could be currently being written?
208-
# (let's be robust to it)
209-
fail = strline
210-
last -= count # move back for next read
211-
continue
212-
if not k: # metadata
213-
meta = info
214-
new_last = self._info_files_last.get(fp.name, last)
215-
if new_last > last:
216-
last = new_last
217-
msg = "Forwarding to byte %s in info file %s"
218-
logger.debug(msg, last, name)
219-
f.seek(last)
220-
continue
221-
key = info.pop("#key")
222-
dinfo = DumpInfo(
223-
jsonl=fp, byte_range=(last - count, last), **meta, content=info
224-
)
225-
self._key_info[key] = dinfo
226-
227-
self._info_files_last[fp.name] = last # f.tell()
181+
jreader = self._jsonl_readers.setdefault(name, JsonlReader(folder / name))
182+
self._key_info.update(jreader.read())
228183

229184
def values(self) -> tp.Iterable[X]:
230185
for key in self:
@@ -309,7 +264,8 @@ def frozen_cache_folder(self) -> tp.Iterator[None]:
309264
This is useful to speed up __contains__ statement with many missing
310265
items, which could trigger thousands of file rereads
311266
"""
312-
self._jsonl_reading_allowance = self._jsonl_readings + 1
267+
readings = max((r.readings for r in self._jsonl_readers.values()), default=0)
268+
self._jsonl_reading_allowance = readings + 1
313269
try:
314270
yield
315271
finally:
@@ -375,6 +331,7 @@ def __setitem__(self, key: str, value: X) -> None:
375331
if cd.folder is not None:
376332
if self._info_filepath is None:
377333
raise RuntimeError("Cannot write out of a writer context")
334+
fp = self._info_filepath
378335
if self._dumper is None:
379336
self._dumper = DumperLoader.CLASSES[cd.cache_type](cd.folder)
380337
self._exit_stack.enter_context(self._dumper.open())
@@ -387,7 +344,6 @@ def __setitem__(self, key: str, value: X) -> None:
387344
meta = {"cache_type": cd.cache_type}
388345
if self._info_handle is None:
389346
# create the file only when required to avoid leaving empty files for some time
390-
fp = self._info_filepath
391347
self._info_handle = self._exit_stack.enter_context(fp.open("ab"))
392348
if not self._info_handle.tell():
393349
meta_str = METADATA_TAG + json.dumps(meta) + "\n"
@@ -397,13 +353,14 @@ def __setitem__(self, key: str, value: X) -> None:
397353
self._info_handle.write(b + b"\n")
398354
info.pop("#key")
399355
dinfo = DumpInfo(
400-
jsonl=self._info_filepath,
356+
jsonl=fp,
401357
byte_range=(current, current + len(b) + 1),
402358
content=info,
403359
**meta,
404360
)
405361
cd._key_info[key] = dinfo
406-
cd._info_files_last[self._info_filepath.name] = self._info_handle.tell()
362+
last = self._info_handle.tell()
363+
cd._jsonl_readers.setdefault(fp.name, JsonlReader(fp))._last = last
407364
# reading will reload to in-memory cache if need be
408365
# (since dumping may have loaded the underlying data, let's not keep it)
409366
if cd.permissions is not None:
@@ -413,3 +370,66 @@ def __setitem__(self, key: str, value: X) -> None:
413370
except Exception: # pylint: disable=broad-except
414371
pass # avoid issues in case of overlapping processes
415372
os.utime(cd.folder) # make sure the modified time is updated
373+
374+
375+
class JsonlReader:
376+
def __init__(self, filepath: str | Path) -> None:
377+
self._fp = Path(filepath)
378+
self._last = 0
379+
self.readings = 0
380+
381+
def read(self) -> dict[str, DumpInfo]:
382+
out: dict[str, DumpInfo] = {}
383+
self.readings += 1
384+
with self._fp.open("rb") as f:
385+
# metadata
386+
try:
387+
first = next(f)
388+
except StopIteration:
389+
return out # nothing to do
390+
strline = first.decode("utf8")
391+
if not strline.startswith(METADATA_TAG):
392+
raise RuntimeError(f"metadata missing in info file {self._fp}")
393+
meta = json.loads(strline[len(METADATA_TAG) :])
394+
last = len(first)
395+
if self._last > len(first):
396+
msg = "Forwarding to byte %s in info file %s"
397+
logger.debug(msg, self._last, self._fp.name)
398+
f.seek(self._last)
399+
last = self._last
400+
branges = []
401+
lines = []
402+
for line in f.readlines():
403+
if not line.startswith(b" "): # empty
404+
lines.append(line)
405+
branges.append((last, last + len(line)))
406+
last += len(line)
407+
if not lines:
408+
return out
409+
lines[0] = b"[" + lines[0]
410+
# last line may be corruped, so check twice
411+
for k in range(2):
412+
lines[-1] = lines[-1] + b"]"
413+
json_str = b",".join(lines).decode("utf8")
414+
try:
415+
infos = json.loads(json_str)
416+
except json.decoder.JSONDecodeError:
417+
if not k:
418+
lines = lines[:-1]
419+
branges = branges[:-1]
420+
else:
421+
logger.warning(
422+
"Could not read json in %s:\n%s", self._fp, json_str
423+
)
424+
raise
425+
else:
426+
break
427+
# metadata
428+
if len(infos) != len(branges):
429+
raise RuntimeError("info and ranges are no more aligned")
430+
for info, brange in zip(infos, branges):
431+
key = info.pop("#key")
432+
dinfo = DumpInfo(jsonl=self._fp, byte_range=brange, **meta, content=info)
433+
out[key] = dinfo
434+
self._last = branges[-1][-1]
435+
return out

exca/test_map.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,15 @@ def test_map_infra_cache_dict_calls(tmp_path: Path) -> None:
110110
whatever = Whatever(infra={"folder": tmp_path, "cluster": "local"}) # type: ignore
111111
cd = whatever.infra.cache_dict
112112
_ = list(whatever.process([1, 2, 3, 4]))
113-
assert cd._jsonl_readings == 3
113+
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
114114
whatever = Whatever(infra={"folder": tmp_path, "cluster": "local"}) # type: ignore
115115
cd = whatever.infra.cache_dict
116116
_ = list(whatever.process([1]))
117-
assert cd._jsonl_readings == 1
117+
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
118118
_ = list(whatever.process([2, 3, 4]))
119-
assert cd._jsonl_readings == 1
119+
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
120120
_ = list(whatever.process([5]))
121-
assert cd._jsonl_readings == 4
121+
assert max(r.readings for r in cd._jsonl_readers.values()) == 2
122122

123123

124124
def test_missing_yield() -> None:

0 commit comments

Comments
 (0)