# Day17 gRPC进阶 上篇
- 传统请求响应,基本的Unary RPC
- 依托于HTTP2,流式的Streaming RPC
# 服务端流式 RPC
服务端流式RPC,客户端发出一个RPC请求,服务端与客户端之间建立一个单向的流,服务端可以向流中写入多个响应消息,最后主动关闭流;而客户端需要监听这个流,不断获取响应直到流关闭
场景:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端
- 定义接口proto
message Request{
string name = 1;
}
message Response {
string reply = 1;
}
// Server 服务接口规范
service Server {
......
// 服务端流式RPC
rpc ServerStreamHello(Request) returns (stream Response){}
}
- 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
- 服务端实现接口
// 编写一个使用多种语言打招呼的方法,客户端发来一个用户名,服务端分多次返回打招呼的信息
func (s server) ServerStreamHello(reqeust *proto.Request, stream proto.Server_ServerStreamHelloServer) error {
words := []string{
"你好",
"hello",
"こんにちは",
"안녕하세요",
}
for _, word := range words {
data := &proto.Response{Reply: word + " " + reqeust.GetName()}
err := stream.Send(data)
if err != nil {
fmt.Println("stream.Send err:", err)
return err
}
}
return nil
}
/*
1. ServerStreamHello 服务端和客户端函数名是一致的,但服务端参数为(request, stream)
2. 服务端的stream.Send只能发送特定类型即Reponse类型的流,可循环多次发送
3. 自动关闭,return nil
*/
- 客户端同样执行proto和对应代码生成
- 客户端处理多次接收响应
func doServerStreamClient(client pb.ServerClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 响应结果为数据流stream,可以多次读取
stream, err := client.ServerStreamHello(ctx, &pb.Request{Name: "linda"})
if err != nil {
log.Fatalf("client.ServerStreamHello err: %v\n", err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("stream.Recv err: %v\n", err)
}
fmt.Println(resp.Reply)
}
}
/*
1. ServerStreamHello 服务端和客户端函数名是一致的,但客户端参数为(ctx, request) stream
2. 客户端stream.Recv()返回值为对应服务端 Response 实例
3. 客户端的 err == io.EOF 判断结束
*/
# 客户端流式 RPC
客户端传入多个请求对象,服务端返回一个响应结果
场景:物联网终端向服务器上报数据、大数据流式计算等
- 定义接口proto
// Server 服务接口规范
service Server {
......
// 客户端流式RPC
rpc ClientStreamHello(stream Request) returns (Response){}
}
- 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
- 服务端实现接口
func (s server) ClientStreamHello(stream proto.Server_ClientStreamHelloServer) error {
// 在这个示例中,我们编写一个多次发送人名,服务端统一返回一个打招呼消息的程序
reply := "你好,"
for {
// 接收客户端发来的流式数据
res, err := stream.Recv()
if err == io.EOF {
// 最终统一回复
return stream.SendAndClose(&proto.Response{
Reply: reply,
})
}
if err != nil {
return err
}
reply += " " + res.GetName()
}
/*
1. stream.Recv() 结束的数据未固定格式 Reqeust的实例
2. stream.Recv() 会有结束表示 io.EOF
3. stream.SendAndClose 发送关闭标志
*/
}
- 客户端实现接口
func doClientStreamClient(client pb.ServerClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
stream, err := client.ClientStreamHello(ctx)
if err != nil {
log.Fatalf("client.ClientStreamHello err: %v\n", err)
}
names := []string{"linda", "tom", "catherine"}
for _, name := range names {
// 发送流式数据
err := stream.Send(&pb.Request{Name: name})
if err != nil {
log.Fatalf("doClientStreamClient stream.Send(%v) failed, err: %v", name, err)
}
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("doClientStreamClient stream.CloseAndRecv failed: %v", err)
}
log.Printf("got reply: %v", res.GetReply())
/*
1. 客户端发送的流,数据也是固定格式 stream.Send(&pb.Request{Name: name})
2. 客户端 stream.CloseAndRecv() 接收后就主动关闭连接
*/
}
# 双向流式 RPC
双向流式RPC即客户端和服务端均为流式的RPC,能发送多个请求对象也能接收到多个响应对象
场景:聊天应用
- 定义接口proto
// Server 服务接口规范
service Server {
......
// 双向流式RPC
rpc DubiStreamHello(stream Request) returns (stream Response){}
}
- 服务端proto代码生成
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto
- 服务端实现接口
// magic 一段价值连城的“人工智能”代码
func magic(s string) string {
s = strings.ReplaceAll(s, "吗", "")
s = strings.ReplaceAll(s, "吧", "")
s = strings.ReplaceAll(s, "你", "我")
s = strings.ReplaceAll(s, "?", "!")
s = strings.ReplaceAll(s, "?", "!")
return s
}
func (s server) DubiStreamHello(stream proto.Server_DubiStreamHelloServer) error {
for {
// 接收流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName()) // 对收到的数据做些处理
// 返回流式响应
if err := stream.Send(&proto.Response{Reply: reply}); err != nil {
return err
}
}
}
- 客户端实现代码
func doDubiStreamClient(client pb.ServerClient) {
// 一边从终端获取输入的请求数据发送至服务端,一边从服务端接收流式响应
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
stream, err := client.DubiStreamHello(ctx)
if err != nil {
log.Fatalf("client.DubiStreamHello err: %v\n", err)
}
waitc := make(chan struct{})
go func() {
for {
// 接收服务端返回的响应
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("client.DubiStreamHello stream.Recv() failed, err: %v", err)
}
fmt.Printf("AI:%s\n", in.GetReply())
}
}()
// 从标准输入获取用户输入
reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象
for {
cmd, _ := reader.ReadString('\n') // 读到换行
cmd = strings.TrimSpace(cmd)
if len(cmd) == 0 {
continue
}
if strings.ToUpper(cmd) == "QUIT" {
break
}
// 将获取到的数据发送至服务端
if err := stream.Send(&pb.Request{Name: cmd}); err != nil {
log.Fatalf("client.DubiStreamHello stream.Send(%v) failed: %v", cmd, err)
}
}
stream.CloseSend()
<-waitc
}
# 元数据 metadata
元数据metadata,类似http headers中的键值对,用于处理RPC请求和响应过程中需要但又不属于具体业务的信息,如认证token、请求标识和监控标签等
- 键
string
类型,值[]string
类型,键大小写不敏感 - Go语言使用库
google.golang.org/grpc/metadata
- 源码实现
type MD map[string][]string
- 注:以下示例皆为普通RPC调用元数据使用方式
# 创建元数据
// 创建1:Pairs 以键值对字符串隔开形式创建
md := metadata.Pairs("k1", "v1", "k2", "v2")
// 创建2:New map[string]string形式创建
md := metadata.New(map[string]string{
"k3": "v3",
"k4": "v4",
})
# 客户端发送元数据
// 创建带有metadata的context
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 附加metadata到context
ctx = metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)
# 客户端接收元数据
客户端使用CallOption
中的Header
和Trailer
函数来获取普通RPC调用发送的header和trailer
var header, trailer metadata.MD // 声明存储header和trailer的变量
resp, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 将会接收header
grpc.Trailer(&trailer), // 将会接收trailer
)
log.Printf("header: %v\n", header)
log.Printf("tailer: %v\n", tailer)
# 服务端接收与设置元数据
func (s *SomeServer)SomeRPC(ctx context.Context, req *pb.Request)(*pb.Response, error){
// ctx 包含请求头,如metadata相关信息
md, _ := metadata.FromIncomingContext(ctx)
log.Printf("md: %v\n", md)
// 服务端设置元数据 header set
header := metadata.New(map[string]string{"header": "HEADER"})
tailer := metadata.New(map[string]string{"tailer": "TAILER"})
grpc.SendHeader(ctx, header)
grpc.SetTrailer(ctx, tailer)
}
# 错误处理 status
类似于HTTP定义了一套响应状态码,gRPC也定义有一些状态码,codes.Code (opens new window)
google.golang.org/grpc/status
google.golang.org/grpc/codes
RPC服务的方法应该返回nil
或来自status.Status
类型的错误
# 错误的创建
// 先创建status.Status,再转换为错误
st := status.New(codes.NotFound, "some description")
err := st.Err() // 转为error类型
// 或直接创建错误
err := status.Error(codes.NotFound, "some description")
// 添加详情,结合自定义proto.Detail
status.WithDetails
// error 转换为 Status,进而获取详情
sts := status.FromError(err)
details := sts.Details()
# 拦截器(中间件)
拦截器可以进行日志记录、身份验证/授权、指标收集以及许多其他可以跨RPC共享的功能
- 客户端和服务端:普通拦截器和流拦截器类型
- 注:以下示例为普通拦截器
# 创建普通拦截器
// ClientUnaryInterceptor 客户端一元拦截器
func ClientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
time.Sleep(2 * time.Second)
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
fmt.Printf("RPC: %s, start time: %s, end time: %s, err: %v\n", method, start.Format("Basic"), end.Format(time.RFC3339), err)
return err
}
// ServerUnaryInterceptor 服务端一元拦截器
func ServerUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// authentication (token verification)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "missing metadata")
}
if _, ok := md["authorization"]; !ok {
return nil, errcode.ToRPCError(errcode.Unauthorized)
}
m, err := handler(ctx, req)
if err != nil {
fmt.Printf("RPC failed with error %v\n", err)
}
return m, err
}
# 注册普通拦截器
// 客户端注册拦截器
conn, _ := grpc.Dial(":"+port,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(middleware.ClientUnaryInterceptor),
)
// 服务端注册拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(middleware.ServerUnaryInterceptor))