Skip to content

Commit a7f1a1e

Browse files
committed
Update doc for IO safety API changes
Adds an unsafe helper for wrapping an `AsRawFd` implementer in `AsFd`. Which is needed with things like zmq until they update to use IO safety traits: erickt/rust-zmq#361.
1 parent f2e1562 commit a7f1a1e

8 files changed

+81
-61
lines changed

doc/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ edition = "2018"
55
publish = false
66

77
[dependencies]
8+
io-lifetimes = "1.0.3"
89
calloop = { path = "..", features = ["futures-io", "executor"] }
910
anyhow = "1.0.56"
1011
futures = "0.3.21"
11-
zmq = { version = "0.9.2", features = ["vendored"] }
12+
zmq = { version = "0.10.0" }
1213

1314
# Here we create bin targets so each chapter's code may be tested.
1415

doc/src/ch02-01-generic.md

+6-10
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,21 @@ The `Generic` event source wraps a file descriptor ("fd") and fires its callback
33

44
For example on Linux, fd-based interfaces are available for GPIO, I2C, USB, UART, network interfaces, timers and many other systems. Integrating these into calloop starts with obtaining the appropriate fd, creating a `Generic` event source from it, and building up a more useful, abstracted event source around that. A detailed example of this is [given for ZeroMQ](ch04-00-a-full-example-zeromq.md).
55

