74
74
// happens-before semantics required for the acquire / release semantics used
75
75
// by the queue structure.
76
76
77
+ use std:: mem:: Pin ;
78
+ use std:: marker:: Unpin ;
77
79
use std:: fmt;
78
80
use std:: error:: Error ;
79
81
use std:: any:: Any ;
@@ -84,8 +86,7 @@ use std::thread;
84
86
use std:: usize;
85
87
86
88
use futures_core:: task:: { self , Waker } ;
87
- use futures_core:: { Async , Poll , Stream } ;
88
- use futures_core:: never:: Never ;
89
+ use futures_core:: { Poll , Stream } ;
89
90
90
91
use mpsc:: queue:: { Queue , PopResult } ;
91
92
@@ -109,6 +110,9 @@ pub struct Sender<T> {
109
110
maybe_parked : bool ,
110
111
}
111
112
113
+ // Safe because we treat the `T` opaquely
114
+ unsafe impl < T > Unpin for Sender < T > { }
115
+
112
116
/// The transmission end of an unbounded mpsc channel.
113
117
///
114
118
/// This value is created by the [`unbounded`](unbounded) function.
@@ -126,12 +130,18 @@ pub struct Receiver<T> {
126
130
inner : Arc < Inner < T > > ,
127
131
}
128
132
133
+ // Safe because we treat the `T` opaquely
134
+ unsafe impl < T > Unpin for Receiver < T > { }
135
+
129
136
/// The receiving end of an unbounded mpsc channel.
130
137
///
131
138
/// This value is created by the [`unbounded`](unbounded) function.
132
139
#[ derive( Debug ) ]
133
140
pub struct UnboundedReceiver < T > ( Receiver < T > ) ;
134
141
142
+ // Safe because we treat the `T` opaquely
143
+ unsafe impl < T > Unpin for UnboundedReceiver < T > { }
144
+
135
145
/// The error type for [`Sender`s](Sender) used as `Sink`s.
136
146
#[ derive( Clone , Debug , PartialEq , Eq ) ]
137
147
pub struct SendError {
@@ -511,14 +521,14 @@ impl<T> Sender<T> {
511
521
Ok ( ( ) )
512
522
}
513
523
514
- fn poll_ready_nb ( & self ) -> Poll < ( ) , SendError > {
524
+ fn poll_ready_nb ( & self ) -> Poll < Result < ( ) , SendError > > {
515
525
let state = decode_state ( self . inner . state . load ( SeqCst ) ) ;
516
526
if state. is_open {
517
- Ok ( Async :: Ready ( ( ) ) )
527
+ Poll :: Ready ( Ok ( ( ) ) )
518
528
} else {
519
- Err ( SendError {
529
+ Poll :: Ready ( Err ( SendError {
520
530
kind : SendErrorKind :: Full ,
521
- } )
531
+ } ) )
522
532
}
523
533
}
524
534
@@ -637,15 +647,15 @@ impl<T> Sender<T> {
637
647
/// - `Ok(Async::Pending)` if the channel may not have
638
648
/// capacity, in which case the current task is queued to be notified once capacity is available;
639
649
/// - `Err(SendError)` if the receiver has been dropped.
640
- pub fn poll_ready ( & mut self , cx : & mut task:: Context ) -> Poll < ( ) , SendError > {
650
+ pub fn poll_ready ( & mut self , cx : & mut task:: Context ) -> Poll < Result < ( ) , SendError > > {
641
651
let state = decode_state ( self . inner . state . load ( SeqCst ) ) ;
642
652
if !state. is_open {
643
- return Err ( SendError {
653
+ return Poll :: Ready ( Err ( SendError {
644
654
kind : SendErrorKind :: Disconnected ,
645
- } ) ;
655
+ } ) ) ;
646
656
}
647
657
648
- Ok ( self . poll_unparked ( Some ( cx) ) )
658
+ self . poll_unparked ( Some ( cx) ) . map ( Ok )
649
659
}
650
660
651
661
/// Returns whether this channel is closed without needing a context.
@@ -662,7 +672,7 @@ impl<T> Sender<T> {
662
672
let _ = self . do_send_nb ( None ) ;
663
673
}
664
674
665
- fn poll_unparked ( & mut self , cx : Option < & mut task:: Context > ) -> Async < ( ) > {
675
+ fn poll_unparked ( & mut self , cx : Option < & mut task:: Context > ) -> Poll < ( ) > {
666
676
// First check the `maybe_parked` variable. This avoids acquiring the
667
677
// lock in most cases
668
678
if self . maybe_parked {
@@ -671,7 +681,7 @@ impl<T> Sender<T> {
671
681
672
682
if !task. is_parked {
673
683
self . maybe_parked = false ;
674
- return Async :: Ready ( ( ) )
684
+ return Poll :: Ready ( ( ) )
675
685
}
676
686
677
687
// At this point, an unpark request is pending, so there will be an
@@ -682,16 +692,16 @@ impl<T> Sender<T> {
682
692
// task
683
693
task. task = cx. map ( |cx| cx. waker ( ) . clone ( ) ) ;
684
694
685
- Async :: Pending
695
+ Poll :: Pending
686
696
} else {
687
- Async :: Ready ( ( ) )
697
+ Poll :: Ready ( ( ) )
688
698
}
689
699
}
690
700
}
691
701
692
702
impl < T > UnboundedSender < T > {
693
703
/// Check if the channel is ready to receive a message.
694
- pub fn poll_ready ( & self , _: & mut task:: Context ) -> Poll < ( ) , SendError > {
704
+ pub fn poll_ready ( & self , _: & mut task:: Context ) -> Poll < Result < ( ) , SendError > > {
695
705
self . 0 . poll_ready_nb ( )
696
706
}
697
707
@@ -832,14 +842,14 @@ impl<T> Receiver<T> {
832
842
/// no longer empty.
833
843
pub fn try_next ( & mut self ) -> Result < Option < T > , TryRecvError > {
834
844
match self . next_message ( ) {
835
- Async :: Ready ( msg) => {
845
+ Poll :: Ready ( msg) => {
836
846
Ok ( msg)
837
847
} ,
838
- Async :: Pending => Err ( TryRecvError { _inner : ( ) } ) ,
848
+ Poll :: Pending => Err ( TryRecvError { _inner : ( ) } ) ,
839
849
}
840
850
}
841
851
842
- fn next_message ( & mut self ) -> Async < Option < T > > {
852
+ fn next_message ( & mut self ) -> Poll < Option < T > > {
843
853
// Pop off a message
844
854
loop {
845
855
match unsafe { self . inner . message_queue . pop ( ) } {
@@ -851,11 +861,11 @@ impl<T> Receiver<T> {
851
861
// Decrement number of messages
852
862
self . dec_num_messages ( ) ;
853
863
854
- return Async :: Ready ( msg) ;
864
+ return Poll :: Ready ( msg) ;
855
865
}
856
866
PopResult :: Empty => {
857
867
// The queue is empty, return Pending
858
- return Async :: Pending ;
868
+ return Poll :: Pending ;
859
869
}
860
870
PopResult :: Inconsistent => {
861
871
// Inconsistent means that there will be a message to pop
@@ -937,27 +947,26 @@ impl<T> Receiver<T> {
937
947
938
948
impl < T > Stream for Receiver < T > {
939
949
type Item = T ;
940
- type Error = Never ;
941
950
942
- fn poll_next ( & mut self , cx : & mut task:: Context ) -> Poll < Option < Self :: Item > , Self :: Error > {
951
+ fn poll_next ( mut self : Pin < Self > , cx : & mut task:: Context ) -> Poll < Option < T > > {
943
952
loop {
944
953
// Try to read a message off of the message queue.
945
954
let msg = match self . next_message ( ) {
946
- Async :: Ready ( msg) => msg,
947
- Async :: Pending => {
955
+ Poll :: Ready ( msg) => msg,
956
+ Poll :: Pending => {
948
957
// There are no messages to read, in this case, attempt to
949
958
// park. The act of parking will verify that the channel is
950
959
// still empty after the park operation has completed.
951
960
match self . try_park ( cx) {
952
961
TryPark :: Parked => {
953
962
// The task was parked, and the channel is still
954
963
// empty, return Pending.
955
- return Ok ( Async :: Pending ) ;
964
+ return Poll :: Pending ;
956
965
}
957
966
TryPark :: Closed => {
958
967
// The channel is closed, there will be no further
959
968
// messages.
960
- return Ok ( Async :: Ready ( None ) ) ;
969
+ return Poll :: Ready ( None ) ;
961
970
}
962
971
TryPark :: NotEmpty => {
963
972
// A message has been sent while attempting to
@@ -969,7 +978,7 @@ impl<T> Stream for Receiver<T> {
969
978
}
970
979
} ;
971
980
// Return the message
972
- return Ok ( Async :: Ready ( msg) ) ;
981
+ return Poll :: Ready ( msg) ;
973
982
}
974
983
}
975
984
}
@@ -1005,10 +1014,9 @@ impl<T> UnboundedReceiver<T> {
1005
1014
1006
1015
impl < T > Stream for UnboundedReceiver < T > {
1007
1016
type Item = T ;
1008
- type Error = Never ;
1009
1017
1010
- fn poll_next ( & mut self , cx : & mut task:: Context ) -> Poll < Option < Self :: Item > , Self :: Error > {
1011
- self . 0 . poll_next ( cx)
1018
+ fn poll_next ( mut self : Pin < Self > , cx : & mut task:: Context ) -> Poll < Option < T > > {
1019
+ Pin :: new ( & mut self . 0 ) . poll_next ( cx)
1012
1020
}
1013
1021
}
1014
1022
0 commit comments