Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file.

This project follows [Keep a Changelog](https://keepachangelog.com/).

## [0.5.2] - 2026-04-17 (Python only — interim fix)

This is a tactical patch. The proper architectural fix — versioned optimistic concurrency via the existing `version` field, symmetric across Python and TypeScript SDKs — is tracked in [#26](https://github.com/NimbleBrainInc/upjack/issues/26) and will ship as 0.6.0.

### Fixed
- `update_entity` and `delete_entity` now serialize concurrent access via an advisory `flock` on a sidecar `.lock` file. Previously, two tool calls targeting the same entity in parallel (e.g. an agent invoking `update_deal` and `move_deal_stage` on the same deal) would each read the same pre-state, compute their update, and write sequentially — silently clobbering the other's fields. Observed in production as tool responses returning wrong `previous_stage` values. The final on-disk state was usually consistent (last writer wins), but the intermediate responses lied.

### Added
- `EntityLockTimeout` (exported from `upjack.entity`) — raised if a lock cannot be acquired within 30 seconds. Guards against a stuck-but-alive writer wedging the whole tool server.
- Lock is reentrant on the same thread (thread-local tracking of held paths) so future callers that nest `update_entity` from within another locked section don't deadlock.

### Known limitations
- **Windows**: `fcntl` is unavailable, so the lock is a no-op there. Concurrent updates remain unsafe on Windows, but no worse than 0.5.1.
- **TypeScript SDK**: unchanged at 0.5.1 — the analogous race is latent (Node's sync I/O accidentally serializes today) but not fixed by design. Resolved together with Python in 0.6.0 via the versioned-CAS design in [#26](https://github.com/NimbleBrainInc/upjack/issues/26).
- **Cross-machine / networked filesystems**: flock semantics on NFS and similar are implementation-defined. 0.6.0's CAS approach does not have this caveat.

## [0.5.1] - 2026-04-16

Applies to both the Python and TypeScript libraries. The tool contract is now identical across both SDKs.
Expand Down
2 changes: 1 addition & 1 deletion lib/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "upjack"
version = "0.5.1"
version = "0.5.2"
description = "Schema-driven entity management for AI-native applications"
readme = "README.md"
license = {text = "Apache-2.0"}
Expand Down
2 changes: 1 addition & 1 deletion lib/python/src/upjack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""NimbleBrain Upjack — schema-driven entity management for AI-native applications."""

__version__ = "0.5.1"
__version__ = "0.5.2"

from upjack.activity import ACTIVITY_ENTITY_DEF, get_activity_schema
from upjack.app import UpjackApp
Expand Down
176 changes: 147 additions & 29 deletions lib/python/src/upjack/entity.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Entity CRUD operations for upjack apps."""

import json
from collections.abc import Callable
import logging
import threading
import time
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
Expand All @@ -11,6 +15,113 @@
from upjack.paths import entity_dir, entity_path
from upjack.schema import hydrate_defaults, validate_entity

logger = logging.getLogger(__name__)

try:
import fcntl

_HAS_FCNTL = True
except ImportError: # pragma: no cover — Windows fallback
_HAS_FCNTL = False
logger.warning(
"fcntl is unavailable on this platform; upjack entity updates will "
"not be serialized across concurrent writers. On Unix this is "
"handled by an fcntl.flock on a sidecar .lock file."
)

# Hard ceiling on how long an entity update may wait for another
# holder to release. Typical lock hold is single-digit milliseconds
# (read → compute → write on a small JSON file). Anything approaching
# this bound indicates a stuck-but-alive writer, and failing loudly
# is better than wedging the whole tool server forever.
_LOCK_TIMEOUT_SECONDS = 30.0
_LOCK_POLL_INTERVAL = 0.02

# Thread-local tracking of which entity paths this thread already
# holds. Guards against deadlock if a caller ever re-enters update_entity
# on the same entity from within another lock — the nested call skips
# the acquire since we already own it.
_held_paths = threading.local()


class EntityLockTimeout(RuntimeError):
"""Raised when an entity lock cannot be acquired within the timeout."""


def _already_held(path: Path) -> bool:
held: set[str] = getattr(_held_paths, "paths", set())
return str(path) in held


def _mark_held(path: Path) -> None:
held: set[str] = getattr(_held_paths, "paths", None) or set()
held.add(str(path))
_held_paths.paths = held


def _unmark_held(path: Path) -> None:
held: set[str] = getattr(_held_paths, "paths", set())
held.discard(str(path))


@contextmanager
def _entity_lock(path: Path, timeout: float = _LOCK_TIMEOUT_SECONDS) -> Iterator[None]:
"""Serialize read-modify-write operations on a single entity.

Concurrent tool calls (e.g. an agent invoking ``update_deal`` and
``move_deal_stage`` on the same deal in parallel) would otherwise
read the same pre-state, each compute a partial update, and write
sequentially — silently clobbering one another's changes. The
on-disk state usually ends up consistent (last writer wins) but the
**intermediate tool responses lie**: each call returns the state it
wrote, unaware of the other's overlap.

Acquires an exclusive advisory flock on a sidecar ``.lock`` file
next to the entity's JSON file. The lock file persists — cleaning
it up would race the lock itself.

Semantics:

- Blocks up to ``timeout`` seconds; raises :class:`EntityLockTimeout`
after that. A stuck-but-alive holder must not wedge the server
indefinitely.
- If the holder process dies (crash, SIGKILL, OOM), the OS releases
the flock automatically — waiters unblock.
- Re-entrant on the same thread: nested calls on the same entity
pass through without re-acquiring, avoiding self-deadlock.
- On Windows (no fcntl) this is a no-op; concurrent updates remain
unsafe, but no worse than before this fix.
"""
if not _HAS_FCNTL or _already_held(path):
yield
return

lock_path = path.with_suffix(path.suffix + ".lock")
lock_path.parent.mkdir(parents=True, exist_ok=True)

deadline = time.monotonic() + timeout
with open(lock_path, "w") as lock_f:
while True:
try:
fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.monotonic() >= deadline:
raise EntityLockTimeout(
f"Could not acquire entity lock for {path} within "
f"{timeout:.1f}s — another writer appears stuck."
) from None
time.sleep(_LOCK_POLL_INTERVAL)

_mark_held(path)
try:
yield
finally:
_unmark_held(path)
# flock is also released on FD close (the `with open` exit),
# so this is belt-and-suspenders — harmless if the OS has
# already dropped it.


@dataclass
class Entity:
Expand Down Expand Up @@ -185,31 +296,34 @@ def update_entity(
if not path.exists():
raise FileNotFoundError(f"Entity not found: {entity_id}")

existing = json.loads(path.read_text())
old_relationships = existing.get("relationships", [])
# Hold the lock across read+compute+write so concurrent updates on
# the same entity don't race. See _entity_lock() for full context.
with _entity_lock(path):
existing = json.loads(path.read_text())
old_relationships = existing.get("relationships", [])

# Hydrate defaults before merge so old entities missing new fields
# get filled in — prevents validation failures on schema evolution.
if schema is not None:
existing = hydrate_defaults(existing, schema)
# Hydrate defaults before merge so old entities missing new fields
# get filled in — prevents validation failures on schema evolution.
if schema is not None:
existing = hydrate_defaults(existing, schema)

# Strip immutable fields — these cannot be changed after creation
immutable = {"id", "type", "version", "created_at", "created_by"}
safe_data = {k: v for k, v in data.items() if k not in immutable}
# Strip immutable fields — these cannot be changed after creation
immutable = {"id", "type", "version", "created_at", "created_by"}
safe_data = {k: v for k, v in data.items() if k not in immutable}

if merge:
existing.update(safe_data)
else:
preserved_keys = {"id", "type", "version", "created_at", "created_by"}
preserved = {k: existing[k] for k in preserved_keys if k in existing}
existing = {**preserved, **safe_data}
if merge:
existing.update(safe_data)
else:
preserved_keys = {"id", "type", "version", "created_at", "created_by"}
preserved = {k: existing[k] for k in preserved_keys if k in existing}
existing = {**preserved, **safe_data}

existing["updated_at"] = _now_iso()
existing["updated_at"] = _now_iso()

if schema is not None:
validate_entity(existing, schema)
if schema is not None:
validate_entity(existing, schema)

path.write_text(json.dumps(existing, indent=2) + "\n")
path.write_text(json.dumps(existing, indent=2) + "\n")

if on_relationships_changed is not None:
new_relationships = existing.get("relationships", [])
Expand Down Expand Up @@ -323,15 +437,19 @@ def delete_entity(
if not path.exists():
raise FileNotFoundError(f"Entity not found: {entity_id}")

entity = json.loads(path.read_text())
# Same lock as update_entity — a concurrent update + delete on the
# same entity would otherwise race.
with _entity_lock(path):
entity = json.loads(path.read_text())

if hard:
path.unlink()
if on_relationships_changed is not None and entity.get("relationships"):
on_relationships_changed(entity_id, entity["relationships"], [])
else:
entity["status"] = "deleted"
entity["updated_at"] = _now_iso()
path.write_text(json.dumps(entity, indent=2) + "\n")
if hard:
path.unlink()
else:
entity["status"] = "deleted"
entity["updated_at"] = _now_iso()
path.write_text(json.dumps(entity, indent=2) + "\n")

if hard and on_relationships_changed is not None and entity.get("relationships"):
on_relationships_changed(entity_id, entity["relationships"], [])

return entity
Loading
Loading