6-
You do not have to use a low-level fd either: any type that implements [`AsRawFd`](https://doc.rust-lang.org/beta/std/os/unix/io/trait.AsRawFd.html) can be provided. This means that you can use a wrapper type that handles allocation and disposal itself, and implement `AsRawFd` on it so that `Generic` can manage it in the event loop.
6+
You do not have to use a low-level fd either: any type that implements [`AsFd`](https://doc.rust-lang.org/stable/std/os/fd/trait.AsFd.html) can be provided. This means that you can use a wrapper type that handles allocation and disposal itself, and implement `AsRawFd` on it so that `Generic` can manage it in the event loop.
77

88
## Creating a Generic source
99

1010
Creating a `Generic` event source requires three things:
11-
- The fd itself, of course, possibly in a wrapper type that implements `AsRawFd`
11+
- An `OwnedFd` or a wrapper type that implements `AsFd`
1212
- The ["interest"](api/calloop/struct.Interest.html) you have in this descriptor — this means, whether you want to generate events when the fd becomes readable, writeable, or both
1313
- The ["mode"](api/calloop/enum.Mode.html) of event generation - level triggered or edge triggered
1414

1515
The easiest constructor to use is the [`new()`](api/calloop/generic/struct.Generic.html#method.new) method, but if you need control over the [associated error type](ch02-06-errors.md) there is also [`new_with_error()`](api/calloop/generic/struct.Generic.html#method.new_with_error).
1616

17-
## Ownership and AsRawFd wrappers
17+
## Ownership and AsFd wrappers
1818

19-
It's important to remember that file descriptors by themselves have no concept of ownership attached to them in Rust — they are simply bare integers. Dropping them does not close the resource they refer to, and copying them does not carry information about how many there are.
19+
Rust 1.63 introduced a concept of file descriptor ownership and borrowing through the `OwnedFd` and `BorrowedFd` types, which are also available on older Rust versions through the `io-lifetimes` crate. The `AsFd` trait provides a way to get a `BorrowedFd` corresponding to a file, socket, etc. while guaranteeing the fd will be valid for the lifetime of the `BorrowedFd`.
2020

21-
Typically (eg. in the standard library) they would be an underlying implementation detail of another type that *did* encode ownership somehow. This how you can manage them in any of your own integration code - use drop handlers, reference counting (if necessary) and general RAII principles in a wrapper type, and then implement `AsRawFd` to allow `Generic` to use it. The `Generic` source will take ownership of it, so it will be dropped when the `Generic` is.
21+
Not all third party crates use `AsFd` yet, and may instead provide types implementing `AsRawFd`. ['AsFdWrapper'](api/calloop/generic/struct.AsFdWrapper.html) provides a way to adapt these types. To use this safely, ensure the `AsRawFd` implementation of the type it wraps returns a valid fd as long as the type exists. And to avoid an fd leak, it should ultimately be `close`d properly.
2222

23-
This means you need to do at least two things:
24-
- follow the rules of the API you obtain the fd from
25-
- wrap them in a type that manages ownership appropriately
26-
27-
For example, on Unix-like systems the [`UdpSocket`](https://doc.rust-lang.org/beta/std/net/struct.UdpSocket.html) contains a fd, and upon being dropped, `libc::close(fd)` is called on it. If you create a `Generic<UdpSocket>` then, it will be closed upon dropping it.
23+
Safe types like `OwnedFd` and `BorrowedFd` should be preferred over `RawFd`s, and the use of `RawFd`s outside of implementing FFI shouldn't be necessary as libraries move to using the IO safe types and traits.

doc/src/ch03-02-async-io-types.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
> This section is about adapting blocking IO types for use with `async` Rust code, and powering that `async` code with Calloop. If you just want to add blocking IO types to your event loop and use Calloop's callback/composition-based design, you only need to wrap your blocking IO type in a [generic event source](api/calloop/generic/struct.Generic.html).
44
5-
You may find that you need to write ordinary Rust `async` code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the [`AsRawFd`](https://doc.rust-lang.org/stable/std/os/unix/io/trait.AsRawFd.html) trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using [Calloop's executor](api/calloop/futures/fn.executor.html).
5+
You may find that you need to write ordinary Rust `async` code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the [`AsFd`](https://doc.rust-lang.org/stable/std/os/unix/io/trait.AsFd.html) trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using [Calloop's executor](api/calloop/futures/fn.executor.html).
66

77
> ## Enable the `futures-io` feature!
88
>
@@ -65,4 +65,4 @@ Starting event loop. Use Ctrl-C to exit.
6565
Async block ended with: Sent data...
6666
Async block ended with: Hello, world
6767
^C
68-
```
68+
```

doc/src/ch04-02-creating-our-source-part-1-our-types.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ So at a minimum, our type needs to contain these:
1212
pub struct ZeroMQSource
1313
{
1414
// Calloop components.
15-
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
15+
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
1616
mpsc_receiver: calloop::channel::Channel<?>,
1717
wake_ping_receiver: calloop::ping::PingSource,
1818
}
@@ -26,15 +26,12 @@ What else do we need? If the `PingSource` is there to wake up the loop manually,
2626
pub struct ZeroMQSource
2727
{
2828
// Calloop components.
29-
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
29+
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
3030
mpsc_receiver: calloop::channel::Channel<?>,
3131
wake_ping_receiver: calloop::ping::PingSource,
3232
3333
/// Sending end of the ping source.
3434
wake_ping_sender: calloop::ping::Ping,
35-
36-
/// The underlying ZeroMQ socket.
37-
socket: zmq::Socket,
3835
}
3936
```
4037

doc/src/ch04-04-creating-our-source-part-3-processing-events-almost.md

+6-4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ self.mpsc_receiver
104104
.process_events(readiness, token, |evt, _| {
105105
if let calloop::channel::Event::Msg(msg) = evt {
106106
self.socket
107+
.file
107108
.send_multipart(msg, 0)
108109
.context("Failed to send message")?;
109110
}
@@ -123,16 +124,13 @@ where
123124
T::Item: Into<zmq::Message>,
124125
{
125126
// Calloop components.
126-
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
127+
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
127128
mpsc_receiver: calloop::channel::Channel<T>,
128129
wake_ping_receiver: calloop::ping::PingSource,
129130
130131
/// Sending end of the ping source.
131132
wake_ping_sender: calloop::ping::Ping,
132133
133-
/// The underlying ZeroMQ socket.
134-
socket: zmq::Socket,
135-
136134
/// FIFO queue for the messages to be published.
137135
outbox: std::collections::VecDeque<T>,
138136
}
@@ -155,15 +153,18 @@ And our "zsocket is writeable" code becomes:
155153

156154
```rust,noplayground
157155
self.socket
156+
.file
158157
.process_events(readiness, token, |_, _| {
159158
let events = self
160159
.socket
160+
.file
161161
.get_events()
162162
.context("Failed to read ZeroMQ events")?;
163163
164164
if events.contains(zmq::POLLOUT) {
165165
if let Some(parts) = self.outbox.pop_front() {
166166
self.socket
167+
.file
167168
.send_multipart(parts, 0)
168169
.context("Failed to send message")?;
169170
}
@@ -172,6 +173,7 @@ self.socket
172173
if events.contains(zmq::POLLIN) {
173174
let messages =
174175
self.socket
176+
.file
175177
.recv_multipart(0)
176178
.context("Failed to receive message")?;
177179
callback(messages, &mut ())

doc/src/zmqsource.rs

+18-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! A Calloop event source implementation for ZeroMQ sockets.
22
3-
use std::{collections, io, os::unix::io::RawFd};
3+
use std::{collections, io};
44

55
use anyhow::Context;
66

@@ -48,7 +48,7 @@ where
4848
{
4949
// Calloop components.
5050
/// Event source for ZeroMQ socket.
51-
socket_source: calloop::generic::Generic<RawFd>,
51+
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
5252

5353
/// Event source for channel.
5454
mpsc_receiver: calloop::channel::Channel<T>,
@@ -62,9 +62,6 @@ where
6262
wake_ping_sender: calloop::ping::Ping,
6363

6464
// ZeroMQ socket.
65-
/// The underlying ZeroMQ socket that we're proxying things to.
66-
socket: zmq::Socket,
67-
6865
/// FIFO queue for the messages to be published.
6966
outbox: collections::VecDeque<T>,
7067
}
@@ -80,15 +77,15 @@ where
8077
let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
8178
let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;
8279

83-
let fd = socket.get_fd()?;
84-
85-
let socket_source =
86-
calloop::generic::Generic::new(fd, calloop::Interest::READ, calloop::Mode::Edge);
80+
let socket = calloop::generic::Generic::new(
81+
unsafe { calloop::generic::FdWrapper::new(socket) },
82+
calloop::Interest::READ,
83+
calloop::Mode::Edge,
84+
);
8785

8886
Ok((
8987
Self {
9088
socket,
91-
socket_source,
9289
mpsc_receiver,
9390
wake_ping_receiver,
9491
wake_ping_sender,
@@ -164,6 +161,7 @@ where
164161
// on the socket that warrants reading the events again.
165162
let events = self
166163
.socket
164+
.file
167165
.get_events()
168166
.context("Failed to read ZeroMQ events")?;
169167

@@ -172,6 +170,7 @@ where
172170
if events.contains(zmq::POLLOUT) {
173171
if let Some(parts) = self.outbox.pop_front() {
174172
self.socket
173+
.file
175174
.send_multipart(parts, 0)
176175
.context("Failed to send message")?;
177176
used_socket = true;
@@ -183,6 +182,7 @@ where
183182
// sending, which includes all parts of a multipart message.
184183
let messages = self
185184
.socket
185+
.file
186186
.recv_multipart(0)
187187
.context("Failed to receive message")?;
188188
used_socket = true;
@@ -205,7 +205,7 @@ where
205205
poll: &mut calloop::Poll,
206206
token_factory: &mut calloop::TokenFactory,
207207
) -> calloop::Result<()> {
208-
self.socket_source.register(poll, token_factory)?;
208+
self.socket.register(poll, token_factory)?;
209209
self.mpsc_receiver.register(poll, token_factory)?;
210210
self.wake_ping_receiver.register(poll, token_factory)?;
211211

@@ -219,7 +219,7 @@ where
219219
poll: &mut calloop::Poll,
220220
token_factory: &mut calloop::TokenFactory,
221221
) -> calloop::Result<()> {
222-
self.socket_source.reregister(poll, token_factory)?;
222+
self.socket.reregister(poll, token_factory)?;
223223
self.mpsc_receiver.reregister(poll, token_factory)?;
224224
self.wake_ping_receiver.reregister(poll, token_factory)?;
225225

@@ -229,7 +229,7 @@ where
229229
}
230230

231231
fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
232-
self.socket_source.unregister(poll)?;
232+
self.socket.unregister(poll)?;
233233
self.mpsc_receiver.unregister(poll)?;
234234
self.wake_ping_receiver.unregister(poll)?;
235235
Ok(())
@@ -247,14 +247,14 @@ where
247247
//
248248
// - https://stackoverflow.com/a/38338578/188535
249249
// - http://api.zeromq.org/4-0:zmq-ctx-term
250-
self.socket.set_linger(0).ok();
251-
self.socket.set_rcvtimeo(0).ok();
252-
self.socket.set_sndtimeo(0).ok();
250+
self.socket.file.set_linger(0).ok();
251+
self.socket.file.set_rcvtimeo(0).ok();
252+
self.socket.file.set_sndtimeo(0).ok();
253253

254254
// Double result because (a) possible failure on call and (b) possible
255255
// failure decoding.
256-
if let Ok(Ok(last_endpoint)) = self.socket.get_last_endpoint() {
257-
self.socket.disconnect(&last_endpoint).ok();
256+
if let Ok(Ok(last_endpoint)) = self.socket.file.get_last_endpoint() {
257+
self.socket.file.disconnect(&last_endpoint).ok();
258258
}
259259
}
260260
}

src/sources/generic.rs

+37-2
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,46 @@
3737
//! these `Generic<_>` as fields of your event source, and delegate the
3838
//! [`EventSource`](crate::EventSource) implementation to them.
3939
40-
use io_lifetimes::AsFd;
41-
use std::marker::PhantomData;
40+
use io_lifetimes::{AsFd, BorrowedFd};
41+
use std::{marker::PhantomData, ops, os::unix::io::AsRawFd};
4242

4343
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
4444

45+
/// Wrapper to use a type implementing `AsRawFd` but not `AsFd` with `Generic`
46+
#[derive(Debug)]
47+
pub struct FdWrapper<T: AsRawFd>(T);
48+
49+
impl<T: AsRawFd> FdWrapper<T> {
50+
/// Wrap `inner` with an `AsFd` implementation.
51+
///
52+
/// This is safe if the `AsRawFd` implementation of `inner` always returns
53+
/// a valid fd. This should usually be true for types implementing
54+
/// `AsRawFd`, but not for `RawFd` itself.
55+
pub unsafe fn new(inner: T) -> Self {
56+
Self(inner)
57+
}
58+
}
59+
60+
impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
61+
type Target = T;
62+
63+
fn deref(&self) -> &Self::Target {
64+
&self.0
65+
}
66+
}
67+
68+
impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
69+
fn deref_mut(&mut self) -> &mut Self::Target {
70+
&mut self.0
71+
}
72+
}
73+
74+
impl<T: AsRawFd> AsFd for FdWrapper<T> {
75+
fn as_fd(&self) -> BorrowedFd {
76+
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
77+
}
78+
}
79+
4580
/// A generic event source wrapping a FD-backed type
4681
#[derive(Debug)]
4782
pub struct Generic<F: AsFd, E = std::io::Error> {

src/sources/signals.rs

+8-19
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@
1111
//! they'll inherit their parent signal mask.
1212
1313
use std::convert::TryFrom;
14-
use std::os::{raw::c_int, unix::io::AsRawFd};
14+
use std::os::raw::c_int;
1515

16-
use io_lifetimes::{AsFd, BorrowedFd};
1716
use nix::sys::signal::SigSet;
1817
pub use nix::sys::signal::Signal;
1918
pub use nix::sys::signalfd::siginfo;
2019
use nix::sys::signalfd::{SfdFlags, SignalFd};
2120

22-
use super::generic::Generic;
21+
use super::generic::{FdWrapper, Generic};
2322
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
2423

2524
/// An event generated by the signal event source
@@ -40,20 +39,10 @@ impl Event {
4039
}
4140
}
4241

43-
// Wrap `nix` SignalFd type with `AsFd` impl
44-
#[derive(Debug)]
45-
struct SignalFdWrapper(SignalFd);
46-
47-
impl AsFd for SignalFdWrapper {
48-
fn as_fd(&self) -> BorrowedFd<'_> {
49-
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
50-
}
51-
}
52-
5342
/// An event source for receiving Unix signals
5443
#[derive(Debug)]
5544
pub struct Signals {
56-
sfd: Generic<SignalFdWrapper>,
45+
sfd: Generic<FdWrapper<SignalFd>>,
5746
mask: SigSet,
5847
}
5948

@@ -71,7 +60,7 @@ impl Signals {
7160
let sfd = SignalFd::with_flags(&mask, SfdFlags::SFD_NONBLOCK | SfdFlags::SFD_CLOEXEC)?;
7261

7362
Ok(Signals {
74-
sfd: Generic::new(SignalFdWrapper(sfd), Interest::READ, Mode::Level),
63+
sfd: Generic::new(unsafe { FdWrapper::new(sfd) }, Interest::READ, Mode::Level),
7564
mask,
7665
})
7766
}
@@ -85,7 +74,7 @@ impl Signals {
8574
self.mask.add(s);
8675
}
8776
self.mask.thread_block()?;
88-
self.sfd.file.0.set_mask(&self.mask)?;
77+
self.sfd.file.set_mask(&self.mask)?;
8978
Ok(())
9079
}
9180

@@ -100,7 +89,7 @@ impl Signals {
10089
removed.add(s);
10190
}
10291
removed.thread_unblock()?;
103-
self.sfd.file.0.set_mask(&self.mask)?;
92+
self.sfd.file.set_mask(&self.mask)?;
10493
Ok(())
10594
}
10695

@@ -116,7 +105,7 @@ impl Signals {
116105

117106
self.mask.thread_unblock()?;
118107
new_mask.thread_block()?;
119-
self.sfd.file.0.set_mask(&new_mask)?;
108+
self.sfd.file.set_mask(&new_mask)?;
120109
self.mask = new_mask;
121110

122111
Ok(())
@@ -150,7 +139,7 @@ impl EventSource for Signals {
150139
self.sfd
151140
.process_events(readiness, token, |_, sfd| {
152141
loop {
153-
match sfd.0.read_signal() {
142+
match sfd.read_signal() {
154143
Ok(Some(info)) => callback(Event { info }, &mut ()),
155144
Ok(None) => break,
156145
Err(e) => {

0 commit comments

Comments
 (0)