Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 74 additions & 54 deletions exca/cachedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ def __init__(
self._key_info: dict[str, DumpInfo] = {}
# json info file reading
self._folder_modified = -1.0
self._info_files_last: dict[str, int] = {}
self._jsonl_readings = 0 # for perf
self._jsonl_readers: dict[str, JsonlReader] = {}
self._jsonl_reading_allowance = float("inf")
# keep loaders live for optimized loading
# (instances are reinstantiated for dumping though, to make sure they are unique)
Expand Down Expand Up @@ -160,10 +159,10 @@ def _read_info_files(self) -> None:
"""Load current info files"""
if self.folder is None:
return
if self._jsonl_reading_allowance <= self._jsonl_readings:
readings = max((r.readings for r in self._jsonl_readers.values()), default=0)
if self._jsonl_reading_allowance <= readings:
# bypass reloading info files
return
self._jsonl_readings += 1
folder = Path(self.folder)
# read all existing jsonl files
find_cmd = 'find . -type f -name "*-info.jsonl"'
Expand All @@ -179,52 +178,8 @@ def _read_info_files(self) -> None:
out = e.output # stderr contains missing tmp files
names = out.decode("utf8").splitlines()
for name in names:
fp = folder / name
last = 0
meta = {}
fail = ""
with fp.open("rb") as f:
for k, line in enumerate(f):
if fail:
msg = f"Failed to read non-last line #{k - 1} in {fp}:\n{fail!r}"
raise RuntimeError(msg)
count = len(line)
last = last + count
line = line.strip()
if not line:
logger.debug("Skipping empty line #%s", k)
continue
strline = line.decode("utf8")
if not k:
if not strline.startswith(METADATA_TAG):
raise RuntimeError(f"metadata missing in info file {fp}")
strline = strline[len(METADATA_TAG) :]
try:
info = json.loads(strline)
except json.JSONDecodeError:
msg = "Failed to read to line #%s in %s in info file %s"
logger.warning(msg, k, name, strline)
# last line could be currently being written?
# (let's be robust to it)
fail = strline
last -= count # move back for next read
continue
if not k: # metadata
meta = info
new_last = self._info_files_last.get(fp.name, last)
if new_last > last:
last = new_last
msg = "Forwarding to byte %s in info file %s"
logger.debug(msg, last, name)
f.seek(last)
continue
key = info.pop("#key")
dinfo = DumpInfo(
jsonl=fp, byte_range=(last - count, last), **meta, content=info
)
self._key_info[key] = dinfo

self._info_files_last[fp.name] = last # f.tell()
jreader = self._jsonl_readers.setdefault(name, JsonlReader(folder / name))
self._key_info.update(jreader.read())

def values(self) -> tp.Iterable[X]:
for key in self:
Expand Down Expand Up @@ -309,7 +264,8 @@ def frozen_cache_folder(self) -> tp.Iterator[None]:
This is useful to speed up __contains__ statement with many missing
items, which could trigger thousands of file rereads
"""
self._jsonl_reading_allowance = self._jsonl_readings + 1
readings = max((r.readings for r in self._jsonl_readers.values()), default=0)
self._jsonl_reading_allowance = readings + 1
try:
yield
finally:
Expand Down Expand Up @@ -375,6 +331,7 @@ def __setitem__(self, key: str, value: X) -> None:
if cd.folder is not None:
if self._info_filepath is None:
raise RuntimeError("Cannot write out of a writer context")
fp = self._info_filepath
if self._dumper is None:
self._dumper = DumperLoader.CLASSES[cd.cache_type](cd.folder)
self._exit_stack.enter_context(self._dumper.open())
Expand All @@ -387,7 +344,6 @@ def __setitem__(self, key: str, value: X) -> None:
meta = {"cache_type": cd.cache_type}
if self._info_handle is None:
# create the file only when required to avoid leaving empty files for some time
fp = self._info_filepath
self._info_handle = self._exit_stack.enter_context(fp.open("ab"))
if not self._info_handle.tell():
meta_str = METADATA_TAG + json.dumps(meta) + "\n"
Expand All @@ -397,13 +353,14 @@ def __setitem__(self, key: str, value: X) -> None:
self._info_handle.write(b + b"\n")
info.pop("#key")
dinfo = DumpInfo(
jsonl=self._info_filepath,
jsonl=fp,
byte_range=(current, current + len(b) + 1),
content=info,
**meta,
)
cd._key_info[key] = dinfo
cd._info_files_last[self._info_filepath.name] = self._info_handle.tell()
last = self._info_handle.tell()
cd._jsonl_readers.setdefault(fp.name, JsonlReader(fp))._last = last
# reading will reload to in-memory cache if need be
# (since dumping may have loaded the underlying data, let's not keep it)
if cd.permissions is not None:
Expand All @@ -413,3 +370,66 @@ def __setitem__(self, key: str, value: X) -> None:
except Exception: # pylint: disable=broad-except
pass # avoid issues in case of overlapping processes
os.utime(cd.folder) # make sure the modified time is updated


class JsonlReader:
def __init__(self, filepath: str | Path) -> None:
self._fp = Path(filepath)
self._last = 0
self.readings = 0

def read(self) -> dict[str, DumpInfo]:
out: dict[str, DumpInfo] = {}
self.readings += 1
with self._fp.open("rb") as f:
# metadata
try:
first = next(f)
except StopIteration:
return out # nothing to do
strline = first.decode("utf8")
if not strline.startswith(METADATA_TAG):
raise RuntimeError(f"metadata missing in info file {self._fp}")
meta = json.loads(strline[len(METADATA_TAG) :])
last = len(first)
if self._last > len(first):
msg = "Forwarding to byte %s in info file %s"
logger.debug(msg, self._last, self._fp.name)
f.seek(self._last)
last = self._last
branges = []
lines = []
for line in f.readlines():
if not line.startswith(b" "): # empty
lines.append(line)
branges.append((last, last + len(line)))
last += len(line)
if not lines:
return out
lines[0] = b"[" + lines[0]
# last line may be corruped, so check twice
for k in range(2):
lines[-1] = lines[-1] + b"]"
json_str = b",".join(lines).decode("utf8")
try:
infos = json.loads(json_str)
except json.decoder.JSONDecodeError:
if not k:
lines = lines[:-1]
branges = branges[:-1]
else:
logger.warning(
"Could not read json in %s:\n%s", self._fp, json_str
)
raise
else:
break
# metadata
if len(infos) != len(branges):
raise RuntimeError("info and ranges are no more aligned")
for info, brange in zip(infos, branges):
key = info.pop("#key")
dinfo = DumpInfo(jsonl=self._fp, byte_range=brange, **meta, content=info)
out[key] = dinfo
self._last = branges[-1][-1]
return out
8 changes: 4 additions & 4 deletions exca/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ def test_map_infra_cache_dict_calls(tmp_path: Path) -> None:
whatever = Whatever(infra={"folder": tmp_path, "cluster": "local"}) # type: ignore
cd = whatever.infra.cache_dict
_ = list(whatever.process([1, 2, 3, 4]))
assert cd._jsonl_readings == 3
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
whatever = Whatever(infra={"folder": tmp_path, "cluster": "local"}) # type: ignore
cd = whatever.infra.cache_dict
_ = list(whatever.process([1]))
assert cd._jsonl_readings == 1
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
_ = list(whatever.process([2, 3, 4]))
assert cd._jsonl_readings == 1
assert max(r.readings for r in cd._jsonl_readers.values()) == 1
_ = list(whatever.process([5]))
assert cd._jsonl_readings == 4
assert max(r.readings for r in cd._jsonl_readers.values()) == 2


def test_missing_yield() -> None:
Expand Down
Loading