12.12 使用生成器代替线程

    要使用生成器实现自己的并发,你首先要对生成器函数和 语句有深刻理解。yield 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器,将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。要演示这种思想,考虑下面两个使用简单的 yield 语句的生成器函数:

    这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:

    1. from collections import deque
    2.  
    3. class TaskScheduler:
    4. def __init__(self):
    5. self._task_queue = deque()
    6.  
    7. def new_task(self, task):
    8. '''
    9. Admit a newly started task to the scheduler
    10.  
    11. '''
    12. self._task_queue.append(task)
    13.  
    14. def run(self):
    15. '''
    16. Run until there are no more tasks
    17. '''
    18. while self._task_queue:
    19. task = self._task_queue.popleft()
    20. try:
    21. # Run until the next yield statement
    22. next(task)
    23. self._task_queue.append(task)
    24. except StopIteration:
    25. # Generator is no longer executing
    26. pass
    27.  
    28. # Example use
    29. sched = TaskScheduler()
    30. sched.new_task(countdown(5))
    31. sched.new_task(countup(15))
    32. sched.run()

    TaskScheduler 类在一个循环中运行生成器集合——每个都运行到碰到yield语句为止。运行这个例子,输出如下:

    到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。生成器函数就是认为,而yield语句是任务挂起的信号。调度器循环检查任务列表直到没有任务要执行为止。

    实际上,你可能想要使用生成器来实现简单的并发。那么,在实现actor或网络服务器的时候你可以使用生成器来替代线程的使用。

    1. from collections import deque
    2.  
    3. class ActorScheduler:
    4. def __init__(self):
    5. self._actors = { } # Mapping of names to actors
    6. self._msg_queue = deque() # Message queue
    7.  
    8. def new_actor(self, name, actor):
    9. '''
    10. Admit a newly started actor to the scheduler and give it a name
    11. '''
    12. self._msg_queue.append((actor,None))
    13. self._actors[name] = actor
    14.  
    15. def send(self, name, msg):
    16. '''
    17. Send a message to a named actor
    18. '''
    19. actor = self._actors.get(name)
    20. if actor:
    21. self._msg_queue.append((actor,msg))
    22.  
    23. def run(self):
    24. '''
    25. Run as long as there are pending messages.
    26. '''
    27. while self._msg_queue:
    28. actor, msg = self._msg_queue.popleft()
    29. try:
    30. except StopIteration:
    31. pass
    32.  
    33. # Example use
    34. if __name__ == '__main__':
    35. def printer():
    36. while True:
    37. msg = yield
    38. print('Got:', msg)
    39.  
    40. def counter(sched):
    41. while True:
    42. # Receive the current count
    43. n = yield
    44. if n == 0:
    45. break
    46. # Send to the printer task
    47. sched.send('printer', n)
    48. # Send the next count to the counter task (recursive)
    49.  
    50. sched.send('counter', n-1)
    51.  
    52. sched = ActorScheduler()
    53. # Create the initial actors
    54. sched.new_actor('printer', printer())
    55. sched.new_actor('counter', counter(sched))
    56.  
    57. # Send an initial message to the counter to initiate
    58. sched.send('counter', 10000)
    59. sched.run()

    完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。本质上,调度器在有需要发送的消息时会一直运行着。计数生成器会给自己发送消息并在一个递归循环中结束。

    下面是一个更加高级的例子,演示了使用生成器来实现一个并发网络应用程序:

    这段代码有点复杂。不过,它实现了一个小型的操作系统。有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。还有很多调度器负责在就绪队列和I/O等待区域之间移动任务。

    在构建基于生成器的并发框架时,通常会使用更常见的yield形式:

    1. def some_generator():
    2. ...
    3. ...

    使用这种形式的yield语句的函数通常被称为“协程”。通过调度器,yield语句在一个循环中被处理,如下:

    除了发送值外,还可以在一个生成器上面执行一个 close() 方法。它会导致在执行yield语句时抛出一个 GeneratorExit 异常,从而终止执行。如果进一步设计,一个生成器可以捕获这个异常并执行清理操作。同样还可以使用生成器的 方法在yield语句执行时生成一个任意的执行指令。一个任务调度器可利用它来在运行的生成器中处理错误。

    最后一个例子中使用的 yield from 语句被用来实现协程,可以被其它生成器作为子程序或过程来调用。本质上就是将控制权透明的传输给新的函数。不像普通的生成器,一个使用 yield from 被调用的函数可以返回一个作为 yield from 语句结果的值。关于 的更多信息可以在 中找到。

    最后,如果使用生成器编程,要提醒你的是它还是有很多缺点的。特别是,你得不到任何线程可以提供的好处。例如,如果你执行CPU依赖或I/O阻塞程序,它会将整个任务挂起知道操作完成。为了解决这个问题,你只能选择将操作委派给另外一个可以独立运行的线程或进程。另外一个限制是大部分Python库并不能很好的兼容基于生成器的线程。如果你选择这个方案,你会发现你需要自己改写很多标准库函数。作为本节提到的协程和相关技术的一个基础背景,可以查看 PEP 342

    PEP 3156 同样有一个关于使用协程的异步I/O模型。特别的,你不可能自己去实现一个底层的协程调度器。不过,关于协程的思想是很多流行库的基础,包括 gevent,,Stackless Python 以及其他类似工程。

    原文: