# Day17 gRPC进阶 上篇

  • 传统请求响应,基本的Unary RPC
  • 依托于HTTP2,流式的Streaming RPC

# 服务端流式 RPC

服务端流式RPC,客户端发出一个RPC请求,服务端与客户端之间建立一个单向的流,服务端可以向流中写入多个响应消息,最后主动关闭流;而客户端需要监听这个流,不断获取响应直到流关闭

场景:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

  • 定义接口proto
message Request{
    string name = 1;
}
message Response {
    string reply = 1;
}

// Server 服务接口规范
service Server {
    ......
    // 服务端流式RPC
    rpc ServerStreamHello(Request) returns (stream Response){}
}
  • 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
  • 服务端实现接口
// 编写一个使用多种语言打招呼的方法,客户端发来一个用户名,服务端分多次返回打招呼的信息
func (s server) ServerStreamHello(reqeust *proto.Request, stream proto.Server_ServerStreamHelloServer) error {
	words := []string{
		"你好",
		"hello",
		"こんにちは",
		"안녕하세요",
	}
	for _, word := range words {
		data := &proto.Response{Reply: word + " " + reqeust.GetName()}
		err := stream.Send(data)
		if err != nil {
			fmt.Println("stream.Send err:", err)
			return err
		}
	}
	return nil
}

/*
1. ServerStreamHello 服务端和客户端函数名是一致的,但服务端参数为(request, stream)
2. 服务端的stream.Send只能发送特定类型即Reponse类型的流,可循环多次发送
3. 自动关闭,return nil
*/
  • 客户端同样执行proto和对应代码生成
  • 客户端处理多次接收响应
func doServerStreamClient(client pb.ServerClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// 响应结果为数据流stream,可以多次读取
	stream, err := client.ServerStreamHello(ctx, &pb.Request{Name: "linda"})
	if err != nil {
		log.Fatalf("client.ServerStreamHello err: %v\n", err)
	}

	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("stream.Recv err: %v\n", err)
		}
		fmt.Println(resp.Reply)
	}
}

/*
1. ServerStreamHello 服务端和客户端函数名是一致的,但客户端参数为(ctx, request) stream
2. 客户端stream.Recv()返回值为对应服务端 Response 实例
3. 客户端的 err == io.EOF 判断结束
*/

# 客户端流式 RPC

客户端传入多个请求对象,服务端返回一个响应结果

场景:物联网终端向服务器上报数据、大数据流式计算等

  • 定义接口proto
// Server 服务接口规范
service Server {
    ......
    // 客户端流式RPC
    rpc ClientStreamHello(stream Request) returns (Response){}
}
  • 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
  • 服务端实现接口
func (s server) ClientStreamHello(stream proto.Server_ClientStreamHelloServer) error {
	// 在这个示例中,我们编写一个多次发送人名,服务端统一返回一个打招呼消息的程序
	reply := "你好,"
	for {
		// 接收客户端发来的流式数据
		res, err := stream.Recv()
		if err == io.EOF {
			// 最终统一回复
			return stream.SendAndClose(&proto.Response{
				Reply: reply,
			})
		}
		if err != nil {
			return err
		}
		reply += " " + res.GetName()
	}

	/*
		1. stream.Recv() 结束的数据未固定格式 Reqeust的实例
		2. stream.Recv() 会有结束表示 io.EOF
		3. stream.SendAndClose 发送关闭标志
	*/
}
  • 客户端实现接口
func doClientStreamClient(client pb.ServerClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	stream, err := client.ClientStreamHello(ctx)
	if err != nil {
		log.Fatalf("client.ClientStreamHello err: %v\n", err)
	}

	names := []string{"linda", "tom", "catherine"}
	for _, name := range names {
		// 发送流式数据
		err := stream.Send(&pb.Request{Name: name})
		if err != nil {
			log.Fatalf("doClientStreamClient stream.Send(%v) failed, err: %v", name, err)
		}
	}

	res, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("doClientStreamClient stream.CloseAndRecv failed: %v", err)
	}
	log.Printf("got reply: %v", res.GetReply())

	/*
		1. 客户端发送的流,数据也是固定格式 stream.Send(&pb.Request{Name: name})
		2. 客户端 stream.CloseAndRecv() 接收后就主动关闭连接
	*/
}

# 双向流式 RPC

双向流式RPC即客户端和服务端均为流式的RPC,能发送多个请求对象也能接收到多个响应对象

场景:聊天应用

  • 定义接口proto
// Server 服务接口规范
service Server {
    ......
    // 双向流式RPC
    rpc DubiStreamHello(stream Request) returns (stream Response){}
}
  • 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
  • 服务端实现接口
// magic 一段价值连城的“人工智能”代码
func magic(s string) string {
	s = strings.ReplaceAll(s, "吗", "")
	s = strings.ReplaceAll(s, "吧", "")
	s = strings.ReplaceAll(s, "你", "我")
	s = strings.ReplaceAll(s, "?", "!")
	s = strings.ReplaceAll(s, "?", "!")
	return s
}

func (s server) DubiStreamHello(stream proto.Server_DubiStreamHelloServer) error {
	for {
		// 接收流式请求
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		reply := magic(in.GetName()) // 对收到的数据做些处理

		// 返回流式响应
		if err := stream.Send(&proto.Response{Reply: reply}); err != nil {
			return err
		}
	}
}
  • 客户端实现代码
