「连载三」gRPC Streaming, Client and Server

    • Server-side streaming RPC:服务器端流式 RPC
    • Client-side streaming RPC:客户端流式 RPC
    • Bidirectional streaming RPC:双向流式 RPC

    任何技术,因为有痛点,所以才有了存在的必要性。如果您想要了解 gRPC 的流式调用,请继续

    gRPC Streaming 是基于 HTTP/2 的,后续章节再进行详细讲解

    为什么不用 Simple RPC

    流式为什么要存在呢,是 Simple RPC 有什么问题吗?通过模拟业务场景,可得知在使用 Simple RPC 时,有如下问题:

    • 数据包过大造成的瞬时压力
    • 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法客户端边发送,服务端边处理)

    为什么用 Streaming RPC

    • 大规模数据包
    • 实时场景

    模拟场景

    每天早上 6 点,都有一批百万级别的数据集要同从 A 同步到 B,在同步的时候,会做一系列操作(归档、数据分析、画像、日志等)。这一次性涉及的数据量确实大

    在同步完成后,也有人马上会去查阅数据,为了新的一天筹备。也符合实时性。

    两者相较下,这个场景下更适合使用 Streaming RPC

    在讲解具体的 gRPC 流式代码时,会着重在第一节讲解,因为三种模式其实是不同的组合。希望你能够注重理解,举一反三,其实都是一样的知识点 👍

    增加 stream_server、stream_client 存放服务端和客户端文件,proto/stream.proto 用于编写 IDL

    IDL

    在 proto 文件夹下的 stream.proto 文件中,写入如下内容:

    1. package proto;
    2. service StreamService {
    3. rpc List(StreamRequest) returns (stream StreamResponse) {};
    4. rpc Record(stream StreamRequest) returns (StreamResponse) {};
    5. rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
    6. }
    7. message StreamPoint {
    8. string name = 1;
    9. int32 value = 2;
    10. }
    11. message StreamRequest {
    12. StreamPoint pt = 1;
    13. }
    14. message StreamResponse {
    15. StreamPoint pt = 1;
    16. }

    注意关键字 stream,声明其为一个流方法。这里共涉及三个方法,对应关系为

    • List:服务器端流式 RPC
    • Record:客户端流式 RPC
    • Route:双向流式 RPC

    基础模板 + 空定义

    Server

    1. package main
    2. import (
    3. "log"
    4. "net"
    5. "google.golang.org/grpc"
    6. pb "github.com/EDDYCJY/go-grpc-example/proto"
    7. )
    8. type StreamService struct{}
    9. const (
    10. PORT = "9002"
    11. )
    12. func main() {
    13. server := grpc.NewServer()
    14. pb.RegisterStreamServiceServer(server, &StreamService{})
    15. lis, err := net.Listen("tcp", ":"+PORT)
    16. if err != nil {
    17. log.Fatalf("net.Listen err: %v", err)
    18. }
    19. server.Serve(lis)
    20. }
    21. func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    22. return nil
    23. }
    24. func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    25. return nil
    26. }
    27. func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    28. return nil
    29. }

    写代码前,建议先将 gRPC Server 的基础模板和接口给空定义出来。若有不清楚可参见上一章节的知识点

    Client

    1. package main
    2. import (
    3. "log"
    4. "google.golang.org/grpc"
    5. pb "github.com/EDDYCJY/go-grpc-example/proto"
    6. )
    7. const (
    8. PORT = "9002"
    9. )
    10. func main() {
    11. conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    12. if err != nil {
    13. log.Fatalf("grpc.Dial err: %v", err)
    14. }
    15. defer conn.Close()
    16. client := pb.NewStreamServiceClient(conn)
    17. err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
    18. if err != nil {
    19. log.Fatalf("printLists.err: %v", err)
    20. }
    21. if err != nil {
    22. log.Fatalf("printRecord.err: %v", err)
    23. }
    24. err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
    25. if err != nil {
    26. log.Fatalf("printRoute.err: %v", err)
    27. }
    28. }
    29. func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    30. return nil
    31. }
    32. func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    33. return nil
    34. }
    35. func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    36. return nil
    37. }

    简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:

    image

    Server

    1. func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    2. for n := 0; n <= 6; n++ {
    3. err := stream.Send(&pb.StreamResponse{
    4. Pt: &pb.StreamPoint{
    5. Name: r.Pt.Name,
    6. Value: r.Pt.Value + int32(n),
    7. },
    8. })
    9. if err != nil {
    10. return err
    11. }
    12. }
    13. return nil
    14. }

    在 Server,主要留意 stream.Send 方法。它看上去能发送 N 次?有没有大小限制?

    1. type StreamService_ListServer interface {
    2. Send(*StreamResponse) error
    3. grpc.ServerStream
    4. }
    5. func (x *streamServiceListServer) Send(m *StreamResponse) error {
    6. return x.ServerStream.SendMsg(m)
    7. }

    通过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:

    • 消息体(对象)序列化
    • 压缩序列化后的消息体
    • 对正在传输的消息体增加 5 个字节的 header
    • 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
    • 写入给流的数据集

    Client

    在 Client,主要留意 stream.Recv() 方法。什么情况下 io.EOF ?什么情况下存在错误信息呢?

    1. type StreamService_ListClient interface {
    2. Recv() (*StreamResponse, error)
    3. grpc.ClientStream
    4. }
    5. func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
    6. m := new(StreamResponse)
    7. if err := x.ClientStream.RecvMsg(m); err != nil {
    8. return nil, err
    9. }
    10. return m, nil
    11. }

    RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:

    (1)RecvMsg 是阻塞等待的

    (2)RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF

    (3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:

    • io.EOF
    • io.ErrUnexpectedEOF
    • transport.ConnectionError
    • google.golang.org/grpc/codes

    同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 _ 1024 _ 4,建议不要超出

    验证

    运行 stream_server/server.go:

    1. $ go run server.go

    运行 stream_client/client.go:

    1. $ go run client.go
    2. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
    3. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
    4. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
    5. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
    6. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
    7. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
    8. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024

    二、Client-side streaming RPC:客户端流式 RPC

    Server

    1. func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    2. for {
    3. r, err := stream.Recv()
    4. if err == io.EOF {
    5. return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
    6. }
    7. return err
    8. }
    9. log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
    10. }
    11. return nil
    12. }

    多了一个从未见过的方法 stream.SendAndClose,它是做什么用的呢?

    在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv

    Client

    1. func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    2. stream, err := client.Record(context.Background())
    3. if err != nil {
    4. return err
    5. }
    6. err := stream.Send(r)
    7. if err != nil {
    8. return err
    9. }
    10. }
    11. resp, err := stream.CloseAndRecv()
    12. if err != nil {
    13. return err
    14. }
    15. log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
    16. return nil
    17. }

    stream.CloseAndRecvstream.SendAndClose 是配套使用的流方法,相信聪明的你已经秒懂它的作用了

    验证

    重启 stream_server/server.go,再次运行 stream_client/client.go:

    stream_client:
    stream_server:
    1. $ go run server.go
    2. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
    3. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
    4. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
    5. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
    6. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
    7. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018

    三、Bidirectional streaming RPC:双向流式 RPC

    双向流式 RPC,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求

    首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)

    假设该双向流是按顺序发送的话,大致如图:

    image

    还是要强调,双向流变化很大,因程序编写的不同而不同。双向流图示无法适用不同的场景

    Server

    1. func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    2. n := 0
    3. for {
    4. err := stream.Send(&pb.StreamResponse{
    5. Pt: &pb.StreamPoint{
    6. Name: "gPRC Stream Client: Route",
    7. Value: int32(n),
    8. },
    9. })
    10. if err != nil {
    11. return err
    12. }
    13. r, err := stream.Recv()
    14. if err == io.EOF {
    15. return nil
    16. }
    17. if err != nil {
    18. return err
    19. }
    20. n++
    21. log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
    22. }
    23. return nil
    24. }

    Client

    1. func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    2. stream, err := client.Route(context.Background())
    3. if err != nil {
    4. return err
    5. }
    6. for n := 0; n <= 6; n++ {
    7. err = stream.Send(r)
    8. if err != nil {
    9. return err
    10. }
    11. resp, err := stream.Recv()
    12. if err == io.EOF {
    13. break
    14. }
    15. if err != nil {
    16. return err
    17. }
    18. log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
    19. }
    20. stream.CloseSend()
    21. return nil
    22. }

    验证

    重启 stream_server/server.go,再次运行 stream_client/client.go:

    stream_server
    1. $ go run server.go
    2. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    3. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    4. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    5. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    6. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    7. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
    stream_client
    1. $ go run client.go
    2. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
    3. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
    4. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
    5. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
    6. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
    7. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
    8. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6

    在本文共介绍了三类流的交互方式,可以根据实际的业务场景去选择合适的方式。会事半功倍哦 🎑