11.12 理解事件驱动的IO

    事件驱动I/O本质上来讲就是将基本I/O操作(比如读和写)转化为你程序需要处理的事件。例如,当数据在某个socket上被接受后,它会转换成一个 事件,然后被你定义的回调方法或函数来处理。作为一个可能的起始点,一个事件驱动的框架可能会以一个实现了一系列基本事件处理器方法的基类开始:

    这个类的实例作为插件被放入类似下面这样的事件循环中:

    1. import select
    2.  
    3. def event_loop(handlers):
    4. while True:
    5. wants_recv = [h for h in handlers if h.wants_to_receive()]
    6. wants_send = [h for h in handlers if h.wants_to_send()]
    7. can_recv, can_send, _ = select.select(wants_recv, wants_send, [])
    8. for h in can_recv:
    9. h.handle_receive()
    10. for h in can_send:
    11. h.handle_send()

    事件循环的关键部分是 select() 调用,它会不断轮询文件描述符从而激活它。在调用 select() 之前,时间循环会询问所有的处理器来决定哪一个想接受或发生。然后它将结果列表提供给 select() 。然后 select() 返回准备接受或发送的对象组成的列表。然后相应的 或 handle_send() 方法被触发。

    编写应用程序的时候,EventHandler 的实例会被创建。例如,下面是两个简单的基于UDP网络服务的处理器例子:

    测试这段代码,试着从另外一个Python解释器连接它:

    1. >>> from socket import *
    2. >>> s = socket(AF_INET, SOCK_DGRAM)
    3. >>> s.sendto(b'',('localhost',14000))
    4. 0
    5. >>> s.recvfrom(128)
    6. (b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000))
    7. >>> s.sendto(b'Hello',('localhost',15000))
    8. 5
    9. >>> s.recvfrom(128)
    10. (b'Hello', ('127.0.0.1', 15000))

    TCP例子的关键点是从处理器中列表增加和删除客户端的操作。对每一个连接,一个新的处理器被创建并加到列表中。当连接被关闭后,每个客户端负责将其从列表中删除。如果你运行程序并试着用Telnet或类似工具连接,它会将你发送的消息回显给你。并且它能很轻松的处理多客户端连接。

    实际上所有的事件驱动框架原理跟上面的例子相差无几。实际的实现细节和软件架构可能不一样,但是在最核心的部分,都会有一个轮询的循环来检查活动socket,并执行响应操作。

    事件驱动I/O的一个可能好处是它能处理非常大的并发连接,而不需要使用多线程或多进程。也就是说,select() 调用(或其他等效的)能监听大量的socket并响应它们中任何一个产生事件的。在循环中一次处理一个事件,并不需要其他的并发机制。

    事件驱动I/O的缺点是没有真正的同步机制。如果任何事件处理器方法阻塞或执行一个耗时计算,它会阻塞所有的处理进程。调用那些并不是事件驱动风格的库函数也会有问题,同样要是某些库函数调用会阻塞,那么也会导致整个事件循环停止。

    对于阻塞或耗时计算的问题可以通过将事件发送个其他单独的现场或进程来处理。不过,在事件循环中引入多线程和多进程是比较棘手的,下面的例子演示了如何使用 concurrent.futures 模块来实现:

    1. from concurrent.futures import ThreadPoolExecutor
    2. import os
    3.  
    4. class ThreadPoolHandler(EventHandler):
    5. def __init__(self, nworkers):
    6. if os.name == 'posix':
    7. self.signal_done_sock, self.done_sock = socket.socketpair()
    8. else:
    9. server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    10. server.bind(('127.0.0.1', 0))
    11. server.listen(1)
    12. self.signal_done_sock = socket.socket(socket.AF_INET,
    13. socket.SOCK_STREAM)
    14. self.signal_done_sock.connect(server.getsockname())
    15. self.done_sock, _ = server.accept()
    16. server.close()
    17.  
    18. self.pending = []
    19. self.pool = ThreadPoolExecutor(nworkers)
    20.  
    21. def fileno(self):
    22. return self.done_sock.fileno()
    23.  
    24. # Callback that executes when the thread is done
    25.  
    26. self.pending.append((callback, r.result()))
    27. self.signal_done_sock.send(b'x')
    28.  
    29. # Run a function in a thread pool
    30. def run(self, func, args=(), kwargs={},*,callback):
    31. r = self.pool.submit(func, *args, **kwargs)
    32. r.add_done_callback(lambda r: self._complete(callback, r))
    33.  
    34. def wants_to_receive(self):
    35. return True
    36.  
    37. # Run callback functions of completed work
    38. def handle_receive(self):
    39. # Invoke all pending callback functions
    40. for callback, result in self.pending:
    41. callback(result)
    42. self.done_sock.recv(1)
    43. self.pending = []

    运行这个服务器,然后试着用其它Python程序来测试它:

    1. from socket import *
    2. sock = socket(AF_INET, SOCK_DGRAM)
    3. for x in range(40):
    4. sock.sendto(str(x).encode('ascii'), ('localhost', 16000))
    5. resp = sock.recvfrom(8192)

    你应该能在不同窗口中重复的执行这个程序,并且不会影响到其他程序,尽管当数字便越来越大时候它会变得越来越慢。

    已经阅读完了这一小节,那么你应该使用这里的代码吗?也许不会。你应该选择一个可以完成同样任务的高级框架。不过,如果你理解了基本原理,你就能理解这些框架所使用的核心技术。作为对回调函数编程的替代,事件驱动编码有时候会使用到协程,参考12.12小节的一个例子。

    原文: