一、背景
在C/S编程模式中,经常需要进行进程间消息传递,常用的模式是“请求-应答”方式,客户端通过发起请求,服务端进行处理再进行回复,如果使用socket去实现,难免还要实现消息的分包、连接状态的维护的功能。
ZMQ(ZeroMQ、0MQ)则是一种高性能的异步消息库,接口风格类似于套接字实现,但又将sockfd的相关细节隐藏起来。支持线程间、进程间通信、一对多、多对一节点处理,外部的调用者只要着力设计业务流程、模块结构即可。
二、相关知识
1、基础API接口
- 创建和销毁套接字:zmq_socket(), zmq_close()
- 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
- 为套接字建立连接:zmq_bind(), zmq_connect()
- 发送和接收消息:zmq_send(), zmq_recv()
- 使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
- 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;
- 连接是异步的,并由一组消息队列做缓冲;
- 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
- 一个套接字可以有多个输入和输出连接;
- ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
- 应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。
3、ZMQ核心消息模型
- 请求-应答模式 将一组服务端和一组客户端相连,用于远程过程调用或任务分发。
- 发布-订阅模式 将一组发布者和一组订阅者相连,用于数据分发。
- 管道模式 使用扇入或扇出的形式组装多个节点,可以产生多个步骤或循环,用于构建并行处理架构。
4、套接字组合
- PUB - SUB
- REQ - REP
- REQ - ROUTER
- DEALER - REP
- DEALER - ROUTER
- DEALER - DEALER
- ROUTER - ROUTER
- PUSH - PULL
- PAIR - PAIR
(ps:更详细的学习资料可以在参考文章[1][2]看哦)
三、实践
以下开始将非常愉快地实现一个同步的REQ-REP服务模型
服务端代码,主要就是:
1、zmq_socket 创建一个ZMQ_REP类型的套接字;
2、zmq_bind 绑定一个ipc类型地址(可以理解为unix socket地址);
3、使用zmq_poll实现灵活的读取(带超时,类似poll/epoll/select的方法);
4、收到请求后,sleep1秒模拟处理,再进行回复;
static void *server_master(void *ctx)
{
int ret = 0;
char id[16] = {0};
char request[1024];
char respone[1024];
void *server = zmq_socket(ctx, ZMQ_REP);
s_set_id_ex(server, id, sizeof(id));
zmq_bind(server, "ipc://server.ipc");
zmq_pollitem_t items[] = {
{ server, 0, ZMQ_POLLIN, 0 },
};
LOGN("Server %s start\n", id);
while (1) {
ret = zmq_poll(items, 1 /* size */, 1000 /* ms */);
assert(ret >= 0);
if (items[0].revents & ZMQ_POLLIN) {
s_recv(server, request);
LOGN("Server %s recv: %s\n", id, request);
//TODO something handle
sleep(1);
snprintf(respone, sizeof(respone), "%s-World", request);
s_send(server, respone);
LOGN("Server %s send: %s\n", id, respone);
}
}
LOGN("Server %s Finish\n", id);
zmq_close(server);
}
int main(int argc, char *argv[])
{
void *ctx = zmq_ctx_new();
server_master(ctx);
zmq_ctx_destroy(ctx);
exit(EXIT_SUCCESS);
}
然后是客户端的代码:
1、zmq_socket创建ZMQ_REQ类型的套接字;
2、zmq_connect去连接服务器地址;
3、循环TEST_TIMES次:发送请求、等待响应;
void client_task(void *ctx)
{
int ix;
int roll = randof(1000);
char request[1024];
char respone[1024];
char id[16] = {0};
void *client = zmq_socket(ctx, ZMQ_REQ);
s_set_id_ex(client, id, sizeof(id));
zmq_connect(client, "ipc://server.ipc");
LOGN("Client %s start\n", id);
for (ix = 0; ix < TEST_TIMES; ix++) {
snprintf(request, sizeof(request), "Hello-%03d-%03d", roll, ix);
s_send(client, request);
LOGN("Client %s send: %s\n", id, request);
s_recv(client, respone);
LOGN("Client %s recv: %s\n", id, respone);
}
LOGN("Client %s finish\n", id);
zmq_close(client);
}
int main(int argc, char *argv[])
{
void *ctx = zmq_ctx_new();
client_task(ctx);
zmq_ctx_destroy(ctx);
exit(EXIT_SUCCESS);
}
代码中的一些 randomof、s_set_id、s_send、s_recv、LOGN集成在头文件 zhelpers.h,是为了代码更简洁易懂,做了简单的封装:
#define s_recv(sock, buf) do { \
int len = zmq_recv(sock, buf, sizeof(buf) - 1, 0); \
assert(len >= 0); \
buf[len] = '\0'; \
} while(0)
static int
s_send (void *socket, const char *string) {
int size = zmq_send (socket, string, strlen (string), 0);
return size;
}
#define LOGN(fmt, ...) do { \
struct timeb tb; \
ftime(&tb); \
printf("[ %u.%03hu ]: " fmt, (unsigned int)tb.time, tb.millitm, ##__VA_ARGS__); \
} while (0);
运行如下图所示:
1、先启动的client程序、过了5秒左右再启动的server端程序;
2、client别名为00CC、server别名为00C8,单多个客户端服务存在可以通过别名区分;
3、client端在zmq_connect后立刻就能发送请求成功,等待server端进行响应,本地是有一个缓存队列的功能的;
4、server端是启动后就立刻收到第一个请求报文了,下来进行1秒的处理等待、响应回复;
其他注意项:
1、编译的时候需要链接库 -lzmq -pthread
2、不要忘记所有程序的地方都有zmq_ctx_new来创建上下文的地方;
3、错误地使用zmq接口都会有报错,可以通过系统errno、errorstr进行错误信息查看;
四、总结
本次先从简单的例子学习起,感受一下ZMQ的设计模型。总的来说使用ZMQ开发变得简单,我们可以更专注于业务模块的组合设计。
参考文章:
[1] http://zguide.zeromq.org
[2] https://github.com/anjuke/zguide-cn