背景:
公司内部各服务(java,pyhton,go)想调取中台数据中心数据,中台有两种服务搭建选择:
1.REST http请求
2.RPC 远程过程调用
实现及遇到的坑
使用Go将HTTP/JSON转编码到gRPC
这个国外总结的已经能解决绝大部分编译proto过程中的问题了,希望对读这样有用,我亲测OK!
部分示例
1.device.proto编写
syntax = "proto3";
package dbclient;
import "google/api/annotations.proto";
option java_package = "com.yingzi.iot.business.edge.grpc.dbclient.device";
option go_package = "./;dbclient";
// device 相关接口服务
service Device {
// 获取所有设备列表
rpc get_device_list (GetDeviceListReq) returns (GetDeviceListRes) {
option (google.api.http) = {
get: "/grpc/devices"
};
}
// 获取指定设备uuid的设备详情
rpc get_device (GetDeviceReq) returns (GetDeviceRes) {
option (google.api.http) = {
get: "/grpc/devices/{device_uuid}"
};
}
// 根据过滤条件获取设备详情
rpc get_device_by_filter (GetDeviceByFilterReq) returns (GetDeviceByFilterRes) {
option (google.api.http) = {
get: "/grpc/devicesFilter"
};
}
// 更新设备通用消息
rpc update_device (UpdateDeviceReq) returns (UpdateDeviceRes) {
option (google.api.http) = {
put: "/grpc/devices/{device_uuid}"
body: "*"
};
}
}
// public: 设备信息结构
message DeviceData {
string device_uuid = 1; // 设备uuid
string device_type = 2; // 设备类型
bool is_gateway = 3; // 是否网关
bool is_enabled = 4; // 设备是否启用
bool is_online = 5; // 设备是否在线
bool is_direct_device = 6; // 设备是否为直连设备
bool is_iot_registered = 7; // 设备是否IoT注册
string iot_registered_at = 8; // 设备注册IoT成功时间
string last_sync_at = 9; // 最后同步时间
string parent_device_uuid = 10; // 父设备唯一标识
string alias = 11; // 设备别名
string description = 12; // 设备描述
string business_type = 13; // 业务类型
string device_mac = 14; // 辅助唯一标识:设备mac
string device_id = 15; // 辅助唯一标识:设备id
int64 device_code = 16; // 设备状态码
string protocol_version = 17; // 协议版本
string protocol_type = 18; // 协议类型, 如"http"、"mqtt"
bytes versions = 19; // 设备固件版本
bytes device_properties = 20; // 设备端属性扩展
bytes shared_properties = 21; // 设备共享属性
bytes shadow_desired = 22; // 设备影子-下发
bytes shadow_reported = 23; // 设备影子-上报
bytes device_info = 24; // 设备信息
string created_time = 25; // 创建时间
string updated_time = 26; // 更新时间
}
// 请求,获取设备列表
message GetDeviceListReq {
oneof is_gateway_oneof {
bool is_gateway = 1; // 可选,是否为网关
}
oneof is_direct_device_oneof {
bool is_direct_device = 2; // 可选,是否为直连设备
}
oneof is_online_oneof {
bool is_online = 3; // 可选,是否在线
}
oneof is_enabled_oneof {
bool is_enabled = 4; // 可选,是否启用
}
oneof is_iot_registered_oneof {
bool is_iot_registered = 5; // 可选,是否注册云端
}
string device_type = 6; // 可选,设备型号,精确查询,查询多个以逗号(,)隔开
string device_uuid = 7; // 可选,设备uuid,精确查询,查询多个以逗号(,)隔开
string parent_device_uuid = 8; // 可选,父设备uuid,精确查询,查询多个以逗号(,)隔开
string business_type = 9; // 可选,业务类型,精确查询,查询多个以逗号(,)隔开
map<string, string> device_properties_like = 10; // 可选,设备属性扩展,map格式,如 {
"rk_version": "v1.2.1", "device_last_job": "12:30:12"}
map<string, string> shared_properties_like = 11; // 可选,设备共享属性,map格式,如 {
"rk_version": "v1.2.1", "device_last_job": "12:30:12"}
string start_time = 12; // 可选,设备注册起始时间,如'2021-04-13 09:00:00'
string end_time = 13; // 可选,设备注册结束时间,如'2021-04-13 16:00:00'
sint32 pos = 14; // 可选,分页页码,如果 pos=-1 则表示查询所有(高风险操作), 默认从 pos=1 第一页开始
uint32 step = 15; // 可选,分页每页大小,范围在 1~1000,默认50
string sort_key = 16; // 可选,自定义排序的字段(同接口参数定义),默认 "created_time",其他包括数据模型定义的所有字段,如:["device_uuid", "device_type", "alias", "is_gateway", "is_enabled", "is_online", "business_type", "created_time", "updated_time"]
string sort_by = 17; // 可选,升降排序规则,"asc"/"desc"
}
// 响应 获取设备列表
message GetDeviceListRes {
string error = 1; // 错误信息
uint32 amount = 2; // 数量
sint32 pos = 3; // 分页页码
uint32 step = 4; // 分页大小
repeated DeviceData data = 5; // 设备列表数据
}
// 请求 获取设备详情
message GetDeviceReq {
string device_uuid = 1; // 必填,设备uuid
}
// 响应 获取设备详情
message GetDeviceRes {
string error = 1; // 错误信息
DeviceData data = 2; // 设备详情数据
}
// 请求 根据过滤条件获取设备
message GetDeviceByFilterReq{
string device_mac = 1; // 可选,设备mac地址
string device_id = 2; // 可选,设备 device_id
oneof is_gateway_oneof {
bool is_gateway = 3; // 可选,是否是网联
}
oneof is_direct_device_oneof {
bool is_direct_device = 4; // 可选,是否是直连设备
}
}
// 响应 根据过滤条件获取设备
message GetDeviceByFilterRes {
string error = 1; // 错误信息
DeviceData data = 2; // 设备详情数据
}
// 请求,更新设备通用消息
message UpdateDeviceReq {
string device_uuid = 1; // 必选,设备uuid
oneof is_enabled_oneof {
bool is_enabled = 2; // 可选,true启用,false禁用
}
string alias = 3; // 可选,设备别名
string description = 4; // 可选,设备描述
string business_type = 5; // 可选,业务类型
bytes versions = 6; // 可选,固件版本信息,json格式,如{
"rk_version": "v1.2.1"}
bytes device_info = 7; // 可选,设备信息,json格式,如{
"ips":{
"eth0": {
"ip": "192.168.2.1","mac":"xxx"}}}
}
// 响应, 更新设备通用消息
message UpdateDeviceRes {
string error = 1; // 错误信息
string data = 2; // 更新结果
}
2.从google/apis下载两个编译所需文件到项目目录
3.编译proto的脚本(根据脚本位置和项目proto文件位置相应调整)
#!/bin/bash
# 类似如此方式,添加指令时修改成实际的目录名称及 proto 文件名称
# -I 或者 --proto_path:用于指定所编译的源码,就是我们所导入的proto文件,多个路径使用冒号分隔,默认为当前路径,按照顺序搜索
# --go_out: 用于设置自动生成的*.pb.go(序列化和反序列化相关)文件的输出目录,同样的也有其他语言的,例如--java_out、--python_out
# --go_opt: paths=source_relative 指定--go_out生成文件是基于相对路径的,会使proto文件中定义的option go_package定义的路径失效
# --go-grpc_out: 用于设置自动生成的*_grpc.pb.go(rpc接口相关)文件的输出目录
# --go-grpc_opt:
# paths=source_relative 指定--go_grpc_out生成文件是基于相对路径的
# require_unimplemented_servers=false 默认是true,会在server类多生成一个接口
# --grpc-gateway_out: 使用 protoc-gen-grpc-gateway 插件,用于设置生成pb.gw.go文件的输出目录
# --grpc-gateway_opt:
# logtostderr=true 记录log
# paths=source_relative 指定--grpc-gateway_out生成文件是基于相对路径的
# generate_unbound_methods=true 如果proto文件没有写api接口信息,也会默认生成
# --openapiv2_out: 使用 protoc-gen-openapiv2 插件,用于生成swagger.json 文件(本项目未使用到)
# 注意:新添加 proto 进行编译时,请先注释旧有脚本代码。使用 grpc 工具生成的序列化及反序列化模块
echo "Start compiling the proto file!"
protoc -I $(dirname "$PWD")/coreservice/grpc_service/ \
--go_out=$(dirname "$PWD")/coreservice/grpc_service/dbclient/ \
--go_opt=paths=source_relative \
--go-grpc_out=$(dirname "$PWD")/coreservice/grpc_service/dbclient/ \
--go-grpc_opt=paths=source_relative \
--grpc-gateway_out=$(dirname "$PWD")/coreservice/grpc_service/dbclient/ \
--grpc-gateway_opt=logtostderr=true \
--proto_path=$(dirname "$PWD")/coreservice/grpc_service/dbclient/: device.proto
echo "compile device.proto success!"
echo "Finish compiling the proto file!"
grpc服务启动函数示例
package grpcservice
import (
"edge-core-service/configs"
pb_dbclient "edge-core-service/coreservice/grpc_service/dbclient"
grpcserver "edge-core-service/coreservice/grpc_service/service"
"edge-core-service/edgedb"
"edge-core-service/pkg/logging"
"fmt"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc/keepalive"
"net"
"os/signal"
"syscall"
"time"
"os"
"go.uber.org/zap"
"google.golang.org/grpc"
)
func New(cfg *configs.Config) (*server, error) {
logger := logging.NewGrpcLogger(cfg)
return &server{
cfg: cfg,
logger: logger,
}, nil
}
type server struct {
cfg *configs.Config
logger *zap.Logger
}
func (s *server) Run() {
cfg := s.cfg
logger := s.logger
finishChan := make(chan os.Signal, 1)
signal.Notify(finishChan, syscall.SIGINT, syscall.SIGTERM)
logger.Info("Start gRPC-service", zap.Any("config", cfg))
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", cfg.GRPC.Port))
if err != nil {
logger.Fatal("failed to listen", zap.Error(err))
os.Exit(1)
}
var opts []grpc.ServerOption
var kaPolicy = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
var kaParams = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
opts = append(opts, grpc.UnaryInterceptor(grpcserver.Interceptor)) // 增加一元拦截器,验证secret-key
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaPolicy), grpc.KeepaliveParams(kaParams)) // keepalive 设置
gRPCServer := grpc.NewServer(opts...)
defer func() {
gRPCServer.GracefulStop()
}()
if _, err := edgedb.SetupConn(cfg); err != nil {
logger.Fatal("failed to setup conn to DB", zap.Error(err))
os.Exit(1)
}
// Register gRPC grpc
pb_dbclient.RegisterDeviceServer(gRPCServer, &grpcserver.Device{
Logger: logger})
//pb_dbclient.RegisterCodeServer(gRPCServer, &grpcserver.Code{Logger: logger})
//pb_dbclient.RegisterBusinessServer(gRPCServer, &grpcserver.Business{Logger: logger})
//pb_dbclient.RegisterConfigServer(gRPCServer, &grpcserver.Config{Logger: logger})
logger.Info(fmt.Sprintf("Serving gRPC on 0.0.0.0:%d", cfg.GRPC.Port))
go func() {
if err := gRPCServer.Serve(lis); err != nil {
logger.Fatal("failed to serve gRPC server", zap.Error(err))
os.Exit(1)
}
}()
maxMsgSize := 1024 * 1024 * 20
// Create a client connection to the gRPC server we just started
// This is where the gRPC-Gateway proxies the requests
endpoint := fmt.Sprintf("0.0.0.0:%d", cfg.GRPC.Port)
newOpts := make([]grpc.DialOption, 0)
newOpts = append(newOpts, grpc.WithBlock())
newOpts = append(newOpts, grpc.WithInsecure())
newOpts = append(newOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize), grpc.MaxCallSendMsgSize(maxMsgSize)))
serverMubOpts := make([]runtime.ServeMuxOption, 0)
// 取消当gRPC返回字段为该字段类型的默认值时为了节省带宽不会传输该字段的这一限制
serverMubOpts = append(serverMubOpts, runtime.WithMarshalerOption(runtime.MIMEWildcard,
&runtime.JSONPb{
OrigName: true, EmitDefaults: true}))
gwMux := runtime.NewServeMux(serverMubOpts...)
// Register User Service
err = pb_dbclient.RegisterDeviceHandlerFromEndpoint(context.Background(), gwMux, endpoint, newOpts)
if err != nil {
logger.Fatal("Failed to register gateway", zap.Error(err))
}
gwServer := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", cfg.GRPC.GatewayPort),
Handler: gwMux,
}
logger.Info(fmt.Sprintf("Serving gRPC-Gateway on 0.0.0.0:%d", cfg.GRPC.GatewayPort))
go func() {
if err := gwServer.ListenAndServe(); err != nil {
logger.Fatal("Failed to serve gRPC-Gateway", zap.Error(err))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := gwServer.Shutdown(ctx); err != nil {
logger.Error("server shutdown err", zap.Error(err))
}
}
}()
<-finishChan
}