Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/openant-core/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from core import tracking
from core.checkpoint import StepCheckpoint
from core.progress import ProgressReporter
from utilities.atomic_io import atomic_write_json

# Import existing analysis machinery
from utilities.llm_client import AnthropicClient, get_global_tracker
Expand Down Expand Up @@ -513,8 +514,7 @@ def _summary_callback(finding, usage=None):
"code_by_route": code_by_route,
}

with open(results_path, "w") as f:
json.dump(experiment_result, f, indent=2)
atomic_write_json(results_path, experiment_result)

print(f"\n[Analyze] Results written to {results_path}", file=sys.stderr)

Expand Down
4 changes: 2 additions & 2 deletions libs/openant-core/core/enhancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from core.schemas import EnhanceResult, UsageInfo
from core import tracking
from core.progress import ProgressReporter
from utilities.atomic_io import atomic_write_json
from utilities.rate_limiter import configure_rate_limiter


Expand Down Expand Up @@ -138,8 +139,7 @@ def _on_restored(count: int):

# Write enhanced dataset
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
with open(output_path, "w") as f:
json.dump(enhanced, f, indent=2)
atomic_write_json(output_path, enhanced)

print(f"[Enhance] Enhanced dataset: {output_path}", file=sys.stderr)
print(f"[Enhance] Classifications: {classifications}", file=sys.stderr)
Expand Down
4 changes: 2 additions & 2 deletions libs/openant-core/core/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from core.checkpoint import StepCheckpoint
from core.progress import ProgressReporter

