# Day16 Protobuf与gRPC入门

# RPC 介绍

RPC(Remote Procedure Call),即远程过程调用,像调用本地服务一样调用远程服务,C/S模式

# RPC 示例

# 基于HTTP的RPC

  • server端和client端都必须有的公共服务 service.go
package main

type Args struct {
	X, Y int
}

// ServiceA 自定义一个结构体类型
type ServiceA struct{}

// Add 为ServiceA类型增加一个可导出的Add方法
func (s *ServiceA) Add(args *Args, reply *int) error {
	*reply = args.X + args.X
	return nil
}
  • 服务端代码 server.go
package main

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
)

func main() {
	service := new(ServiceA)
	rpc.Register(service) // 注册RPC服务
	rpc.HandleHTTP()      // 基于HTTP协议

	l, e := net.Listen("tcp", ":8999")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	http.Serve(l, nil)
}
  • 客户端代码 client.go
package main

import (
	"fmt"
	"log"
	"net/rpc"
)

func main() {
	// 建立HTTP连接
	client, err := rpc.DialHTTP("tcp", "127.0.0.1:8999")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	// 同步调用
	args := &Args{10, 20}
	var reply int
    // 入参args 和 取值replay
	err = client.Call("ServiceA.Add", args, &reply)
	if err != nil {
		log.Fatal("ServiceA.Add error:", err)
	}
	fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)

	// 异步调用
	var reply2 int
	divCall := client.Go("ServiceA.Add", args, &reply2, nil)
	replyCall := <-divCall.Done // 接收调用结果
	fmt.Println(replyCall.Error)
	fmt.Println(reply2)
}
  • 启动服务
// server
go run server.go service.go

// client
go run client.go service.go
ServiceA.Add: 10+20=20
<nil>
20

客户端进行

  1. 拨号client, err := rpc.DialHTTP("tcp", "127.0.0.1:8999")
  2. 调用err = client.Call("ServiceA.Add", args, &reply)

# 基于TCP协议的RPC

rpc 包也支持直接使用 TCP 协议而不使用HTTP协议

  • 服务端代码 server.go
package main

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
)

func main() {
	service := new(ServiceA)
	rpc.Register(service) // 注册RPC服务

	l, e := net.Listen("tcp", ":8999")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	for {
		conn, _ := l.Accept()
		rpc.ServeConn(conn)
	}
}
  • 客户端代码 client.go
package main

import (
	"fmt"
	"log"
	"net/rpc"
)

func main() {
	// 建立TCP连接
	client, err := rpc.Dial("tcp", "127.0.0.1:8999")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	// 同步调用
	args := &Args{10, 20}
	var reply int
	err = client.Call("ServiceA.Add", args, &reply)
	if err != nil {
		log.Fatal("ServiceA.Add error:", err)
	}
	fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)

	// 异步调用
	var reply2 int
	divCall := client.Go("ServiceA.Add", args, &reply2, nil)
	replyCall := <-divCall.Done // 接收调用结果
	fmt.Println(replyCall.Error)
	fmt.Println(reply2)
}
  • 启动服务
// server
go run server.go service.go

// client
go run client.go service.go
ServiceA.Add: 10+20=20
<nil>
20

# 基于JSON协议的RPC

rpc 包默认使用的是 gob 协议对传输数据进行序列化/反序列化。json协议可实现跨语言

  • 服务端代码 server.go
package main

