测试程序
github 地址: https://github.com/fananchong/test_go-micro_qps
测试结果
见: https://github.com/fananchong/test_go-micro_qps#回显测试
- 1 个服务器程序, 20 个测试客户端程序
- CPU 16 核 ; 主频 2600 机器
- 整机 CPU 占 80%
- 服务器程序 CPU 占 30.7% ( 491.4 / 1600 )
- 3.5w qps
用的插件是:
- server/client: rpc
- transport: grpc
对比测试
github 地址: https://github.com/fananchong/gotcp
测试结果: https://github.com/fananchong/gotcp#基准测试
- 1 个服务器程序, 30 个测试客户端程序
- CPU 16 核 ; 主频 2600 机器
- 整机 CPU 70%
- 服务器程序 CPU 占 25.5% ( 407.9 / 1600 )
- 23w qps
原因分析
go-micro 默认插件 rpc 实现 (包括其他默认实现,代码分析见最后):
- 服务器端都是 1 个包 1 个包的发送
- 每次发送阻塞接收协程
而通常线上的 TCP 网络库,均为:
- 不阻塞接收协程,并有发送协程
- 将多个包合包 1 次性发送,来抵消 RTT (网络往返时间)
Issue
本人给官方提了个 issue : https://github.com/micro/go-micro/issues/602
Rpc server has performance issues #602
I see that the rpc server sends a message in receive data coroutine ServeConn , which has two problems:
- The coroutine will be blocked when sending a message
- Cannot merge packets to send, each package will have RTT
已经被标记为 enhancement ,即官方亦认同存在这个问题
go-micro 代码分析
这里以默认 rpc server 插件为例
1. 接受连接
摘至: https://github.com/micro/go-micro/blob/master/server/rpc_server.go
func (s *rpcServer) Start() error {
// 无关代码略
go func() {
for {
// listen for connections
err := ts.Accept(s.ServeConn)
// 无关代码略
}
}()
}
启动后,会监听, 每个连接处理在 s.ServeConn 内
2. 接收消息处理
摘至: https://github.com/micro/go-micro/blob/master/server/rpc_server.go
// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
// 无关代码略
for {
var msg transport.Message
if err := sock.Recv(&msg); err != nil {
return
}
// 无关代码略
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
// write an error response
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// 无关代码略
return
}
// 无关代码略
}
}
有很多分支处理(已经注释为无关代码略),最终都会到 rcodec.Write (如 r.ServeRequest 内最终也是 rcodec.Write )
3. 消息处理
摘自:https://github.com/micro/go-micro/blob/master/server/rpc_router.go
func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error {
// 无关代码略
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
}
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
// 无关代码略
if !mtype.stream {
// 无关代码略
// send response
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
}
// 无关代码略
// client.Stream request
r.stream = true
// execute handler
return fn(ctx, r, rawStream)
}
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
// 无关代码略
err := cc.Write(resp.msg, reply)
// 无关代码略
return err
}
- 最后走到调用 rcodec.Write
- 需要注意的是这里有 stream 分支,最终也会走到 rcodec.Write ,需要参加
func (r *rpcStream) Send(msg interface{}) error
函数
4. 发送数据
摘自:https://github.com/micro/go-micro/blob/master/server/rpc_codec.go
func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// 无关代码略
// send on the socket
return c.socket.Send(&transport.Message{
Header: m.Header,
Body: body,
})
}
摘自:https://github.com/micro/go-micro/blob/master/transport/grpc/socket.go
func (g *grpcTransportSocket) Send(m *transport.Message) error {
// 无关代码略
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
rpcCodec.Write 最终调用 grpc 的 Send 方法,发送 transport.Message, 1 个包