from utilities.atomic_io import atomic_write_json
from utilities.llm_client import TokenTracker, get_global_tracker
from utilities.finding_verifier import FindingVerifier
from utilities.agentic_enhancer.repository_index import load_index_from_file
Expand Down Expand Up @@ -268,8 +269,7 @@ def _write_verified_results(

output["metrics"] = {"total": len(merged_results), **counts}

with open(path, "w") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
atomic_write_json(path, output, ensure_ascii=False)


def _build_code_by_route(results: list) -> dict:
Expand Down
184 changes: 184 additions & 0 deletions libs/openant-core/tests/test_atomic_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Tests for ``utilities.atomic_io.atomic_write_json``.

These exercise the three properties that justify the helper's existence:

1. Round-trip: a written dict reads back identically.
2. Atomicity: a mid-write failure leaves the *previous* file untouched —
no truncated/empty target on disk.
3. Same-directory temp file: required for ``os.replace`` to be atomic on
Windows (cross-volume rename falls back to copy+delete and loses the
atomicity guarantee).
"""

from __future__ import annotations

import json
import os
from pathlib import Path

import pytest

from utilities.atomic_io import atomic_write_json


def test_roundtrip_writes_and_reads(tmp_path: Path):
"""Writing a dict and reading it back yields the same content."""
target = tmp_path / "results.json"
data = {
"dataset": "geospatial_vuln12",
"metrics": {"total": 3, "vulnerable": 1},
"results": [{"id": "u1", "verdict": "VULNERABLE"}],
}

atomic_write_json(str(target), data)

assert target.exists()
with open(target, encoding="utf-8") as f:
assert json.load(f) == data


def test_unicode_roundtrip_with_ensure_ascii_false(tmp_path: Path):
"""ensure_ascii=False (used by verifier) preserves non-ASCII chars."""
target = tmp_path / "results_verified.json"
data = {"note": "résumé — naïve café ☃"}

atomic_write_json(str(target), data, ensure_ascii=False)

text = target.read_text(encoding="utf-8")
# Raw UTF-8, not \u-escaped:
assert "résumé" in text
assert json.loads(text) == data


def test_failure_mid_write_preserves_existing_file(tmp_path: Path, monkeypatch):
"""If json.dump raises mid-write, the previous target file is intact."""
target = tmp_path / "results.json"
original = {"version": 1, "stable": True}
atomic_write_json(str(target), original)

# Sanity check: original content is on disk before the failed write.
assert json.loads(target.read_text(encoding="utf-8")) == original

# Force json.dump to blow up partway through. Because atomic_write_json
# writes to a temp file in the same directory and only os.replaces on
# success, the existing target must be untouched.
real_dump = json.dump

def exploding_dump(*args, **kwargs):
# Write a few bytes first to prove that *temp* file partial writes
# don't reach the target — then raise.
f = args[1]
f.write('{"corru')
raise RuntimeError("simulated mid-write crash")

monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump)

with pytest.raises(RuntimeError, match="simulated mid-write crash"):
atomic_write_json(str(target), {"version": 2, "broken": True})

# Restore so subsequent assertions/teardown aren't affected.
monkeypatch.setattr("utilities.atomic_io.json.dump", real_dump)

# Target is unchanged: still the original content, still valid JSON.
assert target.exists()
assert json.loads(target.read_text(encoding="utf-8")) == original


def test_failure_cleans_up_temp_file(tmp_path: Path, monkeypatch):
"""Failed writes must not leave stray ``.tmp-`` files behind."""
target = tmp_path / "results.json"

def exploding_dump(*args, **kwargs):
raise RuntimeError("boom")

monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump)

with pytest.raises(RuntimeError):
atomic_write_json(str(target), {"x": 1})

leftovers = [p.name for p in tmp_path.iterdir()]
assert leftovers == [], f"expected empty dir, found: {leftovers}"


def test_failure_with_no_existing_target_does_not_create_target(
tmp_path: Path, monkeypatch,
):
"""If no target exists and the write fails, target must not appear."""
target = tmp_path / "results.json"
assert not target.exists()

def exploding_dump(*args, **kwargs):
raise RuntimeError("boom")

monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump)

with pytest.raises(RuntimeError):
atomic_write_json(str(target), {"x": 1})

assert not target.exists()


def test_temp_file_is_in_same_directory_as_target(tmp_path: Path, monkeypatch):
"""Temp file must live in the same directory as the target.

Cross-volume renames are not atomic on Windows; ``os.replace`` falls back
to copy+delete, so this is load-bearing for the atomicity guarantee.
"""
target_dir = tmp_path / "outputs"
target_dir.mkdir()
target = target_dir / "results.json"

captured: dict[str, str] = {}
real_mkstemp = __import__("tempfile").mkstemp

def spying_mkstemp(*args, **kwargs):
fd, path = real_mkstemp(*args, **kwargs)
captured["path"] = path
captured["dir_kwarg"] = kwargs.get("dir", "")
return fd, path

monkeypatch.setattr("utilities.atomic_io.tempfile.mkstemp", spying_mkstemp)

atomic_write_json(str(target), {"ok": True})

assert "path" in captured, "tempfile.mkstemp was not called"
tmp_dir = os.path.dirname(captured["path"])
expected_dir = os.path.abspath(str(target_dir))
assert os.path.abspath(tmp_dir) == expected_dir, (
f"temp file in {tmp_dir!r}, expected {expected_dir!r}"
)
# And the helper passed the same dir explicitly to mkstemp.
assert os.path.abspath(captured["dir_kwarg"]) == expected_dir


def test_overwrites_existing_file(tmp_path: Path):
"""Successful second write replaces the target's content."""
target = tmp_path / "results.json"
atomic_write_json(str(target), {"version": 1})
atomic_write_json(str(target), {"version": 2, "extra": [1, 2, 3]})

assert json.loads(target.read_text(encoding="utf-8")) == {
"version": 2, "extra": [1, 2, 3],
}


@pytest.mark.skipif(os.name != "posix", reason="POSIX permission semantics")
def test_posix_permissions_match_umask(tmp_path: Path):
"""The written file honours the process umask, not mkstemp's 0600 default.

A regression here is silent: pipeline outputs that other users / tools
were previously able to read (under a typical 0022 umask -> 0644 file)
would suddenly become owner-only. We don't want atomic writes to tighten
permissions as a side effect.
"""
target = tmp_path / "results.json"

old_umask = os.umask(0o022)
try:
atomic_write_json(str(target), {"ok": True})
finally:
os.umask(old_umask)

mode = os.stat(target).st_mode & 0o777
# 0666 & ~0022 == 0644
assert mode == 0o644, f"expected 0644, got {oct(mode)}"
84 changes: 84 additions & 0 deletions libs/openant-core/utilities/atomic_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Atomic JSON file writes.

Pipeline output files (``results.json``, ``enhanced_dataset.json``,
``results_verified.json``) represent the final artefact of an expensive
multi-stage LLM pipeline. A crash, power loss, or interrupted process during
``json.dump`` can leave the target file truncated or corrupt, destroying the
output of a long-running run that may have cost real money.

``atomic_write_json`` writes the JSON to a temporary file in the **same
directory** as the target, ``fsync``s it, then ``os.replace``s it onto the
target path. ``os.replace`` is atomic on POSIX and on Windows (when both paths
sit on the same volume), so concurrent readers either see the previous file
intact or the fully-written new file — never a partial write.

The same-directory requirement matters: cross-device renames fall back to
copy+delete on most platforms (and fail outright on Windows), losing
atomicity. Callers should never pass a temp dir on a different volume.
"""

from __future__ import annotations

import json
import os
import tempfile
from typing import Any


def atomic_write_json(path: str, data: Any, *, indent: int | None = 2,
ensure_ascii: bool = True) -> None:
"""Atomically write ``data`` as JSON to ``path``.

Writes to a temporary file in the same directory as ``path``, fsyncs,
then ``os.replace``s it onto the target. If any step fails the temp
file is removed and ``path`` is left untouched.

Args:
path: Destination path. Parent directory must already exist.
data: JSON-serialisable object.
indent: Indentation passed through to ``json.dump`` (default 2).
ensure_ascii: Passed through to ``json.dump`` (default True).
"""
directory = os.path.dirname(os.path.abspath(path)) or "."

# delete=False so we can close the handle and rename it; we clean up
# manually on error. Same-dir is required for atomic os.replace.
fd, tmp_path = tempfile.mkstemp(
prefix=".tmp-" + os.path.basename(path) + "-",
suffix=".json",
dir=directory,
)
try:
# tempfile.mkstemp creates files with mode 0600 (owner-only). A plain
# open(path, "w") honours the process umask (typically yielding 0644).
# Restore that behaviour so atomic writes don't silently tighten
# permissions on pipeline outputs that downstream tools/users read.
# On Windows os.chmod's effect is limited to the readonly bit, so
# this is effectively a no-op there.
if os.name == "posix":
try:
umask = os.umask(0)
os.umask(umask)
os.chmod(tmp_path, 0o666 & ~umask)
except OSError:
pass

with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)
f.flush()
try:
os.fsync(f.fileno())
except OSError:
# fsync can fail on some filesystems (e.g. certain network
# mounts). The replace below is still atomic at the VFS
# layer; durability across power loss is best-effort.
pass
os.replace(tmp_path, path)
except BaseException:
# Clean up the temp file on any failure (including KeyboardInterrupt
# mid-write) so we don't leave stray .tmp- files in the output dir.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
Loading