diff --git a/CHANGELOG.md b/CHANGELOG.md index a09d682..39e50aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file. This project follows [Keep a Changelog](https://keepachangelog.com/). +## [0.5.2] - 2026-04-17 (Python only — interim fix) + +This is a tactical patch. The proper architectural fix — versioned optimistic concurrency via the existing `version` field, symmetric across Python and TypeScript SDKs — is tracked in [#26](https://github.com/NimbleBrainInc/upjack/issues/26) and will ship as 0.6.0. + +### Fixed +- `update_entity` and `delete_entity` now serialize concurrent access via an advisory `flock` on a sidecar `.lock` file. Previously, two tool calls targeting the same entity in parallel (e.g. an agent invoking `update_deal` and `move_deal_stage` on the same deal) would each read the same pre-state, compute their update, and write sequentially — silently clobbering the other's fields. Observed in production as tool responses returning wrong `previous_stage` values. The final on-disk state was usually consistent (last writer wins), but the intermediate responses lied. + +### Added +- `EntityLockTimeout` (exported from `upjack.entity`) — raised if a lock cannot be acquired within 30 seconds. Guards against a stuck-but-alive writer wedging the whole tool server. +- Lock is reentrant on the same thread (thread-local tracking of held paths) so future callers that nest `update_entity` from within another locked section don't deadlock. + +### Known limitations +- **Windows**: `fcntl` is unavailable, so the lock is a no-op there. Concurrent updates remain unsafe on Windows, but no worse than 0.5.1. +- **TypeScript SDK**: unchanged at 0.5.1 — the analogous race is latent (Node's sync I/O accidentally serializes today) but not fixed by design. Resolved together with Python in 0.6.0 via the versioned-CAS design in [#26](https://github.com/NimbleBrainInc/upjack/issues/26). +- **Cross-machine / networked filesystems**: flock semantics on NFS and similar are implementation-defined. 0.6.0's CAS approach does not have this caveat. + ## [0.5.1] - 2026-04-16 Applies to both the Python and TypeScript libraries. The tool contract is now identical across both SDKs. diff --git a/lib/python/pyproject.toml b/lib/python/pyproject.toml index b75e154..12af995 100644 --- a/lib/python/pyproject.toml +++ b/lib/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "upjack" -version = "0.5.1" +version = "0.5.2" description = "Schema-driven entity management for AI-native applications" readme = "README.md" license = {text = "Apache-2.0"} diff --git a/lib/python/src/upjack/__init__.py b/lib/python/src/upjack/__init__.py index dfe2b97..f4cb49f 100644 --- a/lib/python/src/upjack/__init__.py +++ b/lib/python/src/upjack/__init__.py @@ -1,6 +1,6 @@ """NimbleBrain Upjack — schema-driven entity management for AI-native applications.""" -__version__ = "0.5.1" +__version__ = "0.5.2" from upjack.activity import ACTIVITY_ENTITY_DEF, get_activity_schema from upjack.app import UpjackApp diff --git a/lib/python/src/upjack/entity.py b/lib/python/src/upjack/entity.py index d3cbeca..4b7ecb5 100644 --- a/lib/python/src/upjack/entity.py +++ b/lib/python/src/upjack/entity.py @@ -1,7 +1,11 @@ """Entity CRUD operations for upjack apps.""" import json -from collections.abc import Callable +import logging +import threading +import time +from collections.abc import Callable, Iterator +from contextlib import contextmanager from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path @@ -11,6 +15,113 @@ from upjack.paths import entity_dir, entity_path from upjack.schema import hydrate_defaults, validate_entity +logger = logging.getLogger(__name__) + +try: + import fcntl + + _HAS_FCNTL = True +except ImportError: # pragma: no cover — Windows fallback + _HAS_FCNTL = False + logger.warning( + "fcntl is unavailable on this platform; upjack entity updates will " + "not be serialized across concurrent writers. On Unix this is " + "handled by an fcntl.flock on a sidecar .lock file." + ) + +# Hard ceiling on how long an entity update may wait for another +# holder to release. Typical lock hold is single-digit milliseconds +# (read → compute → write on a small JSON file). Anything approaching +# this bound indicates a stuck-but-alive writer, and failing loudly +# is better than wedging the whole tool server forever. +_LOCK_TIMEOUT_SECONDS = 30.0 +_LOCK_POLL_INTERVAL = 0.02 + +# Thread-local tracking of which entity paths this thread already +# holds. Guards against deadlock if a caller ever re-enters update_entity +# on the same entity from within another lock — the nested call skips +# the acquire since we already own it. +_held_paths = threading.local() + + +class EntityLockTimeout(RuntimeError): + """Raised when an entity lock cannot be acquired within the timeout.""" + + +def _already_held(path: Path) -> bool: + held: set[str] = getattr(_held_paths, "paths", set()) + return str(path) in held + + +def _mark_held(path: Path) -> None: + held: set[str] = getattr(_held_paths, "paths", None) or set() + held.add(str(path)) + _held_paths.paths = held + + +def _unmark_held(path: Path) -> None: + held: set[str] = getattr(_held_paths, "paths", set()) + held.discard(str(path)) + + +@contextmanager +def _entity_lock(path: Path, timeout: float = _LOCK_TIMEOUT_SECONDS) -> Iterator[None]: + """Serialize read-modify-write operations on a single entity. + + Concurrent tool calls (e.g. an agent invoking ``update_deal`` and + ``move_deal_stage`` on the same deal in parallel) would otherwise + read the same pre-state, each compute a partial update, and write + sequentially — silently clobbering one another's changes. The + on-disk state usually ends up consistent (last writer wins) but the + **intermediate tool responses lie**: each call returns the state it + wrote, unaware of the other's overlap. + + Acquires an exclusive advisory flock on a sidecar ``.lock`` file + next to the entity's JSON file. The lock file persists — cleaning + it up would race the lock itself. + + Semantics: + + - Blocks up to ``timeout`` seconds; raises :class:`EntityLockTimeout` + after that. A stuck-but-alive holder must not wedge the server + indefinitely. + - If the holder process dies (crash, SIGKILL, OOM), the OS releases + the flock automatically — waiters unblock. + - Re-entrant on the same thread: nested calls on the same entity + pass through without re-acquiring, avoiding self-deadlock. + - On Windows (no fcntl) this is a no-op; concurrent updates remain + unsafe, but no worse than before this fix. + """ + if not _HAS_FCNTL or _already_held(path): + yield + return + + lock_path = path.with_suffix(path.suffix + ".lock") + lock_path.parent.mkdir(parents=True, exist_ok=True) + + deadline = time.monotonic() + timeout + with open(lock_path, "w") as lock_f: + while True: + try: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + break + except BlockingIOError: + if time.monotonic() >= deadline: + raise EntityLockTimeout( + f"Could not acquire entity lock for {path} within " + f"{timeout:.1f}s — another writer appears stuck." + ) from None + time.sleep(_LOCK_POLL_INTERVAL) + + _mark_held(path) + try: + yield + finally: + _unmark_held(path) + # flock is also released on FD close (the `with open` exit), + # so this is belt-and-suspenders — harmless if the OS has + # already dropped it. + @dataclass class Entity: @@ -185,31 +296,34 @@ def update_entity( if not path.exists(): raise FileNotFoundError(f"Entity not found: {entity_id}") - existing = json.loads(path.read_text()) - old_relationships = existing.get("relationships", []) + # Hold the lock across read+compute+write so concurrent updates on + # the same entity don't race. See _entity_lock() for full context. + with _entity_lock(path): + existing = json.loads(path.read_text()) + old_relationships = existing.get("relationships", []) - # Hydrate defaults before merge so old entities missing new fields - # get filled in — prevents validation failures on schema evolution. - if schema is not None: - existing = hydrate_defaults(existing, schema) + # Hydrate defaults before merge so old entities missing new fields + # get filled in — prevents validation failures on schema evolution. + if schema is not None: + existing = hydrate_defaults(existing, schema) - # Strip immutable fields — these cannot be changed after creation - immutable = {"id", "type", "version", "created_at", "created_by"} - safe_data = {k: v for k, v in data.items() if k not in immutable} + # Strip immutable fields — these cannot be changed after creation + immutable = {"id", "type", "version", "created_at", "created_by"} + safe_data = {k: v for k, v in data.items() if k not in immutable} - if merge: - existing.update(safe_data) - else: - preserved_keys = {"id", "type", "version", "created_at", "created_by"} - preserved = {k: existing[k] for k in preserved_keys if k in existing} - existing = {**preserved, **safe_data} + if merge: + existing.update(safe_data) + else: + preserved_keys = {"id", "type", "version", "created_at", "created_by"} + preserved = {k: existing[k] for k in preserved_keys if k in existing} + existing = {**preserved, **safe_data} - existing["updated_at"] = _now_iso() + existing["updated_at"] = _now_iso() - if schema is not None: - validate_entity(existing, schema) + if schema is not None: + validate_entity(existing, schema) - path.write_text(json.dumps(existing, indent=2) + "\n") + path.write_text(json.dumps(existing, indent=2) + "\n") if on_relationships_changed is not None: new_relationships = existing.get("relationships", []) @@ -323,15 +437,19 @@ def delete_entity( if not path.exists(): raise FileNotFoundError(f"Entity not found: {entity_id}") - entity = json.loads(path.read_text()) + # Same lock as update_entity — a concurrent update + delete on the + # same entity would otherwise race. + with _entity_lock(path): + entity = json.loads(path.read_text()) - if hard: - path.unlink() - if on_relationships_changed is not None and entity.get("relationships"): - on_relationships_changed(entity_id, entity["relationships"], []) - else: - entity["status"] = "deleted" - entity["updated_at"] = _now_iso() - path.write_text(json.dumps(entity, indent=2) + "\n") + if hard: + path.unlink() + else: + entity["status"] = "deleted" + entity["updated_at"] = _now_iso() + path.write_text(json.dumps(entity, indent=2) + "\n") + + if hard and on_relationships_changed is not None and entity.get("relationships"): + on_relationships_changed(entity_id, entity["relationships"], []) return entity diff --git a/lib/python/tests/test_entity.py b/lib/python/tests/test_entity.py index ec43d61..0f942b1 100644 --- a/lib/python/tests/test_entity.py +++ b/lib/python/tests/test_entity.py @@ -7,6 +7,8 @@ from upjack.entity import ( Entity, + EntityLockTimeout, + _entity_lock, create_entity, delete_entity, get_entity, @@ -1111,3 +1113,148 @@ def test_soft_delete_no_callback(self, tmp_workspace): on_relationships_changed=lambda eid, old, new: calls.append((eid, old, new)), ) assert len(calls) == 0 + + +class TestConcurrentUpdates: + """Regression: parallel updates to the same entity must serialize. + + Observed in production when an agent parallelized two tool calls on + the same deal (``move_deal_stage`` + ``update_deal``). Both read the + same pre-state, each wrote a partial update, and the second writer + silently clobbered the first's fields — the on-disk state was + self-consistent but the reported tool responses lied about the prior + state. The fix serializes read-modify-write via a sidecar flock. + """ + + def test_parallel_updates_do_not_clobber_distinct_fields(self, tmp_workspace): + """Two threads update different fields on the same entity. Both + updates must land: the final state shows both writer's changes. + Without the lock, one thread's read-before-write sees the + pre-state, computes its update, and writes — overwriting the + other thread's committed change. + """ + import threading + + created = create_entity( + root=tmp_workspace, + namespace=NAMESPACE, + entity_type="deal", + plural="deals", + prefix="dl", + data={"title": "Race", "stage": "proposal", "value": 10000}, + ) + deal_id = created["id"] + + barrier = threading.Barrier(2) + + def update_stage(): + barrier.wait() + update_entity( + root=tmp_workspace, + namespace=NAMESPACE, + plural="deals", + entity_id=deal_id, + data={"stage": "negotiation"}, + ) + + def update_value(): + barrier.wait() + update_entity( + root=tmp_workspace, + namespace=NAMESPACE, + plural="deals", + entity_id=deal_id, + data={"value": 15000}, + ) + + # Run both updates concurrently. The barrier releases them at + # the same instant to maximize the race window. + for _ in range(20): # amplify — single runs might not race + # Reset to known state each iteration + update_entity( + root=tmp_workspace, + namespace=NAMESPACE, + plural="deals", + entity_id=deal_id, + data={"stage": "proposal", "value": 10000}, + ) + t1 = threading.Thread(target=update_stage) + t2 = threading.Thread(target=update_value) + barrier.reset() + t1.start() + t2.start() + t1.join() + t2.join() + + final = get_entity( + root=tmp_workspace, + namespace=NAMESPACE, + plural="deals", + entity_id=deal_id, + ) + # Both updates must be present in the final state — whichever + # order they ran, neither should have been silently lost. + assert final["stage"] == "negotiation", f"stage write lost: {final}" + assert final["value"] == 15000, f"value write lost: {final}" + + def test_lock_times_out_if_never_released(self, tmp_workspace): + """A stuck holder must not wedge the server indefinitely. The + lock raises EntityLockTimeout after the bound, so the tool call + fails cleanly instead of hanging forever. + """ + import threading + + created = create_entity( + root=tmp_workspace, + namespace=NAMESPACE, + entity_type="deal", + plural="deals", + prefix="dl", + data={"title": "Stuck"}, + ) + path = tmp_workspace / NAMESPACE / "data" / "deals" / f"{created['id']}.json" + + holder_entered = threading.Event() + holder_release = threading.Event() + + def hold_forever(): + with _entity_lock(path): + holder_entered.set() + holder_release.wait(timeout=5.0) + + holder = threading.Thread(target=hold_forever) + holder.start() + holder_entered.wait(timeout=2.0) + + try: + # Aggressive short timeout so the test is fast. In prod + # we use 30s; the constant is overridable via the function + # signature. + with pytest.raises(EntityLockTimeout): + with _entity_lock(path, timeout=0.2): + pass + finally: + holder_release.set() + holder.join(timeout=5.0) + + def test_lock_is_reentrant_on_same_thread(self, tmp_workspace): + """Re-entering the same entity's lock on the same thread must + not deadlock. Without the thread-local guard, a caller that + nests update_entity inside another lock would hang forever. + """ + created = create_entity( + root=tmp_workspace, + namespace=NAMESPACE, + entity_type="deal", + plural="deals", + prefix="dl", + data={"title": "Reentrant"}, + ) + path = tmp_workspace / NAMESPACE / "data" / "deals" / f"{created['id']}.json" + + # Nested — outer lock held, inner lock acquired for the same + # entity. Must return immediately without blocking. + with _entity_lock(path): + with _entity_lock(path, timeout=0.5): + # If this deadlocked, the pytest-level timeout would fire. + pass diff --git a/lib/python/uv.lock b/lib/python/uv.lock index 76d060c..199c082 100644 --- a/lib/python/uv.lock +++ b/lib/python/uv.lock @@ -1179,7 +1179,7 @@ wheels = [ [[package]] name = "upjack" -version = "0.5.1" +version = "0.5.2" source = { editable = "." } dependencies = [ { name = "jsonschema" },