diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 663072f763..2c516e3138 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: matrix: rust: # This is the minimum Rust version supported by futures-core, futures-io, futures-sink, futures-task, futures-channel. - # When updating this, the reminder to update the minimum required version of `async-await` feature in README.md. + # When updating this, the reminder to update the minimum required version in .clippy.toml. - 1.36.0 runs-on: ubuntu-latest steps: @@ -74,8 +74,8 @@ jobs: matrix: rust: # This is the minimum Rust version supported by futures, futures-util, futures-macro, futures-executor, futures-test. - # When updating this, the reminder to update the minimum required version of `async-await` feature in README.md. - - 1.37.0 + # When updating this, the reminder to update the minimum required version in README.md. + - 1.41.0 runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -84,6 +84,8 @@ jobs: - run: cargo +stable install cargo-hack # remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866 - run: cargo hack --remove-dev-deps --workspace + # Check default features + - run: cargo hack build --workspace --ignore-private # Check no-default-features - run: cargo hack build --workspace --exclude futures-test --ignore-private --no-default-features # Check alloc feature @@ -95,22 +97,6 @@ jobs: # Check thread-pool feature (futures, futures-executor) - run: cargo hack build -p futures -p futures-executor --no-default-features --features std,thread-pool - async-await-msrv: - name: cargo +${{ matrix.rust }} build - strategy: - matrix: - rust: - # This is the minimum Rust version supported by `async-await` feature. - # When updating this, the reminder to update the minimum required version of `async-await` feature in README.md. - - 1.39.0 - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Install Rust - run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - - run: cargo +stable install cargo-hack - - run: cargo hack build --workspace --no-dev-deps - build: name: cargo +${{ matrix.rust }} build strategy: @@ -141,59 +127,41 @@ jobs: - run: cargo update -Z minimal-versions - run: cargo build --workspace --all-features - thumbv6m: - name: cargo build --target thumbv6m-none-eabi + no-std: + name: cargo build --target ${{ matrix.target }} + strategy: + matrix: + target: + - thumbv6m-none-eabi + - thumbv7m-none-eabi runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Install Rust run: rustup update nightly && rustup default nightly - - run: rustup target add thumbv6m-none-eabi + - run: rustup target add ${{ matrix.target }} - run: cargo install cargo-hack # remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866 - run: cargo hack --remove-dev-deps --workspace - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv6m-none-eabi \ - --no-default-features \ - --features unstable,cfg-target-has-atomic - - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv6m-none-eabi \ - --no-default-features \ - --features alloc,unstable,cfg-target-has-atomic - - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv6m-none-eabi \ - --no-default-features \ - --features async-await,unstable,cfg-target-has-atomic - - thumbv7m: - name: cargo build --target thumbv7m-none-eabi - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Install Rust - run: rustup update nightly && rustup default nightly - - run: rustup target add thumbv7m-none-eabi - - run: cargo install cargo-hack - # remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866 - - run: cargo hack --remove-dev-deps --workspace + cargo hack build --manifest-path futures/tests/no-std/Cargo.toml \ + --each-feature --optional-deps \ + --target ${{ matrix.target }} - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv7m-none-eabi \ + cargo hack build --workspace --ignore-private \ + --exclude futures-test --exclude futures-macro \ --no-default-features \ - --features unstable,cfg-target-has-atomic + --target ${{ matrix.target }} - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv7m-none-eabi \ - --no-default-features \ - --features alloc + cargo hack build --workspace --ignore-private \ + --exclude futures-test --exclude futures-macro \ + --no-default-features --features alloc --ignore-unknown-features \ + --target ${{ matrix.target }} - run: | - cargo build --manifest-path futures/Cargo.toml \ - --target thumbv7m-none-eabi \ - --no-default-features \ - --features async-await + cargo hack build --workspace --ignore-private \ + --exclude futures-test --exclude futures-macro \ + --no-default-features --features async-await,alloc --ignore-unknown-features \ + --target ${{ matrix.target }} bench: name: cargo bench @@ -226,6 +194,19 @@ jobs: --workspace --exclude futures-test \ --features unstable --ignore-unknown-features + # When this job failed, run ci/no_atomic_cas.sh and commit result changes. + # TODO(taiki-e): Ideally, this should be automated using a bot that creates + # PR when failed, but there is no bandwidth to implement it + # right now... + codegen: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + run: rustup update nightly && rustup default nightly + - run: ci/no_atomic_cas.sh + - run: git diff --exit-code + san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} strategy: @@ -247,18 +228,28 @@ jobs: # `--cfg futures_sanitizer`. RUSTFLAGS: -D warnings -Z sanitizer=${{ matrix.sanitizer }} --cfg futures_sanitizer - clippy: - name: cargo clippy + # This branch no longer actively developed. Most commits to this + # branch are backporting and should not be blocked by clippy. + # clippy: + # name: cargo clippy + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v2 + # - name: Install Rust and Clippy + # run: | + # toolchain=nightly-$(curl -sSf https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/clippy) + # rustup set profile minimal + # rustup default "$toolchain" + # rustup component add clippy + # - run: cargo clippy --workspace --all-features --all-targets + + fmt: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Install Rust and Clippy - run: | - toolchain=nightly-$(curl -sSf https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/clippy) - rustup set profile minimal - rustup default "$toolchain" - rustup component add clippy - - run: cargo clippy --workspace --all-features --all-targets + - name: Install Rust + run: rustup update stable && rustup default stable + - run: tools/fmt.sh docs: name: cargo doc diff --git a/.rustfmt.toml b/.rustfmt.toml index 2a35f0230c..2a79d9274a 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1 +1,2 @@ use_small_heuristics = "Max" +edition = "2018" diff --git a/Cargo.toml b/Cargo.toml index f972a73d2c..d27a9f2885 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "futures/tests/macro-tests", "futures/tests/macro-reexport", + "futures/tests/no-std", "examples/functional", "examples/imperative", diff --git a/README.md b/README.md index 8c8df06b67..b97693fb83 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ Crates.io - - Rustc Version + + Rustc Version

