@@ -61,7 +61,9 @@ impl Drop for Receiver {
6161
6262pub fn channel ( ) -> ( Sender , Receiver ) {
6363 let inner = Arc :: new ( Mutex :: new ( Inner :: new ( ) ) ) ;
64- let sender = Sender { inner : inner. clone ( ) } ;
64+ let sender = Sender {
65+ inner : inner. clone ( ) ,
66+ } ;
6567 let receiver = Receiver { inner } ;
6668 ( sender, receiver)
6769}
@@ -70,8 +72,8 @@ impl AsyncRead for Receiver {
7072 fn poll_read (
7173 self : Pin < & mut Self > ,
7274 cx : & mut futures:: task:: Context ,
73- buf : & mut [ u8 ] ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > >
74- {
75+ buf : & mut [ u8 ] ,
76+ ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > > {
7577 let mut inner = self . inner . lock ( ) . unwrap ( ) ;
7678 if inner. read_cursor == inner. write_cursor {
7779 if inner. write_end_closed {
@@ -83,7 +85,8 @@ impl AsyncRead for Receiver {
8385 } else {
8486 assert ! ( inner. read_cursor < inner. write_cursor) ;
8587 let copy_len = std:: cmp:: min ( buf. len ( ) , inner. write_cursor - inner. read_cursor ) ;
86- buf[ 0 ..copy_len] . copy_from_slice ( & inner. buffer [ inner. read_cursor .. inner. read_cursor + copy_len] ) ;
88+ buf[ 0 ..copy_len]
89+ . copy_from_slice ( & inner. buffer [ inner. read_cursor ..inner. read_cursor + copy_len] ) ;
8790 inner. read_cursor += copy_len;
8891 if let Some ( write_waker) = inner. write_waker . take ( ) {
8992 write_waker. wake ( ) ;
@@ -97,19 +100,22 @@ impl AsyncWrite for Sender {
97100 fn poll_write (
98101 self : Pin < & mut Self > ,
99102 cx : & mut futures:: task:: Context ,
100- buf : & [ u8 ] ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > >
101- {
103+ buf : & [ u8 ] ,
104+ ) -> futures :: task :: Poll < Result < usize , futures :: io :: Error > > {
102105 let mut inner = self . inner . lock ( ) . unwrap ( ) ;
103106 if inner. read_end_closed {
104- return Poll :: Ready ( Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: ConnectionAborted , "read end closed" ) ) )
107+ return Poll :: Ready ( Err ( std:: io:: Error :: new (
108+ std:: io:: ErrorKind :: ConnectionAborted ,
109+ "read end closed" ,
110+ ) ) ) ;
105111 }
106112 if inner. write_cursor == inner. buffer . len ( ) {
107113 if inner. read_cursor == inner. buffer . len ( ) {
108114 inner. write_cursor = 0 ;
109115 inner. read_cursor = 0 ;
110116 } else {
111117 inner. write_waker = Some ( cx. waker ( ) . clone ( ) ) ;
112- return Poll :: Pending
118+ return Poll :: Pending ;
113119 }
114120 }
115121
@@ -127,17 +133,15 @@ impl AsyncWrite for Sender {
127133
128134 fn poll_flush (
129135 self : Pin < & mut Self > ,
130- _cx : & mut futures:: task:: Context )
131- -> Poll < Result < ( ) , futures:: io:: Error > >
132- {
136+ _cx : & mut futures:: task:: Context ,
137+ ) -> Poll < Result < ( ) , futures:: io:: Error > > {
133138 Poll :: Ready ( Ok ( ( ) ) )
134139 }
135140
136141 fn poll_close (
137142 self : Pin < & mut Self > ,
138- _cx : & mut futures:: task:: Context )
139- -> Poll < Result < ( ) , futures:: io:: Error > >
140- {
143+ _cx : & mut futures:: task:: Context ,
144+ ) -> Poll < Result < ( ) , futures:: io:: Error > > {
141145 let mut inner = self . inner . lock ( ) . unwrap ( ) ;
142146 inner. write_end_closed = true ;
143147 if let Some ( read_waker) = inner. read_waker . take ( ) {
@@ -149,20 +153,26 @@ impl AsyncWrite for Sender {
149153
150154#[ cfg( test) ]
151155pub mod test {
156+ use futures:: task:: LocalSpawnExt ;
152157 use futures:: { AsyncReadExt , AsyncWriteExt } ;
153- use futures:: task:: { LocalSpawnExt } ;
154158
155159 #[ test]
156160 fn basic ( ) {
157161 let ( mut sender, mut receiver) = crate :: channel ( ) ;
158- let buf: Vec < u8 > = vec ! [ 1 , 2 , 3 , 4 , 5 ] . into_iter ( ) . cycle ( ) . take ( 20000 ) . collect ( ) ;
162+ let buf: Vec < u8 > = vec ! [ 1 , 2 , 3 , 4 , 5 ]
163+ . into_iter ( )
164+ . cycle ( )
165+ . take ( 20000 )
166+ . collect ( ) ;
159167 let mut pool = futures:: executor:: LocalPool :: new ( ) ;
160168
161169 let buf2 = buf. clone ( ) ;
162- pool. spawner ( ) . spawn_local ( async move {
163- sender. write_all ( & buf2) . await . unwrap ( ) ;
164- ( )
165- } ) . unwrap ( ) ;
170+ pool. spawner ( )
171+ . spawn_local ( async move {
172+ sender. write_all ( & buf2) . await . unwrap ( ) ;
173+ ( )
174+ } )
175+ . unwrap ( ) ;
166176
167177 let mut buf3 = vec ! [ ] ;
168178 pool. run_until ( receiver. read_to_end ( & mut buf3) ) . unwrap ( ) ;
@@ -176,8 +186,7 @@ pub mod test {
176186 drop ( receiver) ;
177187
178188 let mut pool = futures:: executor:: LocalPool :: new ( ) ;
179- let result = pool. run_until ( sender. write_all ( & [ 0 , 1 , 2 ] ) ) ;
189+ let result = pool. run_until ( sender. write_all ( & [ 0 , 1 , 2 ] ) ) ;
180190 assert ! ( result. is_err( ) ) ;
181191 }
182192}
183-
0 commit comments