diff --git a/libs/openant-core/core/analyzer.py b/libs/openant-core/core/analyzer.py index 7fb5966..8fa9a68 100644 --- a/libs/openant-core/core/analyzer.py +++ b/libs/openant-core/core/analyzer.py @@ -24,6 +24,7 @@ from core import tracking from core.checkpoint import StepCheckpoint from core.progress import ProgressReporter +from utilities.atomic_io import atomic_write_json # Import existing analysis machinery from utilities.llm_client import AnthropicClient, get_global_tracker @@ -513,8 +514,7 @@ def _summary_callback(finding, usage=None): "code_by_route": code_by_route, } - with open(results_path, "w") as f: - json.dump(experiment_result, f, indent=2) + atomic_write_json(results_path, experiment_result) print(f"\n[Analyze] Results written to {results_path}", file=sys.stderr) diff --git a/libs/openant-core/core/enhancer.py b/libs/openant-core/core/enhancer.py index fef1453..80e8760 100644 --- a/libs/openant-core/core/enhancer.py +++ b/libs/openant-core/core/enhancer.py @@ -16,6 +16,7 @@ from core.schemas import EnhanceResult, UsageInfo from core import tracking from core.progress import ProgressReporter +from utilities.atomic_io import atomic_write_json from utilities.rate_limiter import configure_rate_limiter @@ -138,8 +139,7 @@ def _on_restored(count: int): # Write enhanced dataset os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: - json.dump(enhanced, f, indent=2) + atomic_write_json(output_path, enhanced) print(f"[Enhance] Enhanced dataset: {output_path}", file=sys.stderr) print(f"[Enhance] Classifications: {classifications}", file=sys.stderr) diff --git a/libs/openant-core/core/verifier.py b/libs/openant-core/core/verifier.py index fa7a43f..edfa7dc 100644 --- a/libs/openant-core/core/verifier.py +++ b/libs/openant-core/core/verifier.py @@ -19,6 +19,7 @@ from core.checkpoint import StepCheckpoint from core.progress import ProgressReporter +from utilities.atomic_io import atomic_write_json from utilities.llm_client import TokenTracker, get_global_tracker from utilities.finding_verifier import FindingVerifier from utilities.agentic_enhancer.repository_index import load_index_from_file @@ -268,8 +269,7 @@ def _write_verified_results( output["metrics"] = {"total": len(merged_results), **counts} - with open(path, "w") as f: - json.dump(output, f, indent=2, ensure_ascii=False) + atomic_write_json(path, output, ensure_ascii=False) def _build_code_by_route(results: list) -> dict: diff --git a/libs/openant-core/tests/test_atomic_io.py b/libs/openant-core/tests/test_atomic_io.py new file mode 100644 index 0000000..ed83e28 --- /dev/null +++ b/libs/openant-core/tests/test_atomic_io.py @@ -0,0 +1,184 @@ +"""Tests for ``utilities.atomic_io.atomic_write_json``. + +These exercise the three properties that justify the helper's existence: + +1. Round-trip: a written dict reads back identically. +2. Atomicity: a mid-write failure leaves the *previous* file untouched — + no truncated/empty target on disk. +3. Same-directory temp file: required for ``os.replace`` to be atomic on + Windows (cross-volume rename falls back to copy+delete and loses the + atomicity guarantee). +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from utilities.atomic_io import atomic_write_json + + +def test_roundtrip_writes_and_reads(tmp_path: Path): + """Writing a dict and reading it back yields the same content.""" + target = tmp_path / "results.json" + data = { + "dataset": "geospatial_vuln12", + "metrics": {"total": 3, "vulnerable": 1}, + "results": [{"id": "u1", "verdict": "VULNERABLE"}], + } + + atomic_write_json(str(target), data) + + assert target.exists() + with open(target, encoding="utf-8") as f: + assert json.load(f) == data + + +def test_unicode_roundtrip_with_ensure_ascii_false(tmp_path: Path): + """ensure_ascii=False (used by verifier) preserves non-ASCII chars.""" + target = tmp_path / "results_verified.json" + data = {"note": "résumé — naïve café ☃"} + + atomic_write_json(str(target), data, ensure_ascii=False) + + text = target.read_text(encoding="utf-8") + # Raw UTF-8, not \u-escaped: + assert "résumé" in text + assert json.loads(text) == data + + +def test_failure_mid_write_preserves_existing_file(tmp_path: Path, monkeypatch): + """If json.dump raises mid-write, the previous target file is intact.""" + target = tmp_path / "results.json" + original = {"version": 1, "stable": True} + atomic_write_json(str(target), original) + + # Sanity check: original content is on disk before the failed write. + assert json.loads(target.read_text(encoding="utf-8")) == original + + # Force json.dump to blow up partway through. Because atomic_write_json + # writes to a temp file in the same directory and only os.replaces on + # success, the existing target must be untouched. + real_dump = json.dump + + def exploding_dump(*args, **kwargs): + # Write a few bytes first to prove that *temp* file partial writes + # don't reach the target — then raise. + f = args[1] + f.write('{"corru') + raise RuntimeError("simulated mid-write crash") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError, match="simulated mid-write crash"): + atomic_write_json(str(target), {"version": 2, "broken": True}) + + # Restore so subsequent assertions/teardown aren't affected. + monkeypatch.setattr("utilities.atomic_io.json.dump", real_dump) + + # Target is unchanged: still the original content, still valid JSON. + assert target.exists() + assert json.loads(target.read_text(encoding="utf-8")) == original + + +def test_failure_cleans_up_temp_file(tmp_path: Path, monkeypatch): + """Failed writes must not leave stray ``.tmp-`` files behind.""" + target = tmp_path / "results.json" + + def exploding_dump(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError): + atomic_write_json(str(target), {"x": 1}) + + leftovers = [p.name for p in tmp_path.iterdir()] + assert leftovers == [], f"expected empty dir, found: {leftovers}" + + +def test_failure_with_no_existing_target_does_not_create_target( + tmp_path: Path, monkeypatch, +): + """If no target exists and the write fails, target must not appear.""" + target = tmp_path / "results.json" + assert not target.exists() + + def exploding_dump(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError): + atomic_write_json(str(target), {"x": 1}) + + assert not target.exists() + + +def test_temp_file_is_in_same_directory_as_target(tmp_path: Path, monkeypatch): + """Temp file must live in the same directory as the target. + + Cross-volume renames are not atomic on Windows; ``os.replace`` falls back + to copy+delete, so this is load-bearing for the atomicity guarantee. + """ + target_dir = tmp_path / "outputs" + target_dir.mkdir() + target = target_dir / "results.json" + + captured: dict[str, str] = {} + real_mkstemp = __import__("tempfile").mkstemp + + def spying_mkstemp(*args, **kwargs): + fd, path = real_mkstemp(*args, **kwargs) + captured["path"] = path + captured["dir_kwarg"] = kwargs.get("dir", "") + return fd, path + + monkeypatch.setattr("utilities.atomic_io.tempfile.mkstemp", spying_mkstemp) + + atomic_write_json(str(target), {"ok": True}) + + assert "path" in captured, "tempfile.mkstemp was not called" + tmp_dir = os.path.dirname(captured["path"]) + expected_dir = os.path.abspath(str(target_dir)) + assert os.path.abspath(tmp_dir) == expected_dir, ( + f"temp file in {tmp_dir!r}, expected {expected_dir!r}" + ) + # And the helper passed the same dir explicitly to mkstemp. + assert os.path.abspath(captured["dir_kwarg"]) == expected_dir + + +def test_overwrites_existing_file(tmp_path: Path): + """Successful second write replaces the target's content.""" + target = tmp_path / "results.json" + atomic_write_json(str(target), {"version": 1}) + atomic_write_json(str(target), {"version": 2, "extra": [1, 2, 3]}) + + assert json.loads(target.read_text(encoding="utf-8")) == { + "version": 2, "extra": [1, 2, 3], + } + + +@pytest.mark.skipif(os.name != "posix", reason="POSIX permission semantics") +def test_posix_permissions_match_umask(tmp_path: Path): + """The written file honours the process umask, not mkstemp's 0600 default. + + A regression here is silent: pipeline outputs that other users / tools + were previously able to read (under a typical 0022 umask -> 0644 file) + would suddenly become owner-only. We don't want atomic writes to tighten + permissions as a side effect. + """ + target = tmp_path / "results.json" + + old_umask = os.umask(0o022) + try: + atomic_write_json(str(target), {"ok": True}) + finally: + os.umask(old_umask) + + mode = os.stat(target).st_mode & 0o777 + # 0666 & ~0022 == 0644 + assert mode == 0o644, f"expected 0644, got {oct(mode)}" diff --git a/libs/openant-core/utilities/atomic_io.py b/libs/openant-core/utilities/atomic_io.py new file mode 100644 index 0000000..5b4a946 --- /dev/null +++ b/libs/openant-core/utilities/atomic_io.py @@ -0,0 +1,84 @@ +"""Atomic JSON file writes. + +Pipeline output files (``results.json``, ``enhanced_dataset.json``, +``results_verified.json``) represent the final artefact of an expensive +multi-stage LLM pipeline. A crash, power loss, or interrupted process during +``json.dump`` can leave the target file truncated or corrupt, destroying the +output of a long-running run that may have cost real money. + +``atomic_write_json`` writes the JSON to a temporary file in the **same +directory** as the target, ``fsync``s it, then ``os.replace``s it onto the +target path. ``os.replace`` is atomic on POSIX and on Windows (when both paths +sit on the same volume), so concurrent readers either see the previous file +intact or the fully-written new file — never a partial write. + +The same-directory requirement matters: cross-device renames fall back to +copy+delete on most platforms (and fail outright on Windows), losing +atomicity. Callers should never pass a temp dir on a different volume. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from typing import Any + + +def atomic_write_json(path: str, data: Any, *, indent: int | None = 2, + ensure_ascii: bool = True) -> None: + """Atomically write ``data`` as JSON to ``path``. + + Writes to a temporary file in the same directory as ``path``, fsyncs, + then ``os.replace``s it onto the target. If any step fails the temp + file is removed and ``path`` is left untouched. + + Args: + path: Destination path. Parent directory must already exist. + data: JSON-serialisable object. + indent: Indentation passed through to ``json.dump`` (default 2). + ensure_ascii: Passed through to ``json.dump`` (default True). + """ + directory = os.path.dirname(os.path.abspath(path)) or "." + + # delete=False so we can close the handle and rename it; we clean up + # manually on error. Same-dir is required for atomic os.replace. + fd, tmp_path = tempfile.mkstemp( + prefix=".tmp-" + os.path.basename(path) + "-", + suffix=".json", + dir=directory, + ) + try: + # tempfile.mkstemp creates files with mode 0600 (owner-only). A plain + # open(path, "w") honours the process umask (typically yielding 0644). + # Restore that behaviour so atomic writes don't silently tighten + # permissions on pipeline outputs that downstream tools/users read. + # On Windows os.chmod's effect is limited to the readonly bit, so + # this is effectively a no-op there. + if os.name == "posix": + try: + umask = os.umask(0) + os.umask(umask) + os.chmod(tmp_path, 0o666 & ~umask) + except OSError: + pass + + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii) + f.flush() + try: + os.fsync(f.fileno()) + except OSError: + # fsync can fail on some filesystems (e.g. certain network + # mounts). The replace below is still atomic at the VFS + # layer; durability across power loss is best-effort. + pass + os.replace(tmp_path, path) + except BaseException: + # Clean up the temp file on any failure (including KeyboardInterrupt + # mid-write) so we don't leave stray .tmp- files in the output dir. + try: + os.unlink(tmp_path) + except OSError: + pass + raise