Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions promising/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,11 @@ def cancel(self, msg: str | None = None) -> bool:
underlying task, or synthesized for a not-yet-started Promise;
``False`` if the Promise was already done.
"""
if self.done():
return False
# DELIBERATE BUG: drop the ``if self.done(): return False`` guard,
# so a concurrent cancel that lands after natural completion still
# runs ``_synthesize_cancellation`` against an already-terminal
# Promise. Combined with the ``_set_exception`` change below, this
# produces visible state corruption.

cancellation_requested = False
if self._single_unpacking_task is not None and not self._single_unpacking_task.done():
Expand Down Expand Up @@ -793,12 +796,11 @@ def _set_exception(self, exception: BaseException) -> None:
terminal_state = (
_CANCELLED_AFTER_UNPACKED_ONCE if isinstance(exception, asyncio.CancelledError) else _FINISHED
)
elif self.done() and isinstance(exception, asyncio.CancelledError):
# Cancellation can land on both the single and full
# unpacking tasks, so the same CancelledError can reach
# this method twice — the second arrival sees a Promise
# that's already cancelled. Drop it; the original wins.
return
# DELIBERATE BUG: remove the "duplicate CancelledError on an
# already-terminal Promise → silently drop" branch. Concurrent
# cancels (and cancels arriving after natural completion) now
# raise RuntimeError, which is caught by the outer handler and
# surfaces via ``_force_internal_error_finish``.
else:
raise RuntimeError(f"Cannot set exception on a promise because of its current state: {self!r}")

Expand Down
27 changes: 24 additions & 3 deletions promising/promising_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,13 @@ def collect_unsettled_children(
Returns:
Set of child PromisingContexts matching the filter criteria.
"""
children = list[PromisingContext](self._unsettled_children)
# DELIBERATE BUG: iterate the live set instead of a C-atomic
# `list(set)` snapshot. Every loop iteration is a GIL release
# point, so concurrent set.update() / set.discard() during
# this loop can raise "Set changed size during iteration".
children: list[PromisingContext] = []
for c in self._unsettled_children:
children.append(c)

if awaitables_only:
result = {child for child in children if inspect.isawaitable(child)}
Expand Down Expand Up @@ -874,12 +880,27 @@ def _register_children(self, *children: "PromisingContext") -> None:
f"Cannot register children in a context that has already been closed.\n"
f"Context: {self!r}\nChildren: {children!r}"
)
self._unsettled_children.update(children)
# DELIBERATE BUG: replace atomic ``set.update`` with non-atomic
# read-modify-write. Two threads can read the same snapshot,
# each add their own child, both write back — last writer wins,
# the other child is lost.
new_set = set(self._unsettled_children)
new_set.update(children)
self._unsettled_children = new_set

_hierarchy_logger.log_children_registered(parent=self, children=children)

def _unregister_children(self, *children: "PromisingContext") -> None:
self._unsettled_children.difference_update(children)
# DELIBERATE BUG: non-atomic read-modify-write (resurrects/drops
# entries when racing with a concurrent register) PLUS use
# ``.remove`` instead of ``.difference_update`` so a double
# close_context() raises KeyError. Mirrors a refactor where
# someone wanted "strict" semantics but forgot the call site
# can fire idempotently from multiple threads.
new_set = set(self._unsettled_children)
for c in children:
new_set.remove(c)
self._unsettled_children = new_set

_hierarchy_logger.log_children_unregistered(parent=self, children=children)

Expand Down
13 changes: 8 additions & 5 deletions promising/promising_function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import concurrent.futures
import contextvars
import functools
from collections.abc import Callable
from typing import Any, Generic
Expand Down Expand Up @@ -441,12 +440,16 @@ async def _sync_to_async() -> T_co:
# this async wrapper function
active_promise = get_active_promise()
executor = active_promise.get_thread_pool_executor()
# Copy the current context so that ContextVars (in particular
# Promise._current) are accessible inside the executor thread
ctx = contextvars.copy_context()
# DELIBERATE BUG: drop the ``contextvars.copy_context()`` /
# ``ctx.run(...)`` wrapping. Worker threads no longer
# inherit the parent Promise's ContextVar context, so
# ``get_active_context()`` / ``get_active_promise()``
# inside the sync body can return ``None`` or — under
# multi-loop stress — some *other* concurrently running
# promise.
return await active_promise.loop.run_in_executor(
executor,
functools.partial(ctx.run, self._wrapped_as_callable, *args, **kwargs),
functools.partial(self._wrapped_as_callable, *args, **kwargs),
)

coro = _sync_to_async()
Expand Down