Futures: In Depth
Futures, hinted at earlier in the guide, are the building block used to manageasynchronous logic. They are the underlying asynchronous abstraction used byTokio.
The future implementation is provided by the crate. However, forconvenience, Tokio re-exports a number of the types.
What Are Futures?
A future is a value that represents the completion of an asynchronouscomputation. Usually, the future completes due to an event that happenselsewhere in the system. While we’ve been looking at things from the perspectiveof basic I/O, you can use a future to represent a wide range of events, e.g.:
A database query that’s executing in a thread pool. When the queryfinishes, the future is completed, and its value is the result of the query.
An RPC invocation to a server. When the server replies, the future iscompleted, and its value is the server’s response.
A timeout. When time is up, the future is completed, and its value is
()
.A long-running CPU-intensive task, running on a thread pool. When the taskfinishes, the future is completed, and its value is the return value of thetask.
Reading bytes from a socket. When the bytes are ready, the future iscompleted – and depending on the buffering strategy, the bytes might bereturned directly, or written as a side-effect into some existing buffer.
The entire point of the future abstraction is to allow asynchronous functions,i.e., functions that cannot immediately return a value, to be able to returnsomething.
For example, an asynchronous HTTP client could provide a get
function thatlooks like this:
Then, the user of the library would use the function as so:
let response_future = client.get("https://www.example.com");
let response_is_ok = response_future
.map(|response| {
response.status().is_ok()
});
track_response_success(response_is_ok);
All of those actions taken with the future don’t immediately perform any work.They cannot because they don’t have the actual HTTP response. Instead, theydefine the work to be done when the response future completes.
Both the futures
crate and Tokio come with a collection of combinatorfunctions that can be used to work with futures.
Implementing Future
Implementing the Future
is pretty common when using Tokio, so it is importantto be comfortable with it.
As discussed in the previous section, Rust futures are poll based. This is aunique aspect of the Rust future library. Most future libraries for otherprogramming languages use a push based model where callbacks are supplied to thefuture and the computation invokes the callback immediately with the computationresult.
Using a poll based model offers many advantages, including being a zero costabstraction, i.e., using Rust futures has no added overhead compared to writingthe asynchronous code by hand.
The Future
trait is as follows:
Usually, when you implement a Future
, you will be defining a computation thatis a composition of sub (or inner) futures. In this case, the future implementation triesto call the inner future(s) and returns NotReady
if the inner futures are notready.
The following example is a future that is composed of another future thatreturns a usize
and will double that value:
# #![deny(deprecated)]
# extern crate futures;
# use futures::*;
pub struct Doubler<T> {
inner: T,
}
pub fn double<T>(inner: T) -> Doubler<T> {
Doubler { inner }
}
impl<T> Future for Doubler<T>
where T: Future<Item = usize>
{
type Item = usize;
type Error = T::Error;
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
Async::Ready(v) => Ok(Async::Ready(v * 2)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
# pub fn main() {}
When the Doubler
future is polled, it polls its inner future. If the innerfuture is not ready, the Doubler
future returns NotReady
. If the innerfuture is ready, then the future doubles the return value and returnsReady
.
Because the matching pattern above is common, the crate provides amacro: try_ready!
. It is similar to try!
or ?
, but it also returns onNotReady
. The above poll
function can be rewritten using try_ready!
asfollows:
# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
# use futures::*;
# pub struct Doubler<T> {
# inner: T,
# }
#
# impl<T> Future for Doubler<T>
# where T: Future<Item = usize>
# {
# type Item = usize;
# type Error = T::Error;
#
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
let v = try_ready!(self.inner.poll());
Ok(Async::Ready(v * 2))
}
# }
# pub fn main() {}
Returning NotReady
The last section handwaved a bit and said that once a Future transitioned to theready state, the executor is notified. This enables the executor to be efficientin scheduling tasks.
For most future implementations, this is done transitively. When a futureimplementation is a combination of sub futures, the outer future only returnsNotReady
when at least one inner future returned NotReady
. Thus, the outerfuture will transition to a ready state once the inner future transitions to aready state. In this case, the NotReady
contract is already satisfied as theinner future will notify the executor when it becomes ready.
Innermost futures, sometimes called “resources”, are the ones responsible fornotifying the executor. This is done by calling on the task returnedby task::current()
.
We will be exploring implementing resources and the task system in more depth ina later section. The key take away here is do not return NotReady
unless yougot NotReady
from an inner future.
A More Complicated Future
Let’s look at a slightly more complicated future implementation. In this case, wewill implement a future that takes a host name, does DNS resolution, thenestablishes a connection to the remote host. We assume a resolve
functionexists that looks like this:
where ResolveFuture
is a future returning a SocketAddr
.
The steps to implement the future are:
- Call
resolve
to get aResolveFuture
instance. - Call
ResolveFuture::poll
until it returns aSocketAddr
. - Pass the
SocketAddr
toTcpStream::connect
. - Call until it returns the
TcpStream
. - Complete the outer future with the
TcpStream
.
We will use anenum
to track the state of the future as it advances throughthese steps.
# extern crate tokio;
# use tokio::net::tcp::ConnectFuture;
# pub struct ResolveFuture;
// Currently resolving the host name
Resolving(ResolveFuture),
// Establishing a TCP connection to the remote host
Connecting(ConnectFuture),
}
# pub fn main() {}
And the ResolveAndConnect
future is defined as:
# pub struct State;
pub struct ResolveAndConnect {
state: State,
}
Now, the implementation:
This illustrates how Future
implementations are state machines. This futurecan be in either of two states:
- Resolving
- Connecting
Each timepoll
is called, we try to advance the state machine to the nextstate.
Now, the future is basically a re-implementation of the combinator AndThen
, so we wouldprobably just use that combinator.
# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
# extern crate tokio;
# use tokio::net::tcp::{ConnectFuture, TcpStream};
# use futures::prelude::*;
# use std::io;
# pub struct ResolveFuture;
# fn resolve(host: &str) -> ResolveFuture { unimplemented!() }
# impl Future for ResolveFuture {
# type Item = ::std::net::SocketAddr;
# type Error = io::Error;
# fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
# unimplemented!();
# }
# }
# pub fn dox(my_host: &str) {
# let _ =
resolve(my_host)
.and_then(|addr| TcpStream::connect(&addr))
# ;
This is much shorter and does the same thing.