【sduoj】RPC应用

2021SC@SDUSC

引言

在 oj 项目中,服务端并不是一个不可分割的整体,而是可以根据职责划分为成多个子服务,比如能够处理前端请求的服务、能进行判题的服务等。那么这些服务之间要如何进行通信呢?这里我们选用 RPC。相比于 Restful,RPC 更加轻量级,也更加高效,适合系统内的服务通信。我们以 gRPC 为例。

源码分析

介绍

gRPC 可以分为 4 中类型,分别为一元式 RPC、客户端流式 RPC、服务端流式 RPC、双向流式 RPC。划分的根据是某一端发送的是单个的数据、还是流。

要进行gRPC服务,我们先创建了一个person.proto文件,写入如下内容。将该文件编译,系统会给我们生成两个文件,分别是person.pb.goperson_grpc.pb.go

PersonReq是请求的格式,PersonRes是响应的格式。message内的变量声明格式有点类似于 Java,只是,在同一个message中,变量的值是唯一标识,不能重复。

service下客户端与服务端需要进行的通信,带stream表示该请求或响应是以流的形式,否则就是以值的形式。

syntax = "proto3";
package person;
option go_package = "grpc-test/pb/person;person";

message PersonReq {
    string name = 1;
    int32 age = 2;
}

message PersonRes {
    string name = 1;
    int32 age = 2;
}

service SearchService {
    rpc Search(PersonReq) returns (PersonRes);
    rpc SearchIn(stream PersonReq) returns (PersonRes);
    rpc SearchOut(PersonReq) returns (stream PersonRes);
    rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}

在服务端,我们需要用grpc.NewServer取出server,然后将挂载personServer的方法,并为person注册服务,最后通过Serve创建监听。

type personServer struct {
    
    
	person.UnimplementedSearchServiceServer
}

func main() {
    
    
	l, err := net.Listen("tcp", ":8080")
	if err != nil {
    
    
		panic(err.Error())
	}

	server := grpc.NewServer()
	person.RegisterSearchServiceServer(s, &personServer{
    
    })
	err = server.Serve(l)
	if err != nil {
    
    
		panic(err.Error())
	}
}

在客户端,它先用grpc.Dial创建一个链接,然后调用person.NewSearchServiceClient新建一个客户端,之后我们便可以使用客户端的方法了。

func main() {
    
    
	l, _ := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())
	client := person.NewSearchServiceClient(l)
	
	// 具体请求
}

一元式 RPC

首先需要说明,包括该Search在内,以及其他形式的 RPC 中,这些方法并不是凭空想出来的,而是重写了person_grpc.pb.go中的方法,所以参数和返回值只需要按照原来的写即可。

因为在person.proto中的PersonReq下有name变量,所以当我们编译 person.proto时,它会在person.PersonReq下生成一个GetName方法。我们可以调用该方法,来获取客户端发来的请求内容。然后我们创建了一个person.PersonRes对象,在里面设置响应内容,然后将它返回。

func (*personServer) Search(_ context.Context, req *person.PersonReq) (*person.PersonRes, error) {
    
    
	name := req.GetName()
	res := &person.PersonRes{
    
    Name: "我收到了" + name + "的信息"}
	return res, nil
}

客户端调用Search方法,传入personReq对象,然后就可以得到一个来自服务端返回值res。我们发现,客户端在向服务端发送请求并获取返回值的时候,代码展现的形式就好像客户端在调用自己的方法,这也是 gRPC 的一个特色。

func main() {
    
    
	...
	res, err := client.Search(context.Background(), &person.PersonReq{
    
    Name: "Hello"})
	if err != nil {
    
    
		panic(err.Error())
	}
	fmt.Println(res)
}

客户端流式 RPC

顾名思义,在客户端流式 RPC 中,客户端发送的不是单个的值,而是一个流。这时,服务端需要不断地调用server.Recv接收来自前端的请求,等到客户端传输完毕或者传输过程中出现错误,服务端就会调用SendAndClose向客户端发送一个响应并关闭连接。

