Skip to content

Commit 771ccd1

Browse files
committed
Do not hang in poll if reactor is destroyed
If channel is dropped, receiver may still return EOF, and if channel is alive, receiver produces an error.
1 parent 80d23ca commit 771ccd1

File tree

11 files changed

+129
-51
lines changed

11 files changed

+129
-51
lines changed

src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<T> Stream for Receiver<T> {
9999
match self.rx.get_ref().try_recv() {
100100
Ok(t) => Ok(Async::Ready(Some(t))),
101101
Err(TryRecvError::Empty) => {
102-
self.rx.need_read();
102+
try!(self.rx.need_read());
103103
Ok(Async::NotReady)
104104
}
105105
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),

src/net/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl TcpListener {
109109
match self.inner.io.get_ref().accept() {
110110
Ok(pair) => Ok(Async::Ready(Some(pair))),
111111
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
112-
self.inner.io.need_read();
112+
try!(self.inner.io.need_read());
113113
Ok(Async::NotReady)
114114
}
115115
Err(e) => Err(e)
@@ -128,7 +128,7 @@ impl TcpListener {
128128
});
129129
tx.complete(res);
130130
Ok(())
131-
});
131+
}).expect("failed to spawn");
132132
rx.then(|r| r.expect("shouldn't be canceled"))
133133
}).boxed(),
134134
}

