grpc框架分析一 : Server分析

项目地址:https://github.com/yakaa/grpcx

grpc官网使用案例参考案例结合本框架分析:

  1. 监听tcp端口
  2. 生成一个*grpc.Server对象, 
  3. 把*grpc.Server对象和 实现UserServiceServer接口的实例绑定起来
  4. 启动grpc服务器
func main() {
	lis, err := net.Listen("tcp",fmt.Sprintf(":%d",9000))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	log.Println("UserService to listen:9000 success")
	grpcServer := grpc.NewServer()
	protoFiles_hello.RegisterUserServiceServer(grpcServer,&impl.UserService{})
	grpcServer.Serve(lis)
}

RpcServer框架分析

主要函数,  conf是一个配置文件  Endpoints是etcd的服务器地址,如果有多个集群可以使用,往后面追加地址

func Server(count int) {
	conf := &config.ServiceConf{
		EtcdAuth:      config.EtcdAuth{},
		Schema:        "www.vector.com",
		ServerName:    "knowing",
		Endpoints:     []string{"127.0.0.1:2379"},
		ServerAddress: ":20001",
	}
	demo := &RegionHandlerServer{ServerAddress: conf.ServerAddress}
	rpcServer, err := grpcx.MustNewGrpcxServer(conf, func(server *grpc.Server) {
		proto.RegisterRegionHandlerServer(server, demo)
	})
	if err != nil {
		panic(err)
	}
	log.Fatal(rpcServer.Run())
}

grpc.MustNewGrpcxsServer函数

  1. clientv3.New是生成一个etcd的client对象,
  2. conf.ServerAddress是":20001", findLocalAddress是返回客户端的内网IP,
  3. strings.Join是把address切片,用colon结合起来,就是 example: 192.168.1.2:20001  ,在赋值给conf.ServerAddress参数
  4. GrpcxServiceFunc是
    func(server *grpc.Server) {
       proto.RegisterRegionHandlerServer(server, demo)
    } 
  5. GrpcxServer的register参数
    register: register.NewRegister(
       conf.Schema,    //www.vector.com
       conf.ServerName,  //knowing
       conf.ServerAddress,  //192.168.1.2:20001
       client3,           //etcd3的client
    )
  6. register参数的配置 

    1. func NewRegister(
          schema string,
          serverName string,
          serverAddress string,
          client3 *clientv3.Client,
      ) *Register {
          return &Register{
              schema:        schema,     //www.vector.com
              serverName:    serverName,   //knowing
              serverAddress: serverAddress,   //192.168.1.2:20001
              client3:       client3,
              interval:      3 * time.Second,   
              leaseTime:     6,                      //续租间隔
              stop:          make(chan bool, 1),    
              fullAddress:   fmt.Sprintf("%s/%s/%x", schema, serverName, md5.Sum([]byte(serverAddress))),                                //www.vector.com/knowing/md5(192.168.1.2:20001)
          }
      }

func MustNewGrpcxServer(conf *config.ServiceConf, rpcServiceFunc GrpcxServiceFunc) (*GrpcxServer, error) {
	client3, err := clientv3.New(
		clientv3.Config{
			Endpoints:   conf.Endpoints,
			Username:    conf.UserName,
			Password:    conf.PassWord,
			DialTimeout: config.GrpcxDialTimeout,
		})
	if nil != err {
		return nil, err
	}
	address := strings.Split(conf.ServerAddress, colon)
	if len(address) == 1 {
		return nil, errors.New("ServerAddress must container :")
	}
	if strings.TrimSpace(address[0]) == "" {
		address[0] = FindLocalAddress()
	}
	conf.ServerAddress = strings.Join(address, colon)
	return &GrpcxServer{
		register: register.NewRegister(
			conf.Schema,
			conf.ServerName,
			conf.ServerAddress,
			client3,
		),
		rpcServiceFunc: rpcServiceFunc,
	}, nil
}

 run函数就是常规的启动grpc服务器 

  1. s.register.GetServerAddress   =   192.168.1.2:20001
  2. og4g.InfoFormat是打印出ectd的key和  name 和rpc的服务器监听端口信息

  3. s.register.Register看下面介绍

  4. gprc.NewServer ,然后执行s.rpcServiceFunc函数,s.rpcServiceFunc实际就是proto.RegisterRegionHandlerServer(server, &RegionHandlerServer{ServerAddress: conf.ServerAddress})    进行了常规的 rpc服务器启动操作

  5. s.deadNotify看下面介绍

func (s *GrpcxServer) Run(serverOptions ...grpc.ServerOption) error {
	listen, err := net.Listen("tcp", s.register.GetServerAddress())
	if nil != err {
		return err
	}
	log4g.InfoFormat(
		"serverAddress [%s] of %s Rpc server has started and full key [%s]",
		s.register.GetServerAddress(),
		s.register.GetServerName(),
		s.register.GetFullAddress(),
	)
	if err := s.register.Register(); err != nil {
		return err
	}
	server := grpc.NewServer(serverOptions...)
	s.rpcServiceFunc(server)
	s.deadNotify()
	if err := server.Serve(listen); nil != err {
		return err
	}
	return nil

}

s.register.Register()函数  (主要作用就是把rpc服务器节点信息写入到etcd服务器中)

  1. ticker 的用处是间隔r.interval (在NewRegister的时候赋值为3秒钟),执行一次for循环
  2. goroutine的函数是从etcd服务器中读取节点信息, 如果 getResp.Count = 0表示没有节点信息
  3. 就进入withAlive 函数,把节点信息加入到etcd服务器中,然后keepalived进行包活操作
func (r *Register) Register() error {
	ticker := time.NewTicker(r.interval)
	go func() {
		for {
			//这里的if实际是 err != nil 的判断,首先先从etcd中获取r.fullAddress的节点信息
			//如果获取不到就去包活
			if getResp, err := r.client3.Get(context.Background(), r.fullAddress); err != nil {
				log4g.Error(err)
			} else if getResp.Count == 0 {
				if err = r.withAlive(); err != nil {
					log4g.Error(err)
				}
			}

			//这个select是阻塞退出循环的作用 意思是3秒执行一次
			select {
			case <-ticker.C:
			case <-r.stop:
				return
			}
		}
	}()
	return nil
}

s.deadNotify()函数(主要作用就是根据系统底层传入的程序运行状态结束进程)

  1. <-ch这里会阻塞,如果程序退出就会执行log4g.InfoFormat函数, ch阻塞的之后他本身的log4g也是阻塞的,
  2. 不阻塞后,则会进行etcd的反注册流程,打印服务器信息
func (s *GrpcxServer) deadNotify() {
	ch := make(chan os.Signal, 1) //
	signal.Notify(ch, deadSignal...)
	go func() {

		log4g.InfoFormat(
			"serverAddress [%s] of %s Rpc server has existed with got signal [%v] and full key [%s]",
			s.register.GetServerAddress(),
			s.register.GetServerName(),
			<-ch,
			s.register.GetFullAddress(),
		)


		if err := s.register.UnRegister(); err != nil {
			log4g.InfoFormat(
				"serverAddress [%s] of %s Rpc server UnRegister fail and  err %+v and full key [%s]",
				s.register.GetServerAddress(),
				s.register.GetServerName(),
				s.register.GetFullAddress(),
				err,
				s.register.GetFullAddress(),
			)
		}
		os.Exit(1) //
	}()
	return
}

到这里整个server就正常运行起来了

发布了166 篇原创文章 · 获赞 26 · 访问量 15万+

猜你喜欢

转载自blog.csdn.net/qq_28710983/article/details/93417310