@@ -61,7 +61,9 @@ impl Drop for Receiver {
61
61
62
62
pub fn channel ( ) -> ( Sender , Receiver ) {
63
63
let inner = Arc :: new ( Mutex :: new ( Inner :: new ( ) ) ) ;
64
- let sender = Sender { inner : inner. clone ( ) } ;
64
+ let sender = Sender {
65
+ inner : inner. clone ( ) ,
66
+ } ;
65
67
let receiver = Receiver { inner } ;
66
68
( sender, receiver)
67
69
}
@@ -70,8 +72,8 @@ impl AsyncRead for Receiver {
70
72
fn poll_read (
71
73
self : Pin < & mut Self > ,
72
74
cx : & mut futures:: task:: Context ,
73
- buf : & mut [ u8 ] ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > >
74
- {
75
+ buf : & mut [ u8 ] ,
76
+ ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > > {
75
77
let mut inner = self . inner . lock ( ) . unwrap ( ) ;
76
78
if inner. read_cursor == inner. write_cursor {
77
79
if inner. write_end_closed {
@@ -83,7 +85,8 @@ impl AsyncRead for Receiver {
83
85
} else {
84
86
assert ! ( inner. read_cursor < inner. write_cursor) ;
85
87
let copy_len = std:: cmp:: min ( buf. len ( ) , inner. write_cursor - inner. read_cursor ) ;
86
- buf[ 0 ..copy_len] . copy_from_slice ( & inner. buffer [ inner. read_cursor .. inner. read_cursor + copy_len] ) ;
88
+ buf[ 0 ..copy_len]
89
+ . copy_from_slice ( & inner. buffer [ inner. read_cursor ..inner. read_cursor + copy_len] ) ;
87
90
inner. read_cursor += copy_len;
88
91
if let Some ( write_waker) = inner. write_waker . take ( ) {
89
92
write_waker. wake ( ) ;
@@ -97,19 +100,22 @@ impl AsyncWrite for Sender {
97
100
fn poll_write (
98
101
self : Pin < & mut Self > ,
99
102
cx : & mut futures:: task:: Context ,
100
- buf : & [ u8 ] ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > >
101
- {
103
+ buf : & [ u8 ] ,
104
+ ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > > {
102
105
let mut inner = self . inner . lock ( ) . unwrap ( ) ;
103
106
if inner. read_end_closed {
104
- return Poll :: Ready ( Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: ConnectionAborted , "read end closed" ) ) )
107
+ return Poll :: Ready ( Err ( std:: io:: Error :: new (
108
+ std:: io:: ErrorKind :: ConnectionAborted ,
109
+ "read end closed" ,
110
+ ) ) ) ;
105
111
}
106
112
if inner. write_cursor == inner. buffer . len ( ) {
107
113
if inner. read_cursor == inner. buffer . len ( ) {
108
114
inner. write_cursor = 0 ;
109
115
inner. read_cursor = 0 ;
110
116
} else {
111
117
inner. write_waker = Some ( cx. waker ( ) . clone ( ) ) ;
112
- return Poll :: Pending
118
+ return Poll :: Pending ;
113
119
}
114
120
}
115
121
@@ -127,17 +133,15 @@ impl AsyncWrite for Sender {
127
133
128
134
fn poll_flush (
129
135
self : Pin < & mut Self > ,
130
- _cx : & mut futures:: task:: Context )
131
- -> Poll < Result < ( ) , futures:: io:: Error > >
132
- {
136
+ _cx : & mut futures:: task:: Context ,
137
+ ) -> Poll < Result < ( ) , futures:: io:: Error > > {
133
138
Poll :: Ready ( Ok ( ( ) ) )
134
139
}
135
140
136
141
fn poll_close (
137
142
self : Pin < & mut Self > ,
138
- _cx : & mut futures:: task:: Context )
139
- -> Poll < Result < ( ) , futures:: io:: Error > >
140
- {
143
+ _cx : & mut futures:: task:: Context ,
144
+ ) -> Poll < Result < ( ) , futures:: io:: Error > > {
141
145
let mut inner = self . inner . lock ( ) . unwrap ( ) ;
142
146
inner. write_end_closed = true ;
143
147
if let Some ( read_waker) = inner. read_waker . take ( ) {
@@ -149,20 +153,26 @@ impl AsyncWrite for Sender {
149
153
150
154
#[ cfg( test) ]
151
155
pub mod test {
156
+ use futures:: task:: LocalSpawnExt ;
152
157
use futures:: { AsyncReadExt , AsyncWriteExt } ;
153
- use futures:: task:: { LocalSpawnExt } ;
154
158
155
159
#[ test]
156
160
fn basic ( ) {
157
161
let ( mut sender, mut receiver) = crate :: channel ( ) ;
158
- let buf: Vec < u8 > = vec ! [ 1 , 2 , 3 , 4 , 5 ] . into_iter ( ) . cycle ( ) . take ( 20000 ) . collect ( ) ;
162
+ let buf: Vec < u8 > = vec ! [ 1 , 2 , 3 , 4 , 5 ]
163
+ . into_iter ( )
164
+ . cycle ( )
165
+ . take ( 20000 )
166
+ . collect ( ) ;
159
167
let mut pool = futures:: executor:: LocalPool :: new ( ) ;
160
168
161
169
let buf2 = buf. clone ( ) ;
162
- pool. spawner ( ) . spawn_local ( async move {
163
- sender. write_all ( & buf2) . await . unwrap ( ) ;
164
- ( )
165
- } ) . unwrap ( ) ;
170
+ pool. spawner ( )
171
+ . spawn_local ( async move {
172
+ sender. write_all ( & buf2) . await . unwrap ( ) ;
173
+ ( )
174
+ } )
175
+ . unwrap ( ) ;
166
176
167
177
let mut buf3 = vec ! [ ] ;
168
178
pool. run_until ( receiver. read_to_end ( & mut buf3) ) . unwrap ( ) ;
@@ -176,8 +186,7 @@ pub mod test {
176
186
drop ( receiver) ;
177
187
178
188
let mut pool = futures:: executor:: LocalPool :: new ( ) ;
179
- let result = pool. run_until ( sender. write_all ( & [ 0 , 1 , 2 ] ) ) ;
189
+ let result = pool. run_until ( sender. write_all ( & [ 0 , 1 , 2 ] ) ) ;
180
190
assert ! ( result. is_err( ) ) ;
181
191
}
182
192
}
183
-
0 commit comments