非阻塞I/O

    网络drivers使用mio构建,网络资源由后备实现Evented的类型。

    本指南将重点介绍TCP类型。 其他网络资源(UDP,unix插座,管道等)遵循相同的模式。

    网络资源是由网络句柄和对为资源供电的的引用组成的类型,例如TcpListener和。 最初,在首次创建资源时,driver指针可能是None

    在这种情况下,尚未设置对driver的引用。 但是,如果使用带有Handle引用的构造函数,则driver引用将设置为给定句柄表示的driver:

    1. let listener = TcpListener::from_std(std_listener, &my_reactor_handle);

    一旦driver与资源相关联,就会将其设置为该资源的生命周期,不能改变。 相关的driver负责接收网络资源的操作系统事件并通知对该资源表示兴趣的任务。

    使用资源

    资源类型包括以poll_为前缀和在返回类型中包含Async的非阻塞函数。 这些函数与任务系统关联,应该从任务中使用,并作为[Future]实现一部分使用。 例如,TcpStream提供[poll_read]和[poll_write]。 提供[poll_accept]。

    这里有一个使用[poll_accept]]接受来自侦听器的入站套接字并通过生成新任务来处理它们的任务:

    1. struct Acceptor {
    2. listener: TcpListener,
    3. }
    4. impl Future for Acceptor {
    5. type Item = ();
    6. type Error = ();
    7. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    8. loop {
    9. let (socket, _) = try_ready!(self.listener.poll_accept());
    10. // Spawn a task to process the socket
    11. tokio::spawn(process(socket));
    12. }
    13. }
    14. }

    资源类型还可以包括返回 future的函数。 这些是使用poll_函数提供附加功能的帮助程序。 例如,TcpStream提供了一个返回 future的[connect]函数。一旦与对等方建立了连接(或未能成功),这个 future就会完成。

    使用组合器连接TcpStream

    1. tokio::spawn({
    2. let connect_future = TcpStream::connect(&addr);
    3. connect_future
    4. .and_then(|socket| process(socket))
    5. .map_err(|_| panic!())
    6. });

    future也可以直接用于其他future的实现:

    第一次NotReady由资源返回,如果资源没有明确地使用参数分配一个driver,则资源将使用driver实例注册自身。这是通过查看与当前执行上下文关联的网络driver来完成的。

    执行上下文的默认driver使用本地线程存储,使用with_default设置,并使用[Handle :: current]访问。运行时负责确保,从闭包内传递到过程轮询任务。调用[Handle :: current]访问本地线程由with_default设置,以便将句柄返回给当前执行上下文的driver。

    Handle :: current vsHandle :: default

    Handle :: currentHandle :: default都返回一个Handle实例。然而,它们略有不同。大多数情况下,Handle :: default就是期望的行为。

    Handle :: current为当前driver 立即读取存储在driver中的线程局部变量。这意味着Handle :: current必须从设置默认driver的执行上下文中调用。 Handle :: current当句柄将被发送到不同的执行上下文使用并且用户希望使用特定的反应器(reactor)时使用(参见下面的示例)。

    另一方面,[Handle :: default]懒惰地读取线程局部变量。这允许从执行上下文之外获取Handle实例。使用资源时,句柄将访问线程局部变量,如上一节中所述。

    例如:

    1. fn main() {
    2. let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
    3. let std_listener = ::std::net::TcpListener::bind(&addr).unwrap();
    4. let listener = TcpListener::from_std(std_listener, &Handle::default()).unwrap();
    5. tokio::run({
    6. listener.incoming().for_each(|socket| {
    7. tokio::spawn(process(socket));
    8. Ok(())
    9. })
    10. .map_err(|_| panic!("error"))
    11. });
    12. }

    在这个例子中,incoming()返回通过调用 poll_accept实现的一个 future。 该future产生于具有网络driver配置作为执行上下文的一部分的运行之上。 当在执行上下文中调用poll_accept时,即当读取线程本地driver与TcpListener实例相关联。

    但是,如果直接使用tokio-threadpool,那么产生threadpool executor之上的任务就会将无法访问reactor:

    1. let pool = ThreadPool::new();
    2. pool.spawn({
    3. listener.incoming().for_each(|socket| {
    4. // This will never get called due to the listener not being able to
    5. // function.
    6. unreachable!();
    7. })
    8. .map_err(|_| panic!("error"))
    9. });

    为了使上面的示例工作,必须为线程池的执行上下文设置反应器(reactor)。有关更多信息,请参阅building a runtime细节。 或者,可以使用[Handle :: current]获得的Handle

    1. let pool = ThreadPool::new();
    2. // This does not run on the pool.
    3. tokio::run(future::lazy(move || {
    4. // Get the handle
    5. let handle = Handle::current();
    6. let std_listener = std::net::TcpListener::bind(&addr).unwrap();
    7. // This eagerly links the listener with the handle for the current reactor.
    8. let listener = TcpListener::from_std(std_listener, &handle).unwrap();
    9. pool.spawn({
    10. listener.incoming().for_each(|socket| {
    11. // Do something with the socket
    12. Ok(())
    13. })
    14. .map_err(|_| panic!())
    15. });
    16. Ok(())
    17. }));

    为所有Tokio的网络类型提供动力的driver是[Reactor]Crate中的[tokio-reactor]类型。 它是使用mio实现的。 调用[Reactor :: turn]使用[mio :: Poll :: poll]获取已注册网络资源的操作系统事件。 然后它使用[task system]通知每个网络资源已注册的任务。 任务被调度为在其关联的executor上运行,然后任务将网络资源视为就绪并且调用poll_ *函数返回Async :: Ready

    将driver与资源链接

    这不是真正的实现,而是用于演示行为的简化版本。在实践中,没有Mutex,每个资源实例没有分配单元,并且reactor不使用HashMap。 真正的实现在here

    首次使用资源时,它会向driver注册:

    1. impl TcpListener {
    2. fn poll_accept(&mut self) -> Poll<TcpStream, io::Error> {
    3. // If the registration is not set, this will associate the `TcpListener`
    4. // with the current execution context's reactor.
    5. let registration = self.registration.get_or_insert_with(|| {
    6. // Access the thread-local variable that tracks the reactor.
    7. Reactor::with_current(|reactor| {
    8. // Registers the listener, which implements `mio::Evented`.
    9. // `register` returns the registration instance for the resource.
    10. reactor.register(&self.mio_listener)
    11. })
    12. });
    13. if registration.task.is_none() {
    14. Ok(socket) => {
    15. let socket = mio_socket_to_tokio(socket);
    16. return Ok(Async::Ready(socket));
    17. }
    18. Err(ref e) if e.kind() == WouldBlock => {
    19. // The resource is not ready, fall through to task registration
    20. }
    21. Err(e) => {
    22. // All other errors are returned to the caller
    23. return Err(e);
    24. }
    25. }
    26. }
    27. // The task is set even if it is already `Some`, this handles the case where
    28. // the resource is moved to a different task than the one stored in
    29. // `self.task`.
    30. registration.task = Some(task::current());
    31. Ok(Async::NotReady)
    32. }
    33. }

    请注意,每个资源只有一个task字段。其含义是资源一次只能从一个任务中使用。如果TcpListener :: poll_accept返回NotReady,注册当前任务和将监听器发送到另一个调用poll_accept的任务并视为NotReady,然后第二个任务是唯一一个在套接字准备好被接受后将接收通知的任务。资源可能会支持跟踪不同操作的不同任务。例如,TcpStream内部有两个任务字段:一个用于通知read准备好了,另一个用于通知write准备好了。这允许从不同的任务调用TcpStream :: poll_readTcpStream :: poll_write

    [mio :: Poll]作为register上面使用的函数的一部分,将事件类型注册到驱动程序的实例中。。同样,本指南使用了简化的实现与实际tokio-reactor的实现不匹配,但足以理解tokio-reactor的行为方式。

    1. impl Reactor {
    2. fn register<T: mio::Evented>(&mut self, evented: &T) -> Arc<Mutex<Registration>> {
    3. // Generate a unique identifier for this registration. This identifier
    4. // can be converted to and from a Mio Token.
    5. let id = generate_unique_identifier();
    6. // Register the I/O type with Mio
    7. self.poll.register(
    8. evented, id.into_token(),
    9. mio::Ready::all(),
    10. mio::PollOpt::edge());
    11. let registration = Arc::new(Mutex::new(Registration {
    12. id,
    13. task: None,
    14. }));
    15. self.resources.insert(id, registration.clone());
    16. registration
    17. }
    18. }

    driver需要运行才能使其相关资源正常工作。如果driver无法运行,资源永远不会准备就绪。使用时会自动处理运行driver,但了解它是如何工作的很有用。如果你对真正的实现感兴趣,那么[tokio-reactor] real-impl源码是最好的参考。

    当资源注册到driver时,它们也会注册Mio,运行driver在循环中执行以下步骤:

    1)调用[Poll :: poll]来获取操作系统事件。

    2)发送所有事件到适当的注册过的资源。

    上面的步骤是通过调用Reactor :: turn来完成的。循环部分是取决于我们。这通常在后台线程中完成或嵌入executor中作为一个实现。有关详细信息,请参阅runtime guide

    1. loop {
    2. // `None` means never timeout, blocking until we receive an operating system
    3. // event.
    4. }

    turn的实现执行以下操作: