Futures已经在并发一节中简单讨论过。它是调异步处理的中心机制,渗透在我们代码库中,也是Finagle的核心。Futures允许组合并发事件,简化了高并发操作。也是JVM上异步并发的一种高效的实现。

    Twitter的future是异步的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的线程的执行;网络IO和磁盘IO是就是例子——必须由系统处理,它为结果提供future。Finagle为网络IO提供了这样一种系统。

    Futures清晰简单:它们持有一个尚未完成运算结果的 promise 。它们是一个简单的容器——一个占位符。一次计算当然可能会失败,这种状况必须被编码:一个Future可以是三种状态之一: pending, failed, completed。

    让我们重新审视我们所说的组合:将简单的组件合成一个更复杂的。函数组合的一个权威的例子:给定函数 f 和 g,组合函数 (g∘f)(x) = g(f(x)) ——结果先对 x使用f函数,然后在使用g函数——用Scala来写:

    复合函数h,是个新的函数,由之前定义的f和g函数合成。

    Futures是一种集合类型——它是个包含0或1个元素的容器——你可以发现他们有标准的集合方法(eg:map, filter, foreach)。因为Future的值是延迟的,结果应用这些方法中的任何一种必然也延迟;在

    1. val resultStr: Future[String] = result map { i => i.toString }

    函数 { i => i.toString } 不会被调用,直到int值可用;转换集合的resultStr在可用之前也一直是待定状态。

    List可以被扁平化(flattened):

    1. val listOfList: List[List[Int]] = ..
    2. val list: List[Int] = listOfList.flatten

    这对future也是有意义的:

    1. val futureOfFuture: Future[Future[Int]] = ..
    2. val future: Future[Int] = futureOfFuture.flatten

    因为future是延迟的,flatten的实现——立即返回——不得不返回一个等待外部future (Future[Future[Int]]) 完成的future (Future[Future[Int]]).如果外部future失败,内部flattened future也将失败。

    Future (类似List) 也定义了flatMap;Future[A] 定义方法flatMap的签名

    1. flatMap[B](f: A => Future[B]): Future[B]

    如同组合 map 和 flatten,我们可以这样实现:

    1. def flatMap[B](f: A => Future[B]): Future[B] = {
    2. val mapped: Future[Future[B]] = this map f
    3. val flattened: Future[B] = mapped.flatten
    4. flattened
    5. }

    这是一种有威力的组合!使用flatMap我们可以定义一个 Future 作为两个Future序列的结果。第二个future 的计算基于第一个的结果。想象我们需要2次RPC调用来验证一个用户身份,我们可以用下面的方式组合操作:

    1. def getUser(id: Int): Future[User]
    2. def authenticate(user: User): Future[Boolean]
    3. def isIdAuthed(id: Int): Future[Boolean] =
    4. getUser(id) flatMap { user => authenticate(user) }

    这种组合类型的一个额外的好处是错误处理是内置的:如果getUser(..)或authenticate(..)失败,future 从 isAuthred(..)返回时将会失败。这里我们没有额外的错误处理的代码。

    风格

    Future回调方法(respond, onSuccess, onFailure, ensure) 返回一个新的Future,并链接到调用者。这个Future被保证只有在它调用者完成后才完成,使用模式如下:

    1. acquireResource()
    2. future onSuccess { value =>
    3. computeSomething(value)
    4. } ensure {
    5. freeResource()
    6. }

    freeResource() 被保证只有在 computeSomething之后才执行,这样就模拟了try-finally 模式。

    永远避免直接创建Promise实例: 几乎每一个任务都可以通过使用预定义的组合子完成。这些组合子确保错误和取消是可传播的, 通常鼓励的数据流风格的编程,不再需要同步和volatility声明。

    用尾递归风格编写的代码不再导致堆栈空间泄漏,并使得以数据流风格高效的实现循环成为可能:

    Future定义很多有用的方法: 使用 Future.value() 和 Future.exception() 来创建未满意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了组合子将多个future合成一个(例如:scatter-gather操作的gather部分)。

    Cancellation

    Future实现了一种弱形式的取消。调用Future#cancel 不会直接终止运算,而是发送某个级别的可被任何处理查询的触发信号,最终满足这个future。Cancellation信号流向相反的方向:一个由消费者设置的cancellation信号,会传播到它的生产者。生产者使用 Promise的onCancellation来监听信号并执行相应的动作。

    这意味这cancellation语意上依赖生产者,没有默认的实现。cancellation只是一个提示。

    Local

    Util的提供了一个位于特定的future派发树(dispatch tree)的引用单元(cell)。设定一个local的值,使这个值可以用于被同一个线程的Future 延迟的任何计算。有一些类似于thread locals(注:Java中的线程机制),不同的是它们的范围不是一个Java线程,而是一个 future 线程树。在

    1. trait User {
    2. def name: String
    3. def incrCost(points: Int)
    4. }
    5. val user = new Local[User]
    6. ...
    7. user() = currentUser
    8. rpc() ensure {
    9. user().incrCost(10)

    在 ensure块中的 user() 将在回调被添加的时候引用 user local的值。

    就thread locals来说,我们的Locals非常的方便,但要尽量避免使用:除非确信通过显式传递数据时问题不能被充分的解决,哪怕解决起来有些繁重。

    Locals有效的被核心库使用在非常常见的问题上——线程通过RPC跟踪,传播监视器,为future的回调创建stack traces——任何其他解决方法都使得用户负担过度。Locals在几乎任何其他情况下都不适合。

    Offer/Broker

    并发系统由于需要协调访问数据和资源而变得复杂。Actor提出一种简化的策略:每一个actor是一个顺序的进程(process),保持自己的状态和资源,数据通过消息的方式与其它actor共享。 共享数据需要actor之间通信。

    Offer/Broker 建立于Actor之上,以这三种重要的方式表现:1,通信通道(Brokers)是first class——即发送消息需要通过Brokers,而非直接到actor。2, Offer/Broker 是一种同步机制:通信会话是同步的。 这意味我们可以用 Broker做为协调机制:当进程a发送一条信息给进程b;a和b都要对系统状态达成一致。3, 最后,通信可以选择性地执行:一个进程可以提出几个不同的通信,其中的一个将被获取。

    为了以一种通用的方式支持选择性通信(以及其他组合),我们需要将通信的描述和执行解耦。这正是Offer做的——它是一个持久数据用于描述一次通信;为了执行这个通信(offer执行),我们通过它的sync()方法同步

    1. trait Offer[T] {
    2. def sync(): Future[T]
    3. }

    返回 Future[T] 当通信被获取的时候生成交换值。

    Broker通过offer协调值的交换——它是通信的通道:

    1. trait Broker[T] {
    2. def send(msg: T): Offer[Unit]
    3. val recv: Offer[T]
    4. }

    所以,当创建两个offer

    1. val b: Broker[Int]
    2. val sendOf = b.send(1)
    3. val recvOf = b.recv

    sendOf和recvOf都同步

    1. // In process 1:
    2. sendOf.sync()
    3. // In process 2:
    4. recvOf.sync()

    通过将多个offer和Offer.choose绑定来执行可选择通信。

    1. def choose[T](ofs: Offer[T]*): Offer[T]

    上面的代码生成一个新的offer,当同步时获取一个特定的ofs——第一个可用的。当多个都立即可用时,随机获取一个。

    Offer对象有些一次性的Offers用于与来自Broker的Offer构建。

    1. Offer.timeout(duration): Offer[Unit]

    offer在给定时间后激活。Offer.never将用于不会有效,Offer.const(value)在给定值后立即有效。这些操作由选择性通信来组合是非常有用的。例如,在一个send操作中使用超时:

    人们可能会比较 Offer/Broker 与,他们有细微但非常重要的区别。Offer可以被组合,而queue不能。例如,考虑一组queues,描述为 Brokers:

    1. val q0 = new Broker[Int]
    2. val q1 = new Broker[Int]
    3. val q2 = new Broker[Int]

    现在让我们为读取创建一个合并的queue

    1. val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)

    anyq是一个将从第一个可用的queue中读取的offer。注意 anyq 仍是同步的——我们仍然拥有底层队列的语义。这类组合是不可能用queue实现的。

    例子:一个简单的连接池

    连接池在网络应用中很常见,并且它们的实现常常需要技巧——例如,在从池中获取一个连接的时候,通常需要超时机制,因为不同的客户端有不同的延迟需求。池的简单原则:维护一个连接队列,满足那些进入的等待者。使用传统的同步原语,这通常需要两个队列(queues):一个用于等待者(当没有连接可用时),一个用于连接(当没有等待者时)。

    使用 Offer/Brokers ,可以表达得非常自然:

    1. class Pool(conns: Seq[Conn]) {
    2. private[this] val waiters = new Broker[Conn]
    3. private[this] val returnConn = new Broker[Conn]
    4. val get: Offer[Conn] = waiters.recv
    5. def put(c: Conn) { returnConn ! c }
    6. private[this] def loop(connq: Queue[Conn]) {
    7. Offer.choose(
    8. if (connq.isEmpty) Offer.never else {
    9. val (head, rest) = connq.dequeue
    10. waiters.send(head) { _ => loop(rest) }
    11. returnConn.recv { c => loop(connq enqueue c) }
    12. }
    13. loop(Queue.empty ++ conns)
    14. }

    loop总是提供一个归还的连接,但只有queue非空的时候才会send。 使用持久化队列(persistent queue)更进一步简化逻辑。与连接池的接口也是通过Offer实现,所以调用者如果愿意设置timeout,他们可以通过利用组合子(combinators)来做:

    1. val conn: Future[Option[Conn]] = Offer.choose(
    2. pool.get { conn => Some(conn) },
    3. Offer.timeout(1.second) { _ => None }
    4. ).sync()

    实现timeout不需要额外的记账(bookkeeping);这是因为Offer的语义:如果Offer.timeout被选择,不会再有offer从池中获得——连接池和它的调用者在各自waiter的broker上不必同时同意接受和发送。

    埃拉托色尼筛子(Sieve of Eratosthenes 译注:一种用于筛选素数的算法)

    把并发程序构造为一组顺序的同步通信进程,通常很有用——有时程序被大大地简化了。Offer和Broker提供了一组工具来让它简单并一致。确实,它们的应用超越了我们可能认为是经典并发性问题——并发编程(有Offer/Broker的辅助)是一种有用的构建工具,正如子例程(subroutines),类,和模块都是——来自CSP(译注:Communicating sequential processes的缩写,即通信顺序进程)的重要思想。

    这里有一个埃拉托色尼筛子可以构造为一个针对一个整数流(stream of integers)的连续的应用过滤器 。首先,我们需要一个整数的源(source of integers):

    1. def integers(from: Int): Offer[Int] = {
    2. val b = new Broker[Int]
    3. def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
    4. gen(from)
    5. b.recv
    6. }

    integers(n) 方法简单地提供了从n开始的所有连续的整数。然后我们需要一个过滤器:

    1. def filter(in: Offer[Int], prime: Int): Offer[Int] = {
    2. val b = new Broker[Int]
    3. def loop() {
    4. in.sync() onSuccess { i =>
    5. if (i % prime != 0)
    6. b.send(i).sync() ensure loop()
    7. else
    8. loop()
    9. }
    10. }
    11. loop()
    12. b.recv
    13. }

    filter(in, p) 方法返回的offer删除了in中的所有质数(prime)的倍数。最终我们定义了我们的筛子(sieve):

    1. def sieve = {
    2. val b = new Broker[Int]
    3. def loop(of: Offer[Int]) {
    4. for (prime <- of.sync(); _ <- b.send(prime).sync())
    5. loop(filter(of, prime))
    6. }
    7. loop(integers(2))
    8. }

    loop() 工作很简单:从of中读取下一个质数,然后对of应用过滤器排除这个质数。loop不断的递归,持续的质数被过滤,于是我们得到了筛选结果。我们现在打印前10000个质数: