From 8bc21be4d337f2b66480e2f1991630502b15c843 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 17 Oct 2024 19:20:44 +0200 Subject: [PATCH 1/3] sync: Update broadcast::recv to return a named future --- tokio/src/sync/broadcast.rs | 47 ++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 56c4cd6b92f..88708019921 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,10 +119,11 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::runtime::coop::cooperative; +use crate::runtime::coop::{cooperative, Coop}; use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; +use pin_project_lite::pin_project; use std::fmt; use std::future::Future; use std::marker::PhantomPinned; @@ -389,8 +390,30 @@ struct RecvGuard<'a, T> { slot: RwLockReadGuard<'a, Slot>, } +pin_project! { + /// Future for the [`Receiver::recv`] method. + pub struct Recv<'a, T> + where + T: Clone, + { + #[pin] + inner: Coop>, + } +} + +impl<'a, T> Future for Recv<'a, T> +where + T: Clone, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + /// Receive a value future. -struct Recv<'a, T> { +struct RecvInner<'a, T> { /// Receiver being waited on. receiver: &'a mut Receiver, @@ -398,8 +421,8 @@ struct Recv<'a, T> { waiter: UnsafeCell, } -unsafe impl<'a, T: Send> Send for Recv<'a, T> {} -unsafe impl<'a, T: Send> Sync for Recv<'a, T> {} +unsafe impl<'a, T: Send> Send for RecvInner<'a, T> {} +unsafe impl<'a, T: Send> Sync for RecvInner<'a, T> {} /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; @@ -1262,8 +1285,10 @@ impl Receiver { /// assert_eq!(30, rx.recv().await.unwrap()); /// } /// ``` - pub async fn recv(&mut self) -> Result { - cooperative(Recv::new(self)).await + pub fn recv(&mut self) -> Recv<'_, T> { + Recv { + inner: cooperative(RecvInner::new(self)), + } } /// Attempts to return a pending value on this receiver without awaiting. @@ -1363,9 +1388,9 @@ impl Drop for Receiver { } } -impl<'a, T> Recv<'a, T> { - fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { - Recv { +impl<'a, T> RecvInner<'a, T> { + fn new(receiver: &'a mut Receiver) -> RecvInner<'a, T> { + RecvInner { receiver, waiter: UnsafeCell::new(Waiter { queued: AtomicBool::new(false), @@ -1389,7 +1414,7 @@ impl<'a, T> Recv<'a, T> { } } -impl<'a, T> Future for Recv<'a, T> +impl<'a, T> Future for RecvInner<'a, T> where T: Clone, { @@ -1411,7 +1436,7 @@ where } } -impl<'a, T> Drop for Recv<'a, T> { +impl<'a, T> Drop for RecvInner<'a, T> { fn drop(&mut self) { // Safety: `waiter.queued` is atomic. // Acquire ordering is required to synchronize with From ba5f590acda40cccbc8dd3e942ff227b3978ac26 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 25 Oct 2024 00:55:07 +0200 Subject: [PATCH 2/3] Document 'async fn' notation for boardcast::Receiver::recv --- tokio/src/sync/broadcast.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 88708019921..2e3f9689c92 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1215,6 +1215,12 @@ impl Receiver { } /// Receives the next value for this receiver. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn recv(&self) -> Result; + /// ``` + /// /// Each [`Receiver`] handle will receive a clone of all values sent /// **after** it has subscribed. /// From d533e05adab6718daf5f191aacab2d5954f988d5 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 25 Oct 2024 00:56:11 +0200 Subject: [PATCH 3/3] Update public location of broadcast Recv future Move from sync::broadcast to sync::futures. --- tokio/src/sync/broadcast.rs | 49 ++++++++++++++++++++++++------------- tokio/src/sync/mod.rs | 2 +- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 2e3f9689c92..b97c44769c5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,11 +119,10 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::runtime::coop::{cooperative, Coop}; +use crate::runtime::coop::cooperative; use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; -use pin_project_lite::pin_project; use std::fmt; use std::future::Future; use std::marker::PhantomPinned; @@ -390,28 +389,44 @@ struct RecvGuard<'a, T> { slot: RwLockReadGuard<'a, Slot>, } -pin_project! { - /// Future for the [`Receiver::recv`] method. - pub struct Recv<'a, T> +pub(crate) mod future { + use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + use pin_project_lite::pin_project; + + use crate::runtime::coop::Coop; + + use super::{error::RecvError, RecvInner}; + + pin_project! { + /// Future for the [`Receiver::recv`][super::Receiver::recv] method. + pub struct Recv<'a, T> + where + T: Clone, + { + #[pin] + pub(super) inner: Coop>, + } + } + + impl<'a, T> Future for Recv<'a, T> where T: Clone, { - #[pin] - inner: Coop>, - } -} - -impl<'a, T> Future for Recv<'a, T> -where - T: Clone, -{ - type Output = Result; + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } } } +use self::future::Recv; + /// Receive a value future. struct RecvInner<'a, T> { /// Receiver being waited on. diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index ddf99644270..adfa7a7efb7 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -449,7 +449,7 @@ cfg_sync! { /// Named future types. pub mod futures { - pub use super::notify::Notified; + pub use super::{notify::Notified, broadcast::future::Recv}; } mod barrier;