diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ffddafcc1..4be02be59 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -45,13 +45,13 @@ Tests use `pytest-asyncio` in auto mode — all async test functions are automat **Settings are frozen at creation time.** All configuration (`start_soon`, `children_start_soon`, `start_soon_default`, `thread_pool`, etc.) is fully resolved when a `Promise` or `PromisingContext` is constructed. Sentinels like `INHERIT` and `PROMISING_DEFAULT` are replaced with concrete values immediately — no deferred resolution happens at execution time. This is a core design principle: because a promise may run eagerly or be deferred, the user cannot predict *when* execution will happen, so settings must reflect the state of the world at the moment the promise was created. -**Core hierarchy flow:** `PromisingFunction` wraps an async or sync function → calling it creates a `Promise[T]` → during execution, the Promise sets itself as the current context via `ContextVar` → any Promises (and `PromisingContext` instances) created during that execution register themselves as its children via thread-safe strong-ref sets. +**Core hierarchy flow:** `PromisingFunction` wraps an async or sync function → calling it creates a `Promise[T]` → during execution, the Promise sets itself as the current context via `ContextVar` → any Promises (and `PromisingContext` instances) created during that execution register themselves as its children. ### PromisingContext (`promising/promising_context.py`) The base class for hierarchical context management. Manages parent-child relationships, namespacing (`namespace` parameter), configuration inheritance (`children_start_soon`, `start_soon_default`, `collapse_tracebacks`, `thread_pool`), child-waiting (`await_children` / `await_children_sync`), child inspection (`collect_unsettled_children`), and trace/debugging (`get_trace`, `format_trace`, `print_trace`). Also provides `get_parent_promise()` to walk up past non-Promise contexts. Uses a `ContextVar` (`PromisingContext.__active_context`) to track the currently active context. -**Lifecycle and child tracking.** Each `PromisingContext` keeps an `_unsettled_children: set[PromisingContext]` (a strong-ref set) protected by a `threading.Lock`. Children are added via `_register_children_threadsafe()` when they are constructed (unless born closed via `close_context_immediately=True`, in which case registration is skipped entirely) and removed via `_unregister_children_threadsafe()` once they are both *closed* and have no remaining unsettled descendants. A context is "closed" when its `with` block has exited (`close_context_threadsafe()` runs in `__exit__`'s `finally`); for a `Promise`, the `with self:` block lives inside `_unpack_once_from_loop`, so the context closes the moment the wrapped awaitable produces its first result (intermediate Promise or final value). Closed contexts that still have unsettled descendants stay registered until those descendants drain — this is what `collect_unsettled_children` traverses recursively. Attempting to register a child on an already-closed context raises `ContextAlreadyClosedError`; re-entering an already-closed context raises the same error. +**Lifecycle and child tracking.** Each `PromisingContext` keeps an `_unsettled_children: set[PromisingContext]` (a strong-ref set) protected by a `threading.Lock`. Children are added via `_register_children()` when they are constructed (unless born closed via `close_context_immediately=True`, in which case registration is skipped entirely) and removed via `_unregister_children()` once they are both *closed* and have no remaining unsettled descendants. A context is "closed" when its `with` block has exited (`close_context()` runs in `__exit__`'s `finally`); for a `Promise`, the `with self:` block lives inside `_unpack_once`, so the context closes the moment the wrapped awaitable produces its first result (intermediate Promise or final value). Closed contexts that still have unsettled descendants stay registered until those descendants drain — this is what `collect_unsettled_children` traverses recursively. Attempting to register a child on an already-closed context raises `ContextAlreadyClosedError`; re-entering an already-closed context raises the same error. `PromisingContext` exposes two doneness predicates: `closed()` tracks the context-manager lifecycle (`__exit__` flips it to `True`), and `done()` is what `await_children()` actually waits on. By default `done()` simply delegates to `closed()`. Subclasses can override `done()` to track a non-lifecycle condition — `Promise` does exactly this (it ties `done()` to its own result/cancellation state machine, since "fully unpacked" can come *after* the `with self:` block has already exited). @@ -61,24 +61,24 @@ This file also contains the `context` class — a context manager / decorator th ### Promise (`promising/promise.py`) -`Promise[T_co]` is a direct subclass of `PromisingContext`. It owns a small state machine — `_PENDING` → `_UNPACKED_ONCE` → `_FINISHED`, with `_CANCELLED_BEFORE_UNPACKED_ONCE` / `_CANCELLED_AFTER_UNPACKED_ONCE` as alternative terminals — exposed through `done()`, `unpacked_once()`, `unpacked_once_or_done()`, and `cancelled()`, and queried for results via `result()`, `exception()`, and `intermediate_promise()`. All of those readers are thread-safe (state can only move forward). `loop`, `thread_pool`, and the rest of the configuration are inherited from `PromisingContext`. +`Promise[T_co]` is a direct subclass of `PromisingContext`. It owns a small state machine — `_PENDING` → `_UNPACKED_ONCE` → `_FINISHED`, with `_CANCELLED_BEFORE_UNPACKED_ONCE` / `_CANCELLED_AFTER_UNPACKED_ONCE` as alternative terminals — exposed through `done()`, `unpacked_once()`, `unpacked_once_or_done()`, and `cancelled()`, and queried for results via `result()`, `exception()`, and `intermediate_promise()`. `loop`, `thread_pool`, and the rest of the configuration are inherited from `PromisingContext`. **Two-step unpacking on the loop.** Resolution is split into two cooperating tasks, both pinned to `self.loop`: -- `_unpack_once_from_loop()` — drives a single unpacking step. It enters the `with self:` block, awaits the wrapped `_awaitable`, and either records an intermediate `Promise` (via `_set_intermediate_promise_from_loop`, transition to `_UNPACKED_ONCE`) or stores the final value/exception (via `_set_result_from_loop` / `_set_exception_from_loop`, transition to `_FINISHED`). This is the task `unpack_once()` waits on. -- `_fully_unpack_from_loop()` — drives the Promise to completion. It ensures the single-unpacking task is scheduled, awaits it, then walks the chain of intermediate Promises (`while isinstance(result, Promise): result = await result`) until a non-Promise value is reached, and records that value as the final result. This is the task `__await__` (and, indirectly, `sync()`) waits on. +- `_unpack_once()` — drives a single unpacking step. It enters the `with self:` block, awaits the wrapped `_awaitable`, and either records an intermediate `Promise` (via `_set_intermediate_promise`, transition to `_UNPACKED_ONCE`) or stores the final value/exception (via `_set_result` / `_set_exception`, transition to `_FINISHED`). This is the task `unpack_once()` waits on. +- `_unpack_fully()` — drives the Promise to completion. It ensures the single-unpacking task is scheduled, awaits it, then walks the chain of intermediate Promises (`while isinstance(result, Promise): result = await result`) until a non-Promise value is reached, and records that value as the final result. This is the task `__await__` (and, indirectly, `sync()`) waits on. -Scheduling is driven by `_ensure_from_loop_single_unpacking_scheduled()` and `_ensure_from_loop_full_unpacking_scheduled()`, both of which create the underlying `loop.create_task(...)` lazily on first need. `__init__` schedules `_fully_unpack_from_loop` via `call_soon_threadsafe` when `start_soon` is `True`, so eager Promises start as soon as the loop is reachable; deferred Promises (`start_soon=False`) are scheduled the first time anyone consumes them (`__await__`, `sync()`, `unpack_once()`, `unpack_once_sync()`). +Scheduling is driven by `_ensure_single_unpacking_scheduled()` and `_ensure_full_unpacking_scheduled()`, both of which create the underlying `loop.create_task(...)` lazily on first need. `__init__` schedules `_unpack_fully` when `start_soon` is `True`, so eager Promises start as soon as the loop is reachable; deferred Promises (`start_soon=False`) are scheduled the first time anyone consumes them (`__await__`, `sync()`, `unpack_once()`, `unpack_once_sync()`). -**Sync and thread-safe consumption.** `sync()` and `unpack_once_sync()` dispatch onto the Promise's own event loop via `asyncio.run_coroutine_threadsafe` and block the calling thread on the resulting `concurrent.futures.Future`. Both refuse to run on the Promise's loop thread (`_assert_no_sync_usage_deadlock` → `SyncUsageError`). `cancel()` is similarly thread-safe: when called from outside the loop, it dispatches `_cancel_from_loop` via `call_soon_threadsafe` and blocks only long enough for the dispatched `_cancel_from_loop` call to return — it does not wait for the `CancelledError` itself to land (mirroring `asyncio.Future.cancel()` / `asyncio.Task.cancel()` semantics: the return value reports whether cancellation was *requested*). +**Sync consumption.** `sync()` and `unpack_once_sync()` dispatch onto the Promise's own event loop via `asyncio.run_coroutine_threadsafe` and block the calling thread on the resulting `concurrent.futures.Future`. Both refuse to run on the Promise's loop thread (`_assert_no_sync_usage_deadlock` → `SyncUsageError`). -**Cancellation mechanics.** `_cancel_from_loop` requests cancellation of any running unpacking task(s) via `Task.cancel(msg)`; the `CancelledError` then propagates through `_unpack_once_from_loop` / `_fully_unpack_from_loop` and is stored via `_set_exception_from_loop`, which picks the terminal state (`_CANCELLED_BEFORE_UNPACKED_ONCE` vs. `_CANCELLED_AFTER_UNPACKED_ONCE`) based on whether the first unpacking step had completed. When no task has been scheduled yet (e.g. `start_soon=False` and never awaited), `_synthesize_cancellation_from_loop` closes the context, stores a `CancelledError` directly, and closes the wrapped awaitable to silence the "coroutine was never awaited" warning. A `_unpacking_task_done_callback` covers the edge case where `Task.cancel()` lands between `create_task` and the first `__step`: asyncio resumes a cancelled task by throwing `CancelledError` into the coroutine, but on a coroutine that has never been stepped into there is no suspension point to throw at, so the exception is raised at function entry — *before* the `try` block — and propagates straight out of the task without the body's `try/except BaseException` ever seeing it. The Task ends up `cancelled()` but the Promise is still `_PENDING`, so the callback synthesizes the terminal state from the Task's recorded `CancelledError`. +**Cancellation mechanics.** `cancel()` does not wait for the `CancelledError` itself to land (mirroring `asyncio.Future.cancel()` / `asyncio.Task.cancel()` semantics: the return value reports whether cancellation was *requested*). It requests cancellation of any running unpacking task(s) via `Task.cancel(msg)`; the `CancelledError` then propagates through `_unpack_once` / `_unpack_fully` and is stored via `_set_exception`, which picks the terminal state (`_CANCELLED_BEFORE_UNPACKED_ONCE` vs. `_CANCELLED_AFTER_UNPACKED_ONCE`) based on whether the first unpacking step had completed. When no task has been scheduled yet (e.g. `start_soon=False` and never awaited), `_synthesize_cancellation` closes the context, stores a `CancelledError` directly, and closes the wrapped awaitable to silence the "coroutine was never awaited" warning. A `_unpacking_task_done_callback` covers the edge case where `Task.cancel()` lands between `create_task` and the first `__step`: asyncio resumes a cancelled task by throwing `CancelledError` into the coroutine, but on a coroutine that has never been stepped into there is no suspension point to throw at, so the exception is raised at function entry — *before* the `try` block — and propagates straight out of the task without the body's `try/except BaseException` ever seeing it. The Task ends up `cancelled()` but the Promise is still `_PENDING`, so the callback synthesizes the terminal state from the Task's recorded `CancelledError`. **Prefilled Promises.** A Promise constructed without an `awaitable` (using `prefilled_result` or `prefilled_exception`) passes `close_context_immediately=True` to `PromisingContext.__init__`, so it is born already closed and immediately set to `_FINISHED` — there is no coroutine to run inside a `with self:` block, and no parent registration happens. -**Late parent registration.** `Promise.__init__` passes `register_with_parent=False` to `PromisingContext.__init__` and only calls `_register_with_parent_thread_unsafe()` at the very end of its own constructor, after the state machine has been seeded (including the prefilled `_FINISHED` / exception path). This guarantees that a Promise whose construction raises is never visible to its parent's child set, and that the prefilled-Promise case described above falls out naturally — by the time `_register_with_parent_thread_unsafe` runs, `done()` is already `True`, so the registration is skipped. +**Late parent registration.** `Promise.__init__` passes `register_with_parent=False` to `PromisingContext.__init__` and only calls `_register_with_parent()` at the very end of its own constructor, after the state machine has been seeded (including the prefilled `_FINISHED` / exception path). This guarantees that a Promise whose construction raises is never visible to its parent's child set, and that the prefilled-Promise case described above falls out naturally — by the time `_register_with_parent` runs, `done()` is already `True`, so the registration is skipped. -**Exception breadcrumbs.** `try_to_link_exception` attaches the `PromisingContext` to an exception as `__promising_context__` and stamps `__promising_collapse_traceback__` (a boolean snapshot of the context's resolved `collapse_tracebacks` setting) alongside it — only at the deepest level (skips if `__promising_context__` is already set, so a nested context that already attributed itself is preserved; the two attributes are always stamped as a pair). Primary attribution happens in `PromisingContext.__exit__`; `_set_exception_from_loop` and `_force_internal_error_finish_from_loop` also call it as a safety net for paths that don't pass through `__exit__`. The `sys.excepthook` / `threading.excepthook` overrides in `promising/errors.py` use `__promising_context__` to walk the ancestor chain and render each `Promise`'s `frame_summary_tuple` snapshot, and read `__promising_collapse_traceback__` (a boolean) to decide whether to collapse promising-internal frames in those stacks (and in the exception's own traceback) or print them in full. +**Exception breadcrumbs.** `try_to_link_exception` attaches the `PromisingContext` to an exception as `__promising_context__` and stamps `__promising_collapse_traceback__` (a boolean snapshot of the context's resolved `collapse_tracebacks` setting) alongside it — only at the deepest level (skips if `__promising_context__` is already set, so a nested context that already attributed itself is preserved; the two attributes are always stamped as a pair). Primary attribution happens in `PromisingContext.__exit__`; `_set_exception` and `_force_internal_error_finish` also call it as a safety net for paths that don't pass through `__exit__`. The `sys.excepthook` / `threading.excepthook` overrides in `promising/errors.py` use `__promising_context__` to walk the ancestor chain and render each `Promise`'s `frame_summary_tuple` snapshot, and read `__promising_collapse_traceback__` (a boolean) to decide whether to collapse promising-internal frames in those stacks (and in the exception's own traceback) or print them in full. **Unpacking semantics.** A `PromisingFunction` always returns a `Promise`, regardless of whether the underlying function returns a concrete value or another `Promise`. `await promise` and `promise.sync()` recursively chase nested `Promise`s until a non-`Promise` value is reached. `promise.unpack_once()` and `promise.unpack_once_sync()` unpack a single level — they return either a concrete value or the intermediate `Promise`. @@ -124,7 +124,7 @@ This module also defines the private state-machine sentinels used by `Promise` ( - `SentinelUsageError` — a `Sentinel` was used in a boolean context (e.g. `if INHERIT:`) - `SyncUsageError` — raised when a sync method (`promise.sync()`, `promise.unpack_once_sync()`, `await_children_sync()`) is called from the event loop thread -**Promising-traceback rendering.** `install_promising_tracebacks()` swaps in `_promising_sys_excepthook` and `_promising_threading_excepthook` (saving the previously installed hooks in `_excepthook_state` so they can be used as a fallback if rendering itself raises). Installation is idempotent and protected by `_excepthooks_lock`. `Promise._unpack_once_from_loop` calls `install_promising_tracebacks()` the first time it runs, so users typically don't have to install the hooks by hand. +**Promising-traceback rendering.** `install_promising_tracebacks()` swaps in `_promising_sys_excepthook` and `_promising_threading_excepthook` (saving the previously installed hooks in `_excepthook_state` so they can be used as a fallback if rendering itself raises). Installation is idempotent and protected by `_excepthooks_lock`. `Promise._unpack_once` calls `install_promising_tracebacks()` the first time it runs, so users typically don't have to install the hooks by hand. `_print_exception_with_promising_context` walks the standard `__cause__` / `__context__` chain (mirroring CPython's behavior, including `__suppress_context__`) via `_print_exception_chain`, and `_print_single_exception` prints each link of that chain with a per-link "promising trace". For each `PromisingContext` returned by `get_trace(ancestors_first=True)` that exposes a `frame_summary_tuple` (i.e. each `Promise`), the snapshot captured at construction time is rendered, with promising-internal frames optionally collapsed. diff --git a/RACE_CONDITION_INVARIANTS_A.md b/RACE_CONDITION_INVARIANTS_A.md new file mode 100644 index 000000000..a7e5f2ef1 --- /dev/null +++ b/RACE_CONDITION_INVARIANTS_A.md @@ -0,0 +1,414 @@ +# Race-Condition Invariants for Promising + +A catalogue of properties that must hold under arbitrary thread/event-loop +interleavings. Each invariant is phrased as a postcondition that a test can +assert after exercising a chosen race window. The list is intended as a +checklist for building a `tests/race_conditions/` suite that hammers each +invariant with many concurrent actors, repeats, and (where useful) +`hypothesis` schedules. + +Scope notes: + +- Promising explicitly targets the GIL-backed CPython interpreter (see the + thread-safety contract on `Promise.done()`). Single-attribute reads/writes + are assumed atomic; tests should still cover the *ordering* guarantees, not + the per-attribute atomicity. +- All "from any thread" invariants must be checked from at least three + vantage points: the Promise's own event-loop thread, a thread-pool worker + belonging to the same loop, and an unrelated (foreign) thread. + +--- + +## 1. Promise state machine + +### 1.1 Monotonicity +Once `_state` advances past `_PENDING`, it never moves backwards. Allowed +transitions only: + +- `_PENDING → _UNPACKED_ONCE` +- `_PENDING → _FINISHED` +- `_PENDING → _CANCELLED_BEFORE_UNPACKED_ONCE` +- `_UNPACKED_ONCE → _FINISHED` +- `_UNPACKED_ONCE → _CANCELLED_AFTER_UNPACKED_ONCE` + +Test: spin N threads that repeatedly snapshot `_state`. Across the entire run +no thread observes a regression, and the recorded transition (sorted by wall +time) matches one of the allowed pairs. + +### 1.2 Single terminal state +A Promise reaches exactly one terminal state (`_FINISHED`, +`_CANCELLED_BEFORE_UNPACKED_ONCE`, or `_CANCELLED_AFTER_UNPACKED_ONCE`) — and +reaches it at most once. Repeated calls to `_set_result_from_loop` / +`_set_exception_from_loop` / `_set_intermediate_promise_from_loop` after a +terminal state never re-advance it. + +### 1.3 Writer/reader ordering (the contract behind `done()`) +For every state advance, the matching attribute is observable to readers +*before* the state flip: + +- `_state == _UNPACKED_ONCE` ⇒ `_intermediate_promise is not None`. +- `_state == _FINISHED` and `_exception is None` ⇒ `_result is not UNCHANGED`. +- `_state in (_FINISHED, _CANCELLED_*)` and `_exception is not None` ⇒ + `_exception` is fully populated (not a partial assignment). + +Test: thread A drives the Promise to completion; thread B busy-loops calling +`done()` and, the *instant* it sees `True`, calls `result()` / +`exception()` / `intermediate_promise()` without yielding. Those calls must +never raise `PromiseNotDoneError`, `PromiseNotUnpackedError`, nor the +`RuntimeError("Promise result is UNCHANGED…")` fallback in `result()`. + +### 1.4 Predicate consistency under concurrent reads +At any instant, the predicate triple +`(done(), unpacked_once(), unpacked_once_or_done())` is consistent with one +single underlying `_state` value. No reader ever sees a combination such as +`done()==True and unpacked_once_or_done()==False`. + +### 1.5 Result/exception caching is one-shot +For any awaitable handed to `Promise(...)`, the awaitable's `__await__` (or +equivalent) is driven exactly once across all concurrent consumers +(`await`, `sync`, `unpack_once`, `unpack_once_sync`, and any mix of them +fired in parallel). A counter inside the awaitable must read `1` after the +storm. + +### 1.6 Consumers all observe the same cached value +N consumers from M threads — `await`, `sync()`, `unpack_once_sync()` — that +finish without exception receive `is`-identical results (when the result is +a non-primitive object). When the Promise finished with an exception, every +consumer sees the *same* exception instance (`is`-identical). + +### 1.7 No "task created twice" +`_full_unpacking_task` is assigned exactly once; same for +`_single_unpacking_task`. Concurrent calls to +`_ensure_from_loop_full_unpacking_scheduled` / +`_ensure_from_loop_single_unpacking_scheduled` from rapid-fire await/sync +storms never produce two Tasks for the same role. (This invariant relies on +those methods only ever running on the Promise's own loop — that itself is +checked by invariant 4.1.) + +--- + +## 2. Parent–child hierarchy (`_unsettled_children`) + +### 2.1 No lost child +For every `Promise` / `PromisingContext` created with a parent, between +construction and the parent's eventual settling, the child appears in the +parent's `_unsettled_children` at least once. A test that spawns K children +concurrently while another thread snapshots +`parent._unsettled_children` must, in aggregate, witness every child id. + +### 2.2 No stuck child +After the entire subtree is done, every ancestor's `_unsettled_children` is +empty. In particular, +`root.collect_unsettled_children(whole_subtree=True)` returns `set()` once +all leaves have settled, regardless of the order in which threads drove +them. + +### 2.3 No double-register / double-unregister +The child appears in `parent._unsettled_children` at most once at any +moment. After unregister, it does not reappear. A concurrent stream of +`_register_children_threadsafe` / `_unregister_children_threadsafe` calls +must keep the set's element count consistent with a serial schedule +(check by counting `add`/`remove` operations against final size). + +### 2.4 Iteration safety +Iterating `collect_unsettled_children` from one thread while another thread +register/unregisters children must never raise (`RuntimeError: Set changed +size during iteration`), and the returned snapshot must be a *consistent* +subset of `add`s up to some serializable point (no torn reads). The lock +inside `collect_unsettled_children` enforces this; the test is to flood the +set and assert no exception is raised in 1e5 iterations. + +### 2.5 `closed()` after `close_context_threadsafe()` from another thread +After `close_context_threadsafe()` returns on thread A, any thread B's +subsequent call to `register_children_threadsafe` raises +`ContextAlreadyClosedError`. There must be no window in which `closed()` +returns `True` on one thread while another thread's `register` call slips +through and adds a child. + +### 2.6 Late child cannot enter a closed context +A child whose `__init__` overlaps with the parent's +`close_context_threadsafe()` either (a) registers successfully and is later +properly drained, or (b) raises `ContextAlreadyClosedError`. There is no +third outcome — in particular the child must never be silently dropped while +its constructor still considered the parent alive. + +### 2.7 Unregister ordering +`_unregister_from_parent_if_time` only unregisters when +`done() and not _unsettled_children`. Under concurrency, no parent ever sees +its child unregister while the child still has unsettled descendants. Test +by polling `parent._unsettled_children` and, for each child observed there, +asserting that either the child is not yet done, or it itself has unsettled +descendants. The set difference must always be empty. + +### 2.8 No registration on a torn parent +A child's `_parent` pointer is set in `__init__` before the registration +call. Snapshotted in any reader thread, `child._parent` is always either +the originally captured value or `None` (when explicitly created as a root) +— never an unrelated context. + +--- + +## 3. Cancellation + +### 3.1 `cancel()` is thread-safe and never deadlocks +`Promise.cancel()` invoked simultaneously from K foreign threads against +the same Promise completes in bounded time, returns a deterministic result +(at most one `True`, rest `False` once the Promise is terminal), and never +deadlocks regardless of which thread the event loop runs on. + +### 3.2 Cancellation always yields a terminal state +Every successful `cancel()` (`returned True` for at least one caller) +results in `done() == True and cancelled() == True` eventually (bounded by +a generous timeout). The Promise never lingers in a non-terminal state when +the loop is alive. This includes the corner case where +`task.cancel()` lands between `create_task` and the first `__step` — the +`_unpacking_task_done_callback` synthesize-path must drive the Promise to +terminal. + +### 3.3 Result vs cancel race +If a Promise's underlying awaitable resolves in the same time window as a +foreign-thread `cancel()`, the outcome is *one of*: + +- terminal `_FINISHED` with the produced result/exception, `cancel()` did + not flip the state; or +- terminal `_CANCELLED_BEFORE_UNPACKED_ONCE` / `_CANCELLED_AFTER_UNPACKED_ONCE`, + with a `CancelledError` stored as `_exception`. + +The result must never be: torn state, two terminal states recorded, or a +mix of "result stored" + "exception stored". + +### 3.4 Idempotent cancellation +Repeated `cancel()` calls on an already-cancelled Promise return `False` +and do not raise. A `CancelledError` arriving on an already-terminal +Promise via `_set_exception_from_loop` is silently dropped (per docstring). +A non-CancelledError arriving on a terminal Promise raises `RuntimeError` +(framework bug detector) — tests should assert this never happens under +normal user-driven races. + +### 3.5 Wake-up of waiters +After `cancel()` from a foreign thread, every consumer blocked on `await`, +`.sync()`, or `unpack_once_sync()` is unblocked within a bounded time and +sees `CancelledError` (or its `__cause__` chain) — none hang indefinitely. + +### 3.6 Cancellation propagates only as documented +Cancelling a *parent* Promise does **not** cancel a nested (returned-from) +Promise that the parent is currently awaiting (per the inline TODO in +`_fully_unpack_from_loop`). Tests should pin this current behaviour so any +future change is intentional. + +### 3.7 Awaitable cleanup on synthesize-cancel +When `cancel()` synthesizes a `CancelledError` for a never-started Promise, +the wrapped coroutine is `close()`d exactly once (no "coroutine was never +awaited" warning across many runs). + +--- + +## 4. Event-loop discipline + +### 4.1 "From loop only" methods stay on the loop +Methods documented as "can only be used from the event loop of the Promise" +(`_ensure_from_loop_*`, `_unpack_once_from_loop`, +`_fully_unpack_from_loop`, `_set_*_from_loop`, `_cancel_from_loop`, +`_synthesize_cancellation_from_loop`) must never be invoked from a foreign +thread. Test: install a thread-id assertion at the top of each (a +test-only monkeypatch is fine) and run the full race suite — the assertion +must never trigger. + +### 4.2 `SyncUsageError` is raised, not deadlock +Calling `promise.sync()`, `promise.unpack_once_sync()`, or +`await_children_sync()` from the Promise's own event-loop thread raises +`SyncUsageError` *immediately*, even when the call lands in the same +microsecond window as a foreign-thread `cancel()` or `await`. No deadlock, +no spurious success. + +### 4.3 `start_soon=True` scheduling +A Promise created with `start_soon=True` from a non-loop thread eventually +schedules its full-unpacking task on the correct loop — even when the +constructing thread immediately drops its reference. Test: construct 10k +Promises in a thread pool, all targeting the same loop, then assert all +reach a terminal state. + +### 4.4 No leaked task references on prefilled / never-awaited Promises +A prefilled Promise (`prefilled_result=…` or `prefilled_exception=…`) never +constructs `_full_unpacking_task` or `_single_unpacking_task`. A Promise +constructed with `start_soon=False` that is then cancelled before any +`await` also leaves both task attributes as `None`. Holds across concurrent +constructor/cancel races. + +### 4.5 Loop-mismatch detection is race-free +`await promise` from a different running loop than `promise.loop` raises +`EventLoopMismatchError` synchronously. Holds even when the Promise's own +loop is concurrently mutating the Promise's state. + +--- + +## 5. `await_children()` under churn + +### 5.1 Eventual quiescence +`await_children(whole_subtree=True)` returns once all descendants are +done, even if children spawn new grand-children during the wait. Tests +should fan out a tree where leaves themselves spawn new leaves on +resolution, and assert the call still returns. + +### 5.2 No surprise hang from non-awaitable contexts +A non-awaitable `PromisingContext` child does not stall its parent's +`await_children()` — they are filtered out by `awaitables_only=True`. +Tests should mix `promising.context` siblings with `Promise` siblings and +assert quiescence. + +### 5.3 Exceptions in children do not interrupt the wait +`await_children` uses `return_exceptions=True`. If half of N concurrent +children fail and half succeed, the call still completes and the parent +sees its `_unsettled_children` drained. + +### 5.4 Sync counterpart cannot deadlock +`await_children_sync()` from the event-loop thread raises +`SyncUsageError`. From a foreign thread it completes within a bounded +timeout for any subtree that would have completed under `await_children`. + +### 5.5 `unpack_promises_fully=False` lets parents return early +With `unpack_promises_fully=False`, the call returns as soon as every +direct child has reached `unpacked_once_or_done()` — even if their full +unpacking is still in flight. Subsequent reads of those children must +still show monotonic state progression (invariant 1.1) without crashing. + +--- + +## 6. `ContextVar` activation (`__active_context`) + +### 6.1 Per-task isolation +Two coroutines running on the same loop, each inside their own +`with ctx:` block, see their own context as active when they call +`get_active_context()`. The `ContextVar` must not leak across tasks. + +### 6.2 Activation is non-reentrant +Entering the same `PromisingContext` instance twice (concurrently or +sequentially) raises `ContextAlreadyActiveError`. Under concurrent +attempts, exactly one `__enter__` succeeds and the rest raise; the +successful one's `__exit__` correctly restores the previous token. + +### 6.3 Cross-thread context inheritance +A sync promising function (with `use_thread_pool=True`) launched in a +worker thread sees its parent Promise as the active context (per the +`contextvars.copy_context()` call in `PromisingFunction._call_wrapped`). +Holds even when the parent Promise's own state is concurrently changing. + +### 6.4 No active-context bleed across worker invocations +A thread-pool worker that finishes one sync promising function and is +reused for an unrelated callable observes no leftover `__active_context` +from the previous job. + +--- + +## 7. Exception attachment (`try_to_link_exception`) + +### 7.1 Deepest context wins, exactly once +`exception.__promising_context__` is set by the deepest context whose +`with` block sees the exception, and not overwritten by ancestors. Holds +under concurrent re-raise paths (e.g. multiple sibling Promises failing +simultaneously, each running on its own thread-pool worker). + +### 7.2 No torn attribute writes +A reader thread that observes `__promising_context__` on an exception +also observes a matching `__promising_collapse_traceback__` boolean. The +two attributes never appear in a half-set state. + +--- + +## 8. Settings frozen at creation + +### 8.1 Defaults snapshot +A Promise's resolved settings (`_start_soon`, `_start_soon_default`, +`_children_start_soon`, `_collapse_tracebacks`, `_thread_pool`) are +captured during `__init__` and never change afterwards. Mutating +`promising.Defaults.*` from another thread during construction either +takes effect for that specific Promise (because `__init__` read the +default before the mutation) or does not — but the Promise's snapshot is +internally consistent: every getter on it returns the same value +throughout its lifetime. + +### 8.2 No cross-promise leak via parent inheritance +When child Promise A inherits a setting from parent P at construction +time, and concurrently child Promise B is constructed from P, mutating +P's setting between the two (where API allows) does not retroactively +change A's resolved value. + +--- + +## 9. Thread-pool dispatch + +### 9.1 Correct executor used +A sync promising function with `use_thread_pool=True` runs on the +executor returned by `get_thread_pool_executor()` of its active Promise, +regardless of which thread called the wrapper. Test by tagging worker +threads per executor and asserting the function body ran on the +expected pool. + +### 9.2 No starvation deadlock from sibling sync calls +Multiple sibling sync promising functions submitted to the same +bounded-size `ThreadPoolExecutor`, each performing `.sync()` on another +*non-overlapping* sibling, all complete. (Mutual `.sync()` between two +siblings that contends on the same pool is a user-side deadlock — that +is documented behaviour, not an invariant; tests should isolate it.) + +--- + +## 10. `wrap_awaitable` / construction races + +### 10.1 Bare-coroutine wrapping is concurrency-safe +`wrap_awaitable(coro)` invoked from K threads with K different coroutines +never produces a Promise that is associated with the wrong coroutine, +nor a Promise whose `_awaitable is None` despite an awaitable being +passed. + +### 10.2 Validation runs before parent registration +`Promise.__init__` validates arguments before calling `super().__init__`. +Therefore, on validation failure, the parent's `_unsettled_children` +must not contain the would-be child. Test: feed bad arguments +concurrently with valid sibling construction; the parent's set never +contains a `Promise` instance that later raised in `__init__`. + +--- + +## 11. `Defaults` mutation under load + +### 11.1 No torn Promise from a flipping `START_SOON` +A test thread flips `Defaults.START_SOON` between `True` and `False` in a +tight loop while another thread constructs Promises. Every Promise either +has `_start_soon == True` (and was scheduled) or `_start_soon == False` +(and was not). No Promise is left in an in-between state where it was +scheduled but `_start_soon` reads `False`, or vice versa. + +--- + +## 12. Sentinel safety + +### 12.1 `Sentinel.__bool__` is unreachable from internal code +Under any of the races above, no internal code path produces a +`SentinelUsageError` (which would indicate the framework itself +truthiness-tested a sentinel). Tests should set up exception capture and +assert zero `SentinelUsageError` instances across the run. + +--- + +## Suggested test harness primitives + +Useful building blocks for the `tests/race_conditions/` suite: + +- **`spin_until(predicate, timeout)`** — a tight `while not predicate()` + loop with a generous timeout, used as the "observe the moment the flag + flips" probe. +- **`run_on_many_threads(callable, n, *args)`** — fork N threads, all + blocked on a `threading.Barrier`, then released simultaneously. +- **`assert_monotonic(samples, allowed_transitions)`** — given a list of + observed states (with timestamps), assert that the sorted sequence is + a valid walk through the documented state graph. +- **`exception_aggregator()`** — capture `SystemExit` / + `BaseException` from all threads so a thread-only failure cannot be + silently lost by the test driver. +- **stress repeats** — wrap each invariant in a loop of, say, 200 + iterations to surface low-probability windows. Combine with + `pytest-repeat` or hypothesis stateful machines for schedule fuzzing. +- **dual-loop fixture** — one event loop in the main thread, another in + a background thread, so cross-loop invariants (4.5, 6.3) get + exercised. diff --git a/RACE_CONDITION_INVARIANTS_B.md b/RACE_CONDITION_INVARIANTS_B.md new file mode 100644 index 000000000..3ef82a4c3 --- /dev/null +++ b/RACE_CONDITION_INVARIANTS_B.md @@ -0,0 +1,443 @@ +# Race-Condition Invariants for the `promising` Framework + +A catalogue of properties that must hold even when multiple threads / tasks +hit the same `Promise` or `PromisingContext` concurrently. Each entry is +shaped so a test can target it: **invariant → why it can break → suggested +stressor**. + +The framework intentionally exposes cross-thread access (sync functions in +a `ThreadPoolExecutor`, `.sync()` / `await_children_sync()` from arbitrary +threads, cross-thread cancellation, etc.), so most of these invariants are +*not* protected by the single-event-loop assumption — they need real +locking, atomic reads/writes, or careful state-machine design. + +--- + +## 1. `Promise` state machine + +The legal transitions are: + +``` +_PENDING ──► _UNPACKED_ONCE ──► _FINISHED + │ │ + │ └──► _CANCELLED_AFTER_UNPACKED_ONCE + └──► _CANCELLED_BEFORE_UNPACKED_ONCE + └──► _FINISHED (exception before unpack) +``` + +### 1.1. Terminal states are absorbing +Once `_state` is `_FINISHED`, `_CANCELLED_BEFORE_UNPACKED_ONCE`, or +`_CANCELLED_AFTER_UNPACKED_ONCE`, no further transition is allowed. +- **Break vector:** `_set_result` / `_set_exception` / + `_set_intermediate_promise` racing each other from the single-unpack + task, the full-unpack task, `cancel()`'s synthesize path, and + `_unpacking_task_done_callback`. +- **Stressor:** schedule `cancel()` from N threads at random points during + `_unpack_once`/`_unpack_fully`; assert final state is one of the legal + terminals and matches the first writer's intent. + +### 1.2. No "spontaneous" transition out of `_PENDING` +`_state` only leaves `_PENDING` via a `_set_*` call that observed +`_state is _PENDING` (or `_UNPACKED_ONCE`) under the framework's own +serialisation. There must be no path where two writers both observe +`_PENDING`, both proceed, and both mutate state. +- **Break vector:** `_set_intermediate_promise` checks `is _PENDING` then + writes `_intermediate_promise` then writes `_state`. A concurrent + `_set_exception(CancelledError)` could interleave between the check and + the writes. +- **Stressor:** monkey-patch `_set_state` to `await asyncio.sleep(0)` + between assignment and `close_context()`; trigger cancellation in that + window. + +### 1.3. `done()`, `cancelled()`, `unpacked_once()` are monotonic +`done()` may only flip `False → True`. Same for `cancelled()` once True, +and for `unpacked_once()` once True. +- **Break vector:** an in-progress `_set_*` that fails midway through (the + state-machine assertion raises) calls `_force_internal_error_finish`, + which re-writes `_state = _FINISHED` and `_exception`. If + `done()` was already True via a different path the second write must + not regress any predicate. +- **Stressor:** poll `done()` / `cancelled()` from a hot loop on another + thread while the Promise resolves; assert no observed regression. + +### 1.4. Predicate coherence +At any single read, the *combination* of `done()` / `cancelled()` / +`unpacked_once()` must be consistent with one of the five legal +`_state` values. Equivalently: `done() implies unpacked_once_or_done()`; +`cancelled() implies done()`; if `_intermediate_promise is None` then +`unpacked_once() implies (exception is not None or _state is _FINISHED)`. +- **Break vector:** `_set_state` writes `_state` then calls + `close_context()` (non-atomic with the surrounding `_result` / + `_exception` / `_intermediate_promise` write). +- **Stressor:** read all five accessors back-to-back from another thread + while resolution is in flight; check the snapshot maps to one valid + state row in a truth table. + +### 1.5. `_result` is set iff `_state is _FINISHED` and `_exception is None` +`result()` must never raise `RuntimeError("Promise result is UNCHANGED…")` +in a healthy run. The internal `_result == UNCHANGED` guard in `result()` +is a tripwire for a state/result write reordering. +- **Stressor:** call `.result()` on a hot loop while many concurrent + consumers `await` the same Promise. + +### 1.6. `_exception` and `_result` are mutually exclusive +After `done()`, exactly one of `_exception` / `_result` is the "real" +value. Specifically: `_exception is not None` ⇔ `_result is UNCHANGED` ⇔ +`result()` raises. Holds even across the `_force_internal_error_finish` +path. + +--- + +## 2. Unpacking-task lifecycle + +### 2.1. At most one `_single_unpacking_task` ever exists per Promise +`_ensure_single_unpacking_scheduled` is the only place that creates one +and gates on `_single_unpacking_task is None and not unpacked_once_or_done()`. +Two callers must never both pass that gate. +- **Break vector:** `unpack_once()` is async (runs on the loop thread, no + preemption between Python statements) — but it can also be triggered + indirectly via `unpack_once_sync()` from another thread, which uses + `run_coroutine_threadsafe` to dispatch back to the loop, and via + `_unpack_fully` calling `_ensure_single_unpacking_scheduled` after an + `await`. +- **Stressor:** N coroutines each `await promise.unpack_once()` on the + same Promise concurrently; assert exactly one `_single_unpacking_task` + object identity ever existed. + +### 2.2. At most one `_full_unpacking_task` ever exists per Promise +Symmetric to 2.1; gated by `_full_unpacking_task is None and not done()`. +- **Stressor:** N consumers `await promise` + M threads call + `.sync()` simultaneously on the same Promise; assert single full-task + identity. + +### 2.3. The two task slots, once written, are never overwritten +`_single_unpacking_task` / `_full_unpacking_task` are written exactly +once, in the `None → Task` direction. + +### 2.4. A Task scheduled by `_ensure_*_scheduled` must run its done-callback +`_unpacking_task_done_callback` is the only thing that catches the +"cancelled between `create_task` and the first `__step`" race. It must +fire even if the loop is being torn down — otherwise the Promise stays +non-terminal. + +### 2.5. `unpacked_once_or_done()` flips True before the +`_single_unpacking_task` becomes `done()` +The body of `_unpack_once` calls `_set_*` *before* returning. Any +consumer that observes `_single_unpacking_task.done()` must already see +`unpacked_once_or_done()`. + +--- + +## 3. Cancellation + +### 3.1. A single `cancel()` call drives the Promise terminal +After `cancel()` returns `True`, eventually `done()` becomes `True`. If +no underlying task is running, `cancel()` makes that transition happen +synchronously via `_synthesize_cancellation`. There must be no path +where `cancel()` returns `True` but the Promise stays `_PENDING` +forever. + +### 3.2. Concurrent `cancel()` calls don't corrupt state +N threads call `.cancel()` simultaneously. Outcome: +- exactly one stored exception (the first one wins; later + `CancelledError`s are silently dropped per `_set_exception`) +- `_state` is one of `_CANCELLED_BEFORE_UNPACKED_ONCE` / + `_CANCELLED_AFTER_UNPACKED_ONCE` +- the Promise is unregistered from its parent exactly once. + +### 3.3. `cancel()` racing with natural completion +If `cancel()` lands the same instant the body completes successfully, +the final state is *either* `_FINISHED` with a result *or* a cancelled +state — never both, never neither, and never `_FINISHED` with a stored +`CancelledError` masquerading as a real exception. +- **Break vector:** `_set_result` is called by the body, `cancel()` then + synthesises a `CancelledError` after `done()` already flipped. + `_set_exception` has an explicit branch for "already-terminal + + CancelledError → drop". Verify that branch covers every interleaving. + +### 3.4. `_synthesize_cancellation` always closes the context +The comment in `_synthesize_cancellation` is load-bearing: without +`close_context()` the child never unregisters. Invariant: every code +path that lands in `_CANCELLED_*` must have run `close_context()` +at least once. +- **Stressor:** cancel a `start_soon=False` Promise that was never + awaited; assert it unregisters from its parent immediately. + +### 3.5. Cancel of parent does NOT cancel nested ("returned") Promise +The TODO around `_unpack_fully` notes this intentional design. The +inner Promise's task keeps running independently. Test that cancelling +the outer leaves the inner's `_state` untouched. + +--- + +## 4. Parent / child hierarchy (`_unsettled_children`) + +### 4.1. A child is registered exactly once and unregistered exactly once +Independent of how many tasks/threads race through its construction and +resolution. +- **Break vector:** `_register_with_parent` runs at the end of + `__init__` (after super init). For a sync `@promising.function` + promise, that body runs on a worker thread while the parent's + `_unsettled_children.update(...)` mutates a `set` from a non-loop + thread. +- **Stressor:** spawn K sync child promises in parallel inside one + parent's body; after parent finishes assert `len(_unsettled_children) == 0` + and that the set never raised `RuntimeError: Set changed size during + iteration` during a concurrent `collect_unsettled_children`. + +### 4.2. `_unsettled_children` is never read inconsistently +`collect_unsettled_children` builds a `list[…](self._unsettled_children)` +from a `set`. On CPython this snapshot is safe only because the GIL +serializes `set.__iter__`'s C-level iteration with `set.add` / `set.discard` +*at most* — there is still a documented `RuntimeError: Set changed size +during iteration` window if the iteration is interleaved at the Python +level (i.e. across more than one bytecode boundary). +- **Stressor:** repeatedly call `collect_unsettled_children(whole_subtree=True)` + while children are being registered/unregistered from worker threads; + assert no exception escapes. + +### 4.3. `closed()` parents reject new children +`_register_children` raises `ContextAlreadyClosedError` if `closed()` is +True. Invariant: there is no interleaving in which a child gets added to +a parent *after* the parent's `close_context()` ran. +- **Break vector:** `close_context()` sets `_context_closed = True` then + unregisters. A child being constructed concurrently can read + `_context_closed == False` between those statements? It runs + before — but child registration happens *after* parent unregister + attempt, so a worker thread mid-construction could miss the flag flip. +- **Stressor:** start a parent Promise that finishes very fast, fire off + many sync grandchildren from a worker thread mid-finish, assert each + either succeeds *or* gets a clean `ContextAlreadyClosedError`. + +### 4.4. `await_children()` does terminate when all descendants settle +The outer `while children := …` loop terminates iff no descendant ever +escapes registration. Tests should provoke deeply nested cross-thread +spawns and confirm `await_children()` always returns in finite time. +- **Break vector:** a grandchild registers with its parent *after* the + parent's set was last sampled by `collect_unsettled_children`, but + *before* the parent finished `done()`. The outer loop should still + pick it up on the next iteration. Verify with adversarial scheduling. + +### 4.5. Parent's `_unregister_from_parent_if_time` is correct under races +The guard `done() and not _unsettled_children` is checked +non-atomically. If a child finishes (calls +`parent._unregister_children(self)` → `parent._unregister_from_parent_if_time`) +exactly as a new grandchild is added, the parent must not unregister +prematurely and orphan the grandchild. + +### 4.6. Re-entry protection +`PromisingContext.__enter__` checks `_previous_token is not None` and +`_context_closed`. A context must never be successfully entered twice +concurrently from different tasks/threads (the framework comment at +`promising_context.py:749` flags this as a known race). + +--- + +## 5. `__active_context` ContextVar + +### 5.1. After every balanced `with ctx:` block, the active context is restored +Even if the block ran on a worker thread (with `contextvars.copy_context()` +propagation), or if multiple async tasks race over the same +`PromisingContext` instance. + +### 5.2. No leak across thread-pool boundaries +A sync `@promising.function` body executes in a thread-pool thread with +the active-context ContextVar set via `ctx.run(...)`. The worker thread's +default contextvars must be unaffected when the next executor task uses +the same worker thread. +- **Stressor:** submit N sync functions to a 1-worker pool; between + jobs, assert `PromisingContext.get_active_context(raise_if_none=False)` + on that worker thread is `None`. + +### 5.3. Concurrent enters on distinct contexts in the same task don't lose tokens +Stacked `with` blocks must restore in LIFO order even if intermediate +asynchronous suspensions happened. + +--- + +## 6. Cross-thread sync APIs + +### 6.1. `promise.sync()` from many threads on the same Promise returns the same value +Each caller drives `run_coroutine_threadsafe(awaitable_as_coroutine(self), +self.loop)` and blocks. Invariant: every caller observes the cached +result; the underlying function executes exactly once. +- **Stressor:** N threads `.sync()` the same not-yet-started + (`start_soon=False`) Promise; assert the body ran exactly once and all + returned the same value. + +### 6.2. `.sync()` deadlock guard fires under every interleaving +`_assert_no_sync_usage_deadlock` raises `SyncUsageError` if called on +the loop thread. There must be no race where the check observes "not on +loop" but the actual `concurrent_future.result()` blocks the loop. + +### 6.3. `unpack_once_sync()` fast-path is consistent +The fast-path returns directly when `unpacked_once_or_done()`. If two +threads call `unpack_once_sync()` and one takes the slow path, both must +end up observing the same `_intermediate_promise` (or final value / +exception). + +### 6.4. `await_children_sync()` is a faithful sync mirror of `await_children()` +Driven via `run_coroutine_threadsafe` — must terminate iff the async +version would; must not deadlock when the loop is healthy. + +--- + +## 7. Eager scheduling (`start_soon=True`) + +### 7.1. `start_soon=True` schedules the full-unpack task before `__init__` returns +For `start_soon=True`, the call to `_ensure_full_unpacking_scheduled` +happens before `_register_with_parent`. Invariant: by the time the +parent sees the child in `_unsettled_children`, the child's task is +already on the loop. + +### 7.2. Eager + deferred mix doesn't drop work +A parent with `children_start_soon=False` whose body spawns 100 children +and then exits without awaiting them must still see all children in its +`_unsettled_children`, and the parent's `await_children()` (called by +`protected_run`) must execute every one exactly once. + +### 7.3. `_resolve_start_soon` snapshots are not torn +The decision tree in `_resolve_start_soon` reads +`parent_context._children_start_soon`. The parent's settings are +"frozen at creation time" per README §"Settings Are Frozen at Creation +Time". A child being constructed concurrently with the parent's +constructor finishing must still read fully-initialized parent settings. + +--- + +## 8. Settings inheritance / global `Defaults` + +### 8.1. A Promise's resolved settings are immutable after construction +After `__init__` returns, `_start_soon`, `_start_soon_default`, +`_children_start_soon`, `_collapse_tracebacks`, `_thread_pool`, `_loop`, +`_parent` never change. Tests should assert this under concurrent +mutation of `Defaults.*` and concurrent context entry. + +### 8.2. Mutating `Defaults.START_SOON` mid-flight does not retroactively change behaviour +A test flips `Defaults.START_SOON` from `True` to `False` while a tree +of promises is being constructed and resolved; the already-created +promises continue with their captured value, while new ones reflect the +new default. + +### 8.3. `Defaults.PROMISING_THREAD_POOL` swap is safe +Replacing the global pool while promises are running must not strand +any submitted callable. + +--- + +## 9. Excepthook installation + +### 9.1. `install_promising_tracebacks()` is idempotent +Called inside `_unpack_once` on every first run, often from many +promises in parallel. Must produce stable `sys.excepthook` / +`threading.excepthook` values regardless of interleaving. +- **Stressor:** create thousands of root-level promises in parallel; + capture `sys.excepthook` at the end; assert it equals the installed + promising excepthook (not the default and not double-wrapped). + +--- + +## 10. Cross-event-loop guards + +### 10.1. `_assert_awaiting_on_correct_event_loop` always sees the right loop +When a Promise's `loop` was inherited from its parent and the parent's +loop is no longer running (or a different loop is current), `await +promise` raises `EventLoopMismatchError` *deterministically* — not +"sometimes hangs, sometimes raises". + +### 10.2. A Promise created on one loop, awaited on another, raises +Even when both loops are alive simultaneously on different threads. + +--- + +## 11. Result-caching idempotence + +### 11.1. The wrapped callable runs at most once per Promise +Even under N concurrent `await` + `.sync()` + `unpack_once()` consumers. +- **Stressor:** wrap a callable that increments a counter; saturate it + with all four consumption APIs from multiple threads; assert + counter == 1. + +### 11.2. Identity invariant: `await p` and `p.sync()` and `p.result()` +return the *same* object (not just equal) +For non-Promise return values, the cached `_result` is returned by +identity. Concurrent consumers must all get `is`-equal results. + +--- + +## 12. `intermediate_promise()` visibility + +### 12.1. After `unpacked_once_or_done()` returns True, `intermediate_promise()` +either returns the stored intermediate Promise or raises the stored +exception +There must be no race where `unpacked_once_or_done()` is True but +`intermediate_promise()` raises `PromiseNotUnpackedError`. + +### 12.2. The intermediate Promise's parent linkage is set before it is reachable +The intermediate Promise is created inside the outer Promise's +`with self:` block, so its parent is the outer. Concurrent +`get_trace()` on the inner must see the outer as ancestor immediately. + +--- + +## 13. Tracing / observability + +### 13.1. `get_trace()` is consistent +A concurrent reader walking parents via `get_parent_context()` always +sees an acyclic chain ending at a root. Even during mass +register/unregister churn, no cycle ever appears, and `get_trace()` +terminates. + +### 13.2. `format_trace()` / `print_trace()` don't raise on a context being +unregistered concurrently +`__repr__` reads `self.namespace` and `id(self)` — both immutable — +so the only race surface is the parent chain walk in `get_trace`. Fuzz +test confirming. + +--- + +## 14. Memory / leak invariants + +### 14.1. After `await promise` returns, the awaitable is released +The promise holds `_awaitable`, which after consumption serves no +purpose. Verify (with `weakref`) that the awaitable's referents become +collectable promptly after settlement, including the `cancel()`-pre-start +path that calls `awaitable.close()` in `_synthesize_cancellation`. + +### 14.2. After a parent fully resolves and `await_children()` returns, +`_unsettled_children` is empty +And the parent itself is unregistered from *its* parent. Otherwise the +tree leaks across runs. + +--- + +## 15. Frame-summary capture race + +### 15.1. `frame_summary_tuple` reflects the constructor's caller +`traceback.walk_stack` runs on the constructing thread before +`super().__init__` returns. If the constructor is called from a thread +pool worker, the captured stack must be that worker's stack (not the +loop's). Test by spawning from a sync function and inspecting frames. + +--- + +## Practical guidance for tests + +- Use `asyncio.sleep(0)` injected via monkey-patch into the framework's + state-transition methods (`_set_state`, `_set_exception`, + `_register_with_parent`, `close_context`) to widen race windows + deterministically. +- Pair every "happy path" race test with an "exception path" twin: the + most likely state-machine corruption sites are the `try/except + BaseException` blocks that funnel into + `_force_internal_error_finish`. +- For thread-pool stress, prefer a custom small pool (1–2 workers) and + many submissions — that maximises contention on `_unsettled_children` + and `__active_context`. +- For cancellation stress, alternate `cancel()` callers between the loop + thread and a non-loop thread, and target the four phases (before + schedule, after schedule but before first `__step`, mid-await, after + unpack_once). +- For each invariant, assert both the *terminal* property (final state + is legal) and the *transient* property (no intermediate observation + violated a monotonicity / coherence rule). diff --git a/README.md b/README.md index 48bc080d7..bba5f86d5 100644 --- a/README.md +++ b/README.md @@ -346,9 +346,9 @@ promising.Defaults.START_SOON = True promising.Defaults.START_SOON = False ``` -## Thread-Safe Access +## Access from Non-Async Threads -`promise.sync()` (and `promise.unpack_once_sync()`) lets non-async threads block on a Promise's result. Both are thread-safe and schedule work on the Promise's event loop via `asyncio.run_coroutine_threadsafe`: +`promise.sync()` (and `promise.unpack_once_sync()`) lets non-async threads block on a Promise's result. Both schedule work on the Promise's event loop via `asyncio.run_coroutine_threadsafe`: ```python import threading @@ -357,7 +357,7 @@ async def main(): promise = fetch_data("https://example.com") def worker(): - # Blocks until the Promise resolves (thread-safe) + # Blocks until the Promise resolves result = promise.sync(timeout=5.0) print(result) @@ -367,12 +367,10 @@ async def main(): thread.join() ``` -Like `await`, calling `promise.sync()` automatically triggers the Promise's execution if it was created with `start_soon=False`. There is no need to start the Promise manually before blocking on it. +Like `await`, calling `promise.sync()` automatically triggers the Promise's execution if it was created with `start_soon=False`. `promise.sync()`, `promise.unpack_once_sync()`, and `await_children_sync()` all raise `SyncUsageError` if called from the event loop thread (which would deadlock). -`Promise.cancel()` is also thread-safe — it can be invoked from any thread and is dispatched onto the Promise's event loop when the caller is not already on it. - ## Result Unpacking A decorated function always returns a `Promise`, regardless of whether the underlying function returns a concrete value or another Promise. This means: @@ -470,7 +468,7 @@ Wrapping every async (or sync) operation in a `Promise` gives you: - **Effortless parallelism.** Call your decorated functions and they start running immediately — async on the event loop, sync in a thread pool (with `use_thread_pool=True`). Mix and match freely; the Promise abstraction papers over the difference. No manual `asyncio.gather`, no explicit executor management, no boilerplate to bridge async and threaded code. - **Multiple awaits.** A Promise caches its result. Any number of consumers can `await`, `.sync()`, `unpack_once()`, or `unpack_once_sync()` the same Promise and get the same value — the underlying function is never executed more than once. - **Automatic hierarchy.** Promises created during another Promise's execution become its children. You can wait for the entire subtree (`await_children()`), inspect what's still running (`collect_unsettled_children`), or scope configuration to a subtree — all without manual bookkeeping. -- **Thread-safe synchronous access.** Every Promise exposes `.sync()` and `.unpack_once_sync()`, so threads that can't `await` can still block on a Promise's result. Blocking automatically triggers execution of deferred (`start_soon=False`) Promises, just like `await` does. +- **Synchronous access.** Every Promise exposes `.sync()` and `.unpack_once_sync()`, so threads that can't `await` can still block on a Promise's result. Blocking automatically triggers execution of deferred (`start_soon=False`) Promises, just like `await` does. - **Consistent interface.** A decorated function always returns a `Promise` — whether the underlying function returns a concrete value or another Promise. `await` and `.sync()` recursively unpack nested Promises and return the final non-`Promise` value, so consumers get a uniform interface regardless of how deep the chain is. - **Configurable execution.** `start_soon`, `children_start_soon`, `thread_pool`, and other settings propagate through the hierarchy, letting you control eager vs. deferred execution and thread pool usage at any level. @@ -496,16 +494,16 @@ In short, a `Promise` turns a fire-and-forget coroutine into a first-class objec |---|---| | `await promise` | Wait for and return the result. Recursively unpacks nested Promises and always returns a concrete value. All consumption methods (`await`, `sync`, `unpack_once`, `unpack_once_sync`) can be called multiple times and always return the same cached result. Must be awaited on the Promise's own event loop (raises `EventLoopMismatchError` otherwise). | | `promise.unpack_once()` | Async — resolve the Promise but unpack only one level. Returns either a concrete value or another `Promise`. Must be awaited on the Promise's own event loop. | -| `promise.sync(timeout=None)` | Synchronous counterpart of `await promise` — blocks the calling thread, recursively unpacks nested Promises, and always returns a concrete value. Thread-safe; must not be called from the Promise's own event loop thread (raises `SyncUsageError`). | -| `promise.unpack_once_sync(timeout=None)` | Synchronous counterpart of `unpack_once` — blocks the calling thread and unpacks only one level. Returns either a concrete value or another `Promise`. Thread-safe; must not be called from the Promise's own event loop thread. | -| `promise.done()` | Whether the Promise is "done" — finished (with a result or exception) or cancelled. Thread-safe. Overrides `PromisingContext.done()`, which by default tracks the context-manager lifecycle. | -| `promise.cancelled()` | Whether the Promise has been cancelled. Thread-safe. | -| `promise.unpacked_once()` | Whether the Promise has been unpacked at least one level (i.e. its awaitable has produced either an intermediate Promise or a final value). Thread-safe. | -| `promise.unpacked_once_or_done()` | True if the Promise is at least one-level unpacked, fully done, or cancelled. Thread-safe. | -| `promise.result()` | The resolved value. Raises the underlying exception if the Promise finished with one, `PromiseNotDoneError` if it is not done yet, or `asyncio.CancelledError` if it was cancelled. Thread-safe. | -| `promise.exception()` | The exception that the Promise finished with, or `None`. Same readiness/cancellation rules as `result()`. Thread-safe. | -| `promise.intermediate_promise()` | The intermediate `Promise` produced by the first unpacking step, or `None` if the awaitable's result was already a non-Promise value. Raises `PromiseNotUnpackedError` if the Promise has not yet been unpacked once. Thread-safe. | -| `promise.cancel(msg=None)` | Request cancellation of the Promise. Mirrors `asyncio.Future.cancel()` / `asyncio.Task.cancel()`: the return value reports whether cancellation was *requested* — the Promise's terminal cancelled state is reached only once the `CancelledError` actually propagates through the underlying unpacking task(s). Thread-safe — when called from outside the Promise's event loop, the cancellation is dispatched onto that loop. | +| `promise.sync(timeout=None)` | Synchronous counterpart of `await promise` — blocks the calling thread, recursively unpacks nested Promises, and always returns a concrete value. Must not be called from the Promise's own event loop thread (raises `SyncUsageError`). | +| `promise.unpack_once_sync(timeout=None)` | Synchronous counterpart of `unpack_once` — blocks the calling thread and unpacks only one level. Returns either a concrete value or another `Promise`. Must not be called from the Promise's own event loop thread. | +| `promise.done()` | Whether the Promise is "done" — finished (with a result or exception) or cancelled. Overrides `PromisingContext.done()`, which by default tracks the context-manager lifecycle. | +| `promise.cancelled()` | Whether the Promise has been cancelled. | +| `promise.unpacked_once()` | Whether the Promise has been unpacked at least one level (i.e. its awaitable has produced either an intermediate Promise or a final value). | +| `promise.unpacked_once_or_done()` | True if the Promise is at least one-level unpacked, fully done, or cancelled. | +| `promise.result()` | The resolved value. Raises the underlying exception if the Promise finished with one, `PromiseNotDoneError` if it is not done yet, or `asyncio.CancelledError` if it was cancelled. | +| `promise.exception()` | The exception that the Promise finished with, or `None`. Same readiness/cancellation rules as `result()`. | +| `promise.intermediate_promise()` | The intermediate `Promise` produced by the first unpacking step, or `None` if the awaitable's result was already a non-Promise value. Raises `PromiseNotUnpackedError` if the Promise has not yet been unpacked once. | +| `promise.cancel(msg=None)` | Request cancellation of the Promise. Mirrors `asyncio.Future.cancel()` / `asyncio.Task.cancel()`: the return value reports whether cancellation was *requested* — the Promise's terminal cancelled state is reached only once the `CancelledError` actually propagates through the underlying unpacking task(s). | | `promise.loop` | The event loop this Promise is bound to (inherited from `PromisingContext`). | ### `wrap_awaitable` diff --git a/promising/errors.py b/promising/errors.py index 20515e441..bd5feddd7 100644 --- a/promising/errors.py +++ b/promising/errors.py @@ -116,7 +116,7 @@ def install_promising_tracebacks() -> bool: successful installation are captured and used as a fallback if the promising renderer itself raises. - ``Promise._unpack_once_from_loop`` calls this function automatically + ``Promise._unpack_once`` calls this function automatically the first time a Promise runs, so applications rarely need to invoke it directly. It is exposed in the public API for cases where you want to enable promising tracebacks before any Promise has executed (for diff --git a/promising/logging_utils.py b/promising/logging_utils.py index d22015fed..dbae38fdc 100644 --- a/promising/logging_utils.py +++ b/promising/logging_utils.py @@ -127,16 +127,16 @@ def __init__(self, *, logger: logging.Logger | None = None, level: int) -> None: self.level = level def log_single_unpacking_scheduling(self, *, promise: "Promise") -> None: - self._log("ATTEMPTING TO SCHEDULE UNPACK_ONCE_FROM_LOOP", promise=promise) + self._log("ATTEMPTING TO SCHEDULE UNPACK_ONCE", promise=promise) def log_single_unpacking_scheduled(self, *, promise: "Promise") -> None: - self._log("SCHEDULED UNPACK_ONCE_FROM_LOOP", promise=promise) + self._log("SCHEDULED UNPACK_ONCE", promise=promise) def log_single_unpacking_started(self, *, promise: "Promise") -> None: - self._log("STARTED UNPACK_ONCE_FROM_LOOP", promise=promise) + self._log("STARTED UNPACK_ONCE", promise=promise) def log_single_unpacking_finished(self, *, promise: "Promise") -> None: - self._log("FINISHED UNPACK_ONCE_FROM_LOOP", promise=promise) + self._log("FINISHED UNPACK_ONCE", promise=promise) def log_single_unpacking_result(self, *, promise: "Promise", result: Any) -> None: if not self.logger.isEnabledFor(self.level): @@ -147,7 +147,7 @@ def log_single_unpacking_result(self, *, promise: "Promise", result: Any) -> Non kind = "Intermediate (Promise)" if isinstance(result, Promise) else "Final (non-Promise)" log_message = "\n".join( [ - f"UNPACK_ONCE_FROM_LOOP Result: {kind}", + f"UNPACK_ONCE Result: {kind}", f" promise: {promise}", f" result type: {type(result).__name__}", ] @@ -163,7 +163,7 @@ def log_unwrap_step(self, *, promise: "Promise", depth: int, result: Any) -> Non kind = "Promise (continue unwrap)" if isinstance(result, Promise) else "Non-Promise (stop)" log_message = "\n".join( [ - f"FULLY_UNPACK_FROM_LOOP Unwrap Step depth={depth}: {kind}", + f"UNPACK_FULLY Unwrap Step depth={depth}: {kind}", f" promise: {promise}", f" result type: {type(result).__name__}", ] @@ -184,16 +184,16 @@ def log_unpacking_exception(self, *, promise: "Promise", stage: str, exc: BaseEx self.logger.log(self.level, f"\n{log_message}\n") def log_full_unpacking_scheduling(self, *, promise: "Promise") -> None: - self._log("ATTEMPTING TO SCHEDULE FULLY_UNPACK_FROM_LOOP", promise=promise) + self._log("ATTEMPTING TO SCHEDULE UNPACK_FULLY", promise=promise) def log_full_unpacking_scheduled(self, *, promise: "Promise") -> None: - self._log("SCHEDULED FULLY_UNPACK_FROM_LOOP", promise=promise) + self._log("SCHEDULED UNPACK_FULLY", promise=promise) def log_full_unpacking_started(self, *, promise: "Promise") -> None: - self._log("STARTED FULLY_UNPACK_FROM_LOOP", promise=promise) + self._log("STARTED UNPACK_FULLY", promise=promise) def log_full_unpacking_finished(self, *, promise: "Promise") -> None: - self._log("FINISHED FULLY_UNPACK_FROM_LOOP", promise=promise) + self._log("FINISHED UNPACK_FULLY", promise=promise) def _log(self, headline: str, *, promise: "Promise") -> None: if not self.logger.isEnabledFor(self.level): diff --git a/promising/promise.py b/promising/promise.py index 83d6093e3..2e7fa89f1 100644 --- a/promising/promise.py +++ b/promising/promise.py @@ -108,12 +108,11 @@ class Promise(PromisingContext, Generic[T_co]): Promise implements: - Asynchronous computation backed by an awaitable - Result/exception caching, with both async (``await``, ``unpack_once``) - and thread-safe sync (``sync``, ``unpack_once_sync``) consumption + and sync (``sync``, ``unpack_once_sync``) consumption - A two-step unpacking model: a single unpacking step that produces an intermediate Promise (if the awaitable returned one), and a full unpacking that recursively chases nested Promises down to a concrete value - - Cancellation that is safe to invoke from any thread - Construction-time stack capture (``frame_summary_tuple``) consumed by the ``sys.excepthook`` / ``threading.excepthook`` overrides installed via ``install_promising_tracebacks()`` to render @@ -278,22 +277,15 @@ def __init__( self._single_unpacking_task: Task[T_co | Promise[Any]] | None = None if self._awaitable is None: - # No outside code has any reference to this Promise yet, so we can - # set the result/exception directly, no matter which thread the - # constructor is currently running in if prefilled_result is not UNCHANGED: - self._set_result_from_loop(prefilled_result) + self._set_result(prefilled_result) else: - self._set_exception_from_loop(prefilled_exception) + self._set_exception(prefilled_exception) if self._start_soon and self._awaitable is not None: - # We don't know which thread the Promise is created in, so we - # use the event loop's `call_soon_threadsafe` to "stay on the - # safe side" - self.loop.call_soon_threadsafe(self._ensure_from_loop_full_unpacking_scheduled_wrapper) + self._ensure_full_unpacking_scheduled() - # TODO Activate the threading lock ? - self._register_with_parent_thread_unsafe() + self._register_with_parent() @classmethod def get_active_promise(cls, *, raise_if_none: bool = True) -> "Promise[Any] | None": @@ -326,7 +318,7 @@ def __await__(self) -> Generator[Any, None, T_co]: Await the Promise, fully unpacking all nested Promises. If the Promise hasn't started yet, starts execution via - ``_fully_unpack_from_loop()``. If already started via start_soon, + ``_unpack_fully()``. If already started via start_soon, waits for the existing task to complete. Once the Promise resolves, recursively awaits the result as long as it is itself a Promise, returning the final non-Promise value. @@ -337,12 +329,10 @@ def __await__(self) -> Generator[Any, None, T_co]: Returns: The fully unpacked result of the Promise (no remaining nested Promises). - - NOTE: This method can only be used from the event loop of the Promise. """ self._assert_awaiting_on_correct_event_loop() - self._ensure_from_loop_full_unpacking_scheduled() + self._ensure_full_unpacking_scheduled() if self._full_unpacking_task is not None: yield from self._full_unpacking_task @@ -371,9 +361,6 @@ def sync(self, *, timeout: float | None = None) -> T_co: SyncUsageError: If called from the same thread as the event loop, which would deadlock. TimeoutError: If timeout expires before completion. - - NOTE: This method is thread-safe, but it is unavailable from the event - loop of the Promise to avoid a deadlock. """ self._assert_no_sync_usage_deadlock() @@ -400,12 +387,10 @@ async def unpack_once(self) -> "T_co | Promise[Any]": Raises: EventLoopMismatchError: If awaited from a different event loop than the one this Promise belongs to. - - NOTE: This method can only be used from the event loop of the Promise. """ self._assert_awaiting_on_correct_event_loop() - self._ensure_from_loop_single_unpacking_scheduled() + self._ensure_single_unpacking_scheduled() if self._single_unpacking_task is not None: await self._single_unpacking_task @@ -440,9 +425,6 @@ def unpack_once_sync(self, *, timeout: float | None = None) -> "T_co | Promise[A SyncUsageError: If called from the same thread as the event loop, which would deadlock. TimeoutError: If timeout expires before completion. - - NOTE: This method is thread-safe, but it is unavailable from the event - loop of the Promise to avoid a deadlock. """ self._assert_no_sync_usage_deadlock() @@ -468,32 +450,6 @@ def done(self) -> bool: Returns: Whether this Promise is "done". - - NOTE: This method is thread-safe, including from the event loop of the - Promise. - - Thread-safety contract for ``Promise`` state-reading methods (this - method and the ones below referencing it): - - The Promise state machine is monotonic — once advanced past - ``_PENDING`` (to ``_UNPACKED_ONCE``, ``_FINISHED``, or one of the - ``_CANCELLED_XX`` states), the state never moves backwards. The - writers (``_set_intermediate_promise_from_loop`` / - ``_set_result_from_loop`` / ``_set_exception_from_loop``) write the - corresponding attribute (``_intermediate_promise``, ``_result``, - ``_exception``) *before* advancing the state via ``_set_state``, so a - reader that observes a state past ``_PENDING`` is guaranteed to also - observe the matching attribute. - - This relies on single-attribute reads and writes being atomic across - threads — which holds under CPython's reference (GIL-backed) - interpreter. Under a free-threaded CPython build the GIL no longer - provides that guarantee, and the reader/writer pair would need - explicit synchronization (e.g. a lock or memory fence) to remain - correct. Promising does not currently target free-threaded - interpreters. - # TODO Future-proof it ? - # https://github.com/teremterem/Promising/pull/102#discussion_r3197680342 """ state = self._state return state in (_FINISHED, _CANCELLED_BEFORE_UNPACKED_ONCE, _CANCELLED_AFTER_UNPACKED_ONCE) @@ -504,9 +460,6 @@ def unpacked_once(self) -> bool: either an intermediate Promise (which means a further unpacking step is still pending) or a final concrete value (in which case the Promise is also ``done()``). - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ state = self._state return state in (_FINISHED, _UNPACKED_ONCE, _CANCELLED_AFTER_UNPACKED_ONCE) @@ -516,9 +469,6 @@ def unpacked_once_or_done(self) -> bool: Convenience predicate: True if the Promise is at least one-level unpacked, fully done, or cancelled. Used internally as the readiness check for one-level (non-recursive) consumers. - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ state = self._state return state in (_FINISHED, _CANCELLED_BEFORE_UNPACKED_ONCE, _UNPACKED_ONCE, _CANCELLED_AFTER_UNPACKED_ONCE) @@ -527,9 +477,6 @@ def cancelled(self) -> bool: """ Whether the Promise has been cancelled (either before or after the first unpacking step). - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ state = self._state return state in (_CANCELLED_BEFORE_UNPACKED_ONCE, _CANCELLED_AFTER_UNPACKED_ONCE) @@ -543,9 +490,6 @@ def result(self) -> T_co: asyncio.CancelledError: If the Promise was cancelled. BaseException: Re-raises whatever exception the Promise finished with (if any). - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ self._assert_done() @@ -553,9 +497,6 @@ def result(self) -> T_co: raise self._exception if self._result is UNCHANGED: - # Should not happen: _assert_done() above guarantees a terminal - # state, and the only way to reach _FINISHED without an - # exception is via _set_result_from_loop (which sets _result). raise RuntimeError( f"Promise result is UNCHANGED even though the promise is done and there is no exception: {self!r}" ) @@ -574,9 +515,6 @@ def intermediate_promise(self) -> "Promise[Any] | None": BaseException: Re-raises the underlying exception if the first unpacking step itself failed before producing an intermediate Promise. - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ if not self.unpacked_once_or_done(): raise PromiseNotUnpackedError(f"Promise is not unpacked even once yet: {self!r}") @@ -602,9 +540,6 @@ def exception(self) -> BaseException | None: Raises: PromiseNotDoneError: If the Promise is not done yet. asyncio.CancelledError: If the Promise was cancelled. - - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. """ self._assert_done() @@ -623,106 +558,70 @@ def cancel(self, msg: str | None = None) -> bool: return value reports whether cancellation was *requested* — the Promise's terminal cancelled state is reached only once the ``CancelledError`` actually propagates through the underlying - unpacking task and is stored via ``_set_exception_from_loop``. Until + unpacking task and is stored via ``_set_exception``. Until then, ``cancelled()`` may still return ``False``. For a Promise whose underlying task hasn't been scheduled yet (e.g. ``start_soon=False`` and never awaited), the cancellation is synthesized as a ``CancelledError`` stored directly via - ``_set_exception_from_loop``, with no task involvement — analogous to + ``_set_exception``, with no task involvement — analogous to ``Future.cancel()`` on a not-yet-running future. - When called from the Promise's own event loop thread the cancellation - is dispatched directly. When called from any other thread it is - scheduled onto the Promise's event loop via - ``call_soon_threadsafe`` and the call blocks only long enough for the - scheduled dispatch to finish (it does not wait for the cancellation - itself to land). - Returns: ``True`` if cancellation was requested for at least one underlying task, or synthesized for a not-yet-started Promise; ``False`` if the Promise was already done. - - NOTE: This method is thread-safe, including from the event loop of the - Promise. """ - if self.is_on_correct_running_loop(raise_thread_loop_not_running=False): - # We are on the event loop of the Promise, so we can cancel it - # directly - return self._cancel_from_loop(msg) + if self.done(): + return False - # We are on a different thread, so we need to use a thread-safe - # mechanism to cancel the Promise - self._assert_event_loop_running_for_sync() - future = concurrent.futures.Future() + cancellation_requested = False + if self._single_unpacking_task is not None and not self._single_unpacking_task.done(): + cancellation_requested |= self._single_unpacking_task.cancel(msg) + if self._full_unpacking_task is not None and not self._full_unpacking_task.done(): + cancellation_requested |= self._full_unpacking_task.cancel(msg) - def callback(): - try: - result = self._cancel_from_loop(msg) - except BaseException as exc: - future.set_exception(exc) - else: - future.set_result(result) + if cancellation_requested: + return True + + # No task is currently running cancellation through — synthesize the + # CancelledError and store it directly. Covers the + # `start_soon=False`/never-awaited case as well as the rare race + # where every task has finished but the Promise hasn't transitioned + # to a terminal state yet. + self._synthesize_cancellation(msg) - self.loop.call_soon_threadsafe(callback) - return future.result() + return self.cancelled() - def _ensure_from_loop_single_unpacking_scheduled(self) -> None: - """ - NOTE: This method can only be used from the event loop of the Promise. - """ + def _ensure_single_unpacking_scheduled(self) -> None: _unpacking_logger.log_single_unpacking_scheduling(promise=self) if self._single_unpacking_task is None and not self.unpacked_once_or_done(): self._single_unpacking_task = self.loop.create_task( - self._unpack_once_from_loop(), name=str(self) + "-SingleUnpackingTask" + self._unpack_once(), name=str(self) + "-SingleUnpackingTask" ) self._single_unpacking_task.add_done_callback(self._unpacking_task_done_callback) _unpacking_logger.log_single_unpacking_scheduled(promise=self) - def _ensure_from_loop_full_unpacking_scheduled(self) -> None: - """ - NOTE: This method can only be used from the event loop of the Promise. - """ + def _ensure_full_unpacking_scheduled(self) -> None: _unpacking_logger.log_full_unpacking_scheduling(promise=self) if self._full_unpacking_task is None and not self.done(): self._full_unpacking_task = self.loop.create_task( - self._fully_unpack_from_loop(), name=str(self) + "-FullUnpackingTask" + self._unpack_fully(), name=str(self) + "-FullUnpackingTask" ) self._full_unpacking_task.add_done_callback(self._unpacking_task_done_callback) _unpacking_logger.log_full_unpacking_scheduled(promise=self) - def _ensure_from_loop_full_unpacking_scheduled_wrapper(self) -> None: - """ - ``call_soon_threadsafe``-safe wrapper around - ``_ensure_from_loop_full_unpacking_scheduled``. - - Used by the ``start_soon=True`` path in ``__init__``, where scheduling - is deferred to the event loop via ``call_soon_threadsafe``. Any - exception raised from that callback would otherwise propagate to the - loop's default exception handler and leave the Promise stuck in a - non-terminal state. This wrapper instead routes the exception through - ``_force_internal_error_finish_from_loop`` so the Promise is settled - as an internal error. - - NOTE: This method can only be used from the event loop of the Promise. - """ - try: - self._ensure_from_loop_full_unpacking_scheduled() - except BaseException as exc: - self._force_internal_error_finish_from_loop(exc) - def _unpacking_task_done_callback(self, task: Task[Any]) -> None: """ Bridge the case where ``task.cancel()`` lands between ``create_task`` and the first ``__step``: ``CancelledError`` is thrown into a not-yet-started coroutine and propagates out without entering the ``try/except BaseException`` inside - ``_unpack_once_from_loop`` / ``_fully_unpack_from_loop``, leaving + ``_unpack_once`` / ``_unpack_fully``, leaving the Promise non-terminal even though the Task ended cancelled. """ # Early return if the task wasn't cancelled, or if the Promise (self) @@ -741,9 +640,9 @@ def _unpacking_task_done_callback(self, task: Task[Any]) -> None: if exc.args: msg = exc.args[0] - self._synthesize_cancellation_from_loop(msg) + self._synthesize_cancellation(msg) - async def _unpack_once_from_loop(self) -> None: + async def _unpack_once(self) -> None: """ Drive a single unpacking step on the event loop. @@ -751,24 +650,18 @@ async def _unpack_once_from_loop(self) -> None: promises created during this step are registered as its children), awaits the wrapped awaitable, and stores either an intermediate Promise or a final value/exception. The state machine is moved forward via - ``_set_intermediate_promise_from_loop`` / ``_set_result_from_loop`` / - ``_set_exception_from_loop``. + ``_set_intermediate_promise`` / ``_set_result`` / + ``_set_exception``. Backs ``unpack_once()`` (and the first leg of - ``_fully_unpack_from_loop``). - - NOTE: This method can only be used from the event loop of the Promise. + ``_unpack_fully``). """ try: _unpacking_logger.log_single_unpacking_started(promise=self) if self.unpacked_once_or_done(): - # Should not happen: this method is only scheduled by - # _ensure_from_loop_single_unpacking_scheduled, which guards - # on `not unpacked_once_or_done()`. raise RuntimeError( - f"An attempt was made to _unpack_once_from_loop a Promise " - f"that was already unpacked once or done: {self!r}" + f"An attempt was made to _unpack_once a Promise that was already unpacked once or done: {self!r}" ) # TODO [TRACES] Introduce some sort of `DEBUG` boolean flag (like in @@ -782,17 +675,17 @@ async def _unpack_once_from_loop(self) -> None: _unpacking_logger.log_single_unpacking_result(promise=self, result=result) except BaseException as exc: - _unpacking_logger.log_unpacking_exception(promise=self, stage="unpack_once_from_loop", exc=exc) - self._set_exception_from_loop(exc) + _unpacking_logger.log_unpacking_exception(promise=self, stage="_unpack_once", exc=exc) + self._set_exception(exc) else: if isinstance(result, Promise): - self._set_intermediate_promise_from_loop(result) + self._set_intermediate_promise(result) else: - self._set_result_from_loop(result) + self._set_result(result) _unpacking_logger.log_single_unpacking_finished(promise=self) - async def _fully_unpack_from_loop(self) -> None: + async def _unpack_fully(self) -> None: """ Drive the Promise to completion on the event loop, recursively unpacking nested Promises. @@ -801,21 +694,19 @@ async def _fully_unpack_from_loop(self) -> None: that produced an intermediate Promise, awaits it (and any further nested Promises) until a non-Promise value is reached, then stores that value as the final result. Any exception from the chain is - captured via ``_set_exception_from_loop``. + captured via ``_set_exception``. Backs ``__await__`` (and, indirectly, ``sync()``). - - NOTE: This method can only be used from the event loop of the Promise. """ try: _unpacking_logger.log_full_unpacking_started(promise=self) if self.done(): # When there are no more nested Promises to unpack, the Promise - # becomes done already after unpack_once_from_loop completes + # becomes done already after _unpack_once completes return - self._ensure_from_loop_single_unpacking_scheduled() + self._ensure_single_unpacking_scheduled() if self._single_unpacking_task is not None: await self._single_unpacking_task @@ -825,7 +716,7 @@ async def _fully_unpack_from_loop(self) -> None: result = self._intermediate_promise - # Note: cancelling this Promise does NOT propagate cancellation + # NOTE: Cancelling this Promise does NOT propagate cancellation # into the nested Promise being awaited below — asyncio's # task-cancellation lands on this task and unwinds upward; the # inner Promise's own task keeps running independently. @@ -843,27 +734,20 @@ async def _fully_unpack_from_loop(self) -> None: _unpacking_logger.log_unwrap_step(promise=self, depth=depth, result=result) except BaseException as exc: - _unpacking_logger.log_unpacking_exception(promise=self, stage="fully_unpack_from_loop", exc=exc) - self._set_exception_from_loop(exc) + _unpacking_logger.log_unpacking_exception(promise=self, stage="_unpack_fully", exc=exc) + self._set_exception(exc) else: - self._set_result_from_loop(result) + self._set_result(result) _unpacking_logger.log_full_unpacking_finished(promise=self) - def _set_intermediate_promise_from_loop(self, promise: "Promise[Any]") -> None: + def _set_intermediate_promise(self, promise: "Promise[Any]") -> None: """ Record the intermediate Promise returned by a single unpacking step. No-op if already unpacked once or done. - - NOTE: This method can only be used from the event loop of the Promise. """ try: if self._state is not _PENDING: - # Should not happen: only called from _unpack_once_from_loop - # when the awaitable resolved to a Promise. The only steps - # between the awaitable resolving and this call are the - # synchronous `with self:` exit and a logger call — - # neither yields, so state stays _PENDING. raise RuntimeError( f"Cannot set intermediate_promise on a promise because of the promise's current state: {self!r}" ) @@ -871,29 +755,23 @@ def _set_intermediate_promise_from_loop(self, promise: "Promise[Any]") -> None: self._set_state(_UNPACKED_ONCE) except BaseException as internal_error: - self._force_internal_error_finish_from_loop(internal_error) + self._force_internal_error_finish(internal_error) - def _set_result_from_loop(self, result: T_co) -> None: + def _set_result(self, result: T_co) -> None: """ Store the fully unpacked result. No-op if the Promise is already done (finished or cancelled). - - NOTE: This method can only be used from the event loop of the Promise. """ try: if self._state not in (_PENDING, _UNPACKED_ONCE): - # Should not happen: all callsites reach this with state in - # (_PENDING, _UNPACKED_ONCE) — prefill in __init__, the - # non-Promise branch of _unpack_once_from_loop, or the end - # of _fully_unpack_from_loop's unwrap chain. raise RuntimeError(f"Cannot set result on a promise because of its current state: {self!r}") self._result = result self._set_state(_FINISHED) except BaseException as internal_error: - self._force_internal_error_finish_from_loop(internal_error) + self._force_internal_error_finish(internal_error) - def _set_exception_from_loop(self, exception: BaseException) -> None: + def _set_exception(self, exception: BaseException) -> None: """ Store the exception and move the Promise into a terminal state. The cancelled state is an *effect* of storing a ``CancelledError``, not a @@ -905,8 +783,6 @@ def _set_exception_from_loop(self, exception: BaseException) -> None: A ``CancelledError`` arriving on an already-terminal Promise is silently dropped. Any other exception arriving in that state is treated as a framework bug and raises ``RuntimeError``. - - NOTE: This method can only be used from the event loop of the Promise. """ try: if self._state is _PENDING: @@ -924,15 +800,10 @@ def _set_exception_from_loop(self, exception: BaseException) -> None: # that's already cancelled. Drop it; the original wins. return else: - # Should not happen: any non-CancelledError exception - # arriving on a non-_PENDING / non-_UNPACKED_ONCE Promise - # implies the framework's state machine is broken - # (legitimate user-triggered cancellation races are - # caught by the elif above). raise RuntimeError(f"Cannot set exception on a promise because of its current state: {self!r}") # The context was probably already attached to the exception by the - # ``with self:`` block of ``_unpack_once_from_loop``, but it is + # ``with self:`` block of ``_unpack_once``, but it is # also possible that the exception occurred outside the # ``with self:`` block (e.g. a framework bug), so lets try to # attach it here too. @@ -955,9 +826,9 @@ def _set_exception_from_loop(self, exception: BaseException) -> None: # Contemplate on this GitHub issue along the way: # https://github.com/teremterem/Promising/issues/105 _logger.debug("Failed to chain original exception onto internal_error", exc_info=True) - self._force_internal_error_finish_from_loop(internal_error) + self._force_internal_error_finish(internal_error) - def _force_internal_error_finish_from_loop(self, error: BaseException) -> None: + def _force_internal_error_finish(self, error: BaseException) -> None: """ Last-resort recovery path. Force the Promise into _FINISHED with the given error, bypassing state validation. Each step is wrapped @@ -969,8 +840,6 @@ def _force_internal_error_finish_from_loop(self, error: BaseException) -> None: because parent unregistration raised. Treating such failures as bugs in the Promise class itself, this method prioritizes reaching a terminal state over surfacing further errors. - - NOTE: This method can only be used from the event loop of the Promise. """ try: _logger.debug("Force-finishing Promise %r with internal error", self, exc_info=error) @@ -990,66 +859,26 @@ def _force_internal_error_finish_from_loop(self, error: BaseException) -> None: raise def _assert_done(self) -> None: - """ - NOTE: This method is thread-safe, including from the event loop of the - Promise — see ``done()`` for the thread-safety contract. - """ if not self.done(): raise PromiseNotDoneError(f"Promise is not done: {self!r}") - def _cancel_from_loop(self, msg: str | None = None) -> bool: - """ - Request cancellation of the underlying unpacking task(s) — or, when - no task has been scheduled yet, synthesize the cancellation directly - (see ``_synthesize_cancellation_from_loop``). - - The state machine is *not* moved here. Instead, the ``CancelledError`` - propagates through ``_unpack_once_from_loop`` / - ``_fully_unpack_from_loop`` (``except BaseException`` catches it) and - is stored via ``_set_exception_from_loop``. - - NOTE: This method can only be used from the event loop of the Promise. - """ - if self.done(): - return False - - cancellation_requested = False - if self._single_unpacking_task is not None and not self._single_unpacking_task.done(): - cancellation_requested |= self._single_unpacking_task.cancel(msg) - if self._full_unpacking_task is not None and not self._full_unpacking_task.done(): - cancellation_requested |= self._full_unpacking_task.cancel(msg) - - if cancellation_requested: - return True - - # No task is currently running cancellation through — synthesize the - # CancelledError and store it directly. Covers the - # `start_soon=False`/never-awaited case as well as the rare race - # where every task has finished but the Promise hasn't transitioned - # to a terminal state yet. - self._synthesize_cancellation_from_loop(msg) - - return self.cancelled() - - def _synthesize_cancellation_from_loop(self, msg: str | None = None) -> None: + def _synthesize_cancellation(self, msg: str | None = None) -> None: """ Drive the Promise into a cancelled terminal state without relying on a running unpacking task to surface the ``CancelledError``. Mirrors ``Future.cancel()`` on a not-yet-running future. - Shared by ``_cancel_from_loop`` (synthesize path, no task ever + Shared by ``cancel`` (synthesize path, no task ever scheduled) and ``_unpacking_task_done_callback`` (task cancelled between ``create_task`` and its first ``__step``, so the body's ``except BaseException`` never saw the ``CancelledError``). - - NOTE: This method can only be used from the event loop of the Promise. """ - # `_unpack_once_from_loop` would normally close the context via + # `_unpack_once` would normally close the context via # `with self:`. Without this, `_context_closed` stays False and the # child never unregisters from its parent. - self.close_context_threadsafe() + self.close_context() - self._set_exception_from_loop(asyncio.CancelledError(msg) if msg is not None else asyncio.CancelledError()) + self._set_exception(asyncio.CancelledError(msg) if msg is not None else asyncio.CancelledError()) # Close the wrapped awaitable so a never-driven coroutine doesn't # trigger a "coroutine was never awaited" warning at GC time — @@ -1072,9 +901,9 @@ def _set_state(self, new_state: Sentinel) -> None: self._state = new_state # Force-close the context just in case (it was most likely closed by # the `with` block already, but it might also have been - # `_force_internal_error_finish_from_loop`) and unregister from parent + # `_force_internal_error_finish`) and unregister from parent # "if time": - self.close_context_threadsafe() + self.close_context() def _resolve_start_soon(self, start_soon: bool | None | Sentinel) -> bool: """ diff --git a/promising/promising_context.py b/promising/promising_context.py index ebaf081b3..7a6cc567b 100644 --- a/promising/promising_context.py +++ b/promising/promising_context.py @@ -4,7 +4,6 @@ import functools import inspect import logging -import threading from asyncio import AbstractEventLoop from contextvars import ContextVar from types import TracebackType @@ -430,23 +429,9 @@ def __init__( self._context_closed = close_context_immediately self._unsettled_children = set[PromisingContext]() - # TODO To simplify safe-guarding against race conditions, do EVERYTHING - # on the event loop, instead of using threading locks or any other - # kinds of synchronization techniques (except for the ones that are - # designed for operation within the same async event loop). For any - # public method that can be invoked both, from the event loop and from - # a different thread, just check what thread we are in and either do - # the operation directly or schedule it with - # `asyncio.run_coroutine_threadsafe` and read the concurrent future - # result instead. Do all this after you take care of the following - # issue: - # https://github.com/teremterem/Promising/issues/104 - self._unsettled_children_lock = threading.Lock() if register_with_parent: - # No other code has a reference to this PromisingContext yet, so we - # can just register it with the parent in a thread-unsafe manner - self._register_with_parent_thread_unsafe() + self._register_with_parent() @property def loop(self) -> AbstractEventLoop: @@ -479,7 +464,7 @@ def closed(self) -> bool: Whether this context is closed. A ``PromisingContext`` is "open" from the moment it is constructed - until ``close_context_threadsafe()`` runs (which happens automatically + until ``close_context()`` runs (which happens automatically when the ``with`` block exits). Closed contexts are still kept around in their parent's ``_unsettled_children`` until their own unsettled descendants drain (they do not accept new children anymore). @@ -732,8 +717,7 @@ def collect_unsettled_children( Returns: Set of child PromisingContexts matching the filter criteria. """ - with self._unsettled_children_lock: - children = list[PromisingContext](self._unsettled_children) + children = list[PromisingContext](self._unsettled_children) if awaitables_only: result = {child for child in children if inspect.isawaitable(child)} @@ -797,25 +781,24 @@ def __exit__( raise exc from exc_value finally: - self.close_context_threadsafe() + self.close_context() return False # Let's not suppress any exceptions - def close_context_threadsafe(self) -> None: + def close_context(self) -> None: """ Mark this context as closed and unregister it from its parent if - no unsettled descendants remain. Safe to call from any thread. + no unsettled descendants remain. Called automatically by ``__exit__`` (so a normal ``with`` block always closes the context). For a ``Promise``, the context is - also entered and exited from inside ``_unpack_once_from_loop`` + also entered and exited from inside ``_unpack_once`` around the awaiting of the wrapped awaitable, so the close happens in lockstep with the unpacking step that produced its first result. After this runs, any further attempt to enter the context or to register children on it raises ``ContextAlreadyClosedError``. """ - with self._unsettled_children_lock: - self._context_closed = True + self._context_closed = True self._unregister_from_parent_if_time() def try_to_link_exception(self, exception: BaseException) -> None: @@ -868,18 +851,17 @@ def __repr__(self) -> str: namespace_prefix = "" if self.namespace is None else f"{self.namespace!r} " return f"<{namespace_prefix}{self.__class__.__name__} id={id(self)}>" - def _register_with_parent_thread_unsafe(self) -> None: - # It is thread-safe for the parent but is unsafe for the child itself + def _register_with_parent(self) -> None: if self._parent is not None and not self.done(): - self._parent._register_children_threadsafe(self) + self._parent._register_children(self) def _unregister_from_parent_if_time(self) -> None: if self.done() and self._parent is not None and not self._unsettled_children: _hierarchy_logger.log_unregistering_from_parent(parent=self._parent, child=self) - self._parent._unregister_children_threadsafe(self) + self._parent._unregister_children(self) - def _register_children_threadsafe(self, *children: "PromisingContext") -> None: + def _register_children(self, *children: "PromisingContext") -> None: for child in children: if not isinstance(child, PromisingContext): raise TypeError( @@ -887,21 +869,19 @@ def _register_children_threadsafe(self, *children: "PromisingContext") -> None: f"Context: {self!r}\nChild: {child!r}" ) - with self._unsettled_children_lock: - if self.closed(): - raise ContextAlreadyClosedError( - f"Cannot register children in a context that has already been closed.\n" - f"Context: {self!r}\nChildren: {children!r}" - ) - self._unsettled_children.update(children) + if self.closed(): + raise ContextAlreadyClosedError( + f"Cannot register children in a context that has already been closed.\n" + f"Context: {self!r}\nChildren: {children!r}" + ) + self._unsettled_children.update(children) - _hierarchy_logger.log_children_registered(parent=self, children=children) + _hierarchy_logger.log_children_registered(parent=self, children=children) - def _unregister_children_threadsafe(self, *children: "PromisingContext") -> None: - with self._unsettled_children_lock: - self._unsettled_children.difference_update(children) + def _unregister_children(self, *children: "PromisingContext") -> None: + self._unsettled_children.difference_update(children) - _hierarchy_logger.log_children_unregistered(parent=self, children=children) + _hierarchy_logger.log_children_unregistered(parent=self, children=children) self._unregister_from_parent_if_time() diff --git a/tests/hierarchy/test_unregister_from_parent.py b/tests/hierarchy/test_unregister_from_parent.py index e952478dd..544188e2b 100644 --- a/tests/hierarchy/test_unregister_from_parent.py +++ b/tests/hierarchy/test_unregister_from_parent.py @@ -61,7 +61,7 @@ async def test_unregisters_from_parent_when_last_child_is_unregistered() -> None assert parent in grandparent._unsettled_children # Removing the last child triggers deferred unregistration - parent._unregister_children_threadsafe(grandchild) + parent._unregister_children(grandchild) assert grandchild not in parent._unsettled_children assert parent not in grandparent._unsettled_children @@ -81,11 +81,11 @@ async def test_does_not_unregister_while_other_children_remain() -> None: assert parent in grandparent._unsettled_children # Removing one child — parent should stay registered - parent._unregister_children_threadsafe(child_a) + parent._unregister_children(child_a) assert parent in grandparent._unsettled_children # Removing the last child triggers deferred unregistration - parent._unregister_children_threadsafe(child_b) + parent._unregister_children(child_b) assert parent not in grandparent._unsettled_children diff --git a/tests/hierarchy/test_unregister_on_cancellation.py b/tests/hierarchy/test_unregister_on_cancellation.py index fea518405..96e6922c7 100644 --- a/tests/hierarchy/test_unregister_on_cancellation.py +++ b/tests/hierarchy/test_unregister_on_cancellation.py @@ -19,8 +19,8 @@ async def test_cancel_pending_promise_unregisters_from_parent() -> None: """ Cancelling a never-started Promise (no underlying task — synthesize - path in ``_cancel_from_loop``) must close its context so that the - Promise unregisters from its parent. Without ``close_context_threadsafe()`` + path in ``cancel``) must close its context so that the + Promise unregisters from its parent. Without ``close_context()`` on that path, ``_context_closed`` stays False and the child is leaked in the parent's ``_unsettled_children``. """ @@ -43,10 +43,8 @@ async def coro() -> str: async def test_cancel_pending_promise_from_other_thread_unregisters_from_parent() -> None: """ - Synthesize path reached via the thread-safe dispatch: cancel() is - called from a non-loop thread, which schedules ``_cancel_from_loop`` - on the loop. The unregistration must still happen, just on the loop - thread. + cancel() is called from a non-loop thread. The unregistration must still + happen. """ with promising.context() as parent: @@ -65,10 +63,6 @@ def cancel_in_thread() -> None: thread = threading.Thread(target=cancel_in_thread) thread.start() - # Yield so the threadsafe callback (and the thread blocked on its - # future) can run on this loop. Don't await the promise itself - # here — that would start an unpacking task and race with the - # synthesize path we're trying to exercise. while not cancel_result: await asyncio.sleep(0.1) thread.join(timeout=2) @@ -83,7 +77,7 @@ async def test_coroutine_raising_cancelled_error_unregisters_from_parent() -> No """ When the coroutine itself raises ``CancelledError`` (no external cancel() call), the Promise still goes through the standard - ``_unpack_once_from_loop`` path whose ``with self:`` closes the + ``_unpack_once`` path whose ``with self:`` closes the context. Verify the cancelled Promise unregisters from its parent. """ with promising.context() as parent: @@ -110,18 +104,19 @@ async def test_cancel_full_unpacking_task_before_first_step_transitions_promise( ``create_task`` and its first ``__step`` throws ``CancelledError`` into a not-yet-started coroutine — Python propagates that exception out without entering the body's ``try/except BaseException``, so the - coroutine never calls ``_set_exception_from_loop`` itself. Without + coroutine never calls ``_set_exception`` itself. Without the done-callback bridge, the Task ends cancelled while the Promise stays ``_PENDING`` and leaks in its parent's ``_unsettled_children``. """ + # TODO [TESTS] This test is suspicious - we need a way to know that it + # actually tests what it's supposed to test and doesn't pass for unrelated + # reasons with promising.context() as parent: async def coro() -> str: return "unreachable" promise = Promise(coro(), start_soon=True) - # Let the threadsafe scheduling callback create the task without - # giving the task itself a chance to take its first step. await asyncio.sleep(0) full_task = promise._full_unpacking_task assert full_task is not None @@ -146,6 +141,9 @@ async def test_cancel_single_unpacking_task_before_first_step_transitions_promis ``_single_unpacking_task`` created via ``unpack_once()``. Verifies the done-callback is wired on both task creation sites. """ + # TODO [TESTS] This test is suspicious - we need a way to know that it + # actually tests what it's supposed to test and doesn't pass for unrelated + # reasons with promising.context() as parent: async def coro() -> str: diff --git a/tests/resolution/test_promise_cancellation.py b/tests/resolution/test_promise_cancellation.py index 3b217c8aa..cf4babaaa 100644 --- a/tests/resolution/test_promise_cancellation.py +++ b/tests/resolution/test_promise_cancellation.py @@ -1,6 +1,6 @@ """ Tests for Promise.cancel() — modeled on asyncio.Future / asyncio.Task -semantics. Cancellation flows through ``_set_exception_from_loop``: the +semantics. Cancellation flows through ``_set_exception``: the ``CancelledError`` is stored first, the ``_CANCELLED_*`` state transition is its effect. """ @@ -173,7 +173,7 @@ async def coro() -> str: async def test_result_raises_not_done_before_cancel_propagates() -> None: """ ``cancel()`` on a running task only *requests* cancellation; until the - CancelledError lands and is stored via ``_set_exception_from_loop``, ``done()`` + CancelledError lands and is stored via ``_set_exception``, ``done()`` stays False and ``result()`` raises ``PromiseNotDoneError``. """ coro_started = asyncio.Event() @@ -200,14 +200,13 @@ async def coro() -> str: assert promise.done() is True -# ── Thread-safe cancel() ───────────────────────────────────────── +# ── cancel() from another thread ───────────────────────────────── async def test_cancel_from_another_thread() -> None: """ cancel() called from a thread other than the event loop's thread - dispatches via call_soon_threadsafe and reports True once the - cancellation request was scheduled. + reports True once the cancellation request was scheduled. """ coro_started = asyncio.Event() coro_finished = False diff --git a/tests/resolution/test_promising_function_run.py b/tests/resolution/test_promising_function_run.py index 5e145ecff..9ea95ad26 100644 --- a/tests/resolution/test_promising_function_run.py +++ b/tests/resolution/test_promising_function_run.py @@ -56,8 +56,7 @@ def test_promising_function_run_with_child_promise( ) -> None: """ Same as above but also creates a child Promise inside the function, which - exercises _call_soon_threadsafe and verifies the event loop is correctly - resolved at runtime. + verifies the event loop is correctly resolved at runtime. Runs in a separate thread to avoid interfering with the pytest-asyncio event loop. diff --git a/uv.lock b/uv.lock index 61cbff85c..88fa94182 100644 --- a/uv.lock +++ b/uv.lock @@ -408,14 +408,14 @@ wheels = [ [[package]] name = "click" -version = "8.3.3" +version = "8.4.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "colorama", marker = "sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/bb/63/f9e1ea081ce35720d8b92acde70daaedace594dc93b693c869e0d5910718/click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2", size = 328061, upload-time = "2026-04-22T15:11:27.506Z" } +sdist = { url = "https://files.pythonhosted.org/packages/23/e4/796662cd90cf80e3a363c99db2b88e0e394b988a575f60a17e16440cd011/click-8.4.0.tar.gz", hash = "sha256:638f1338fe1235c8f4e008e4a8a254fb5c5fbdcbb40ece3c9142ebb78e792973", size = 350843, upload-time = "2026-05-17T00:47:58.425Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ae/44/c1221527f6a71a01ec6fbad7fa78f1d50dfa02217385cf0fa3eec7087d59/click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613", size = 110502, upload-time = "2026-04-22T15:11:25.044Z" }, + { url = "https://files.pythonhosted.org/packages/ee/ae/8e92f8058baf87f6c7d86ee7e457668690195cc77efedb8d3797a06e3940/click-8.4.0-py3-none-any.whl", hash = "sha256:40c50b7c6c6adac2823d411041ec84f3f103f1b280d5e9ce0d7f998995832f81", size = 116147, upload-time = "2026-05-17T00:47:56.842Z" }, ] [[package]] @@ -581,11 +581,11 @@ wheels = [ [[package]] name = "decorator" -version = "5.2.1" +version = "5.3.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/43/fa/6d96a0978d19e17b68d634497769987b16c8f4cd0a7a05048bec693caa6b/decorator-5.2.1.tar.gz", hash = "sha256:65f266143752f734b0a7cc83c46f4618af75b8c5911b00ccb61d0ac9b6da0360", size = 56711, upload-time = "2025-02-24T04:41:34.073Z" } +sdist = { url = "https://files.pythonhosted.org/packages/5c/50/a39dd7ab407e93978dfa07d109b7d633e37958c89f30cbcec061b77b3ebc/decorator-5.3.0.tar.gz", hash = "sha256:95fda3122972c847cf0ff7e0ce2829bf25136f2526b627b3da85b60ca5f485c0", size = 58431, upload-time = "2026-05-17T06:59:57.258Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4e/8c/f3147f5c4b73e7550fe5f9352eaa956ae838d5c51eb58e7a25b9f3e2643b/decorator-5.2.1-py3-none-any.whl", hash = "sha256:d316bb415a2d9e2d2b3abcc4084c6502fc09240e292cd76a76afc106a1c8e04a", size = 9190, upload-time = "2025-02-24T04:41:32.565Z" }, + { url = "https://files.pythonhosted.org/packages/d5/6f/f8d0bba4dc2a69817d74f640d504650241ebf2f9f7263426f1b953b344d4/decorator-5.3.0-py3-none-any.whl", hash = "sha256:f8c2d71ede92f073144ddd7f3e9fbbc3bd0f2f29522c9d75ee648d66553834f4", size = 11104, upload-time = "2026-05-17T06:59:54.676Z" }, ] [[package]] @@ -1224,7 +1224,7 @@ wheels = [ [[package]] name = "litellm" -version = "1.84.0" +version = "1.85.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, @@ -1240,9 +1240,9 @@ dependencies = [ { name = "tiktoken" }, { name = "tokenizers" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/dd/e9/8941b7e72a187000561d932c0f2f2ed2b0fd080dfc33ba6e05961d45ca7d/litellm-1.84.0.tar.gz", hash = "sha256:b8ad0cbea11a5941b18d5af973017a340abd3d3ab41cb86e5401b970626d71a6", size = 15103206, upload-time = "2026-05-14T05:45:53.017Z" } +sdist = { url = "https://files.pythonhosted.org/packages/3b/d5/3c9b560db2ffa9e498655d0dfd74f408bc5b32ede858b5731c2a5fa4c752/litellm-1.85.0.tar.gz", hash = "sha256:babdd569809af913d08a08a7eb55df1ed3e6a3960ee365c6cef4ad031c9bc72a", size = 15344387, upload-time = "2026-05-17T01:59:15.97Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/01/a6/77fa1bbf5e42eb596b06318b3f7e6af5d0f44028046d1d598c6a595d028f/litellm-1.84.0-py3-none-any.whl", hash = "sha256:2a58d6041e6aa27d1a28dc8d8828ab500fef1a00ef74ca65e60899035010c2f2", size = 16735062, upload-time = "2026-05-14T05:45:49.927Z" }, + { url = "https://files.pythonhosted.org/packages/1c/38/e6a4abb062e039d18d59538cc4e6fc370c2c10cd2bff4a2e546acb69dcb9/litellm-1.85.0-py3-none-any.whl", hash = "sha256:2bb449153610691faffd76f5b94a8c29e4b66fc5394156ebf54fd4fe92759b1a", size = 16978229, upload-time = "2026-05-17T01:59:11.902Z" }, ] [[package]]