一、基本原理
远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
具体原理参考:
引用其中一图,本文就以此架构图对官方net/rpc库分析:
golang官方的net/rpc库使用encoding/gob
进行编解码,gob编码方式仅被Go所支持,因此net/rpc库只能用于Go内部的rpc调用,无法进行跨语言调用, 但可以使用基于json编码的net/rpc/jsonrpc包进行这项工作。net/rpc同时支持tcp
或http
数据传输方式。
本文将首先展示使用net/rpc包进行rpc调用的主要方法,随后对于net/rpc包的结构和流程进行具体分析。
二、使用方法:
1、在一个独立的包rpc_objects中新建类型及方法
新定义一个类型 Args
及其方法 Multiply,
该方法将Args类型的参数中的两个整形成员相乘后,赋值给引用类型的参数Reply。
此处,net/rpc包要求要注册的函数必需以error类型作为返回值,因此函数运行结果反映在按引用传递的参数中。
package rpc_objects
type Args struct{
N,M int
}
//要对外提供的远程调用方法Multiply
func (a *Args)Multiply(args *Args,reply *int)error{
*reply=args.M*args.N
return nil
}
2、定义服务提供方,即rpc协议下的服务器端rpc_server.go
依次进行了服务方法注册、http请求处理函数注册、http端口的监听和对外服务。
package main
import (
"../rpc_objects"
"log"
"net"
"net/http"
"net/rpc"
"time"
)
func main() {
calc := new(rpc_objects.Args) //新建Args类型实例
rpc.Register(calc) //调用rpc.Register()函数进行注册
rpc.HandleHTTP() //调用rpc.HandleHTTP()
listener, e := net.Listen("tcp", "localhost:1234") //使用net包监听端口
if e != nil {
log.Fatal("Starting RPC-server -listen error:", e)
}
go http.Serve(listener, nil) //新建协程提供对外服务
time.Sleep(1000e9)
}
3、新建rpc协议客户端rpc_client.go
向服务端进行客户端连接后,调用call()函数,传递服务函数名(类型名.函数名)和调用函数参数,完成服务调用。
package main
import(
"../rpc_objects"
"fmt"
"log"
"net/rpc"
)
const serverAddress="localhost"
func main(){
client,err:=rpc.DialHTTP("tcp",serverAddress+":1234") //通过http协议连接服务器
if err!=nil{
log.Fatal(err)
}
args:=&rpc_objects.Args{13,4}
var reply int
err=client.Call("Args.Multiply",args,&reply) //进行服务调用
if err!=nil{
log.Fatal("Args error",err)
}
fmt.Println(reply)
}
三、源码学习
1、网络传输接口
不难看出,在上述rpc包的使用中,明显使用了http协议建立底层连接(server端使用http.serve函数),net/rpc便相当于在http协议基础上设计的高层应用协议,对Go的net/http包进行了使用和不完全的的封装,因此,可以先同上述函数调用入手,查看rpc使用http进行网络传输的入口。
使用net/http包建立服务器程序的核心为以下两个步骤:
1)http请求处理函数的注册
方法1:http.Handle("/myrpc",http.HandlerFunc(customFunc))
方法2:http.HandleFunc("/myrpc",customFunc)
其中,方法2接收一个类型为func(w http.ResponseWriter,r *http.Request)类型的处理函数,而方法一需要接收一个实现下述接口的实例:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
总而言之,对于http请求的处理,服务器端将通过处理函数注册最终落实到一个类型为func(ResponseWriter, *Request)的方法上。
2)监听端口并对外开放服务
分别调用net.Listen()和net.Serve()方法进行端口监听和服务提供
listener, e := net.Listen("tcp", "localhost:1234")
if e != nil {
log.Fatal("Starting RPC-server -listen error:", e)
}
go http.Serve(listener, nil)
time.Sleep(1000e9)
可以看出,在建立rpc服务器时明显已经直接使用http包进行了端口监听和服务提供,而http处理函数注册部分将由函数rpc.HandleHTTP()完成,主要流程为:
1)新建RPC Server类型实例,该实例唯一的包含了一个RPC服务器的主要信息,该实例的具体成员功能暂不探究
type Server struct {
serviceMap sync.Map // map[string]*service对外服务字典
reqLock sync.Mutex // protects freeReq 请求队列保护锁
freeReq *Request //请求队列(链表
respLock sync.Mutex // protects freeResp 响应队列保护锁
freeResp *Response //响应队列(链表
}
2)调用Server实例的http方法进行http处理方法注册,使用了一个默认的rpcPath,同时还有一个对于debugPath的debug方法
const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"
DefaultDebugPath = "/debug/rpc"
)
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}
可以看到在该函数中完成了http协议的处理方法注册过程,注册参数为实现了Handler接口的Server实例,对于host:port/rpcPath的http请求都将交由Server实例的ServeHTTP函数进行处理:
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" { //报错以非CONNECT方法发出的连接
...
}
conn, _, err := w.(http.Hijacker).Hijack() //利用Hijack()函数将HTTP的TCP连接取出
...
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") //打印响应头
server.ServeConn(conn)
}
可以看到,该方法经过预处理后,将本次TCP连接交由server.ServeConn()函数进一步处理。
值得学习的就是conn,_,err:=w.(http.Hijacker).Hijack()进行tcp连接的提取
至此,RPC服务端完成了对于HTTP网络传输的对接。
不难推测,在Client端在完成上层RPC请求消息的序列化、编码等封装后,通过访问host:port/rpcPath对RPC服务器端发出请求
rpc.DialHTTP=>DialHTTPPath
调用DialHTTPPath(network, address, DefaultRPCPath) //DefaultRPCPath = "/_goRPC_"
func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address) //向目标服务器建立TCP连接
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") //向目标服务器建立HTTP连接
//在转向RPC高层处理前,以CONNECT方式尝试进行成功连接
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil //使用已建立的conn连接创建新Clint实例
}
... //error处理内容
}
综合上述,可以看到:
- net/rpc包在底层的HTTP连接阶段通过统一的HTTP访问路径和统一的method(CONNECT)完成了RPC握手。
- 在建立连接和完成握手之后,Client和Server端均通过操作TCP连接实例conn进行通信。
过程示意图如下:
2、服务注册
下面将目光转向服务暴露或者说服务注册上,在图示的RPC架构中,需要一个服务中心的角色集中服务端所注册的服务并向客户端发布,但在目前学习到的net/rpc包中并不存在这样一个角色,目前可以考虑在服务端注册一个服务列表功能,并在客户端包中做调用该功能的相关封装。
所谓服务注册过程,即将自定义化要对外提供的的方法(服务)添加到服务器对外服务列表中,正如上述代码所展示的,需要向rpc.register函数传递一个带有相应方法的实例指针,根据源码注释的内容,方法必需遵循一系列格式要求:
- the method's type is exported. 方法所属类型必须是导出类型(首字母大写) - the method is exported. 方法本身也必须是被导出的(首字母大写) - the method has two arguments, both exported (or builtin) types. 方法接收两个被导出类型或内置类型的参数 - the method's second argument is a pointer. 方法的第二个参数必需是指针 - the method has return type error. 方法的返回值必须是一个error类型的量
可以说,这些约束是必要的:
- 一方面,用于对外提供服务的方法遵循统一的格式有利于服务发现的过程;
- 另一方面,net/rpc包需要大量的使用reflect包提供的反射技术来解析和存储服务方法,同样需要统一的方法格式;
- 最后因为参数值和返回值在类型和个数上必定各有不同,将两者都约束为一个结构,并通过引用参数进行结果反馈是一件很明智的事。
具体看服务注册过程,可以看到其主要的工作为:将输入结构通过reflect技术进行解析,建立一个与之对应的Service的实例,包含了以该实例为接收者的所有方法,最后添加到Server实例所存储的service字典中,具体如下:
rpc.Register()调用将最终调用到server.register方法用于根据传入值信息注册一个新服务。
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
func (server *Server) Register(rcvr interface{}) error { //接收一个空接口
return server.register(rcvr, "", false)
}
在server.register方法中,执行了建立服务实例、加入方法集字典、错误检查,最后添加入server的服务集的工作。
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service) //新建服务实例
s.typ = reflect.TypeOf(rcvr) //服务结构类型
s.rcvr = reflect.ValueOf(rcvr) //服务结构reflect.Value值
sname := reflect.Indirect(s.rcvr).Type().Name()
......//对于服务名称是否自定义做判断的部分处理
s.name = sname
// 对传入量所包含的方法进行reflect处理,生成map[string]*methodType型的注册方法字典
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
//对传入方法集为空时的错误处理,如尝试是否含有指针方法以提醒用户
//method := suitableMethods(reflect.PtrTo(s.typ), false)
......
}
//将新建立的服务实例存入Server实例的服务字典中(serviceMap sync.Map)
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
其中关键的service和method结构体内容如下,分别代表了一个对外服务和服务中包含的一个方法。
type service struct {
name string // name of service服务名称
rcvr reflect.Value // receiver of methods for the service服务方法的接收者
typ reflect.Type // type of the receiver接收者类型
method map[string]*methodType // registered methods服务中注册的方法
}
type methodType struct {
sync.Mutex // protects counter方法互斥锁
method reflect.Method//方法结构
ArgType reflect.Type//方法参数类型
ReplyType reflect.Type//方法返回值类型
numCalls uint //被调用次数
}
在这一部分,其核心是使用reflect包的反射技术解析出注册服务传入的结构,包括服务信息和服务包含的方法集信息,关键代码如下:
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service) //新建服务实例
s.typ = reflect.TypeOf(rcvr) //服务结构类型
s.rcvr = reflect.ValueOf(rcvr) //服务结构值信息
sname := reflect.Indirect(s.rcvr).Type().Name()
......
}
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
if method.PkgPath != "" //方法必需是被导出的
......
if mtype.NumIn() != 3 //方法必需包含三个输入
......
argType := mtype.In(1) //第一个参数必须是被导出的或内置类型
if !isExportedOrBuiltinType(argType)
......
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr //第二个参数必需是一个指针型
......
if !isExportedOrBuiltinType(replyType) //第二个参数必需是被导出的
......
if mtype.NumOut() != 1 //必需有一个输出
......
if returnType := mtype.Out(0); returnType != typeOfError //输出类型为error
......
//将方法存储在方法集中
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
可以看到,在注册部分使用了大量的反射技术来获取服务和方法信息,最终也是以reflect包中定义的结构进行存储,因此需要掌握reflect包的使用方法才能理解这部分内容,使用方法归纳在下面的文章里:
https://blog.csdn.net/qq_38093301/article/details/104226908
这一阶段的一个结构和流程大致如下: