前一篇skynet watchdog和gate讲解了watchdog和gate的流程图,这篇就代码剖析一下。
watchdog和gate旨在简化网络模块的设计,在实际的项目中我们通常会都会用到这种模式 ,提高生产效率。前面讲的集群模块中也用到了这种模式。
skynet中gate.lua文件实际上引用了gateserver.lua,他们共同构成了一个服务,消息的主体回调处理在gateserver中。他接收消息的类型为'lua'。
要想启用网络服务,首先得监听网络端口号。在gateserver中监听端口号的部分是:
function CMD.open( source, conf )
assert(not socket)
local address = conf.address or "0.0.0.0"
local port = assert(conf.port)
maxclient = conf.maxclient or 1024
nodelay = conf.nodelay
socket = socketdriver.listen(address, port)
socketdriver.start(socket)
if handler.open then
return handler.open(source, conf)
end
end
所以给gate服务发送open命令即可。监听这部分代码前面网络部分已经讲过,理解起来很简单。监听成功后会调用gate部分的handler.open。
重点是网络数据接收这一块。前面网络部分已经讲过,网络数据有自己的格式:
struct skynet_socket_message {
int type;
int id;
int ud;
char * buffer;
};
其中type是socket数据的类型,例如打开SOCKET_OPEN,接收SOCKET_ACCEPT,数据SOCKET_DATA,关闭SOCKET_CLOSE等等,在socket.lua中就是根据这个type来调用不同的处理函数的。在gate服务中却用自己的解包数据算法:
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}
当有客户服务(假设为watchdog)连接上gate服务端口之后,gate服务会通知通知到watchdog,仅仅发送lua协议,字符串命令为'socket',子命令为'open'的数据而已,所以watchdog要设置lua协议的回调函数。有数据时也是如此,gate服务发送watchdog命令'socket',只不过子命令为'data'。
function handler.connect(fd, addr)
local c = {
fd = fd,
ip = addr,
}
connection[fd] = c
skynet.send(watchdog, "lua", "socket", "open", fd, addr)
end
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent
if agent then
skynet.redirect(agent, c.client, "client", 0, msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
end
end
由于在连接建立时(open),gate服务保存了watchdog的地址,gate服务就能够给watchdog发送消息了,协议用的是lua协议。
我们注意到,handler.message()有个agent分支。他的目的是有消息时不再发送给watchdog,而是发送给agent。何为agent,也就是代理服务。当gate服务有连接时,我们可以为每个连接生成一个新的服务,即agent。这样做的目的既为了代码模块化,也可以提高并发效率。如何生成新的服务,再怎么样转发数据给新的服务呢?
很显然,新服务agent需要知道socket的fd,因为发送数据时要用到。然后gate服务也需要知道新服务的地址,不然怎么转发数据给他。所以在打开新的连接生成新的服务agent时,传入了fd以及gate服务地址,这样agent给gate服务发送消息("forward")时就可以把其地址带上,同时打开fd。
网络数据始终是在gate服务监听的,即上面的handler.message函数,由于有agent,所以会调用skyent.redirect将数据转给agent,用到的协议是'client',所以agent服务中要注册'client'类型的回调函数,来接收网络数据。由于转发的数据已经是去掉长度的网络数据,所以解包时不要做什么处理,只在后面发送了其长度而已,这样接收数据时就可以知道数据包的长度。代码如下:
skynet.register_protocol {
name = "client",
id = skynet.PTYPE_CLIENT,
unpack = function(msg,sz)
return skynet.tostring(msg,sz), sz
end,
dispatch = function (_, _, smsg, sz)
local ok, result = pcall(request, smsg, sz)
if ok then
if result then
send_package(client_fd, result)
end
else
skynet.error(result)
end
end
}
最后也要做一些关闭,错误的处理。gate收到socket关闭的消息后,告知watchdog服务,watchdog又会告知agent和gate。为什么不直接让gate告诉agent,非得绕两圈。原因是各个层都有完整的消息链,每一层触发时完成的逻辑是不一样的。例如watchdog收到关闭的消息可能需要对他管理的agent进行处理,而gate可能需要做连接上的最后一步管理等等。
最后总结一下,watchdog,gate服务模式不难,搞懂gate服务处理网络消息数据是关键。他必须按照一定的格式发送网络数据。