@@ -48,7 +48,7 @@ Now, you can use futures-rs: use futures::future::Future; ``` -The current futures-rs requires Rust 1.39 or later. +The current futures-rs requires Rust 1.41 or later. ### Feature `std` diff --git a/ci/no_atomic_cas.sh b/ci/no_atomic_cas.sh new file mode 100755 index 0000000000..24faa70487 --- /dev/null +++ b/ci/no_atomic_cas.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +cd "$(cd "$(dirname "$0")" && pwd)"/.. + +file="no_atomic_cas.rs" + +{ + echo "// This file is @generated by $(basename "$0")." + echo "// It is not intended for manual editing." + echo "" + echo "const NO_ATOMIC_CAS_TARGETS: &[&str] = &[" +} >"$file" + +for target in $(rustc --print target-list); do + res=$(rustc --print target-spec-json -Z unstable-options --target "$target" \ + | jq -r "select(.\"atomic-cas\" == false)") + [[ -z "$res" ]] || echo " \"$target\"," >>"$file" +done + +echo "];" >>"$file" diff --git a/futures-channel/Cargo.toml b/futures-channel/Cargo.toml index d7843e7d5e..6d32d56cd2 100644 --- a/futures-channel/Cargo.toml +++ b/futures-channel/Cargo.toml @@ -17,11 +17,10 @@ std = ["alloc", "futures-core/std"] alloc = ["futures-core/alloc"] sink = ["futures-sink"] -# Unstable features -# These features are outside of the normal semver guarantees and require the -# `unstable` feature as an explicit opt-in to unstable API. -unstable = ["futures-core/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] +# These features are no longer used. +# TODO: remove in the next major version. +unstable = [] +cfg-target-has-atomic = [] [dependencies] futures-core = { path = "../futures-core", version = "0.3.14", default-features = false } diff --git a/futures-channel/build.rs b/futures-channel/build.rs new file mode 100644 index 0000000000..c4f341d480 --- /dev/null +++ b/futures-channel/build.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does not have atomic CAS (compare-and-swap). +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg strings below are +// *not* public API. Please let us know by opening a GitHub issue if your build +// environment requires some way to enable these cfgs other than by executing +// our build script. +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/futures-channel/no_atomic_cas.rs b/futures-channel/no_atomic_cas.rs new file mode 120000 index 0000000000..3d7380fadd --- /dev/null +++ b/futures-channel/no_atomic_cas.rs @@ -0,0 +1 @@ +../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-channel/src/lib.rs b/futures-channel/src/lib.rs index 41a4a19af7..9377a3e2c2 100644 --- a/futures-channel/src/lib.rs +++ b/futures-channel/src/lib.rs @@ -11,7 +11,6 @@ //! All items are only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. @@ -19,12 +18,9 @@ #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - macro_rules! cfg_target_has_atomic { ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] $item )*}; } diff --git a/futures-channel/src/lock.rs b/futures-channel/src/lock.rs index 5eecdd9aa2..b328d0f7dd 100644 --- a/futures-channel/src/lock.rs +++ b/futures-channel/src/lock.rs @@ -6,8 +6,8 @@ use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic::Ordering::SeqCst; use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; /// A "mutex" around a value, similar to `std::sync::Mutex`. /// @@ -37,10 +37,7 @@ unsafe impl Sync for Lock {} impl Lock { /// Creates a new lock around the given value. pub(crate) fn new(t: T) -> Self { - Self { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } + Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } } /// Attempts to acquire this lock, returning whether the lock was acquired or diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index c32ad4b683..28612da84d 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll, Waker}; use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; use std::fmt; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,9 +209,7 @@ impl SendError { impl fmt::Debug for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError") - .field("kind", &self.err.kind) - .finish() + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() } } @@ -251,8 +249,7 @@ impl TrySendError { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError") - .finish() + f.debug_tuple("TryRecvError").finish() } } @@ -335,10 +332,7 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { - task: None, - is_parked: false, - } + Self { task: None, is_parked: false } } fn notify(&mut self) { @@ -381,9 +375,7 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { maybe_parked: false, }; - let rx = Receiver { - inner: Some(inner), - }; + let rx = Receiver { inner: Some(inner) }; (Sender(Some(tx)), rx) } @@ -399,7 +391,6 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -407,13 +398,9 @@ pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { - inner: inner.clone(), - }; + let tx = UnboundedSenderInner { inner: inner.clone() }; - let rx = UnboundedReceiver { - inner: Some(inner), - }; + let rx = UnboundedReceiver { inner: Some(inner) }; (UnboundedSender(Some(tx)), rx) } @@ -430,13 +417,10 @@ impl UnboundedSenderInner { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })) + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) } } - // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -462,16 +446,17 @@ impl UnboundedSenderInner { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -516,12 +501,7 @@ impl BoundedSenderInner { fn try_send(&mut self, msg: T) -> Result<(), TrySendError> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Full, - }, - val: msg, - }); + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); } // The channel has capacity to accept the message, so send it @@ -531,9 +511,7 @@ impl BoundedSenderInner { // Do the send without failing. // Can be called only by bounded sender. #[allow(clippy::debug_assert_with_mut_call)] - fn do_send_b(&mut self, msg: T) - -> Result<(), TrySendError> - { + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError> { // Anyone callig do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -551,12 +529,12 @@ impl BoundedSenderInner { // the configured buffer size num_messages > self.inner.buffer } - None => return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }), + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } }; // If the channel has reached capacity, then the sender task needs to @@ -600,16 +578,17 @@ impl BoundedSenderInner { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -644,15 +623,10 @@ impl BoundedSenderInner { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })); + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); } self.poll_unparked(Some(cx)).map(Ok) @@ -699,7 +673,7 @@ impl BoundedSenderInner { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()) + return Poll::Ready(()); } // At this point, an unpark request is pending, so there will be an @@ -724,12 +698,7 @@ impl Sender { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } } @@ -739,8 +708,7 @@ impl Sender { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg) - .map_err(|e| e.err) + self.try_send(msg).map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -755,13 +723,8 @@ impl Sender { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - let inner = self.0.as_mut().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready(cx) } @@ -799,7 +762,10 @@ impl Sender { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -809,13 +775,8 @@ impl Sender { impl UnboundedSender { /// Check if the channel is ready to receive a message. - pub fn poll_ready( - &self, - _: &mut Context<'_>, - ) -> Poll> { - let inner = self.0.as_ref().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready_nb() } @@ -845,12 +806,7 @@ impl UnboundedSender { } } - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } /// Send a message on the channel. @@ -858,8 +814,7 @@ impl UnboundedSender { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg) - .map_err(|e| e.err) + self.do_send_nb(msg).map_err(|e| e.err) } /// Sends a message along this channel. @@ -888,7 +843,10 @@ impl UnboundedSender { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -928,9 +886,7 @@ impl Clone for UnboundedSenderInner { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { - inner: self.inner.clone(), - }; + return Self { inner: self.inner.clone() }; } Err(actual) => curr = actual, } @@ -1027,9 +983,7 @@ impl Receiver { /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } @@ -1103,18 +1057,15 @@ impl FusedStream for Receiver { impl Stream for Receiver { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Try to read a message off of the message queue. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1180,9 +1131,7 @@ impl UnboundedReceiver { /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } @@ -1240,10 +1189,7 @@ impl FusedStream for UnboundedReceiver { impl Stream for UnboundedReceiver { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1251,7 +1197,7 @@ impl Stream for UnboundedReceiver { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1349,10 +1295,7 @@ impl State { */ fn decode_state(num: usize) -> State { - State { - is_open: num & OPEN_MASK == OPEN_MASK, - num_messages: num & MAX_CAPACITY, - } + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } } fn encode_state(state: &State) -> usize { diff --git a/futures-channel/src/mpsc/queue.rs b/futures-channel/src/mpsc/queue.rs index b00e1b1755..57dc7f5654 100644 --- a/futures-channel/src/mpsc/queue.rs +++ b/futures-channel/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; -use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; /// A result of the `pop` function. pub(super) enum PopResult { @@ -76,15 +76,12 @@ pub(super) struct Queue { tail: UnsafeCell<*mut Node>, } -unsafe impl Send for Queue { } -unsafe impl Sync for Queue { } +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} impl Node { unsafe fn new(v: Option) -> *mut Self { - Box::into_raw(Box::new(Self { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) } } @@ -93,10 +90,7 @@ impl Queue { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -133,7 +127,11 @@ impl Queue { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/futures-channel/src/mpsc/sink_impl.rs b/futures-channel/src/mpsc/sink_impl.rs index 4ce66b4e59..1be20162c2 100644 --- a/futures-channel/src/mpsc/sink_impl.rs +++ b/futures-channel/src/mpsc/sink_impl.rs @@ -6,24 +6,15 @@ use std::pin::Pin; impl Sink for Sender { type Error = SendError; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { (*self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -33,10 +24,7 @@ impl Sink for Sender { } } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } @@ -45,31 +33,19 @@ impl Sink for Sender { impl Sink for UnboundedSender { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Self::poll_ready(&*self, cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } @@ -78,29 +54,19 @@ impl Sink for UnboundedSender { impl Sink for &UnboundedSender { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg) - .map_err(TrySendError::into_send_error) + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/futures-channel/src/oneshot.rs b/futures-channel/src/oneshot.rs index dbbce8112b..5af651b913 100644 --- a/futures-channel/src/oneshot.rs +++ b/futures-channel/src/oneshot.rs @@ -7,7 +7,7 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use crate::lock::Lock; @@ -16,7 +16,6 @@ use crate::lock::Lock; /// /// This is created by the [`channel`](channel) function. #[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] pub struct Receiver { inner: Arc>, } @@ -24,7 +23,6 @@ pub struct Receiver { /// A means of transmitting a single value to another task. /// /// This is created by the [`channel`](channel) function. -#[derive(Debug)] pub struct Sender { inner: Arc>, } @@ -35,7 +33,6 @@ impl Unpin for Sender {} /// Internal state of the `Receiver`/`Sender` pair above. This is all used as /// the internal synchronization between the two for send/recv operations. -#[derive(Debug)] struct Inner { /// Indicates whether this oneshot is complete yet. This is filled in both /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it @@ -106,12 +103,8 @@ struct Inner { /// ``` pub fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner::new()); - let receiver = Receiver { - inner: inner.clone(), - }; - let sender = Sender { - inner, - }; + let receiver = Receiver { inner: inner.clone() }; + let sender = Sender { inner }; (sender, receiver) } @@ -127,7 +120,7 @@ impl Inner { fn send(&self, t: T) -> Result<(), T> { if self.complete.load(SeqCst) { - return Err(t) + return Err(t); } // Note that this lock acquisition may fail if the receiver @@ -164,7 +157,7 @@ impl Inner { // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. if self.complete.load(SeqCst) { - return Poll::Ready(()) + return Poll::Ready(()); } // If our other half is not gone then we need to park our current task @@ -273,7 +266,10 @@ impl Inner { } else { let task = cx.waker().clone(); match self.rx_task.try_lock() { - Some(mut slot) => { *slot = Some(task); false }, + Some(mut slot) => { + *slot = Some(task); + false + } None => true, } }; @@ -394,6 +390,12 @@ impl Drop for Sender { } } +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").field("complete", &self.inner.complete).finish() + } +} + /// A future that resolves when the receiving end of a channel has hung up. /// /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). @@ -453,10 +455,7 @@ impl Receiver { impl Future for Receiver { type Output = Result; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.recv(cx) } } @@ -481,3 +480,9 @@ impl Drop for Receiver { self.inner.drop_rx() } } + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() + } +} diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index e4c42f717e..bcc0b60f91 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -16,9 +16,8 @@ default = ["std"] std = ["alloc"] alloc = [] -# Unstable features -# These features are outside of the normal semver guarantees and require the -# `unstable` feature as an explicit opt-in to unstable API. +# These features are no longer used. +# TODO: remove in the next major version. unstable = [] cfg-target-has-atomic = [] diff --git a/futures-core/build.rs b/futures-core/build.rs new file mode 100644 index 0000000000..c4f341d480 --- /dev/null +++ b/futures-core/build.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does not have atomic CAS (compare-and-swap). +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg strings below are +// *not* public API. Please let us know by opening a GitHub issue if your build +// environment requires some way to enable these cfgs other than by executing +// our build script. +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/futures-core/no_atomic_cas.rs b/futures-core/no_atomic_cas.rs new file mode 120000 index 0000000000..3d7380fadd --- /dev/null +++ b/futures-core/no_atomic_cas.rs @@ -0,0 +1 @@ +../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index f3bd9ab928..e363ff777d 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -1,6 +1,5 @@ //! Core traits and types for asynchronous operations in Rust. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. @@ -8,9 +7,6 @@ #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(feature = "alloc")] extern crate alloc; diff --git a/futures-core/src/task/__internal/mod.rs b/futures-core/src/task/__internal/mod.rs index 77e3678075..c902eb4bfb 100644 --- a/futures-core/src/task/__internal/mod.rs +++ b/futures-core/src/task/__internal/mod.rs @@ -1,4 +1,4 @@ -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] mod atomic_waker; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] pub use self::atomic_waker::AtomicWaker; diff --git a/futures-macro/Cargo.toml b/futures-macro/Cargo.toml index 4ab6860bb3..9002a9f391 100644 --- a/futures-macro/Cargo.toml +++ b/futures-macro/Cargo.toml @@ -16,6 +16,9 @@ proc-macro = true [features] +[build-dependencies] +autocfg = "1" + [dependencies] proc-macro2 = "1.0" proc-macro-hack = "0.5.19" diff --git a/futures-macro/build.rs b/futures-macro/build.rs new file mode 100644 index 0000000000..ff8630ce69 --- /dev/null +++ b/futures-macro/build.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use autocfg::AutoCfg; + +// The rustc-cfg strings below are *not* public API. Please let us know by +// opening a GitHub issue if your build environment requires some way to enable +// these cfgs other than by executing our build script. +fn main() { + let cfg = match AutoCfg::new() { + Ok(cfg) => cfg, + Err(e) => { + println!( + "cargo:warning={}: unable to determine rustc version: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Function like procedural macros in expressions patterns statements stabilized in Rust 1.45: + // https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements + if cfg.probe_rustc_version(1, 45) { + println!("cargo:rustc-cfg=fn_like_proc_macro"); + } + + println!("cargo:rerun-if-changed=build.rs"); +} diff --git a/futures-macro/src/lib.rs b/futures-macro/src/lib.rs index 5f0c47ca89..98408ebfe6 100644 --- a/futures-macro/src/lib.rs +++ b/futures-macro/src/lib.rs @@ -13,31 +13,34 @@ extern crate proc_macro; use proc_macro::TokenStream; -use proc_macro_hack::proc_macro_hack; mod join; mod select; /// The `join!` macro. -#[proc_macro_hack] +#[cfg_attr(fn_like_proc_macro, proc_macro)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack)] pub fn join_internal(input: TokenStream) -> TokenStream { crate::join::join(input) } /// The `try_join!` macro. -#[proc_macro_hack] +#[cfg_attr(fn_like_proc_macro, proc_macro)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack)] pub fn try_join_internal(input: TokenStream) -> TokenStream { crate::join::try_join(input) } /// The `select!` macro. -#[proc_macro_hack] +#[cfg_attr(fn_like_proc_macro, proc_macro)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack)] pub fn select_internal(input: TokenStream) -> TokenStream { crate::select::select(input) } /// The `select_biased!` macro. -#[proc_macro_hack] +#[cfg_attr(fn_like_proc_macro, proc_macro)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack)] pub fn select_biased_internal(input: TokenStream) -> TokenStream { crate::select::select_biased(input) } diff --git a/futures-task/Cargo.toml b/futures-task/Cargo.toml index 68abccd6a9..721c1ee4c4 100644 --- a/futures-task/Cargo.toml +++ b/futures-task/Cargo.toml @@ -16,9 +16,8 @@ default = ["std"] std = ["alloc"] alloc = [] -# Unstable features -# These features are outside of the normal semver guarantees and require the -# `unstable` feature as an explicit opt-in to unstable API. +# These features are no longer used. +# TODO: remove in the next major version. unstable = [] cfg-target-has-atomic = [] diff --git a/futures-task/build.rs b/futures-task/build.rs new file mode 100644 index 0000000000..c4f341d480 --- /dev/null +++ b/futures-task/build.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does not have atomic CAS (compare-and-swap). +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg strings below are +// *not* public API. Please let us know by opening a GitHub issue if your build +// environment requires some way to enable these cfgs other than by executing +// our build script. +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/futures-task/no_atomic_cas.rs b/futures-task/no_atomic_cas.rs new file mode 120000 index 0000000000..3d7380fadd --- /dev/null +++ b/futures-task/no_atomic_cas.rs @@ -0,0 +1 @@ +../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-task/src/lib.rs b/futures-task/src/lib.rs index 6693c53af0..439af135af 100644 --- a/futures-task/src/lib.rs +++ b/futures-task/src/lib.rs @@ -1,6 +1,5 @@ //! Tools for working with tasks. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. @@ -8,15 +7,12 @@ #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(feature = "alloc")] extern crate alloc; macro_rules! cfg_target_has_atomic { ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] $item )*}; } diff --git a/futures-task/src/waker.rs b/futures-task/src/waker.rs index 265a445d91..a7310a07af 100644 --- a/futures-task/src/waker.rs +++ b/futures-task/src/waker.rs @@ -1,7 +1,7 @@ use super::arc_wake::ArcWake; -use core::mem; -use core::task::{Waker, RawWaker, RawWakerVTable}; use alloc::sync::Arc; +use core::mem; +use core::task::{RawWaker, RawWakerVTable, Waker}; pub(super) fn waker_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( @@ -22,9 +22,7 @@ where { let ptr = Arc::into_raw(wake) as *const (); - unsafe { - Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) - } + unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) } } // FIXME: panics on Arc::clone / refcount changes could wreak havoc on the diff --git a/futures-task/src/waker_ref.rs b/futures-task/src/waker_ref.rs index 76d849ac1b..791c690120 100644 --- a/futures-task/src/waker_ref.rs +++ b/futures-task/src/waker_ref.rs @@ -1,10 +1,10 @@ -use super::arc_wake::{ArcWake}; +use super::arc_wake::ArcWake; use super::waker::waker_vtable; use alloc::sync::Arc; -use core::mem::ManuallyDrop; use core::marker::PhantomData; +use core::mem::ManuallyDrop; use core::ops::Deref; -use core::task::{Waker, RawWaker}; +use core::task::{RawWaker, Waker}; /// A [`Waker`] that is only valid for a given lifetime. /// @@ -22,10 +22,7 @@ impl<'a> WakerRef<'a> { // copy the underlying (raw) waker without calling a clone, // as we won't call Waker::drop either. let waker = ManuallyDrop::new(unsafe { core::ptr::read(waker) }); - Self { - waker, - _marker: PhantomData, - } + Self { waker, _marker: PhantomData } } /// Create a new [`WakerRef`] from a [`Waker`] that must not be dropped. @@ -35,10 +32,7 @@ impl<'a> WakerRef<'a> { /// by the caller), and the [`Waker`] doesn't need to or must not be /// destroyed. pub fn new_unowned(waker: ManuallyDrop) -> Self { - Self { - waker, - _marker: PhantomData, - } + Self { waker, _marker: PhantomData } } } @@ -57,14 +51,13 @@ impl Deref for WakerRef<'_> { #[inline] pub fn waker_ref(wake: &Arc) -> WakerRef<'_> where - W: ArcWake + W: ArcWake, { // simply copy the pointer instead of using Arc::into_raw, // as we don't actually keep a refcount by using ManuallyDrop.< let ptr = (&**wake as *const W) as *const (); - let waker = ManuallyDrop::new(unsafe { - Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) - }); + let waker = + ManuallyDrop::new(unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) }); WakerRef::new_unowned(waker) } diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 424e372630..57e3f77884 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -27,11 +27,17 @@ channel = ["std", "futures-channel"] # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic"] bilock = [] read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] write-all-vectored = ["io"] +# These features are no longer used. +# TODO: remove in the next major version. +cfg-target-has-atomic = [] + +[build-dependencies] +autocfg = "1" + [dependencies] futures-core = { path = "../futures-core", version = "0.3.14", default-features = false } futures-task = { path = "../futures-task", version = "0.3.14", default-features = false } diff --git a/futures-util/benches_disabled/bilock.rs b/futures-util/benches_disabled/bilock.rs index 48afe3c551..417f75d31e 100644 --- a/futures-util/benches_disabled/bilock.rs +++ b/futures-util/benches_disabled/bilock.rs @@ -2,125 +2,121 @@ #[cfg(feature = "bilock")] mod bench { -use futures::task::{Context, Waker}; -use futures::executor::LocalPool; -use futures_util::lock::BiLock; -use futures_util::lock::BiLockAcquire; -use futures_util::lock::BiLockAcquired; -use futures_util::task::ArcWake; + use futures::executor::LocalPool; + use futures::task::{Context, Waker}; + use futures_util::lock::BiLock; + use futures_util::lock::BiLockAcquire; + use futures_util::lock::BiLockAcquired; + use futures_util::task::ArcWake; -use std::sync::Arc; -use test::Bencher; + use std::sync::Arc; + use test::Bencher; -fn notify_noop() -> Waker { - struct Noop; + fn notify_noop() -> Waker { + struct Noop; - impl ArcWake for Noop { - fn wake(_: &Arc) {} - } - - ArcWake::into_waker(Arc::new(Noop)) -} - - -/// Pseudo-stream which simply calls `lock.poll()` on `poll` -struct LockStream { - lock: BiLockAcquire, -} - -impl LockStream { - fn new(lock: BiLock) -> Self { - Self { - lock: lock.lock() + impl ArcWake for Noop { + fn wake(_: &Arc) {} } - } - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired) { - self.lock = guard.unlock().lock() + ArcWake::into_waker(Arc::new(Noop)) } -} - -impl Stream for LockStream { - type Item = BiLockAcquired; - type Error = (); - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) + /// Pseudo-stream which simply calls `lock.poll()` on `poll` + struct LockStream { + lock: BiLockAcquire, } -} - - -#[bench] -fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); + impl LockStream { + fn new(lock: BiLock) -> Self { + Self { lock: lock.lock() } } - (x, y) - }); -} - -#[bench] -fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); + /// Release a lock after it was acquired in `poll`, + /// so `poll` could be called again. + fn release_lock(&mut self, guard: BiLockAcquired) { + self.lock = guard.unlock().lock() + } + } - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; + impl Stream for LockStream { + type Item = BiLockAcquired; + type Error = (); - x.release_lock(x_guard); + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { + self.lock.poll(cx).map(|a| a.map(Some)) + } + } - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; + #[bench] + fn contended(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_next(&mut waker) { + Ok(Poll::Pending) => (), + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }); + } - y.release_lock(y_guard); - } - (x, y) - }) -} + #[bench] + fn lock_unlock(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }) + } } diff --git a/futures-util/build.rs b/futures-util/build.rs new file mode 100644 index 0000000000..ffe9711414 --- /dev/null +++ b/futures-util/build.rs @@ -0,0 +1,61 @@ +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use autocfg::AutoCfg; +use std::env; + +include!("no_atomic_cas.rs"); + +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does not have atomic CAS (compare-and-swap). +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg strings below are +// *not* public API. Please let us know by opening a GitHub issue if your build +// environment requires some way to enable these cfgs other than by executing +// our build script. +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + let cfg = match AutoCfg::new() { + Ok(cfg) => cfg, + Err(e) => { + println!( + "cargo:warning={}: unable to determine rustc version: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Function like procedural macros in expressions patterns statements stabilized in Rust 1.45: + // https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements + if cfg.probe_rustc_version(1, 45) { + println!("cargo:rustc-cfg=fn_like_proc_macro"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/futures-util/no_atomic_cas.rs b/futures-util/no_atomic_cas.rs new file mode 120000 index 0000000000..3d7380fadd --- /dev/null +++ b/futures-util/no_atomic_cas.rs @@ -0,0 +1 @@ +../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-util/src/async_await/join_mod.rs b/futures-util/src/async_await/join_mod.rs index 965d9fb236..c5cdd9babc 100644 --- a/futures-util/src/async_await/join_mod.rs +++ b/futures-util/src/async_await/join_mod.rs @@ -1,7 +1,5 @@ //! The `join` macro. -use proc_macro_hack::proc_macro_hack; - macro_rules! document_join_macro { ($join:item $try_join:item) => { /// Polls multiple futures simultaneously, returning a tuple @@ -81,12 +79,14 @@ macro_rules! document_join_macro { } } +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::join_internal; +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::try_join_internal; document_join_macro! { diff --git a/futures-util/src/async_await/select_mod.rs b/futures-util/src/async_await/select_mod.rs index 59bca0840a..37e938da55 100644 --- a/futures-util/src/async_await/select_mod.rs +++ b/futures-util/src/async_await/select_mod.rs @@ -1,7 +1,5 @@ //! The `select` macro. -use proc_macro_hack::proc_macro_hack; - macro_rules! document_select_macro { // This branch is required for `futures 0.3.1`, from before select_biased was introduced ($select:item) => { @@ -309,12 +307,14 @@ macro_rules! document_select_macro { } #[cfg(feature = "std")] +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_internal; +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_biased_internal; document_select_macro! { diff --git a/futures-util/src/future/abortable.rs b/futures-util/src/future/abortable.rs index 3f2e5a064d..198cc8e668 100644 --- a/futures-util/src/future/abortable.rs +++ b/futures-util/src/future/abortable.rs @@ -1,11 +1,11 @@ use super::assert_future; use crate::task::AtomicWaker; -use futures_core::future::Future; -use futures_core::task::{Context, Poll}; +use alloc::sync::Arc; use core::fmt; use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; -use alloc::sync::Arc; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -19,7 +19,10 @@ pin_project! { } } -impl Abortable where Fut: Future { +impl Abortable +where + Fut: Future, +{ /// Creates a new `Abortable` future using an existing `AbortRegistration`. /// `AbortRegistration`s can be acquired through `AbortHandle::new`. /// @@ -40,10 +43,7 @@ impl Abortable where Fut: Future { /// # }); /// ``` pub fn new(future: Fut, reg: AbortRegistration) -> Self { - assert_future::, _>(Self { - future, - inner: reg.inner, - }) + assert_future::, _>(Self { future, inner: reg.inner }) } } @@ -80,19 +80,10 @@ impl AbortHandle { /// # }); /// ``` pub fn new_pair() -> (Self, AbortRegistration) { - let inner = Arc::new(AbortInner { - waker: AtomicWaker::new(), - cancel: AtomicBool::new(false), - }); - - ( - Self { - inner: inner.clone(), - }, - AbortRegistration { - inner, - }, - ) + let inner = + Arc::new(AbortInner { waker: AtomicWaker::new(), cancel: AtomicBool::new(false) }); + + (Self { inner: inner.clone() }, AbortRegistration { inner }) } } @@ -112,13 +103,11 @@ struct AbortInner { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn abortable(future: Fut) -> (Abortable, AbortHandle) - where Fut: Future +where + Fut: Future, { let (handle, reg) = AbortHandle::new_pair(); - ( - Abortable::new(future, reg), - handle, - ) + (Abortable::new(future, reg), handle) } /// Indicator that the `Abortable` future was aborted. @@ -134,18 +123,21 @@ impl fmt::Display for Aborted { #[cfg(feature = "std")] impl std::error::Error for Aborted {} -impl Future for Abortable where Fut: Future { +impl Future for Abortable +where + Fut: Future, +{ type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Check if the future has been aborted if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) + return Poll::Ready(Err(Aborted)); } // attempt to complete the future if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) { - return Poll::Ready(Ok(x)) + return Poll::Ready(Ok(x)); } // Register to receive a wakeup if the future is aborted in the... future @@ -156,7 +148,7 @@ impl Future for Abortable where Fut: Future { // Checking with `Relaxed` is sufficient because `register` introduces an // `AcqRel` barrier. if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) + return Poll::Ready(Err(Aborted)); } Poll::Pending diff --git a/futures-util/src/future/either.rs b/futures-util/src/future/either.rs index 5f5b614763..35650daa99 100644 --- a/futures-util/src/future/either.rs +++ b/futures-util/src/future/either.rs @@ -5,8 +5,25 @@ use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; -/// Combines two different futures, streams, or sinks having the same associated types into a single -/// type. +/// Combines two different futures, streams, or sinks having the same associated types into a single type. +/// +/// This is useful when conditionally choosing between two distinct future types: +/// +/// ```rust +/// use futures::future::Either; +/// +/// # futures::executor::block_on(async { +/// let cond = true; +/// +/// let fut = if cond { +/// Either::Left(async move { 12 }) +/// } else { +/// Either::Right(async move { 44 }) +/// }; +/// +/// assert_eq!(fut.await, 12); +/// # }) +/// ``` #[derive(Debug, Clone)] pub enum Either { /// First branch of the type diff --git a/futures-util/src/future/future/remote_handle.rs b/futures-util/src/future/future/remote_handle.rs index 861e4c1cd8..1358902cab 100644 --- a/futures-util/src/future/future/remote_handle.rs +++ b/futures-util/src/future/future/remote_handle.rs @@ -36,7 +36,7 @@ use { /// must be careful with regard to unwind safety because the thread in which the future /// is polled will keep running after the panic and the thread running the [RemoteHandle] /// will unwind. -#[must_use = "futures do nothing unless you `.await` or poll them"] +#[must_use = "dropping a remote handle cancels the underlying future"] #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub struct RemoteHandle { diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 6a22cf144d..76d6ca7666 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -1,7 +1,6 @@ //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, //! and the `AsyncRead` and `AsyncWrite` traits. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] @@ -12,9 +11,6 @@ #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); @@ -53,7 +49,7 @@ pub mod __private { macro_rules! cfg_target_has_atomic { ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] $item )*}; } diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 600e16e421..2f51ae7c98 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -1,16 +1,16 @@ //! Futures-powered synchronization primitives. -#[cfg(feature = "bilock")] -use futures_core::future::Future; -use futures_core::task::{Context, Poll, Waker}; +use alloc::boxed::Box; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::SeqCst; -use alloc::boxed::Box; -use alloc::sync::Arc; +#[cfg(feature = "bilock")] +use futures_core::future::Future; +use futures_core::task::{Context, Poll, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between /// two possible owners. @@ -61,10 +61,7 @@ impl BiLock { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { - state: AtomicUsize::new(0), - value: Some(UnsafeCell::new(t)), - }); + let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); (Self { arc: arc.clone() }, Self { arc }) } @@ -103,11 +100,11 @@ impl BiLock { let mut prev = Box::from_raw(n as *mut Waker); *prev = cx.waker().clone(); waker = Some(prev); - } + }, } // type ascription for safety's sake! - let me: Box = waker.take().unwrap_or_else(||Box::new(cx.waker().clone())); + let me: Box = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); let me = Box::into_raw(me) as usize; match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { @@ -145,9 +142,7 @@ impl BiLock { #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub fn lock(&self) -> BiLockAcquire<'_, T> { - BiLockAcquire { - bilock: self, - } + BiLockAcquire { bilock: self } } /// Attempts to put the two "halves" of a `BiLock` back together and @@ -181,7 +176,7 @@ impl BiLock { // up as its now their turn. n => unsafe { Box::from_raw(n as *mut Waker).wake(); - } + }, } } } @@ -205,9 +200,7 @@ pub struct ReuniteError(pub BiLock, pub BiLock); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/futures-util/src/lock/mutex.rs b/futures-util/src/lock/mutex.rs index a78de6283c..a849aeeb38 100644 --- a/futures-util/src/lock/mutex.rs +++ b/futures-util/src/lock/mutex.rs @@ -1,13 +1,13 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use slab::Slab; -use std::{fmt, mem}; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex as StdMutex; +use std::{fmt, mem}; /// A futures-aware mutex. /// @@ -53,7 +53,7 @@ enum Waiter { impl Waiter { fn register(&mut self, waker: &Waker) { match self { - Self::Waiting(w) if waker.will_wake(w) => {}, + Self::Waiting(w) if waker.will_wake(w) => {} _ => *self = Self::Waiting(waker.clone()), } } @@ -61,7 +61,7 @@ impl Waiter { fn wake(&mut self) { match mem::replace(self, Self::Woken) { Self::Waiting(waker) => waker.wake(), - Self::Woken => {}, + Self::Woken => {} } } } @@ -113,10 +113,7 @@ impl Mutex { /// This method returns a future that will resolve once the lock has been /// successfully acquired. pub fn lock(&self) -> MutexLockFuture<'_, T> { - MutexLockFuture { - mutex: Some(self), - wait_key: WAIT_KEY_NONE, - } + MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } /// Returns a mutable reference to the underlying data. @@ -145,7 +142,7 @@ impl Mutex { if wait_key != WAIT_KEY_NONE { let mut waiters = self.waiters.lock().unwrap(); match waiters.remove(wait_key) { - Waiter::Waiting(_) => {}, + Waiter::Waiting(_) => {} Waiter::Woken => { // We were awoken, but then dropped before we could // wake up to acquire the lock. Wake up another @@ -191,13 +188,10 @@ impl fmt::Debug for MutexLockFuture<'_, T> { f.debug_struct("MutexLockFuture") .field("was_acquired", &self.mutex.is_none()) .field("mutex", &self.mutex) - .field("wait_key", &( - if self.wait_key == WAIT_KEY_NONE { - None - } else { - Some(self.wait_key) - } - )) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) .finish() } } @@ -295,10 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MutexGuard") - .field("value", &&**self) - .field("mutex", &self.mutex) - .finish() + f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish() } } diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index e5b515b64a..147e9adc93 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -243,7 +243,8 @@ pub trait SinkExt: Sink { /// This future will drive the stream to keep producing items until it is /// exhausted, sending each item to the sink. It will complete once both the /// stream is exhausted, the sink has received all items, and the sink has - /// been flushed. Note that the sink is **not** closed. + /// been flushed. Note that the sink is **not** closed. If the stream produces + /// an error, that error will be returned by this future without flushing the sink. /// /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index eda3b27038..f596b3b0e3 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -52,10 +52,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let index = self.index; - self.project().data.poll(cx).map(|output| OrderWrapper { - data: output, - index, - }) + self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index }) } } @@ -139,10 +136,7 @@ impl FuturesOrdered { /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. pub fn push(&mut self, future: Fut) { - let wrapped = OrderWrapper { - data: future, - index: self.next_incoming_index, - }; + let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index ef7b15aed8..17cde4fac4 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -1,5 +1,5 @@ -use super::FuturesUnordered; use super::task::Task; +use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; use core::sync::atomic::Ordering::Relaxed; @@ -9,12 +9,12 @@ use core::sync::atomic::Ordering::Relaxed; pub struct IterPinMut<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, - pub(super) _marker: PhantomData<&'a mut FuturesUnordered> + pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } #[derive(Debug)] /// Mutable iterator over all futures in the unordered set. -pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>); +pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); #[derive(Debug)] /// Immutable iterator over all futures in the unordered set. @@ -22,12 +22,12 @@ pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, - pub(super) _marker: PhantomData<&'a FuturesUnordered> + pub(super) _marker: PhantomData<&'a FuturesUnordered>, } #[derive(Debug)] /// Immutable iterator over all the futures in the unordered set. -pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>); +pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; @@ -85,10 +85,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { // `head_all` was initially read for this iterator implies acquire // ordering for all previously inserted nodes (and we don't need to // read `len_all` again for any other nodes). - let next = (*self.task).spin_next_all( - self.pending_next_all, - Relaxed, - ); + let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; Some(Pin::new_unchecked(future)) @@ -115,3 +112,11 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { } impl ExactSizeIterator for Iter<'_, Fut> {} + +// SAFETY: we do nothing thread-local and there is no interior mutability, +// so the usual structural `Send`/`Sync` apply. +unsafe impl Send for IterPinRef<'_, Fut> {} +unsafe impl Sync for IterPinRef<'_, Fut> {} + +unsafe impl Send for IterPinMut<'_, Fut> {} +unsafe impl Sync for IterPinMut<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 8dcc551e9f..d1377ff327 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -3,11 +3,8 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -use futures_core::future::Future; -use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; use crate::task::AtomicWaker; +use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::fmt::{self, Debug}; use core::iter::FromIterator; @@ -16,8 +13,11 @@ use core::mem; use core::pin::Pin; use core::ptr; use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; -use core::sync::atomic::{AtomicPtr, AtomicBool}; -use alloc::sync::{Arc, Weak}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; @@ -28,8 +28,7 @@ mod task; use self::task::Task; mod ready_to_run_queue; -use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; - +use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; /// A set of futures which may complete in any order. /// @@ -63,18 +62,14 @@ unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { - fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } } impl LocalSpawn for FuturesUnordered> { - fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } @@ -191,7 +186,10 @@ impl FuturesUnordered { } /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin { + pub fn iter(&self) -> Iter<'_, Fut> + where + Fut: Unpin, + { Iter(Pin::new(self).iter_pin_ref()) } @@ -199,16 +197,14 @@ impl FuturesUnordered { fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); - IterPinRef { - task, - len, - pending_next_all: self.pending_next_all(), - _marker: PhantomData, - } + IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin { + pub fn iter_mut(&mut self) -> IterMut<'_, Fut> + where + Fut: Unpin, + { IterMut(Pin::new(self).iter_pin_mut()) } @@ -217,19 +213,9 @@ impl FuturesUnordered { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = *self.head_all.get_mut(); - let len = if task.is_null() { - 0 - } else { - unsafe { - *(*task).len_all.get() - } - }; + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; - IterPinMut { - task, - len, - _marker: PhantomData - } + IterPinMut { task, len, _marker: PhantomData } } /// Returns the current head node and number of futures in the list of all @@ -395,9 +381,7 @@ impl FuturesUnordered { impl Stream for FuturesUnordered { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Variable to determine how many times it is allowed to poll underlying // futures without yielding. // @@ -469,14 +453,11 @@ impl Stream for FuturesUnordered { // Double check that the call to `release_task` really // happened. Calling it required the task to be unlinked. - debug_assert_eq!( - task.next_all.load(Relaxed), - self.pending_next_all() - ); + debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all()); unsafe { debug_assert!((*task.prev_all.get()).is_null()); } - continue + continue; } }; @@ -516,10 +497,7 @@ impl Stream for FuturesUnordered { } } - let mut bomb = Bomb { - task: Some(task), - queue: &mut *self, - }; + let mut bomb = Bomb { task: Some(task), queue: &mut *self }; // Poll the underlying future with the appropriate waker // implementation. This is where a large bit of the unsafety @@ -555,11 +533,9 @@ impl Stream for FuturesUnordered { cx.waker().wake_by_ref(); return Poll::Pending; } - continue - } - Poll::Ready(output) => { - return Poll::Ready(Some(output)) + continue; } + Poll::Ready(output) => return Poll::Ready(Some(output)), } } } @@ -611,7 +587,10 @@ impl FromIterator for FuturesUnordered { I: IntoIterator, { let acc = Self::new(); - iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) + iter.into_iter().fold(acc, |acc, item| { + acc.push(item); + acc + }) } } diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 210519585e..3b34dc6e27 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -1,9 +1,9 @@ use crate::task::AtomicWaker; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::ptr; use core::sync::atomic::AtomicPtr; -use core::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel}; -use alloc::sync::Arc; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use super::abort::abort; use super::task::Task; diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index 261408f01c..da2cd67d97 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -1,11 +1,11 @@ +use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::{AtomicPtr, AtomicBool}; use core::sync::atomic::Ordering::{self, SeqCst}; -use alloc::sync::{Arc, Weak}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; -use crate::task::{ArcWake, WakerRef, waker_ref}; -use super::ReadyToRunQueue; use super::abort::abort; +use super::ReadyToRunQueue; +use crate::task::{waker_ref, ArcWake, WakerRef}; pub(super) struct Task { // The future diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index f9c2e78fc8..ec6c68bbf3 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -36,11 +36,11 @@ pub use self::stream::ReadyChunks; #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -58,7 +58,7 @@ pub use self::try_stream::{ #[cfg(feature = "std")] pub use self::try_stream::IntoAsyncRead; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index c0b92faabe..6b17bad125 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -5,11 +5,11 @@ use core::iter::FromIterator; use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use super::assert_stream; -use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; +use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; /// An unbounded set of streams /// @@ -75,10 +75,7 @@ impl Default for SelectAll { impl Stream for SelectAll { type Item = St::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.inner.poll_next_unpin(cx)) { Some((Some(item), remaining)) => { @@ -116,8 +113,9 @@ impl FusedStream for SelectAll { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn select_all(streams: I) -> SelectAll - where I: IntoIterator, - I::Item: Stream + Unpin +where + I: IntoIterator, + I::Item: Stream + Unpin, { let mut set = SelectAll::new(); diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index de42cfd330..d64c142b41 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -1,12 +1,12 @@ use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) @@ -63,10 +63,7 @@ where { type Item = ::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 1af9f492b9..6052a737ba 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -1,4 +1,6 @@ use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; use futures_core::stream::Stream; @@ -6,8 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffered`](super::StreamExt::buffered) method. @@ -44,11 +44,7 @@ where St::Item: Future, { pub(super) fn new(stream: St, n: usize) -> Self { - Self { - stream: super::Fuse::new(stream), - in_progress_queue: FuturesOrdered::new(), - max: n, - } + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n } } delegate_access_inner!(stream, St, (.)); @@ -61,10 +57,7 @@ where { type Item = ::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -79,7 +72,7 @@ where // Attempt to pull the next value from the in_progress_queue let res = this.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { - return Poll::Ready(Some(val)) + return Poll::Ready(Some(val)); } // If more values are still coming from the stream, we're not done yet diff --git a/futures-util/src/stream/stream/for_each_concurrent.rs b/futures-util/src/stream/stream/for_each_concurrent.rs index cee0ba197e..6c18753eb9 100644 --- a/futures-util/src/stream/stream/for_each_concurrent.rs +++ b/futures-util/src/stream/stream/for_each_concurrent.rs @@ -1,7 +1,7 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -35,9 +35,10 @@ where } impl ForEachConcurrent -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: St, limit: Option, f: F) -> Self { Self { @@ -51,9 +52,10 @@ where St: Stream, } impl FusedFuture for ForEachConcurrent - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -61,9 +63,10 @@ impl FusedFuture for ForEachConcurrent } impl Future for ForEachConcurrent - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { type Output = (); @@ -80,7 +83,7 @@ impl Future for ForEachConcurrent Poll::Ready(Some(elem)) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { stream_completed = true; None @@ -102,9 +105,9 @@ impl Future for ForEachConcurrent Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(()) + return Poll::Ready(()); } - }, + } Poll::Pending => {} } diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index fd3a103491..9089e6e3df 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -919,7 +919,7 @@ pub trait StreamExt: Stream { /// fut.await; /// # }) /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn for_each_concurrent( self, @@ -1142,7 +1142,7 @@ pub trait StreamExt: Stream { /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn buffered(self, n: usize) -> Buffered where @@ -1187,7 +1187,7 @@ pub trait StreamExt: Stream { /// assert_eq!(buffered.next().await, None); /// # Ok::<(), i32>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn buffer_unordered(self, n: usize) -> BufferUnordered where @@ -1329,7 +1329,8 @@ pub trait StreamExt: Stream { /// the sink is closed. Note that neither the original stream nor provided /// sink will be output by this future. Pass the sink by `Pin<&mut S>` /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in - /// order to preserve access to the `Sink`. + /// order to preserve access to the `Sink`. If the stream produces an error, + /// that error will be returned by this future without flushing/closing the sink. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn forward(self, sink: S) -> Forward @@ -1354,7 +1355,7 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn split(self) -> (SplitSink, SplitStream) where diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 997b9747a2..3a72fee30b 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -1,9 +1,9 @@ +use core::fmt; +use core::pin::Pin; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use core::fmt; -use core::pin::Pin; use crate::lock::BiLock; @@ -20,7 +20,8 @@ impl SplitStream { /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitSink) -> Result> - where S: Sink, + where + S: Sink, { other.reunite(self) } @@ -36,10 +37,7 @@ impl Stream for SplitStream { #[allow(bad_style)] fn SplitSink, Item>(lock: BiLock) -> SplitSink { - SplitSink { - lock, - slot: None, - } + SplitSink { lock, slot: None } } /// A `Sink` part of the split pair @@ -58,14 +56,16 @@ impl + Unpin, Item> SplitSink { /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitStream) -> Result> { - self.lock.reunite(other.0).map_err(|err| { - ReuniteError(SplitSink(err.0), SplitStream(err.1)) - }) + self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1))) } } impl, Item> SplitSink { - fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option, cx: &mut Context<'_>) -> Poll> { + fn poll_flush_slot( + mut inner: Pin<&mut S>, + slot: &mut Option, + cx: &mut Context<'_>, + ) -> Poll> { if slot.is_some() { ready!(inner.as_mut().poll_ready(cx))?; Poll::Ready(inner.start_send(slot.take().unwrap())) @@ -74,7 +74,10 @@ impl, Item> SplitSink { } } - fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_lock_and_flush_slot( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) @@ -127,9 +130,7 @@ pub struct ReuniteError(pub SplitSink, pub SplitStream); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 12f2de0d31..11cd9c0d31 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -516,7 +516,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(Err(oneshot::Canceled), fut.await); /// # }) /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_for_each_concurrent( self, @@ -837,7 +837,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered where @@ -913,7 +913,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_buffered(self, n: usize) -> TryBuffered where diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 71c6fc7e26..9a899d4ea6 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesUnordered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the @@ -24,8 +24,9 @@ pin_project! { } impl TryBufferUnordered - where St: TryStream, - St::Ok: TryFuture, +where + St: TryStream, + St::Ok: TryFuture, { pub(super) fn new(stream: St, n: usize) -> Self { Self { @@ -39,15 +40,13 @@ impl TryBufferUnordered } impl Stream for TryBufferUnordered - where St: TryStream, - St::Ok: TryFuture, +where + St: TryStream, + St::Ok: TryFuture, { type Item = Result<::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -77,8 +76,9 @@ impl Stream for TryBufferUnordered // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for TryBufferUnordered - where S: TryStream + Sink, - S::Ok: TryFuture, +where + S: TryStream + Sink, + S::Ok: TryFuture, { type Error = E; diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index ff7e8447f9..45bd3f8c7a 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method. @@ -47,10 +47,7 @@ where { type Item = Result<::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs b/futures-util/src/stream/try_stream/try_for_each_concurrent.rs index d2f4b0fed2..62734c746b 100644 --- a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs +++ b/futures-util/src/stream/try_stream/try_for_each_concurrent.rs @@ -1,8 +1,8 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; use core::mem; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; @@ -37,9 +37,10 @@ where } impl FusedFuture for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -47,9 +48,10 @@ impl FusedFuture for TryForEachConcurrent } impl TryForEachConcurrent -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { pub(super) fn new(stream: St, limit: Option, f: F) -> Self { Self { @@ -63,9 +65,10 @@ where St: TryStream, } impl Future for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { type Output = Result<(), St::Error>; @@ -85,7 +88,7 @@ impl Future for TryForEachConcurrent Poll::Ready(Some(Ok(elem))) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { this.stream.set(None); None @@ -109,9 +112,9 @@ impl Future for TryForEachConcurrent Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } - }, + } Poll::Pending => {} Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know diff --git a/futures/Cargo.toml b/futures/Cargo.toml index a0a64bfece..51c710d6e6 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -47,11 +47,14 @@ thread-pool = ["executor", "futures-executor/thread-pool"] # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] bilock = ["futures-util/bilock"] read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] write-all-vectored = ["futures-util/write-all-vectored"] +# These features are no longer used. +# TODO: remove in the next major version. +cfg-target-has-atomic = [] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index e40ad16b7f..d15c16c377 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -78,7 +78,6 @@ //! The majority of examples and code snippets in this crate assume that they are //! inside an async block as written above. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] @@ -88,9 +87,6 @@ #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 111fdf6388..e934595c2a 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1383,6 +1383,26 @@ pub mod stream { assert_impl!(Next<'_, ()>: Unpin); assert_not_impl!(Next<'_, PhantomPinned>: Unpin); + assert_impl!(NextIf<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIf<'_, SendStream, ()>: Send); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin); + + assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin); + assert_impl!(Once<()>: Send); assert_not_impl!(Once<*const ()>: Send); assert_impl!(Once<()>: Sync); @@ -1780,24 +1800,32 @@ pub mod stream { assert_not_impl!(Zip: Unpin); assert_not_impl!(Zip: Unpin); - assert_not_impl!(futures_unordered::Iter<()>: Send); - assert_not_impl!(futures_unordered::Iter<()>: Sync); + assert_impl!(futures_unordered::Iter<()>: Send); + assert_not_impl!(futures_unordered::Iter<*const ()>: Send); + assert_impl!(futures_unordered::Iter<()>: Sync); + assert_not_impl!(futures_unordered::Iter<*const ()>: Sync); assert_impl!(futures_unordered::Iter<()>: Unpin); - // futures_unordered::Iter requires `Fut: Unpin` + // The definition of futures_unordered::Iter has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::Iter: Unpin); - assert_not_impl!(futures_unordered::IterMut<()>: Send); - assert_not_impl!(futures_unordered::IterMut<()>: Sync); + assert_impl!(futures_unordered::IterMut<()>: Send); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Send); + assert_impl!(futures_unordered::IterMut<()>: Sync); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync); assert_impl!(futures_unordered::IterMut<()>: Unpin); - // futures_unordered::IterMut requires `Fut: Unpin` + // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::IterMut: Unpin); - assert_not_impl!(futures_unordered::IterPinMut<()>: Send); - assert_not_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_impl!(futures_unordered::IterPinMut<()>: Send); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send); + assert_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut: Unpin); - assert_not_impl!(futures_unordered::IterPinRef<()>: Send); - assert_not_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); assert_impl!(futures_unordered::IterPinRef: Unpin); } diff --git a/futures/tests/no-std/Cargo.toml b/futures/tests/no-std/Cargo.toml new file mode 100644 index 0000000000..9526732e34 --- /dev/null +++ b/futures/tests/no-std/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "no-std" +version = "0.1.0" +authors = ["Taiki Endo "] +edition = "2018" +publish = false + +[features] +futures-core-alloc = ["futures-core/alloc"] +futures-task-alloc = ["futures-task/alloc"] +futures-channel-alloc = ["futures-channel/alloc"] +futures-util-alloc = ["futures-util/alloc"] +futures-util-async-await = ["futures-util/async-await"] +futures-alloc = ["futures/alloc"] +futures-async-await = ["futures/async-await"] + +[dependencies] +futures-core = { path = "../../../futures-core", optional = true, default-features = false } +futures-task = { path = "../../../futures-task", optional = true, default-features = false } +futures-channel = { path = "../../../futures-channel", optional = true, default-features = false } +futures-util = { path = "../../../futures-util", optional = true, default-features = false } +futures = { path = "../..", optional = true, default-features = false } diff --git a/futures/tests/no-std/build.rs b/futures/tests/no-std/build.rs new file mode 100644 index 0000000000..a96a68274b --- /dev/null +++ b/futures/tests/no-std/build.rs @@ -0,0 +1,14 @@ +use std::{env, process::Command}; + +fn main() { + if is_nightly() { + println!("cargo:rustc-cfg=nightly"); + } +} + +fn is_nightly() -> bool { + env::var_os("RUSTC") + .and_then(|rustc| Command::new(rustc).arg("--version").output().ok()) + .and_then(|output| String::from_utf8(output.stdout).ok()) + .map_or(false, |version| version.contains("nightly") || version.contains("dev")) +} diff --git a/futures/tests/no-std/src/lib.rs b/futures/tests/no-std/src/lib.rs new file mode 100644 index 0000000000..308218d6b7 --- /dev/null +++ b/futures/tests/no-std/src/lib.rs @@ -0,0 +1,31 @@ +#![cfg(nightly)] +#![no_std] +#![feature(cfg_target_has_atomic)] + +#[cfg(feature = "futures-core-alloc")] +#[cfg(target_has_atomic = "ptr")] +pub use futures_core::task::__internal::AtomicWaker as _; + +#[cfg(feature = "futures-task-alloc")] +#[cfg(target_has_atomic = "ptr")] +pub use futures_task::ArcWake as _; + +#[cfg(feature = "futures-channel-alloc")] +#[cfg(target_has_atomic = "ptr")] +pub use futures_channel::oneshot as _; + +#[cfg(any(feature = "futures", feature = "futures-alloc"))] +#[cfg(target_has_atomic = "ptr")] +pub use futures::task::AtomicWaker as _; + +#[cfg(feature = "futures-alloc")] +#[cfg(target_has_atomic = "ptr")] +pub use futures::stream::FuturesOrdered as _; + +#[cfg(any(feature = "futures-util", feature = "futures-util-alloc"))] +#[cfg(target_has_atomic = "ptr")] +pub use futures_util::task::AtomicWaker as _; + +#[cfg(feature = "futures-util-alloc")] +#[cfg(target_has_atomic = "ptr")] +pub use futures_util::stream::FuturesOrdered as _; diff --git a/futures/tests/oneshot.rs b/futures/tests/oneshot.rs index 58951ec581..34b78a33fb 100644 --- a/futures/tests/oneshot.rs +++ b/futures/tests/oneshot.rs @@ -64,3 +64,15 @@ fn oneshot_drop_rx() { drop(rx); assert_eq!(Err(2), tx.send(2)); } + +#[test] +fn oneshot_debug() { + let (tx, rx) = oneshot::channel::(); + assert_eq!(format!("{:?}", tx), "Sender { complete: false }"); + assert_eq!(format!("{:?}", rx), "Receiver { complete: false }"); + drop(rx); + assert_eq!(format!("{:?}", tx), "Sender { complete: true }"); + let (tx, rx) = oneshot::channel::(); + drop(tx); + assert_eq!(format!("{:?}", rx), "Receiver { complete: true }"); +} diff --git a/futures/tests_disabled/all.rs b/futures/tests_disabled/all.rs index 6c7e11cf7b..a7a571040a 100644 --- a/futures/tests_disabled/all.rs +++ b/futures/tests_disabled/all.rs @@ -1,27 +1,27 @@ -use futures::future; -use futures::executor::block_on; use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future; use std::sync::mpsc::{channel, TryRecvError}; -mod support; -use support::*; +// mod support; +// use support::*; fn unselect(r: Result, Either<(E, B), (E, A)>>) -> Result { match r { - Ok(Either::Left((t, _))) | - Ok(Either::Right((t, _))) => Ok(t), - Err(Either::Left((e, _))) | - Err(Either::Right((e, _))) => Err(e), + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e), } } #[test] fn result_smoke() { fn is_future_v(_: C) - where A: Send + 'static, - B: Send + 'static, - C: Future - {} + where + A: Send + 'static, + B: Send + 'static, + C: Future, + { + } is_future_v::(f_ok(1).map(|a| a + 1)); is_future_v::(f_ok(1).map_err(|a| a + 1)); @@ -64,7 +64,9 @@ fn result_smoke() { #[test] fn test_empty() { - fn empty() -> Empty { future::empty() } + fn empty() -> Empty { + future::empty() + } assert_empty(|| empty()); assert_empty(|| empty().select(empty())); @@ -105,16 +107,22 @@ fn flatten() { #[test] fn smoke_oneshot() { - assert_done(|| { - let (c, p) = oneshot::channel(); - c.send(1).unwrap(); - p - }, Ok(1)); - assert_done(|| { - let (c, p) = oneshot::channel::(); - drop(c); - p - }, Err(Canceled)); + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::(); + drop(c); + p + }, + Err(Canceled), + ); let mut completes = Vec::new(); assert_empty(|| { let (a, b) = oneshot::channel::(); @@ -129,9 +137,7 @@ fn smoke_oneshot() { let (c, p) = oneshot::channel::(); drop(c); let (tx, rx) = channel(); - p.then(move |_| { - tx.send(()) - }).forget(); + p.then(move |_| tx.send(())).forget(); rx.recv().unwrap(); } @@ -139,8 +145,14 @@ fn smoke_oneshot() { fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); // assert!(f.poll(&mut Task::new()).is_pending()); @@ -156,8 +168,14 @@ fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); assert!(f.poll(lw).ok().unwrap().is_pending()); @@ -173,8 +191,14 @@ fn select_cancels() { fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.join(d); drop(a); @@ -185,8 +209,14 @@ fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let (tx, rx) = channel(); let f = b.join(d); @@ -194,7 +224,8 @@ fn join_cancels() { tx.send(()).unwrap(); let res: Result<(), ()> = Ok(()); res - }).forget(); + }) + .forget(); assert!(rx.try_recv().is_err()); drop(a); rx.recv().unwrap(); @@ -243,7 +274,6 @@ fn join_incomplete() { }) } - #[test] fn select2() { assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); @@ -251,14 +281,15 @@ fn select2() { assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); - assert_done(|| { - f_ok(1).select(f_ok(2)) - .map_err(|_| 0) - .and_then(|either_tup| { - let (a, b) = either_tup.into_inner(); - b.map(move |b| a + b) - }) - }, Ok(3)); + assert_done( + || { + f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, + Ok(3), + ); // Finish one half of a select and then fail the second, ensuring that we // get the notification of the second one. @@ -297,8 +328,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let f = b.select(d); drop(f); assert!(drx.recv().is_err()); @@ -309,8 +346,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let mut f = b.select(d); let _res = noop_waker_lw(|lw| f.poll(lw)); drop(f); @@ -322,8 +365,14 @@ fn select2() { { let ((a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let (tx, rx) = channel(); b.select(d).map(move |_| tx.send(()).unwrap()).forget(); drop(a); diff --git a/futures/tests_disabled/bilock.rs b/futures/tests_disabled/bilock.rs index c1bc33f507..0166ca48ba 100644 --- a/futures/tests_disabled/bilock.rs +++ b/futures/tests_disabled/bilock.rs @@ -1,11 +1,11 @@ -use futures::task; -use futures::stream; use futures::future; +use futures::stream; +use futures::task; use futures_util::lock::BiLock; use std::thread; -mod support; -use support::*; +// mod support; +// use support::*; #[test] fn smoke() { @@ -41,9 +41,9 @@ fn smoke() { }); assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); } #[test] @@ -51,10 +51,7 @@ fn concurrent() { const N: usize = 10000; let (a, b) = BiLock::new(0); - let a = Increment { - a: Some(a), - remaining: N, - }; + let a = Increment { a: Some(a), remaining: N }; let b = stream::iter_ok(0..N).fold(b, |b, _n| { b.lock().map(|mut b| { *b += 1; @@ -89,7 +86,7 @@ fn concurrent() { fn poll(&mut self) -> Poll, ()> { loop { if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()) + return Ok(self.a.take().unwrap().into()); } let a = self.a.as_ref().unwrap(); diff --git a/futures/tests_disabled/stream.rs b/futures/tests_disabled/stream.rs index ef0676db35..854dbad829 100644 --- a/futures/tests_disabled/stream.rs +++ b/futures/tests_disabled/stream.rs @@ -1,26 +1,26 @@ +use futures::channel::mpsc; +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{err, ok}; use futures::stream::{empty, iter_ok, poll_fn, Peekable}; -use futures::channel::oneshot; -use futures::channel::mpsc; -mod support; -use support::*; +// mod support; +// use support::*; pub struct Iter { iter: I, } pub fn iter(i: J) -> Iter - where J: IntoIterator>, +where + J: IntoIterator>, { - Iter { - iter: i.into_iter(), - } + Iter { iter: i.into_iter() } } impl Stream for Iter - where I: Iterator>, +where + I: Iterator>, { type Item = T; type Error = E; @@ -34,21 +34,15 @@ impl Stream for Iter } } -fn list() -> Box + Send> { +fn list() -> Box + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Ok(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } -fn err_list() -> Box + Send> { +fn err_list() -> Box + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Err(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } @@ -89,40 +83,31 @@ fn filter() { #[test] fn filter_map() { - assert_done(|| list().filter_map(|x| { - ok(if x % 2 == 0 { - Some(x + 10) - } else { - None - }) - }).collect(), Ok(vec![12])); + assert_done( + || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(), + Ok(vec![12]), + ); } #[test] fn and_then() { assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); - assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), - Err(1)); + assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), Err(1)); } #[test] fn then() { assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); - } #[test] fn or_else() { - assert_done(|| err_list().or_else(|a| { - ok::(a as i32) - }).collect(), Ok(vec![1, 2, 3])); + assert_done(|| err_list().or_else(|a| ok::(a as i32)).collect(), Ok(vec![1, 2, 3])); } #[test] fn flatten() { - assert_done(|| list().map(|_| list()).flatten().collect(), - Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); - + assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); } #[test] @@ -132,9 +117,7 @@ fn skip() { #[test] fn skip_passes_errors_through() { - let mut s = block_on_stream( - iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) - ); + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(4))); @@ -144,8 +127,7 @@ fn skip_passes_errors_through() { #[test] fn skip_while() { - assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), - Ok(vec![2, 3])); + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3])); } #[test] fn take() { @@ -154,8 +136,7 @@ fn take() { #[test] fn take_while() { - assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), - Ok(vec![1, 2])); + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2])); } #[test] @@ -193,9 +174,9 @@ fn buffered() { let (a, b) = oneshot::channel::(); let (c, d) = oneshot::channel::(); - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(2); sassert_empty(&mut rx); @@ -211,9 +192,9 @@ fn buffered() { let (a, b) = oneshot::channel::(); let (c, d) = oneshot::channel::(); - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(1); sassert_empty(&mut rx); @@ -233,8 +214,8 @@ fn unordered() { let (c, d) = oneshot::channel::(); tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); let mut rx = rx.buffer_unordered(2); sassert_empty(&mut rx); @@ -250,8 +231,8 @@ fn unordered() { let (c, d) = oneshot::channel::(); tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); // We don't even get to see `c` until `a` completes. let mut rx = rx.buffer_unordered(1); @@ -267,21 +248,17 @@ fn unordered() { #[test] fn zip() { - assert_done(|| list().zip(list()).collect(), - Ok(vec![(1, 1), (2, 2), (3, 3)])); - assert_done(|| list().zip(list().take(2)).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| list().take(2).zip(list()).collect(), - Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)])); assert_done(|| err_list().zip(list()).collect::>(), Err(3)); - assert_done(|| list().zip(list().map(|x| x + 1)).collect(), - Ok(vec![(1, 2), (2, 3), (3, 4)])); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)])); } #[test] fn peek() { struct Peek { - inner: Peekable + Send>> + inner: Peekable + Send>>, } impl Future for Peek { @@ -299,15 +276,12 @@ fn peek() { } } - block_on(Peek { - inner: list().peekable(), - }).unwrap() + block_on(Peek { inner: list().peekable() }).unwrap() } #[test] fn wait() { - assert_eq!(block_on_stream(list()).collect::, _>>(), - Ok(vec![1, 2, 3])); + assert_eq!(block_on_stream(list()).collect::, _>>(), Ok(vec![1, 2, 3])); } #[test] @@ -337,8 +311,10 @@ fn forward() { let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; assert_eq!(v, vec![0, 1, 2, 3]); - assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), - Ok(vec![0, 1, 2, 3, 4, 5])); + assert_done( + move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5]), + ); } #[test] diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs new file mode 100644 index 0000000000..0819af1a45 --- /dev/null +++ b/no_atomic_cas.rs @@ -0,0 +1,11 @@ +// This file is @generated by no_atomic_cas.sh. +// It is not intended for manual editing. + +const NO_ATOMIC_CAS_TARGETS: &[&str] = &[ + "avr-unknown-gnu-atmega328", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv6m-none-eabi", +]; diff --git a/tools/fmt.sh b/tools/fmt.sh new file mode 100755 index 0000000000..5438030aee --- /dev/null +++ b/tools/fmt.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Format all rust code. +# +# Usage: +# ./tools/fmt.sh +# +# This script is needed because `cargo fmt` cannot recognize modules defined inside macros. +# Refs: https://github.com/rust-lang/rustfmt/issues/4078 + +set -euo pipefail +IFS=$'\n\t' + +cd "$(cd "$(dirname "${0}")" && pwd)"/.. + +# shellcheck disable=SC2046 +if [[ -z "${CI:-}" ]]; then + ( + # `cargo fmt` cannot recognize modules defined inside macros so run rustfmt directly. + rustfmt $(git ls-files "*.rs") + ) +else + ( + rustfmt --check $(git ls-files "*.rs") + ) +fi