2021SC@SDUSC
引言
在 oj 项目中,服务端并不是一个不可分割的整体,而是可以根据职责划分为成多个子服务,比如能够处理前端请求的服务、能进行判题的服务等。那么这些服务之间要如何进行通信呢?这里我们选用 RPC。相比于 Restful,RPC 更加轻量级,也更加高效,适合系统内的服务通信。我们以 gRPC 为例。
源码分析
介绍
gRPC 可以分为 4 中类型,分别为一元式 RPC、客户端流式 RPC、服务端流式 RPC、双向流式 RPC。划分的根据是某一端发送的是单个的数据、还是流。
要进行gRPC服务,我们先创建了一个person.proto
文件,写入如下内容。将该文件编译,系统会给我们生成两个文件,分别是person.pb.go
和person_grpc.pb.go
。
PersonReq
是请求的格式,PersonRes
是响应的格式。message
内的变量声明格式有点类似于 Java,只是,在同一个message
中,变量的值是唯一标识,不能重复。
service
下客户端与服务端需要进行的通信,带stream
表示该请求或响应是以流的形式,否则就是以值的形式。
syntax = "proto3";
package person;
option go_package = "grpc-test/pb/person;person";
message PersonReq {
string name = 1;
int32 age = 2;
}
message PersonRes {
string name = 1;
int32 age = 2;
}
service SearchService {
rpc Search(PersonReq) returns (PersonRes);
rpc SearchIn(stream PersonReq) returns (PersonRes);
rpc SearchOut(PersonReq) returns (stream PersonRes);
rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}
在服务端,我们需要用grpc.NewServer
取出server
,然后将挂载personServer
的方法,并为person
注册服务,最后通过Serve
创建监听。
type personServer struct {
person.UnimplementedSearchServiceServer
}
func main() {
l, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err.Error())
}
server := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServer{
})
err = server.Serve(l)
if err != nil {
panic(err.Error())
}
}
在客户端,它先用grpc.Dial
创建一个链接,然后调用person.NewSearchServiceClient
新建一个客户端,之后我们便可以使用客户端的方法了。
func main() {
l, _ := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
// 具体请求
}
一元式 RPC
首先需要说明,包括该Search
在内,以及其他形式的 RPC 中,这些方法并不是凭空想出来的,而是重写了person_grpc.pb.go
中的方法,所以参数和返回值只需要按照原来的写即可。
因为在person.proto
中的PersonReq
下有name
变量,所以当我们编译 person.proto
时,它会在person.PersonReq
下生成一个GetName
方法。我们可以调用该方法,来获取客户端发来的请求内容。然后我们创建了一个person.PersonRes
对象,在里面设置响应内容,然后将它返回。
func (*personServer) Search(_ context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{
Name: "我收到了" + name + "的信息"}
return res, nil
}
客户端调用Search
方法,传入personReq
对象,然后就可以得到一个来自服务端返回值res
。我们发现,客户端在向服务端发送请求并获取返回值的时候,代码展现的形式就好像客户端在调用自己的方法,这也是 gRPC 的一个特色。
func main() {
...
res, err := client.Search(context.Background(), &person.PersonReq{
Name: "Hello"})
if err != nil {
panic(err.Error())
}
fmt.Println(res)
}
客户端流式 RPC
顾名思义,在客户端流式 RPC 中,客户端发送的不是单个的值,而是一个流。这时,服务端需要不断地调用server.Recv
接收来自前端的请求,等到客户端传输完毕或者传输过程中出现错误,服务端就会调用SendAndClose
向客户端发送一个响应并关闭连接。
func (*personServer) SearchIn(server person.SearchService_SearchInServer) error {
for {
req, err := server.Recv()
fmt.Println(req)
if err != nil {
err = server.SendAndClose(&person.PersonRes{
Name: "ok"})
if err != nil {
fmt.Println(err)
}
break
}
}
return nil
}
客户端先调用SearchIn
建立一个连接,为了模拟流,我们让客户端在一段时间内不断用Send
来发送请求,然后调用CloseAndRecv
等待服务端的响应。
func main() {
...
c, err := client.SearchIn(context.Background())
if err != nil {
panic(err.Error())
}
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
err = c.Send(&person.PersonReq{
Name: "information"})
if err != nil {
panic(err.Error())
}
}
res, err := c.CloseAndRecv()
if err != nil {
panic(err.Error())
}
fmt.Println(res)
}
服务端流式 RPC
与客户端流式 RPC 相反,服务端流式 RPC是让客户端发送一个数据,然后服务端响应一段流。在这里,服务端在获取到req.Name
之后,就一值调用Send
向客户端发送响应。
func (*personServer) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
name := req.Name
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
err := server.Send(&person.PersonRes{
Name: "我拿到了" + name + "的信息"})
if err != nil {
panic(err.Error())
}
}
return nil
}
客户端调用SearchOut
建立连接,并发送一个请求,然后调用Recv
不断接收服务端的响应,直到服务端传输完毕或者出现错误。
func main() {
...
c, err := client.SearchOut(context.Background(), &person.PersonReq{
Name: "hello"})
if err != nil {
panic(err.Error())
}
for {
res, err := c.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(res)
}
}
双向流式 RPC
双向流式 RPC 是客户端与服务端都在用流传输数据,所以无论是客户端还是服务端,都需要同时进行接收和发送数据。
在服务端,我们创建了一个管道,服务端调用Recv
接收的前 10 次数据,都将它通过管道,传输到发送装置,再通过Send
发送给客户端。到第 11 次的时候,发送装置会接收到一个"end"
字符串,终止发送。
func (*personServer) SearchIO(server person.SearchService_SearchIOServer) error {
str := make(chan string)
go func() {
for i := 0; i < 10; i++ {
req, err := server.Recv()
if err != nil {
break
}
str <- req.Name
}
str <- "end"
}()
for {
s := <-str
err := server.Send(&person.PersonRes{
Name: s})
if err != nil {
fmt.Println(err)
}
if s == "end" {
break
}
}
return nil
}
客户端通过SearchIO
建立连接,为了让发送和接收同时进行,这里启用了两个协程。第一个协程用Send
发送数据,直到因为连接中断导致的err
产生;第二个协程用Recv
接收数据。
值得一说的是,这里用sync.WaitGroup
的目的是防止程序结束。我们知道,当main
函数执行结束的时候,整个程序也就结束了。而加这么一个机制的话,只有两个协程全部结束的时候,Wait
才会停止阻塞。
func main() {
...
c, err := client.SearchIO(context.Background())
if err != nil {
panic(err.Error())
}
wg := sync.WaitGroup{
}
wg.Add(2)
go func() {
for {
time.Sleep(time.Second)
err = c.Send(&person.PersonReq{
Name: "Hello"})
if err != nil {
fmt.Println(err)
wg.Done()
break
}
}
}()
go func() {
for {
res, err := c.Recv()
if err != nil {
fmt.Println(err)
wg.Done()
break
}
fmt.Println(res)
}
}()
wg.Wait()
}