src/net/udp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl UdpSocket {
8181
match self.io.get_ref().send_to(buf, target) {
8282
Ok(Some(n)) => Ok(n),
8383
Ok(None) => {
84-
self.io.need_write();
84+
try!(self.io.need_write());
8585
Err(mio::would_block())
8686
}
8787
Err(e) => Err(e),
@@ -97,7 +97,7 @@ impl UdpSocket {
9797
match self.io.get_ref().recv_from(buf) {
9898
Ok(Some(n)) => Ok(n),
9999
Ok(None) => {
100-
self.io.need_read();
100+
try!(self.io.need_read());
101101
Err(mio::would_block())
102102
}
103103
Err(e) => Err(e),

src/reactor/channel.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,35 @@ use std::cell::Cell;
1010
use std::io;
1111
use std::marker;
1212
use std::sync::Arc;
13+
use std::sync::atomic::AtomicBool;
14+
use std::sync::atomic::Ordering;
1315

1416
use mio;
1517
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
1618

1719
use mpsc_queue::{Queue, PopResult};
1820

21+
struct Inner<T> {
22+
queue: Queue<T>,
23+
receiver_alive: AtomicBool,
24+
}
25+
1926
pub struct Sender<T> {
2027
ctl: SenderCtl,
21-
inner: Arc<Queue<T>>,
28+
inner: Arc<Inner<T>>,
2229
}
2330

2431
pub struct Receiver<T> {
2532
ctl: ReceiverCtl,
26-
inner: Arc<Queue<T>>,
33+
inner: Arc<Inner<T>>,
2734
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
2835
}
2936

3037
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
31-
let inner = Arc::new(Queue::new());
38+
let inner = Arc::new(Inner {
39+
queue: Queue::new(),
40+
receiver_alive: AtomicBool::new(true),
41+
});
3242
let (tx, rx) = ctl_pair();
3343

3444
let tx = Sender {
@@ -45,7 +55,10 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
4555

4656
impl<T> Sender<T> {
4757
pub fn send(&self, data: T) -> io::Result<()> {
48-
self.inner.push(data);
58+
if !self.inner.receiver_alive.load(Ordering::SeqCst) {
59+
return Err(io::Error::new(io::ErrorKind::Other, "receiver has been dropped"));
60+
}
61+
self.inner.queue.push(data);
4962
self.ctl.inc()
5063
}
5164
}
@@ -57,7 +70,7 @@ impl<T> Receiver<T> {
5770
//
5871
// We, however, are the only thread with a `Receiver<T>` because this
5972
// type is not `Sync`. and we never handed out another instance.
60-
match unsafe { self.inner.pop() } {
73+
match unsafe { self.inner.queue.pop() } {
6174
PopResult::Data(t) => {
6275
try!(self.ctl.dec());
6376
Ok(Some(t))
@@ -108,6 +121,12 @@ impl<T> mio::Evented for Receiver<T> {
108121
}
109122
}
110123

124+
impl<T> Drop for Receiver<T> {
125+
fn drop(&mut self) {
126+
self.inner.receiver_alive.store(false, Ordering::SeqCst);
127+
}
128+
}
129+
111130
impl<T> Clone for Sender<T> {
112131
fn clone(&self) -> Sender<T> {
113132
Sender {

src/reactor/io_token.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ impl IoToken {
8282
///
8383
/// This function will also panic if there is not a currently running future
8484
/// task.
85-
pub fn schedule_read(&self, handle: &Remote) {
86-
handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
85+
pub fn schedule_read(&self, handle: &Remote) -> io::Result<()> {
86+
handle.send(Message::Schedule(self.token, task::park(), Direction::Read))
8787
}
8888

8989
/// Schedule the current future task to receive a notification when the
@@ -109,8 +109,8 @@ impl IoToken {
109109
///
110110
/// This function will also panic if there is not a currently running future
111111
/// task.
112-
pub fn schedule_write(&self, handle: &Remote) {
113-
handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
112+
pub fn schedule_write(&self, handle: &Remote) -> io::Result<()> {
113+
handle.send(Message::Schedule(self.token, task::park(), Direction::Write))
114114
}
115115

116116
/// Unregister all information associated with a token on an event loop,
@@ -135,7 +135,7 @@ impl IoToken {
135135
/// This function will panic if the event loop this handle is associated
136136
/// with has gone away, or if there is an error communicating with the event
137137
/// loop.
138-
pub fn drop_source(&self, handle: &Remote) {
139-
handle.send(Message::DropSource(self.token));
138+
pub fn drop_source(&self, handle: &Remote) -> io::Result<()> {
139+
handle.send(Message::DropSource(self.token))
140140
}
141141
}

src/reactor/mod.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ const SLAB_CAPACITY: usize = 1024 * 64;
4444
pub struct Core {
4545
events: mio::Events,
4646
tx: Sender<Message>,
47-
rx: Receiver<Message>,
47+
// `rx` is `Option` here only because it is needed to be dropped explicitly
48+
// in `drop` before other things.
49+
rx: Option<Receiver<Message>>,
4850
inner: Rc<RefCell<Inner>>,
4951

5052
// Used for determining when the future passed to `run` is ready. Once the
@@ -143,7 +145,7 @@ impl Core {
143145
Ok(Core {
144146
events: mio::Events::with_capacity(1024),
145147
tx: tx,
146-
rx: rx,
148+
rx: Some(rx),
147149
_future_registration: future_pair.0,
148150
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
149151

@@ -379,8 +381,9 @@ impl Core {
379381

380382
fn consume_queue(&self) {
381383
debug!("consuming notification queue");
384+
let rx = self.rx.as_ref().unwrap();
382385
// TODO: can we do better than `.unwrap()` here?
383-
while let Some(msg) = self.rx.recv().unwrap() {
386+
while let Some(msg) = rx.recv().unwrap() {
384387
self.notify(msg);
385388
}
386389
}
@@ -408,6 +411,27 @@ impl Core {
408411
}
409412
}
410413

414+
impl Drop for Core {
415+
fn drop(&mut self) {
416+
// Destroy the receiver, so all schedule operations will be rejected.
417+
// Do it explicitly before unparking, to avoid race condition.
418+
self.rx.take();
419+
420+
// Unpark all tasks.
421+
// It has no effect for tasks in this event loop,
422+
// however tasks in another executors get an error
423+
// when they do `poll` right after wakeup.
424+
for io in self.inner.borrow_mut().io_dispatch.iter_mut() {
425+
if let Some(ref mut reader) = io.reader {
426+
reader.unpark();
427+
}
428+
if let Some(ref mut writer) = io.writer {
429+
writer.unpark();
430+
}
431+
}
432+
}
433+
}
434+
411435
impl Inner {
412436
fn add_source(&mut self, source: &mio::Evented)
413437
-> io::Result<(Arc<AtomicUsize>, usize)> {
@@ -498,26 +522,20 @@ impl Inner {
498522
}
499523

500524
impl Remote {
501-
fn send(&self, msg: Message) {
525+
fn send(&self, msg: Message) -> io::Result<()> {
502526
self.with_loop(|lp| {
503527
match lp {
504528
Some(lp) => {
505529
// Need to execute all existing requests first, to ensure
506530
// that our message is processed "in order"
507531
lp.consume_queue();
508532
lp.notify(msg);
533+
Ok(())
509534
}
510535
None => {
511-
match self.tx.send(msg) {
512-
Ok(()) => {}
513-
514-
// This should only happen when there was an error
515-
// writing to the pipe to wake up the event loop,
516-
// hopefully that never happens
517-
Err(e) => {
518-
panic!("error sending message to event loop: {}", e)
519-
}
520-
}
536+
// May return an error if receiver is closed
537+
// or if there was an error writing to the pipe.
538+
self.tx.send(msg)
521539
}
522540
}
523541
})
@@ -548,15 +566,15 @@ impl Remote {
548566
///
549567
/// Note that while the closure, `F`, requires the `Send` bound as it might
550568
/// cross threads, the future `R` does not.
551-
pub fn spawn<F, R>(&self, f: F)
569+
pub fn spawn<F, R>(&self, f: F) -> io::Result<()>
552570
where F: FnOnce(&Handle) -> R + Send + 'static,
553571
R: IntoFuture<Item=(), Error=()>,
554572
R::Future: 'static,
555573
{
556574
self.send(Message::Run(Box::new(|lp: &Core| {
557575
let f = f(&lp.handle());
558576
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
559-
})));
577+
})))
560578
}
561579
}
562580

src/reactor/poll_evented.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ impl<E> PollEvented<E> {
7171
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
7272
Async::Ready(())
7373
} else {
74-
self.token.schedule_read(&self.handle);
75-
Async::NotReady
74+
match self.token.schedule_read(&self.handle) {
75+
Ok(()) => Async::NotReady,
76+
// next read will return error
77+
Err(_) => Async::Ready(()),
78+
}
7679
}
7780
}
7881

@@ -91,8 +94,12 @@ impl<E> PollEvented<E> {
9194
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
9295
Async::Ready(())
9396
} else {
94-
self.token.schedule_write(&self.handle);
95-
Async::NotReady
97+
// ignore error, will be handled in need_write
98+
match self.token.schedule_write(&self.handle) {
99+
Ok(()) => Async::NotReady,
100+
// next read will return error
101+
Err(_) => Async::Ready(()),
102+
}
96103
}
97104
}
98105

@@ -107,7 +114,7 @@ impl<E> PollEvented<E> {
107114
/// The flag indicating that this stream is readable is unset and the
108115
/// current task is scheduled to receive a notification when the stream is
109116
/// then again readable.
110-
pub fn need_read(&self) {
117+
pub fn need_read(&self) -> io::Result<()> {
111118
self.readiness.fetch_and(!1, Ordering::SeqCst);
112119
self.token.schedule_read(&self.handle)
113120
}
@@ -123,7 +130,7 @@ impl<E> PollEvented<E> {
123130
/// The flag indicating that this stream is writable is unset and the
124131
/// current task is scheduled to receive a notification when the stream is
125132
/// then again writable.
126-
pub fn need_write(&self) {
133+
pub fn need_write(&self) -> io::Result<()> {
127134
self.readiness.fetch_and(!2, Ordering::SeqCst);
128135
self.token.schedule_write(&self.handle)
129136
}
@@ -154,7 +161,7 @@ impl<E: Read> Read for PollEvented<E> {
154161
}
155162
let r = self.get_mut().read(buf);
156163
if is_wouldblock(&r) {
157-
self.need_read();
164+
try!(self.need_read());
158165
}
159166
return r
160167
}
@@ -167,7 +174,7 @@ impl<E: Write> Write for PollEvented<E> {
167174
}
168175
let r = self.get_mut().write(buf);
169176
if is_wouldblock(&r) {
170-
self.need_write();
177+
try!(self.need_write());
171178
}
172179
return r
173180
}
@@ -178,7 +185,7 @@ impl<E: Write> Write for PollEvented<E> {
178185
}
179186
let r = self.get_mut().flush();
180187
if is_wouldblock(&r) {
181-
self.need_write();
188+
try!(self.need_write());
182189
}
183190
return r
184191
}
@@ -203,7 +210,7 @@ impl<'a, E> Read for &'a PollEvented<E>
203210
}
204211
let r = self.get_ref().read(buf);
205212
if is_wouldblock(&r) {
206-
self.need_read();
213+
try!(self.need_read());
207214
}
208215
return r
209216
}
@@ -218,7 +225,7 @@ impl<'a, E> Write for &'a PollEvented<E>
218225
}
219226
let r = self.get_ref().write(buf);
220227
if is_wouldblock(&r) {
221-
self.need_write();
228+
try!(self.need_write());
222229
}
223230
return r
224231
}
@@ -229,7 +236,7 @@ impl<'a, E> Write for &'a PollEvented<E>
229236
}
230237
let r = self.get_ref().flush();
231238
if is_wouldblock(&r) {
232-
self.need_write();
239+
try!(self.need_write());
233240
}
234241
return r
235242
}
@@ -256,6 +263,7 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
256263

257264
impl<E> Drop for PollEvented<E> {
258265
fn drop(&mut self) {
259-
self.token.drop_source(&self.handle);
266+
// Ignore error
267+
drop(self.token.drop_source(&self.handle));
260268
}
261269
}

src/reactor/timeout.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,15 @@ impl Future for Timeout {
5656
if *self.token.when() <= now {
5757
Ok(Async::Ready(()))
5858
} else {
59-
self.token.update_timeout(&self.handle);
59+
try!(self.token.update_timeout(&self.handle));
6060
Ok(Async::NotReady)
6161
}
6262
}
6363
}
6464

6565
impl Drop for Timeout {
6666
fn drop(&mut self) {
67-
self.token.cancel_timeout(&self.handle);
67+
// Ignore error
68+
drop(self.token.cancel_timeout(&self.handle));
6869
}
6970
}

0 commit comments

Comments
 (0)