import (
	"log"
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

func main() {
	service := new(ServiceA)
	rpc.Register(service) // 注册RPC服务

	l, e := net.Listen("tcp", ":8999")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	for {
		conn, _ := l.Accept()
		// 使用JSON协议
		rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
	}
}
  • 客户端代码 client.go
package main

import (
	"fmt"
	"log"
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

func main() {
	// 建立TCP连接,是net.Dial
	conn, err := net.Dial("tcp", "127.0.0.1:8999")
	if err != nil {
		log.Fatal("dialing:", err)
	}
	// 使用JSON协议
	client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

	// 同步调用
	args := &Args{10, 20}
	var reply int
	err = client.Call("ServiceA.Add", args, &reply)
	if err != nil {
		log.Fatal("ServiceA.Add error:", err)
	}
	fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)

	// 异步调用
	var reply2 int
	divCall := client.Go("ServiceA.Add", args, &reply2, nil)
	replyCall := <-divCall.Done // 接收调用结果
	fmt.Println(replyCall.Error)
	fmt.Println(reply2)
}
  • 启动服务
// server
go run server.go service.go

// client
go run client.go service.go
ServiceA.Add: 10+20=20
<nil>
20

# Python调用RPC

python client 远程调用上面 Go server中 serviceA的Add方法

import socket
import json

request = {
    "id": 0,
    "params": [{"X": 10, "Y": 20}, "reply"],  # 参数要对应上Args结构体
    "method": "ServiceA.Add"
}

client = socket.create_connection(("127.0.0.1", 8999), 5)
client.sendall(json.dumps(request).encode())

rsp = client.recv(1024)
rsp = json.loads(rsp.decode())
print(rsp)

# RPC 原理

使用RPC框架的目标是只需要关心第1步和最后1步,中间的其他步骤统统封装起来,让使用者无需关心

如RPC框架(grpc、thrift等)就是为了让RPC调用更方便

# Protocol Buffers

Protocol Buffers 是 Google 公司于2008年开源的一种语言无关、平台无关、可扩展的用于序列化结构化数据——类似于XML,但比XML更小、更快、更简单,它可用于(数据)通信协议、数据存储等。你只需要定义一次你想要的数据结构,然后你就可以使用特殊生成的源代码来轻松地从各种数据流和各种语言中写入和读取你的结构化数据。

目前 Protobuf 被广泛用作微服务中的通信协议

Protobuf 工作模式:大家定义好.proto源文件,然后根据源文件生成各种语言的代码

# Protocol Buffers V3 定义

syntax = "proto3";  // 声明使用v3版本的语法;

message SearchRequest { // 关键字 message
    string query = 1;           // 1 是编号,指的是消息中第1位是query
    int32 page_numer = 2;       // 分号结尾
    int32 result_per_page = 3;  // 已发布的名称,不能修改,只能新增
}

# gRPC 安装

# 安装gRPC

go get google.golang.org/grpc@latest

# 安装Protocol Buffers v3

* 安装用于生产gRPC服务代码的协议编译器 [https://github.com/protocolbuffers/protobuf/releases](https://github.com/protocolbuffers/protobuf/releases)
* Mac M2系统 [protoc-3.20.1-osx-aarch_64.zip](https://objects.githubusercontent.com/github-production-release-asset-2e65be/23357588/f4d3f0f2-79f3-43f2-8c52-2db7bed10003?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20221215%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20221215T045734Z&X-Amz-Expires=300&X-Amz-Signature=78dab429497e235f8f7ac35b6d9de64b17e15c705b24dbc8b7583ea14b5f02d9&X-Amz-SignedHeaders=host&actor_id=22052946&key_id=0&repo_id=23357588&response-content-disposition=attachment%3B%20filename%3Dprotoc-3.20.1-osx-aarch_64.zip&response-content-type=application%2Foctet-stream)
* 设置环境变量,查看安装版本
vim /etc/profile
export PATH=/Users/nining/go/install/protoc-3.20.1-osx-aarch_64/bin:$PATH
source /etc/profile

protoc --version
> libprotoc 3.20.1

  1. bin 目录下的 protoc 是可执行文件;
  2. include 目录下的是 google 定义的.proto文件,import "google/protobuf/timestamp.proto"就是从此处导入;

# 安装插件 protoc-gen-go

该插件会根据.proto文件生成一个后缀为.pb.go的文件,包含所有.proto文件中定义的类型及其序列化方法

go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28

protoc-gen-go --version
> protoc-gen-go v1.28.1

# 安装插件 protoc-gen-go-grpc

该插件会生成一个后缀为_grpc.pb.go的文件,其中包含:

  1. 一种接口类型(或存根) ,供客户端调用的服务方法
  2. 服务器要实现的接口类型
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

protoc-gen-go-grpc --version
> protoc-gen-go-grpc 1.2.0

# gRPC 示例

# 编写proto代码

// key = value;
syntax = "proto3";  // 版本声明,使用Protocol Buffers v3版本

// 服务端和客户端 共用,共同定义请求参数、响应参数和具体服务方法名称(实现的功能)
option go_package = "grpc_server/proto"; // 指定生成的Go代码在项目中的导入路径
// option go_package = "gpbc_client/proto";  // 指定生成的Go代码在项目中的导入路径

package proto;  // 包名

// import "google/protobuf/timestamp.proto";

// Op 枚举 操作类型
enum Op {
    _ = 0;
    Add = 1;
    Sub = 2;
    Mul = 3;
    Div = 4;
}

// Req 请求服务参数
message Req{
    int64 x = 1;
    int64 y = 2;
    Op op = 3;
}

// Resp 响应结果参数
message Resp {
    int64 result = 1;
}

message OrderReq{
    int64 order_id = 1;
}

message OrderResp {
    int64 order_id = 1;
    string name = 2;
    int64 price = 3;
    int64 count = 4;
}

// Server 服务接口规范
service Server {
    rpc Operate(Req) returns (Resp){}
    rpc GetOrderInfo(OrderReq) returns (OrderResp){}
}
  • 服务端和客户端 各自生产代码
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/server.proto

# 编写Server端Go代码

package main

import (
	"context"
	"grpc_server/proto"
	"log"
	"net"

	"google.golang.org/grpc"
)

// 实现 prodo 描述文件中定义的服务,然后把实现的服务注册到 gRPC 服务上
type server struct {
	proto.UnimplementedServerServer
}

func (s server) Operate(ctx context.Context, in *proto.Req) (*proto.Resp, error) {
	var result int64
	switch in.Op {
	case proto.Op_Add:
		result = in.X + in.Y
	case proto.Op_Sub:
		result = in.X - in.Y
	case proto.Op_Mul:
		result = in.X * in.Y
	case proto.Op_Div:
		result = in.X / in.Y
	default:
		result = 0
	}
	return &proto.Resp{Result: result}, nil
}

func (s server) GetOrderInfo(ctx context.Context, in *proto.OrderReq) (*proto.OrderResp, error) {
	return &proto.OrderResp{OrderId: in.OrderId, Name: "linda_order", Price: 100, Count: 2}, nil
}

func main() {
	// 1. 启 TCP 服务
	lis, err := net.Listen("tcp", ":8973")
	if err != nil {
		log.Fatalf("net.Liste err: %v\n", err)
	}

	// 2. 注册 RPC 服务
	s := grpc.NewServer()
	proto.RegisterServerServer(s, &server{})

	// 3. 启动 RPC 服务
	err = s.Serve(lis)
	if err != nil {
		log.Fatalf("s.Serve err: %v\n", err)
	}
}

# 编写Client端Go代码

package main

import (
	"context"
	"fmt"
	pb "grpc_client/proto"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	// 通过 RPC 调用其他程序中服务

	// 1. 建立连接
	// 不使用加密认证
	conn, err := grpc.Dial(":8973", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("ngrpc.Dial err: %v\n", err)
	}
	defer conn.Close()

	// 2. 发起调用
	client := pb.NewServerClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	respAdd, err := client.Operate(ctx, &pb.Req{X: 200, Y: 100, Op: pb.Op_Add})
	if err != nil {
		log.Printf("client.Operate err: %v\n", err)
	}
	fmt.Println("respAdd:", respAdd.Result)

	respOrder, err := client.GetOrderInfo(ctx, &pb.OrderReq{OrderId: 100})
	if err != nil {
		log.Printf("client.GetOrderInfo err: %v\n", err)
	}
	fmt.Println("OrderId:", respOrder.OrderId)
	fmt.Println("OrderName:", respOrder.Name)
}

服务端编译启动,客户端请求即可

./grpc_server

./grpc_client
respAdd: 300
OrderId: 100
OrderName: linda_order

# 编写Python版RPC客户端

https://grpc.io (opens new window)

  • 安装依赖
pip install grpcio
pip install grpcio-tools
  • 拷贝 server.proto
syntax = "proto3";

option go_package = "grpc_client/proto";

package proto;

// Op 枚举 操作类型
enum Op {
    _ = 0;
    Add = 1;
    Sub = 2;
    Mul = 3;
    Div = 4;
}

// Req 请求服务参数
message Req{
    int64 x = 1;
    int64 y = 2;
    Op op = 3;
}

// Resp 响应结果参数
message Resp {
    int64 result = 1;
}

message OrderReq{
    int64 order_id = 1;
}

message OrderResp {
    int64 order_id = 1;
    string name = 2;
    int64 price = 3;
    int64 count = 4;
}

// Server 服务接口规范
service Server {
    rpc Operate(Req) returns (Resp){}
    rpc GetOrderInfo(OrderReq) returns (OrderResp){}
}
  • 生成代码
python -m grpc_tools.protoc -I ./proto --python_out=. --pyi_out=. --grpc_python_out=. ./proto/server.proto
  • client.py
from __future__ import print_function

import logging

import grpc
import server_pb2
import server_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel('localhost:8974') as channel:
        stub = server_pb2_grpc.ServerStub(channel)
        response = stub.GetOrderInfo(server_pb2.OrderReq(order_id=100))
        print(response.name)


if __name__ == '__main__':
    logging.basicConfig()
    run()
上次更新: 12/18/2022, 5:17:21 PM