概述

在一些应用场景中, client或server需要向对面发送大量数据,这些数据非常大或者持续地在产生以至于无法放在一个RPC的附件中。比如一个分布式系统的不同节点间传递replica或snapshot。client/server之间虽然可以通过多次RPC把数据切分后传输过去,但存在如下问题:

  • 如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑相当复杂。
  • 如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时,特别是后者的延时可能是难以预估的。为了让大块数据以流水线的方式在client/server之间传递, 我们提供了Streaming RPC这种交互模型。Streaming RPC让用户能够在client/service之间建立用户态连接,称为Stream, 同一个TCP连接之上能同时存在多个Stream。 Stream的传输数据以消息为基本单位, 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。

Streaming RPC保证:

  • 有消息边界。
  • 接收消息的顺序和发送消息的顺序严格一致。
  • 支持流控。
  • 提供超时提醒目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有Head-of-line blocking问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。

例子见。

建立Stream

程序中我们用StreamId代表一个Stream,对Stream的读写,关闭操作都将作用在这个Id上。

接受Stream

如果client在RPC上附带了一个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。

  1. // [Called at the server side]
  2. // Accept the Stream. If client didn't create a Stream with the request
  3. // (cntl.has_remote_stream() returns false), this method would fail.
  4. // Return 0 on success, -1 otherwise.
  5. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);

读取Stream

第一次收到请求的时间

在client端,如果建立过程是一次同步RPC, 那在等待的线程被唤醒之后,on_received_message就可能会被调用到。 如果是异步RPC请求, 那等到这次请求的done->Run() 执行完毕之后, on_received_message就可能会被调用。

在server端, 当框架传入的done->Run()被调用完之后, on_received_message就可能会被调用。

写入Stream

  1. // Write |message| into |stream_id|. The remote-side handler will received the
  2. // message by the written order
  3. // Returns 0 on success, errno otherwise
  4. // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
  5. // which the remote side hasn't consumed yet excceeds the number.
  6. // - EINVAL: |stream_id| is invalied or has been closed
  7. int StreamWrite(StreamId stream_id, const butil::IOBuf &message);

流控

关闭Stream

  1. // Close |stream_id|, after this function is called:
  2. // - All the following |StreamWrite| would fail
  3. // - |StreamWait| wakes up immediately.
  4. // - Both sides |on_closed| would be notifed after all the pending buffers have
  5. // been received
  6. // This function could be called multiple times without side-effects