浅析gRPC

gRPC.io官网上的一篇Blog大致讲解了gRPC是如何使用HTTP/2的,觉得讲的比较抽象,理解的不够透彻,于是自己找了一些资料。大致总结一下,个人认为:gRPC之所以高效,除了在协议层使用Protobuffer之外,底层使用HTTP/2也是一个非常重要的原因。下面先上一张图,再来看看HTTP/2的一些特征。

gRPC

HTTP/2

概念

  • 消息(Message):由一个或多个帧组合而成,例如请求和响应;
  • 流(Stream):存在于连接中的一个虚拟通道,流可以承载双向消息,每个流都有一个唯一的整数ID;
  • 帧(Frame):HTTP/2通信的最小单位,每个帧包含帧首部,至少也会标识出当前帧所属的流;
  • 连接(Connection):与 HTTP/1 相同,都是指对应的 TCP 连接;

特征

  • 多路复用、乱序收发:可以乱序收发数据报文,不用使用单步:发1->收1 或者流水线:发1->发2->收2->收1 的流程,提高效率;
  • Header压缩:不用花大量篇幅重复发送常用header,采用发送增量的方法,由客户端和服务器端共同维护一个字典;
  • stream优先级:可以在一个连接上,为不同stream设置不同优先级;
  • 服务器推送:提前发送需要的资源;

gRPC

收发消息流程

发送流程

  • 解析地址:client消息发送给gRpc,然后resolver解析域名,并获取到目标服务器地址列表;
  • 负载均衡:客户端基于负载均衡算法,从连接服务器列表中找出一个目标服务器;
  • 连接:如果到目标服务器已有连接,则使用已有连接,访问目标服务器;如果没有可用连接,则创建HTTP/2连接;
  • 编码:对请求消息使用 Protobuf做序列化,通过 HTTP/2 Stream 发送给 gRPC 服务端;

接收流程

  • 编码:接收到服务端响应之后,使用Protobuf 做反序列化;
  • 回调:回调 GrpcFuture 的 set(Response) 方法,唤醒阻塞的客户端调用线程,获取 RPC 响应。

负载均衡

简单流程

gRPC支持客户端负载均衡rr和grpclb policy流程:

  1. grpc client请求nameserver解析服务名称,返回地址里面会标识该地址是属于lb还是server;
  2. 如果返回地址是lb的,客户端将使用grpclb策略,否则使用其他配置的负载均衡策略;
  3. 其他负载均衡策略都会建立到所有server的子通道,然后按照策略发送流量;
  4. 如果是grpclb策略,将建立到lb server的连接,并通过该连接获取grpc server的地址(grpc server会发送负载情况到lb server以更新状态);

实现

基于gRPC dnsResolver可以实现基于consul的resolver,只需要实现Resolve接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
config := api.DefaultConfig()
config.Address = r.address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &consulWatcher{
client: client,
service: r.service,
addrs: map[string]struct{}{},
}, nil
}

其返回值naming.Watcher需要实现next方法,用于刷新address列表。在使用的时候,还是需要指定对应的target,除此之外,还需要通过lb和resolver配置负载均衡。

1
2
3
4
5
6
t.gRpcConn, err = grpc.Dial(
t.GRpcURL,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithBalancer(grpc.RoundRobin(grpclb.NewConsulResolver(consulAddr, service))),
)

这里必须要指定grpc.WithBlock(),否则不能够生效,应该是从consul获取地址需要时间,如果不指定的话会直接跑过。
但是,这也带来了坑,一旦consul上查不到对应的地址列表,会阻塞,且没有任何打印信息。

服务定义

service

在protobuf中定义的service,最终会被编译为一个类;所有的RPC调用都是这个类的方法。

1
2
3
service RouteGuide {
...
}

RPC类型

所有的RPC都是service的编译成的类的一个方法,gRPC支持四种不同的方法。其中流式RPC中,使用stream关键字特殊定义了参数类型。

  • 简单RPC
1
rpc GetFeature(Point) returns (Feature) {}
  • 服务器端流式RPC
1
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 客户端流式RPC
1
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 向流式RPC
1
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

编译结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type routeGuideServer struct {
...
}
...

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}

流式RPC

流式RPC是指,传入或者返回的是一个stream(类似于一个套接口)。代码可以对该stream执行send或者recv操作来写入或者读出在RPC定义时指定的数据结构,可循环操作多次,直到读完数据。

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建gRPC连接
conn, err := grpc.Dial(*address, grpc.WithInsecure())
if err != nil {
log.Fatalf("faild to connect: %v", err)
}
defer conn.Close()

// 初始化客户端代码
c := pb.NewGreeterClient(conn)

// 调用服务器端流式RPC,请求是一个HelloRequest结构体,返回是stream
stream, err := c.SayHello1(context.Background(), &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}

// 循环读取在RPC中定义的数据结构,直到返回io.EOF
for {
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("failed to recv: %v", err)
}

log.Printf("Greeting: %s", reply.Message)
}

场景

当年不知道gRPC支持流式RPC,为了从日志agent获取日志信息,使用了简单的RPC来逐条读取消息,效率极其低下。现在想来,其实如果使用gRPC的流式方法来获取日志应该也算是一个比较好的方案。比如,当用户在页面上要查看日志的时候,controller层就发起到日志微服务的gGRPC流式请求,一个日志请求发送过去,从该RPC就可以流式的返回所需查看的所有日志;而不用发送一条再请求第二条,再返回。

0%