func (*personServer) SearchIn(server person.SearchService_SearchInServer) error {
    
    
	for {
    
    
		req, err := server.Recv()
		fmt.Println(req)
		if err != nil {
    
    
			err = server.SendAndClose(&person.PersonRes{
    
    Name: "ok"})
			if err != nil {
    
    
				fmt.Println(err)
			}
			break
		}
	}
	return nil
}

客户端先调用SearchIn建立一个连接,为了模拟流,我们让客户端在一段时间内不断用Send来发送请求,然后调用CloseAndRecv等待服务端的响应。

func main() {
    
    
	...
	c, err := client.SearchIn(context.Background())
	if err != nil {
    
    
		panic(err.Error())
	}
	for i := 0; i < 10; i++ {
    
    
		time.Sleep(1 * time.Second)
		err = c.Send(&person.PersonReq{
    
    Name: "information"})
		if err != nil {
    
    
			panic(err.Error())
		}
	}
	res, err := c.CloseAndRecv()
	if err != nil {
    
    
		panic(err.Error())
	}
	fmt.Println(res)
}

服务端流式 RPC

与客户端流式 RPC 相反,服务端流式 RPC是让客户端发送一个数据,然后服务端响应一段流。在这里,服务端在获取到req.Name之后,就一值调用Send向客户端发送响应。

func (*personServer) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
    
    
	name := req.Name
	for i := 0; i < 10; i++ {
    
    
		time.Sleep(time.Second)
		err := server.Send(&person.PersonRes{
    
    Name: "我拿到了" + name + "的信息"})
		if err != nil {
    
    
			panic(err.Error())
		}
	}
	return nil
}

客户端调用SearchOut建立连接,并发送一个请求,然后调用Recv不断接收服务端的响应,直到服务端传输完毕或者出现错误。

func main() {
    
    
	...
	c, err := client.SearchOut(context.Background(), &person.PersonReq{
    
    Name: "hello"})
	if err != nil {
    
    
		panic(err.Error())
	}
	
	for {
    
    
		res, err := c.Recv()
		if err != nil {
    
    
			fmt.Println(err)
			break
		}
		fmt.Println(res)
	}
}

双向流式 RPC

双向流式 RPC 是客户端与服务端都在用流传输数据,所以无论是客户端还是服务端,都需要同时进行接收和发送数据。

在服务端,我们创建了一个管道,服务端调用Recv接收的前 10 次数据,都将它通过管道,传输到发送装置,再通过Send发送给客户端。到第 11 次的时候,发送装置会接收到一个"end"字符串,终止发送。

func (*personServer) SearchIO(server person.SearchService_SearchIOServer) error {
    
    
	str := make(chan string)
	go func() {
    
    
		for i := 0; i < 10; i++ {
    
    
			req, err := server.Recv()
			if err != nil {
    
    
				break
			}
			str <- req.Name
		}
		str <- "end"
	}()

	for {
    
    
		s := <-str
		err := server.Send(&person.PersonRes{
    
    Name: s})
		if err != nil {
    
    
			fmt.Println(err)
		}
		if s == "end" {
    
    
			break
		}
	}

	return nil
}

客户端通过SearchIO建立连接,为了让发送和接收同时进行,这里启用了两个协程。第一个协程用Send发送数据,直到因为连接中断导致的err产生;第二个协程用Recv接收数据。

值得一说的是,这里用sync.WaitGroup的目的是防止程序结束。我们知道,当main函数执行结束的时候,整个程序也就结束了。而加这么一个机制的话,只有两个协程全部结束的时候,Wait才会停止阻塞。

func main() {
    
    
	...
	c, err := client.SearchIO(context.Background())
	if err != nil {
    
    
		panic(err.Error())
	}
	wg := sync.WaitGroup{
    
    }
	wg.Add(2)
	go func() {
    
    
		for {
    
    
			time.Sleep(time.Second)
			err = c.Send(&person.PersonReq{
    
    Name: "Hello"})
			if err != nil {
    
    
				fmt.Println(err)
				wg.Done()
				break
			}
		}
	}()
	go func() {
    
    
		for {
    
    
			res, err := c.Recv()
			if err != nil {
    
    
				fmt.Println(err)
				wg.Done()
				break
			}
			fmt.Println(res)
		}
	}()
	wg.Wait()
}

猜你喜欢

转载自blog.csdn.net/weixin_45922876/article/details/121088951
RPC