Skip to content

Commit 1731c6d

Browse files
committed
refactor: drop futures_util dependency
1 parent d66be8b commit 1731c6d

File tree

4 files changed

+45
-29
lines changed

4 files changed

+45
-29
lines changed

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ unindent = { version = "0.2.1", optional = true }
3131
# support crate for multiple-pymethods feature
3232
inventory = { version = "0.3.0", optional = true }
3333

34-
# coroutine implementation
35-
futures-util = "0.3"
36-
3734
# crate integrations that can be added using the eponymous features
3835
anyhow = { version = "1.0", optional = true }
3936
chrono = { version = "0.4.25", default-features = false, optional = true }

src/coroutine.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
11
//! Python coroutine implementation, used notably when wrapping `async fn`
22
//! with `#[pyfunction]`/`#[pymethods]`.
3+
use std::task::Waker;
34
use std::{
4-
any::Any,
55
future::Future,
66
panic,
77
pin::Pin,
88
sync::Arc,
99
task::{Context, Poll},
1010
};
1111

12-
use futures_util::FutureExt;
1312
use pyo3_macros::{pyclass, pymethods};
1413

1514
use crate::{
1615
coroutine::waker::AsyncioWaker,
1716
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
18-
panic::PanicException,
1917
pyclass::IterNextOutput,
2018
types::{PyIterator, PyString},
2119
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
@@ -25,19 +23,18 @@ pub(crate) mod cancel;
2523
mod waker;
2624

2725
use crate::coroutine::cancel::ThrowCallback;
26+
use crate::panic::PanicException;
2827
pub use cancel::CancelHandle;
2928

3029
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";
3130

32-
type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>;
33-
3431
/// Python coroutine wrapping a [`Future`].
3532
#[pyclass(crate = "crate")]
3633
pub struct Coroutine {
3734
name: Option<Py<PyString>>,
3835
qualname_prefix: Option<&'static str>,
3936
throw_callback: Option<ThrowCallback>,
40-
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>,
37+
future: Option<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send>>>,
4138
waker: Option<Arc<AsyncioWaker>>,
4239
}
4340

@@ -68,7 +65,7 @@ impl Coroutine {
6865
name,
6966
qualname_prefix,
7067
throw_callback,
71-
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())),
68+
future: Some(Box::pin(wrap)),
7269
waker: None,
7370
}
7471
}
@@ -98,14 +95,20 @@ impl Coroutine {
9895
} else {
9996
self.waker = Some(Arc::new(AsyncioWaker::new()));
10097
}
101-
let waker = futures_util::task::waker(self.waker.clone().unwrap());
98+
let waker = Waker::from(self.waker.clone().unwrap());
10299
// poll the Rust future and forward its results if ready
103-
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) {
104-
self.close();
105-
return match res {
106-
Ok(res) => Ok(IterNextOutput::Return(res?)),
107-
Err(err) => Err(PanicException::from_panic_payload(err)),
108-
};
100+
// polling is UnwindSafe because the future is dropped in case of panic
101+
let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker));
102+
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
103+
Ok(Poll::Ready(res)) => {
104+
self.close();
105+
return Ok(IterNextOutput::Return(res?));
106+
}
107+
Err(err) => {
108+
self.close();
109+
return Err(PanicException::from_panic_payload(err));
110+
}
111+
_ => {}
109112
}
110113
// otherwise, initialize the waker `asyncio.Future`
111114
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {

src/coroutine/cancel.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use crate::{ffi, Py, PyAny, PyObject};
2-
use futures_util::future::poll_fn;
3-
use futures_util::task::AtomicWaker;
2+
use std::future::Future;
3+
use std::pin::Pin;
44
use std::ptr;
55
use std::ptr::NonNull;
66
use std::sync::atomic::{AtomicPtr, Ordering};
7-
use std::sync::Arc;
8-
use std::task::{Context, Poll};
7+
use std::sync::{Arc, Mutex};
8+
use std::task::{Context, Poll, Waker};
99

1010
#[derive(Debug, Default)]
1111
struct Inner {
1212
exception: AtomicPtr<ffi::PyObject>,
13-
waker: AtomicWaker,
13+
waker: Mutex<Option<Waker>>,
1414
}
1515

1616
/// Helper used to wait and retrieve exception thrown in [`Coroutine`](super::Coroutine).
@@ -43,16 +43,17 @@ impl CancelHandle {
4343
if self.is_cancelled() {
4444
return Poll::Ready(take());
4545
}
46-
self.0.waker.register(cx.waker());
46+
let mut guard = self.0.waker.lock().unwrap();
4747
if self.is_cancelled() {
4848
return Poll::Ready(take());
4949
}
50+
*guard = Some(cx.waker().clone());
5051
Poll::Pending
5152
}
5253

5354
/// Retrieve the exception thrown in the associated coroutine.
5455
pub async fn cancelled(&mut self) -> PyObject {
55-
poll_fn(|cx| self.poll_cancelled(cx)).await
56+
Cancelled(self).await
5657
}
5758

5859
#[doc(hidden)]
@@ -61,6 +62,15 @@ impl CancelHandle {
6162
}
6263
}
6364

65+
struct Cancelled<'a>(&'a mut CancelHandle);
66+
67+
impl Future for Cancelled<'_> {
68+
type Output = PyObject;
69+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70+
self.0.poll_cancelled(cx)
71+
}
72+
}
73+
6474
#[doc(hidden)]
6575
pub struct ThrowCallback(Arc<Inner>);
6676

@@ -69,6 +79,8 @@ impl ThrowCallback {
6979
let ptr = self.0.exception.swap(exc.into_ptr(), Ordering::Relaxed);
7080
// SAFETY: non-null pointers set in `self.0.exceptions` are valid owned pointers
7181
drop(unsafe { PyObject::from_owned_ptr_or_opt(exc.py(), ptr) });
72-
self.0.waker.wake();
82+
if let Some(waker) = self.0.waker.lock().unwrap().take() {
83+
waker.wake();
84+
}
7385
}
7486
}

src/coroutine/waker.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::sync::GILOnceCell;
22
use crate::types::PyCFunction;
33
use crate::{intern, wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python};
4-
use futures_util::task::ArcWake;
54
use pyo3_macros::pyfunction;
65
use std::sync::Arc;
6+
use std::task::Wake;
77

88
/// Lazy `asyncio.Future` wrapper, implementing [`ArcWake`] by calling `Future.set_result`.
99
///
@@ -31,10 +31,14 @@ impl AsyncioWaker {
3131
}
3232
}
3333

34-
impl ArcWake for AsyncioWaker {
35-
fn wake_by_ref(arc_self: &Arc<Self>) {
34+
impl Wake for AsyncioWaker {
35+
fn wake(self: Arc<Self>) {
36+
self.wake_by_ref()
37+
}
38+
39+
fn wake_by_ref(self: &Arc<Self>) {
3640
Python::with_gil(|gil| {
37-
if let Some(loop_and_future) = arc_self.0.get_or_init(gil, || None) {
41+
if let Some(loop_and_future) = self.0.get_or_init(gil, || None) {
3842
loop_and_future
3943
.set_result(gil)
4044
.expect("unexpected error in coroutine waker");

0 commit comments

Comments
 (0)