skynet所有的消息最终会被lua层捕获,socket消息也不例外。上篇讲到了skynet的网络模型及epoll,所有的消息都是在epoll中发出的。那么网络消息又是如何转换为skynet服务间的消息的呢?
skynet将不同的网络消息分类,例如连接消息,socket读数据消息等等。socket消息转换为skynet服务间的消息在skynet_socket_poll函数中体现:
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR:
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
case SOCKET_UDP:
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING:
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}
根据消息是否带数据可以分为两类。所谓带数据就是指socket有读的数据,不带数据就是那些socket的动作消息,例如socket打开,错误,连接等等。skynet消息有几个要素:source,session,data,sz。由于socket消息的关联性不是很强,所有他的source和session都设为0,但是data和sz肯定是有的。
data是skynet_socket_message结构体的指针。sz字段是32位大小,其中前8位为消息的类型PTYPE_SOCKET,后24为数据的大小。消息的类型不同,数据的大小也不同。
对于带数据socket消息,其sz为结构体skynet_socket_message的大小。真正的数据消息也包含在这里面:
数据的大小在ud字段里,这个是在forward_message_tcp里填充的。
对于不带数据的socket消息,例如accept消息,其大小sz除了包括skynet_socket_message之外(即使此时没有用到数据buffer,为NULL。但是此时ud字段代表了新的连接fd id),还包含额外的数据的result.data,例如客户端的ip地址和端口。此时数据sz的总大小为skynet_socket_message结构体的大小16加上额外数据的大小。在forward_message函数中可以看到:
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) { //应该是消息没有数据,需要填充的意思
if (result->data) {
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
} else {
result->data = "";
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type;
sm->id = result->id;
sm->ud = result->ud;
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));
} else {
sm->buffer = result->data;
}
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
if (skynet_context_push((uint32_t)result->opaque, &message)) {
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
上面的result.opaque是指服务的id,他是在lua发送指令的时候获取发出的。例如lua调用listen的时候:
int
socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
int fd = do_listen(addr, port, backlog);
if (fd < 0) {
return -1;
}
struct request_package request;
int id = reserve_id(ss);
if (id < 0) {
close(fd);
return id;
}
request.u.listen.opaque = opaque;
request.u.listen.id = id;
request.u.listen.fd = fd;
send_request(ss, &request, 'L', sizeof(request.u.listen));
return id;
}
int
skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) {
uint32_t source = skynet_context_handle(ctx);
return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);
}
static int
llisten(lua_State *L) {
const char * host = luaL_checkstring(L,1);
int port = luaL_checkinteger(L,2);
int backlog = luaL_optinteger(L,3,BACKLOG);
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = skynet_socket_listen(ctx, host,port,backlog);
if (id < 0) {
return luaL_error(L, "Listen error");
}
lua_pushinteger(L,id);
return 1;
}
对于不同的消息类型,解析的时候也会有所不一样,PTYPE_SOCKET类型消息的解包函数为driver.unpack,请看:
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = driver.unpack,
dispatch = function (_, _, t, ...)
socket_message[t](...)
end
}
unpack函数在lua-socket.c中:
static int
lunpack(lua_State *L) {
struct skynet_socket_message *message = lua_touserdata(L,1);
int size = luaL_checkinteger(L,2);
lua_pushinteger(L, message->type);
lua_pushinteger(L, message->id);
lua_pushinteger(L, message->ud);
if (message->buffer == NULL) {
lua_pushlstring(L, (char *)(message+1),size - sizeof(*message));
} else {
lua_pushlightuserdata(L, message->buffer);
}
if (message->type == SKYNET_SOCKET_TYPE_UDP) {
int addrsz = 0;
const char * addrstring = skynet_socket_udp_address(message, &addrsz);
if (addrstring) {
lua_pushlstring(L, addrstring, addrsz);
return 5;
}
}
return 4;
}
我们看到在lua层调用的dispatch有三个参数,type(socket消息类型),id(socket标识),ud(新的socket id或数据大小),buffer(socket数据或额外数据)。lua层收到消息后将会有不同的响应函数: