Non-blocking I/O
This section describes the network resources and drivers provided by Tokio. Thiscomponent provides one of Tokio’s primary functions: non-blocking, event-driven,networking provided by the appropriate operating system primitives (epoll,kqueue, IOCP, …). It is modeled after the resource and driver patterndescribed in the previous section.
The network driver is built using and network resources are backed bytypes that implement .
This guide will be focused on TCP types. The other network resources (UDP, unixsockets, pipes, etc) follow the same pattern.
The network resource.
Network resources are types, such as TcpListener
and , that arecomposed of the network handle and a reference to the driver that is poweringthe resource. Initially, when the resource is first created, the driver pointermay be None
:
In this case, the reference to the driver is not yet set. However, if aconstructor that takes a reference is used, then the driver referencewill be set to driver represented by the given handle:
# extern crate tokio;
# use tokio::net::TcpListener;
# use tokio::reactor::Handle;
# use std::net::TcpListener as StdListener;
# fn dox(std_listener: StdListener, my_reactor_handle: &Handle) {
let listener = TcpListener::from_std(std_listener, &my_reactor_handle);
# }
Once a driver is associated with a resource, it is set for the lifetime of theresource and cannot be changed. The associated driver is responsible forreceiving operating system events for the network resource and notifying thetasks that have expressed interest in the resource.
Resource types include non-blocking functions that are prefixed with poll_
andthat include Async
in the return type. These are the functions that are linkedwith the task system and should be used from tasks and are used as part of[Future
] implementations. For example, TcpStream
provides [poll_read
] and[poll_write
]. provides [poll_accept
].
Here is a task that uses [poll_accept
] to accept inbound sockets from alistener and handle them by spawning a new task:
struct Acceptor {
listener: TcpListener,
}
impl Future for Acceptor {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let (socket, _) = try_ready!(self.listener.poll_accept());
// Spawn a task to process the socket
tokio::spawn(process(socket));
}
}
}
Resource types may also include functions that return futures. These arehelpers that use the poll_
functions to provide additional functionality. Forexample, TcpStream
provides a [connect
] function that returns a future.This future will complete once the has established a connectionwith a peer (or failed attemepting to do so).
Using combinators to connect a TcpStream
:
# extern crate tokio;
# use tokio::prelude::*;
# use tokio::net::TcpStream;
# use std::io;
# fn process<T>(t: T) -> impl Future<Item = (), Error = io::Error> {
# Ok(()).into_future()
# }
# fn dox() {
# let addr = "127.0.0.1:0".parse().unwrap();
tokio::spawn({
let connect_future = TcpStream::connect(&addr);
connect_future
.and_then(|socket| process(socket))
.map_err(|_| panic!())
});
# }
Registering the resource with the driver
When using TcpListener::poll_accept
(or any poll_
function),if the resource is ready to return immediately then it will do so. In the caseof , being ready means that there is a socketwaiting to be accepted in the queue. If the resource is *not ready, i.e.there is no pending socket to accept, then the resource asks the driver tonotify the current task once it becomes ready.
The first time NotReady
is returned by a resource, if the resource was notexplicity assigned a driver using a Handle
argument, the resource will registeritself with a driver instance. This is done by looking at the network driverassociated with the current execution context.
The default driver for the execution context is stored using a thread-local, setusing , and accessed using Handle::current
. It is theruntime’s responsibility to ensure that the task is polled from within theclosure passed to . A call to Handle::current
accesses thethread-local set by in order to return the handle to thedriver for the current execution context.
Both Handle::current
and Handle::default
return a Handle
instance.They are, however, subtly different. Most often, Handle::default
is thedesired behavior.
Handle::current
immediately reads the thread-local variable storing thedriver for the current driver. This means that Handle::current
must be calledfrom an execution context that set the default driver. Handle::current
shouldbe used when the handle is going to be sent to a different execution contextsand the user wishes that a specific reactor is used (see below for an example).
On the other hand, Handle::default
lazily reads the thread-local variable.This allows getting a Handle
instance from outside of an execution context.When the resource is used, the handle will access the thread-local variable asdescribed in the previous section.
For example:
# extern crate tokio;
# use tokio::prelude::*;
# use tokio::net::TcpListener;
# use tokio::reactor::Handle;
# use std::net::SocketAddr;
# fn process<T>(t: T) -> impl Future<Item = (), Error = ()> {
# Ok(()).into_future()
fn main() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let std_listener = ::std::net::TcpListener::bind(&addr).unwrap();
let listener = TcpListener::from_std(std_listener, &Handle::default()).unwrap();
tokio::run({
listener.incoming().for_each(|socket| {
Ok(())
})
.map_err(|_| panic!("error"))
});
}
In this example, incoming()
returns a future that is implemented by callingpoll_accept
. The future is spawned onto a runtime, which has a network driverconfigured as part of the execution context. When poll_accept
is called fromwithin the execution context, that is when the thread-local is read and thedriver is associated with the TcpListener
instance.
However, if tokio-threadpool
is used directly, then tasks spawned onto thethreadpool executor will not have access to a reactor:
# extern crate tokio;
# extern crate tokio_threadpool;
# use tokio_threadpool::*;
# use tokio::prelude::*;
# use tokio::net::TcpListener;
# fn dox() {
# let addr = "127.0.0.1:0".parse().unwrap();
let pool = ThreadPool::new();
let listener = TcpListener::bind(&addr).unwrap();
pool.spawn({
listener.incoming().for_each(|socket| {
// This will never get called due to the listener not being able to
// function.
unreachable!();
# Ok(())
})
.map_err(|_| panic!("error"))
});
# }
In order to make the above example work, a reactor must be set for thethreadpool’s execution context. See for moredetails. Alternatively, a Handle
obtained with [Handle::current]
could beused:
# extern crate futures;
# extern crate tokio;
# extern crate tokio_threadpool;
# use tokio::prelude::*;
# use tokio::net::TcpListener;
# use tokio_threadpool::*;
# use tokio::reactor::Handle;
# use futures::future;
# use std::net::SocketAddr;
# fn dox() {
# let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let pool = ThreadPool::new();
// This does not run on the pool.
tokio::run(future::lazy(move || {
// Get the handle
let handle = Handle::current();
let std_listener = std::net::TcpListener::bind(&addr).unwrap();
// This eagerly links the listener with the handle for the current reactor.
let listener = TcpListener::from_std(std_listener, &handle).unwrap();
pool.spawn({
listener.incoming().for_each(|socket| {
// Do something with the socket
Ok(())
})
.map_err(|_| panic!())
});
Ok(())
}));
# }
The network driver
The driver must track each resource that is registered with it. While the actualimplementation is more complex, it can be thought as a shared reference to acell sharing state, similar to:
This is not the real implementation, but a simplified version to demonstratethe behavior. In practice, there is no Mutex
, cells are not allocated perresource instance, and the reactor does not use a HashMap
. The realimplementation can be found
When the resource is first used, it is registered with the driver:
impl TcpListener {
fn poll_accept(&mut self) -> Poll<TcpStream, io::Error> {
// If the registration is not set, this will associate the `TcpListener`
// Access the thread-local variable that tracks the reactor.
Reactor::with_current(|reactor| {
// Registers the listener, which implements `mio::Evented`.
// `register` returns the registration instance for the resource.
reactor.register(&self.mio_listener)
})
});
if registration.task.is_none() {
// The task is `None`, this means the resource **might** be ready.
match self.mio_listener.accept() {
Ok(socket) => {
let socket = mio_socket_to_tokio(socket);
return Ok(Async::Ready(socket));
}
Err(ref e) if e.kind() == WouldBlock => {
// The resource is not ready, fall through to task registration
}
Err(e) => {
// All other errors are returned to the caller
return Err(e);
}
}
}
// The task is set even if it is already `Some`, this handles the case where
// the resource is moved to a different task than the one stored in
// `self.task`.
registration.task = Some(task::current());
Ok(Async::NotReady)
}
}
Note that there is only a single task
field per resource. The implications arethat a resource can only be used from a single task at a time. IfTcpListener::poll_accept
returns NotReady
, registering the current task andthe listener is then sent to a different task which calls poll_accept
and seesNotReady
, then the second task is the only one that will receive anotification once a socket is ready to be accepted. Resources may supporttracking different tasks for different operations. For example, TcpStream
internally has two task fields: one for notifying on read ready and one fornotifying on write ready. This allows TcpStream::poll_read
andTcpStream::poll_write
to be called from different tasks.
The evented types are registered with the driver’s mio::Poll
instance aspart of the register
function used above. Again, this guide uses asimplified implementation which does not match the actual one intokio-reactor
but is sufficient for understanding how tokio-reactor
behaves.
impl Reactor {
fn register<T: mio::Evented>(&mut self, evented: &T) -> Arc<Mutex<Registration>> {
// Generate a unique identifier for this registration. This identifier
// can be converted to and from a Mio Token.
let id = generate_unique_identifier();
// Register the I/O type with Mio
self.poll.register(
evented, id.into_token(),
mio::Ready::all(),
mio::PollOpt::edge());
let registration = Arc::new(Mutex::new(Registration {
id,
task: None,
}));
self.resources.insert(id, registration.clone());
registration
}
}
The driver needs to run in order for its associated resources to function. Ifthe driver does not run, the resources will never become ready. Running thedriver is handled automatically when using a , but it is useful tounderstand how it works. If you are interested in the real implementation, thetokio-reactor
source is the best reference.
When resources are registered with the driver, they are also registered withMio. Running the driver performs the following steps in a loop:
1) Call to get operating system events.2) Dispatch all events to the appropriate resources via the registration.
The steps above are done by calling Reactor::turn
. The looping part is up tous. This is typically done in a background thread or embedded in the executor asa Park
implementation. See the for more details.
# extern crate tokio_reactor;
# fn dox() {
# let mut reactor = tokio_reactor::Reactor::new().unwrap();
loop {
// `None` means never timeout, blocking until we receive an operating system
// event.
reactor.turn(None);
}
# }
The implementation of turn
does the following: