grpc分析

介绍

中文文档
英文文档
gRPC 一开始由 google 开发,是一款语言中立、平台中立、开源的远程过程调用(RPC)系统。

Example

定义proto文件

syntax = "proto3";

option go_package = "./proto/";  // 生成的代码放入指定的package中,新版本需要指定。
package user;

message UserInfo{
        string name = 1;
        int32   age = 2;
}

message UserInfoReq{
        UserInfo        Users = 1;
}

message UserInfoResp{
        int32   code = 1;
        string  msg = 2;
}

service User{
        rpc GetUserInfo(UserInfoReq) returns (UserInfoResp){}
}

根据文档中的插件使用命令生成两个文件 user_grpc.pb.go、 user.pb.go
使用插件版本:protoc libprotoc 3.21.12、protoc-gen-go v1.28.1、protoc-gen-go-grpc 1.2.0。

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/*.proto

其中的 --go_opt=paths=source_relative:paths 参数有两个选项,import和 source_relative。
默认为 import ,代表按照生成的 go 代码的包的全路径去创建目录层级,source_relative 代表按照 proto 源文件的目录层级去创建 go 代码的目录层级,如果目录已存在则不用创建。

user_grpc.pb.go内容如下:

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc             v3.21.12
// source: proto/user.proto

package proto

import (
        context "context"
        grpc "google.golang.org/grpc"
        codes "google.golang.org/grpc/codes"
        status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

// UserClient is the client API for User service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserClient interface {
        GetUserInfo(ctx context.Context, in *UserInfoReq, opts ...grpc.CallOption) (*UserInfoResp, error)
}

type userClient struct {
        cc grpc.ClientConnInterface
}

func NewUserClient(cc grpc.ClientConnInterface) UserClient {
        return &userClient{cc}
}

func (c *userClient) GetUserInfo(ctx context.Context, in *UserInfoReq, opts ...grpc.CallOption) (*UserInfoResp, error) {
        out := new(UserInfoResp)
        err := c.cc.Invoke(ctx, "/user.User/GetUserInfo", in, out, opts...)
        if err != nil {
                return nil, err
        }
        return out, nil
}

// UserServer is the server API for User service.
// All implementations must embed UnimplementedUserServer
// for forward compatibility
type UserServer interface {
        GetUserInfo(context.Context, *UserInfoReq) (*UserInfoResp, error)
        mustEmbedUnimplementedUserServer()
}

// UnimplementedUserServer must be embedded to have forward compatible implementations.
type UnimplementedUserServer struct {
}

func (UnimplementedUserServer) GetUserInfo(context.Context, *UserInfoReq) (*UserInfoResp, error) {
        return nil, status.Errorf(codes.Unimplemented, "method GetUserInfo not implemented")
}
func (UnimplementedUserServer) mustEmbedUnimplementedUserServer() {}

// UnsafeUserServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to UserServer will
// result in compilation errors.
type UnsafeUserServer interface {
        mustEmbedUnimplementedUserServer()
}

func RegisterUserServer(s grpc.ServiceRegistrar, srv UserServer) {
        s.RegisterService(&User_ServiceDesc, srv)
}

func _User_GetUserInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
        in := new(UserInfoReq)
        if err := dec(in); err != nil {
                return nil, err
        }
        if interceptor == nil {
                return srv.(UserServer).GetUserInfo(ctx, in)
        }
        info := &grpc.UnaryServerInfo{
                Server:     srv,
                FullMethod: "/user.User/GetUserInfo",
        }
        handler := func(ctx context.Context, req interface{}) (interface{}, error) {
                return srv.(UserServer).GetUserInfo(ctx, req.(*UserInfoReq))
        }
        return interceptor(ctx, in, info, handler)
}

// User_ServiceDesc is the grpc.ServiceDesc for User service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var User_ServiceDesc = grpc.ServiceDesc{
        ServiceName: "user.User",
        HandlerType: (*UserServer)(nil),
        Methods: []grpc.MethodDesc{
                {
                        MethodName: "GetUserInfo",
                        Handler:    _User_GetUserInfo_Handler,
                },
        },
        Streams:  []grpc.StreamDesc{},
        Metadata: "proto/user.proto",
}

user.pb.go 内容如下:

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
//      protoc-gen-go v1.28.1
//      protoc        v3.21.12
// source: proto/user.proto

package proto

import (
        protoreflect "google.golang.org/protobuf/reflect/protoreflect"
        protoimpl "google.golang.org/protobuf/runtime/protoimpl"
        reflect "reflect"
        sync "sync"
)

const (
        // Verify that this generated code is sufficiently up-to-date.
        _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
        // Verify that runtime/protoimpl is sufficiently up-to-date.
        _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type UserInfo struct {
        state         protoimpl.MessageState
        sizeCache     protoimpl.SizeCache
        unknownFields protoimpl.UnknownFields

        Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
        Age  int32  `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"`
}

func (x *UserInfo) Reset() {
        *x = UserInfo{}
        if protoimpl.UnsafeEnabled {
                mi := &file_proto_user_proto_msgTypes[0]
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                ms.StoreMessageInfo(mi)
        }
}

func (x *UserInfo) String() string {
        return protoimpl.X.MessageStringOf(x)
}

func (*UserInfo) ProtoMessage() {}

func (x *UserInfo) ProtoReflect() protoreflect.Message {
        mi := &file_proto_user_proto_msgTypes[0]
        if protoimpl.UnsafeEnabled && x != nil {
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                if ms.LoadMessageInfo() == nil {
                        ms.StoreMessageInfo(mi)
                }
                return ms
        }
        return mi.MessageOf(x)
}

// Deprecated: Use UserInfo.ProtoReflect.Descriptor instead.
func (*UserInfo) Descriptor() ([]byte, []int) {
        return file_proto_user_proto_rawDescGZIP(), []int{0}
}

func (x *UserInfo) GetName() string {
        if x != nil {
                return x.Name
        }
        return ""
}

func (x *UserInfo) GetAge() int32 {
        if x != nil {
                return x.Age
        }
        return 0
}

type UserInfoReq struct {
        state         protoimpl.MessageState
        sizeCache     protoimpl.SizeCache
        unknownFields protoimpl.UnknownFields

        Users *UserInfo `protobuf:"bytes,1,opt,name=Users,proto3" json:"Users,omitempty"`
}

func (x *UserInfoReq) Reset() {
        *x = UserInfoReq{}
        if protoimpl.UnsafeEnabled {
                mi := &file_proto_user_proto_msgTypes[1]
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                ms.StoreMessageInfo(mi)
        }
}

func (x *UserInfoReq) String() string {
        return protoimpl.X.MessageStringOf(x)
}

func (*UserInfoReq) ProtoMessage() {}

func (x *UserInfoReq) ProtoReflect() protoreflect.Message {
        mi := &file_proto_user_proto_msgTypes[1]
        if protoimpl.UnsafeEnabled && x != nil {
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                if ms.LoadMessageInfo() == nil {
                        ms.StoreMessageInfo(mi)
                }
                return ms
        }
        return mi.MessageOf(x)
}

// Deprecated: Use UserInfoReq.ProtoReflect.Descriptor instead.
func (*UserInfoReq) Descriptor() ([]byte, []int) {
        return file_proto_user_proto_rawDescGZIP(), []int{1}
}

func (x *UserInfoReq) GetUsers() *UserInfo {
        if x != nil {
                return x.Users
        }
        return nil
}

type UserInfoResp struct {
        state         protoimpl.MessageState
        sizeCache     protoimpl.SizeCache
        unknownFields protoimpl.UnknownFields

        Code int32  `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
        Msg  string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
}

func (x *UserInfoResp) Reset() {
        *x = UserInfoResp{}
        if protoimpl.UnsafeEnabled {
                mi := &file_proto_user_proto_msgTypes[2]
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                ms.StoreMessageInfo(mi)
        }
}

func (x *UserInfoResp) String() string {
        return protoimpl.X.MessageStringOf(x)
}

func (*UserInfoResp) ProtoMessage() {}

func (x *UserInfoResp) ProtoReflect() protoreflect.Message {
        mi := &file_proto_user_proto_msgTypes[2]
        if protoimpl.UnsafeEnabled && x != nil {
                ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
                if ms.LoadMessageInfo() == nil {
                        ms.StoreMessageInfo(mi)
                }
                return ms
        }
        return mi.MessageOf(x)
}

// Deprecated: Use UserInfoResp.ProtoReflect.Descriptor instead.
func (*UserInfoResp) Descriptor() ([]byte, []int) {
        return file_proto_user_proto_rawDescGZIP(), []int{2}
}

func (x *UserInfoResp) GetCode() int32 {
        if x != nil {
                return x.Code
        }
        return 0
}

func (x *UserInfoResp) GetMsg() string {
        if x != nil {
                return x.Msg
        }
        return ""
}

var File_proto_user_proto protoreflect.FileDescriptor

var file_proto_user_proto_rawDesc = []byte{
        0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x75, 0x73, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f,
        0x74, 0x6f, 0x12, 0x04, 0x75, 0x73, 0x65, 0x72, 0x22, 0x30, 0x0a, 0x08, 0x55, 0x73, 0x65, 0x72,
        0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
        0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18,
        0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x61, 0x67, 0x65, 0x22, 0x33, 0x0a, 0x0b, 0x55, 0x73,
        0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x12, 0x24, 0x0a, 0x05, 0x55, 0x73, 0x65,
        0x72, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x75, 0x73, 0x65, 0x72, 0x2e,
        0x55, 0x73, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x55, 0x73, 0x65, 0x72, 0x73, 0x22,
        0x34, 0x0a, 0x0c, 0x55, 0x73, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x12,
        0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63,
        0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
        0x52, 0x03, 0x6d, 0x73, 0x67, 0x32, 0x3e, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x36, 0x0a,
        0x0b, 0x47, 0x65, 0x74, 0x55, 0x73, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x11, 0x2e, 0x75,
        0x73, 0x65, 0x72, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x1a,
        0x12, 0x2e, 0x75, 0x73, 0x65, 0x72, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52,
        0x65, 0x73, 0x70, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
        0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
        file_proto_user_proto_rawDescOnce sync.Once
        file_proto_user_proto_rawDescData = file_proto_user_proto_rawDesc
)

func file_proto_user_proto_rawDescGZIP() []byte {
        file_proto_user_proto_rawDescOnce.Do(func() {
                file_proto_user_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_user_proto_rawDescData)
        })
        return file_proto_user_proto_rawDescData
}

var file_proto_user_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_proto_user_proto_goTypes = []interface{}{
        (*UserInfo)(nil),     // 0: user.UserInfo
        (*UserInfoReq)(nil),  // 1: user.UserInfoReq
        (*UserInfoResp)(nil), // 2: user.UserInfoResp
}
var file_proto_user_proto_depIdxs = []int32{
        0, // 0: user.UserInfoReq.Users:type_name -> user.UserInfo
        1, // 1: user.User.GetUserInfo:input_type -> user.UserInfoReq
        2, // 2: user.User.GetUserInfo:output_type -> user.UserInfoResp
        2, // [2:3] is the sub-list for method output_type
        1, // [1:2] is the sub-list for method input_type
        1, // [1:1] is the sub-list for extension type_name
        1, // [1:1] is the sub-list for extension extendee
        0, // [0:1] is the sub-list for field type_name
}

func init() { file_proto_user_proto_init() }
func file_proto_user_proto_init() {
        if File_proto_user_proto != nil {
                return
        }
        if !protoimpl.UnsafeEnabled {
                file_proto_user_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
                        switch v := v.(*UserInfo); i {
                        case 0:
                                return &v.state
                        case 1:
                                return &v.sizeCache
                        case 2:
                                return &v.unknownFields
                        default:
                                return nil
                        }
                }
                file_proto_user_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
                        switch v := v.(*UserInfoReq); i {
                        case 0:
                                return &v.state
                        case 1:
                                return &v.sizeCache
                        case 2:
                                return &v.unknownFields
                        default:
                                return nil
                        }
                }
                file_proto_user_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
                        switch v := v.(*UserInfoResp); i {
                        case 0:
                                return &v.state
                        case 1:
                                return &v.sizeCache
                        case 2:
                                return &v.unknownFields
                        default:
                                return nil
                        }
                }
        }
        type x struct{}
        out := protoimpl.TypeBuilder{
                File: protoimpl.DescBuilder{
                        GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
                        RawDescriptor: file_proto_user_proto_rawDesc,
                        NumEnums:      0,
                        NumMessages:   3,
                        NumExtensions: 0,
                        NumServices:   1,
                },
                GoTypes:           file_proto_user_proto_goTypes,
                DependencyIndexes: file_proto_user_proto_depIdxs,
                MessageInfos:      file_proto_user_proto_msgTypes,
        }.Build()
        File_proto_user_proto = out.File
        file_proto_user_proto_rawDesc = nil
        file_proto_user_proto_goTypes = nil
        file_proto_user_proto_depIdxs = nil
}

使用

demo

package main

import(
        pb "grpc/proto"
        "fmt"
        "log"
        "net"
        "context"
        "time"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
)

var(
        port = ":9090"
)

type user struct{
        pb.UnimplementedUserServer //必须嵌入才能具有向前兼容的实现
}

func (u *user) GetUserInfo(context.Context, *pb.UserInfoReq)(*pb.UserInfoResp, error) {
        fmt.Println("----",111)


        return &pb.UserInfoResp{Code: 200, Msg: "success"}, nil

}

func init(){
        log.SetFlags(log.Lshortfile)
}

func main() {
        go server()

        time.Sleep(time.Second)

        client()
}


func server(){
        lis, err := net.Listen("tcp", port)
        if err != nil {
                log.Fatalf("failed to listen: %v", err)

                return
        }

        gsvr := grpc.NewServer()

        pb.RegisterUserServer(gsvr, &user{})

        if err := gsvr.Serve(lis); err != nil {
                log.Fatalf("failed to serve: %v", err)
        }
}

func client(){
        conn, err := grpc.Dial(port, grpc.WithTransportCredentials(insecure.NewCredentials()))
        if err != nil {
                log.Fatalf("did not connect: %v", err)

                return
        }

        defer conn.Close()

        client := pb.NewUserClient(conn)

        u := &pb.UserInfoReq{
                Users: &pb.UserInfo{
                        Name:   "jn",
                        Age: 18,
                },
        }

        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()


        resp, err := client.GetUserInfo(ctx, u)
        if err != nil {
                log.Println(err)
        }

        fmt.Println(resp)
}

问题

新版protoc-gen-go不支持grpc服务生成,需要通过protoc-gen-go-grpc生成grpc服务接口。
但是生成的Server端接口中会出现一个mustEmbedUnimplemented***方法。
}
生成的grpc代码中有这样的注解:

// UserServer is the server API for User service.
// All implementations must embed UnimplementedUserServer
// for forward compatibility
type UserServer interface {
	GetUserInfo(context.Context, *UserInfoReq) (*UserInfoResp, error)
	mustEmbedUnimplementedUserServer()
}

所有实现都必须嵌入UnimplementedUserServer, 用于前向兼容性。
UnimplementedGreetServiceServer是一个带有所有实现方法的结构。 如果我在proto文件中添加更多的RPC服务,那么我就不需要添加所有导致向前兼容的RPC方法。

源码解析

server

type Server struct {
	opts serverOptions

	mu  sync.Mutex // guards following
	lis map[net.Listener]bool
	// conns contains all active server transports. It is a map keyed on a
	// listener address with the value being the set of active transports
	// belonging to that listener.
	conns    map[string]map[transport.ServerTransport]bool
	serve    bool
	drain    bool
	cv       *sync.Cond              // signaled when connections close for GracefulStop
	services map[string]*serviceInfo // service name -> service info
	events   trace.EventLog

	quit               *grpcsync.Event
	done               *grpcsync.Event
	channelzRemoveOnce sync.Once
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

	channelzID *channelz.Identifier
	czData     *channelzData

	serverWorkerChannels []chan *serverWorkerData
}

server端:
先进行注册,RegisterService注册proto生成的ServiceDesc信息。

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
	if ss != nil {
		ht := reflect.TypeOf(sd.HandlerType).Elem()
		st := reflect.TypeOf(ss)
		if !st.Implements(ht) {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
		}
	}
	s.register(sd, ss)
}

判断校验是否是合格的内容,在走注册方法。

grpc server的真正启动的接口为: Serve

*******
var tempDelay time.Duration // how long to sleep on accept failure
for {
	rawConn, err := lis.Accept()
	if err != nil {
		if ne, ok := err.(interface {
			Temporary() bool
		}); ok && ne.Temporary() {
			if tempDelay == 0 {
				tempDelay = 5 * time.Millisecond
			} else {
				tempDelay *= 2
			}
			if max := 1 * time.Second; tempDelay > max {
				tempDelay = max
			}
			s.mu.Lock()
			s.printf("Accept error: %v; retrying in %v", err, tempDelay)
			s.mu.Unlock()
			timer := time.NewTimer(tempDelay)
			select {
			case <-timer.C:
			case <-s.quit.Done():
				timer.Stop()
				return nil
			}
			continue
		}
		s.mu.Lock()
		s.printf("done serving; Accept = %v", err)
		s.mu.Unlock()

		if s.quit.HasFired() {
			return nil
		}
		return err
	}
	tempDelay = 0
	// Start a new goroutine to deal with rawConn so we don't stall this Accept
	// loop goroutine.
	//
	// Make sure we account for the goroutine so GracefulStop doesn't nil out
	// s.conns before this conn can be added.
	s.serveWG.Add(1)
	go func() {
		s.handleRawConn(lis.Addr().String(), rawConn)
		s.serveWG.Done()
	}()
}

可以看到大致逻辑是在一个for循环中,监听注册的tcp端口,如果有请求过来了,如果接受tcp请求时,发生err, 则休眠一段时间(刚开始休眠5ms, 然后不停翻倍,最多休眠1s), 则在一个协程中进行处理,同时使用waitgroup进行计数(这个跟重启有关),所以核心处理逻辑是最后的handleRawConn方法.

后开启了一个http2连接st,newHTTP2Transport就是建立一个http2的Transport

st := s.newHTTP2Transport(rawConn)

最关键的代码是新开启的协程里的s.serveStreams(st)。处理部分是

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {}

grpc请求的路径是service/method,因此可以看到代码前面是找到/的位置,然后分割得出service和name名称,再通过的grpc server结构体找到service和method。
再调用s.processUnaryRPC(t, stream, srv, md, trInfo)或者s.processStreamingRPC(t, stream, srv, sd, trInfo)进行调用,只看比较简单的unary rpc处理接口,可以看到方法中终于有对handler进行的处理。

server总结:server端整体概括,就是监听端口,然后将受到的请求转发,根据method name转发到对应的handler进行处理

client

初始化了ClientConn结构体

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target, // ip/porrt
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
}

NewUserClient是pb生成的go文件中的方法,其实就是封装了conn而已。
RPC调用通过调用cc的Invoke方法进行处理,这个cc就是client封装的conn。

rpc调用:

func (c *userClient) GetUserInfo(ctx context.Context, in *UserInfoReq, opts ...grpc.CallOption) (*UserInfoResp, error) {
	out := new(UserInfoResp)
	err := c.cc.Invoke(ctx, "/user.User/GetUserInfo", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
	// allow interceptor to see all applicable call options, which means those
	// configured as defaults from dial option as well as per-call options
	opts = combine(cc.dopts.callOptions, opts)

	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

一路往下有个SendMsg和RecvMsg


func (cs *clientStream) SendMsg(m interface{}) (err error) {
	defer func() {
		if err != nil && err != io.EOF {
			// Call finish on the client stream for errors generated by this SendMsg
			// call, as these indicate problems created by this client.  (Transport
			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
			// error will be returned from RecvMsg eventually in that case, or be
			// retried.)
			cs.finish(err)
		}
	}()
	if cs.sentLast {
		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
	}
	if !cs.desc.ClientStreams {
		cs.sentLast = true
	}

	// load hdr, payload, data
	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
	if err != nil {
		return err
	}

	// TODO(dfawley): should we be checking len(data) instead?
	if len(payload) > *cs.callInfo.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
	}
	op := func(a *csAttempt) error {
		return a.sendMsg(m, hdr, payload, data)
	}
	err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
	if len(cs.binlogs) != 0 && err == nil {
		cm := &binarylog.ClientMessage{
			OnClientSide: true,
			Message:      data,
		}
		for _, binlog := range cs.binlogs {
			binlog.Log(cm)
		}
	}
	return err
}

func (cs *clientStream) RecvMsg(m interface{}) error {
	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
		// Call Header() to binary log header if it's not already logged.
		cs.Header()
	}
	var recvInfo *payloadInfo
	if len(cs.binlogs) != 0 {
		recvInfo = &payloadInfo{}
	}
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m, recvInfo)
	}, cs.commitAttemptLocked)
	if len(cs.binlogs) != 0 && err == nil {
		sm := &binarylog.ServerMessage{
			OnClientSide: true,
			Message:      recvInfo.uncompressedBytes,
		}
		for _, binlog := range cs.binlogs {
			binlog.Log(sm)
		}
	}
	if err != nil || !cs.desc.ServerStreams {
		// err != nil or non-server-streaming indicates end of stream.
		cs.finish(err)

		if len(cs.binlogs) != 0 {
			// finish will not log Trailer. Log Trailer here.
			logEntry := &binarylog.ServerTrailer{
				OnClientSide: true,
				Trailer:      cs.Trailer(),
				Err:          err,
			}
			if logEntry.Err == io.EOF {
				logEntry.Err = nil
			}
			if peer, ok := peer.FromContext(cs.Context()); ok {
				logEntry.PeerAddr = peer.Addr
			}
			for _, binlog := range cs.binlogs {
				binlog.Log(logEntry)
		}
	}
	return err
}

处理封装,通过ClientTransport进行发送

猜你喜欢

转载自blog.csdn.net/weixin_56766616/article/details/129956002