func doDubiStreamClient(client pb.ServerClient) {
	// 一边从终端获取输入的请求数据发送至服务端,一边从服务端接收流式响应
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
	defer cancel()

	stream, err := client.DubiStreamHello(ctx)
	if err != nil {
		log.Fatalf("client.DubiStreamHello err: %v\n", err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			// 接收服务端返回的响应
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("client.DubiStreamHello stream.Recv() failed, err: %v", err)
			}
			fmt.Printf("AI:%s\n", in.GetReply())
		}
	}()

	// 从标准输入获取用户输入
	reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象
	for {
		cmd, _ := reader.ReadString('\n') // 读到换行
		cmd = strings.TrimSpace(cmd)
		if len(cmd) == 0 {
			continue
		}
		if strings.ToUpper(cmd) == "QUIT" {
			break
		}
		// 将获取到的数据发送至服务端
		if err := stream.Send(&pb.Request{Name: cmd}); err != nil {
			log.Fatalf("client.DubiStreamHello stream.Send(%v) failed: %v", cmd, err)
		}
	}
	stream.CloseSend()
	<-waitc
}

# 元数据 metadata

元数据metadata,类似http headers中的键值对,用于处理RPC请求和响应过程中需要但又不属于具体业务的信息,如认证token、请求标识和监控标签等

  • string类型,值[]string类型,键大小写不敏感
  • Go语言使用库 google.golang.org/grpc/metadata
  • 源码实现 type MD map[string][]string
  • 注:以下示例皆为普通RPC调用元数据使用方式

# 创建元数据

// 创建1:Pairs 以键值对字符串隔开形式创建
md := metadata.Pairs("k1", "v1", "k2", "v2")

// 创建2:New map[string]string形式创建
md := metadata.New(map[string]string{
	"k3": "v3",
	"k4": "v4",
})

# 客户端发送元数据

// 创建带有metadata的context
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 附加metadata到context
ctx = metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

# 客户端接收元数据

客户端使用CallOption中的HeaderTrailer函数来获取普通RPC调用发送的header和trailer

var header, trailer metadata.MD // 声明存储header和trailer的变量
resp, err := client.SomeRPC(
    ctx,
    someRequest,
    grpc.Header(&header),    // 将会接收header
    grpc.Trailer(&trailer),  // 将会接收trailer
)

log.Printf("header: %v\n", header)
log.Printf("tailer: %v\n", tailer)

# 服务端接收与设置元数据

func (s *SomeServer)SomeRPC(ctx context.Context, req *pb.Request)(*pb.Response, error){
	// ctx 包含请求头,如metadata相关信息
	md, _ := metadata.FromIncomingContext(ctx)
	log.Printf("md: %v\n", md)

	// 服务端设置元数据 header set
	header := metadata.New(map[string]string{"header": "HEADER"})
	tailer := metadata.New(map[string]string{"tailer": "TAILER"})
	grpc.SendHeader(ctx, header)
	grpc.SetTrailer(ctx, tailer)
}

# 错误处理 status

类似于HTTP定义了一套响应状态码,gRPC也定义有一些状态码,codes.Code (opens new window)

  • google.golang.org/grpc/status
  • google.golang.org/grpc/codes

RPC服务的方法应该返回nil或来自status.Status类型的错误

# 错误的创建

// 先创建status.Status,再转换为错误
st := status.New(codes.NotFound, "some description")
err := st.Err()  // 转为error类型

// 或直接创建错误
err := status.Error(codes.NotFound, "some description")

// 添加详情,结合自定义proto.Detail
status.WithDetails

// error 转换为 Status,进而获取详情
sts := status.FromError(err)
details := sts.Details()

# 拦截器(中间件)

拦截器可以进行日志记录、身份验证/授权、指标收集以及许多其他可以跨RPC共享的功能

  • 客户端和服务端:普通拦截器和流拦截器类型
  • 注:以下示例为普通拦截器

# 创建普通拦截器

// ClientUnaryInterceptor 客户端一元拦截器
func ClientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	start := time.Now()
	time.Sleep(2 * time.Second)

	err := invoker(ctx, method, req, reply, cc, opts...)

	end := time.Now()
	fmt.Printf("RPC: %s, start time: %s, end time: %s, err: %v\n", method, start.Format("Basic"), end.Format(time.RFC3339), err)
	return err
}

// ServerUnaryInterceptor 服务端一元拦截器
func ServerUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	// authentication (token verification)
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, status.Errorf(codes.InvalidArgument, "missing metadata")
	}

	if _, ok := md["authorization"]; !ok {
		return nil, errcode.ToRPCError(errcode.Unauthorized)
	}

	m, err := handler(ctx, req)
	if err != nil {
		fmt.Printf("RPC failed with error %v\n", err)
	}
	return m, err
}

# 注册普通拦截器

// 客户端注册拦截器
conn, _ := grpc.Dial(":"+port,
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithUnaryInterceptor(middleware.ClientUnaryInterceptor),
)

// 服务端注册拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(middleware.ServerUnaryInterceptor))

# 开源中间件

go-grpc-middleware (opens new window)

上次更新: 12/22/2022, 6:53:26 PM