5 消息队列
消息队列(message queue)是一种以链表式结构组织的一组数据,存放在内核中,是由各种进程通过消息队列标识符来引用的一种数据传送方式。像其它两种IPC对象一样,也是由内核来维护。消息队列是三个IPC对象类型中最具有数据操作性的数据传送方式,在消息队列中可以随意根据特定的数据类型值来检索消息。
先来看看有用的shell命令。虽然X/Open规范并没有定义它们,但大多数Linux系统都提供了一组命令,用于从命令行上访问IPC信息以及清理游离的IPC机制。它们是ipcs和ipcrm命令,这两个命令对于开发程序非常有用。
- 信号量:ipcs -s可以查看系统中信号量的状态,显示的有一个semid列,这个值相当于信号量的ID,要删除某个信号量可以使用命令ipcrm -s <semid>。
- 共享内存:ipcs -m用于查看,删除为ipcrm -m <id>。
- 消息队列:ipcs -q用于查看,删除为ipcrm -q <id>。
5.1 消息队列的概念
消息队列(也叫做报文队列)能够克服早起UNIX通信机制的一些缺点。比如作为早起UNIX通信机制之一的信号,它所能够传送的信息量是很有限的,后来虽然POSIX 1003.1b在信号的实时性方面做了扩展,使得信号在传递信息量方面有了相当程度的改进,但信号更像是一种“即时”的通信方式,它要求接受信号的进程在某个时间范围内对信号作出反应,因此该信号最多在接受信号进程的生命周期内才有意义,信号所传递的信息是接近于随进程持续(process-persistent)的概念。管道及命名管道则是典型的随进程持续IPC,并且只能传送无格式的字节流无疑会给应用程序开发带来不便,另外,它的缓冲区大小也受到限制。
消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式及特定的优先级。对消息队列有写权限的进程可以向其中按照一定的规则添加新消息:对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续(kernel-persistent)的。
说明:
- 随进程持续的定义为:IPC一直存在,直至打开IPC对象的最后一个进程关闭该对象为止,如管道和命名管道。
- 随内核持续的定义为:IPC一直持续到内核重新自举或者显示删除该对象位置。如消息队列、信号量及共享内存等。
- 随文件系统持续的定义为:IPC一直持续到显示删除该对象为止。
System V消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除。因此系统中记录消息队列的数据结构(struct ipc_ids msg_ids)位于内核中,系统中的所有消息队列都可以在结构msg_ids中找到访问入口。消息队列就是一个消息的链表。每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息,包括消息队列键值、用户ID、组ID、消息队列中消息数目等,甚至记录了最近对消息队列读写进程的ID。用户可以访问这些信息,也可以设置其中的某些信息。
结构msg_queue用来描述消息队列头,存在于系统空间,定义如下:
struct msg_queue { struct ipc_perm q_perm; time_t q_stime; /* 最后一次发送消息的时间 */ time_t q_rtime; /* 最后一次接收消息的时间 */ time_t q_ctime; /* 最后一次更改的时间 */ unsigned long q_cbytes; /* 队列当前的字节总数 */ unsigned long q_qnum; /* 队列中的消息数 */ unsigned long q_qbytes; /* 队列所能存储的最大字节数 */</span> pid_t q_lspid; /* 最后发送信息的进程ID */ pid_t q_lrpid; /* 最后接收到信息的进程ID */ struct list_head q_messages; struct list_head q_receivers; struct list_head q_senders; };结果msqid_ds用来设置或返回消息队列的信息,存在于用户空间,定义如下:
struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* 队列中的首条消息,未使用 */ struct msg *msg_last; /* 队列中的尾部消息,未使用 */ time_t msg_stime; /* 最后一次发送消息的时间 */ time_t msg_rtime; /* 最后一次接收消息的时间 */ time_t msg_ctime; /* 最后一次更改的时间 */ unsigned long msg_lcbytes; /* 32位上可再用的字段(Reuse junk fields for 32 bit) */ unsigned long msg_lqbytes; /* 同上 */ unsigned short msg_cbytes; /* 队列当前的字节总数 */ unsigned short msg_qnum; /* 队列中的消息数 */ unsigned short msg_qbytes; /* 队列所能存储的最大字节数 */ pid_t msg_lspid; /* 最后发送消息的进程ID */ pid_t msg_lrpid; /* 最后接收到消息的进程ID */ };下图说明了内核与消息队列是怎么建立联系的。其中struct ipc_ids msg_ids是内核中记录消息的全局数据结构;struct msg_queue是每个消息队列的队列头。
从上图可以看出,全局数据结构struct ipc_ids msg_ids可以访问每个消息队列头的第一个成员struct ipc_perm;而每个struct ipc_perm能够与具体的消息队列对应起来是因为在该结构中,有一个key_t类型成员key,而key则唯一确定一个消息队列。ipc_perm结构定义如下:
struct ipc_perm { /* 内核中记录消息队列的全局数据结构msg_ids能够访问到该结构 */ key_t key; /* 该键值唯一对应一个消息队列 */ uid_t uid; /* 所有者的有效用户ID */ gid_t gid; /* 所有者的有效组ID */ uid_t cuid; /* 创建者的有效用户ID */ gid_t cgid; /* 创建者的有效组ID */ mode_t mode; /* 此对象的访问权限 */ unsigned long seq; /* 对象的序号 */ };
5.2 消息队列的创建与打开
消息队列的内核持续性要求每个消息队列都在系统范围内对应唯一的键值,所以,要获得一个消息队列的引用标识符(ID)——即创建或打开消息队列,只需要提供该消息队列的键值即可。
注意,消息队列的引用标识符也可以称为此消息队列的ID号,就像进程ID一样,用来唯一标识系统中的某一进程。消息队列的ID是由在系统范围内唯一的键值生成的,而键值可以看作对应系统内的一条路径。
获得特定文件名的键值的系统调用是ftok,原型如下:
#include <sys/types.h>#include <sys/ipc.h>key_t ftok (const char *pathname, char proj_id);
若成功返回消息队列的一个键值,若失败返回-1。
ftok返回与路径pathname相对应的一个键值。该函数并不直接对消息队列操作,但在调用msgget来获得消息队列的标识符前,往往要调用该函数。
创建或打开一个消息队列的系统调用为msgget,原型如下:
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>int msgget (key_t key, int msgflg);
若成功则返回消息队列的引用标识符(ID),若失败则返回-1。
参数key是一个键值,由ftok获得(话虽如此,但实际上这个参数实际上会跟已有的消息列对象的关键字进行比较来判断该消息列对象是否已经创建,所以也可以自由定义这个值);msgflg参数是一些标志位(9个权限位),可以取如下值:IPC_CREAT、IPC_EXCL、IPC_NOWAIT或三者的逻辑或结果。该调用返回与键值key相对应的消息队列标识符。在以下两种情况下,该调用将创建一个新的消息队列:
- 如果没有消息队列与键值key相对应,并且msgflg中包含了IPC_CREAT标志位。
- key参数为IPC_PRIVATE(用于创建私有队列,一般来说它应该只能被当前进程访问,但同信号量和共享内存的情况一样,消息队列在某些Linux系统中事实上并非私有,由于私有队列没有什么用处,所以这并不是一个很严重的问题)。
5.3 消息队列的读写
使用消息队列进行进程间的通信,就是要对消息队列进行读和写的操作。写操作即向消息队列中发送数据,而读操作则是从消息队列中接收(读走)数据。消息队列所传递的消息由两部分组成,即消息的类型和所传递的数据。消息的结构受到两方面的约束,首先,它的长度必须小于系统规定的上限;其次,它必须以一个长整型成员变量开始,接收函数将用这个成员变量来确定消息的类型。当使用消息时,最好把消息结构定义为下面这样:
struct msgbuf { long int msg_type; /* 接着定义你需要传输的数据的结构 */ };
5.3.1 向消息队列发送数据
向消息队列发送数据系统调用的函数原型如下:
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>int msgsnd (int msqid, const void *msgp, size_t msgsz, int msgflg);
若调用成功返回0,否则返回-1。如果调用成功,消息数据的一份副本将被放到消息队列中。
msgsnd函数的作用是向一个消息队列发送一个消息,该消息被添加到队列的末尾。参数msqid代表消息队列的引用标识符(ID);参数msgp是一个void类型指针,指向要发送的消息,并且消息必须像刚才说的那样以一个长整型成员开始;参数msgsz是以字节表示的消息的数据长度,这个长度不能包括长整型消息类型成员变量的长度;参数msgflg用于指定消息队列满时的处理方法,如果msgflg中设置了IPC_NOWAIT标志,函数将立刻返回,不发送消息并且返回值为-1,如果msgflg中的IPC_NOWAIT标志被清除,则发送进程将挂起以等待队列中腾出可用空间,如果取0表示忽略标志位。
造成msgsnd等待的条件有两种:
- 当前消息的大小与当前消息队列中的字节数之和超过了消息队列的总容量。
- 当前消息队列中的消息数(单位“个”)不小于消息队列的总容量(单位“字节数”),此时,虽然消息队列中的消息数目很多,但基本上都只有一个字节。
msgsnd解除阻塞的条件有三个:
- 不满足上述两个条件,即消息队列中有容纳该消息的空间。
- msqid代表的消息队列被删除。
- 调用msgsnd的进程被某个信号中断。
5.3.2 从消息队列接收消息
从消息队列接收数据系统调用的函数原型如下:
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>ssize_t msgrcv (int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
若成功调用则返回消息的数据长度,否则返回-1。成功调用时,函数返回放到接收缓存区中的字节数,消息被复制到由msgp指向的用户分配的缓存区中,然后删除消息队列中的对应消息。
此函数用于从指定的消息队列中读取一个消息数据。参数msqid代表消息队列的引用标识符。msgp是一个void类型指针,指向存放消息的缓冲区,消息必须向msgsnd函数中介绍的那样以一个长整型成员变量开始。msgsz是以字节表示的要接收的消息的数据长度,它不包括长整型消息类型成员变量的长度。msgtyp是一个长整型,它可以实现一种简单形式的接收优先级,如果msgtyp的消息值为0,就获取队列中的第一个可用消息,如果它的值大于0,将获取具有相同类型的第一个消息,如果它的值小于0,将获取消息类型等于或小于msgtyp的绝对值的第一个消息。msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情,如果msgflg中的IPC_NOWAIT标志被设置,函数将会立刻返回并且返回值为-1,如果msgflg中的IPC_WAIT标志被清除,进程将会挂起以等待一条相应类型的消息到达,如果取0表示忽略标志位,其取值为表5-3-2-1中所列的常值或几个常值的逻辑或。
取值 | 含义 |
IPC_NOWAIT | 如果没有满足条件的消息,调用立即返回,此时errno=ENOMSG |
IPC_EXEPT | 与msgtyp>0配合使用,返回队列中第一个类型不为msgtyp的消息 |
MSG_NOERROR | 如果队列中满足条件的消息内容大于所请求的msgsz字节,则把该消息截断,截断部分将丢失 |
取值 | 含义 |
msgtyp=0 | 接收消息队列中的第一条消息 |
msgtyp>0 | 接收消息队列中类型只等于msgtyp的第一条消息 |
msgtyp<0 | 接收消息队列中类型值小于等于msgtyp的绝对值的所有消息中类型值最小的那一条消息 |
msgrcv解除阻塞的条件有三个:
- 消息队列中有了满足条件的消息。
- msqid代表的消息队列被删除。
- 调用msgrcv的进程被某个信号中断。
5.4 获得或设置消息队列属性
消息队列的信息基本上都保存在消息队列头中,因此,可以分配一个类似于消息队列头的结构struct msqid_ds来返回消息队列的属性;同样可以设置该数据结构。关于消息队列属性的操作的系统调用如下:
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>int msgctl (int msqid, int cmd, struct msqid_ds *buf);
若成功则返回0,否则返回-1。
msqid_ds结构至少包含以下成员:
struct msqid_ds { uid_t msg_perm.uid; uid_t msg_perm.gid; mode_t msg_perm.mode; };该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作,分别如下:
- IPC_STAT:该命名用来获取消息队列消息,返回的信息存贮在buf指向的msqid_ds结构中。
- IPC_SET:该命名用来设置消息队列的属性,要设置的属性存储在buf指向的msqid_ds结构中;可以设置属性包括msg_perm.uid、msg_perm.gid、msg_perm.mode及msqid_qbytes,同时,也影响msg_ctime成员。
- IPC_RMID:删除msqid标识的消息队列。
因为有共用的结构,所以定义了一个头文件msg.h。
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> struct msg_st { long int msg_type; char text[BUFSIZ]; };send_msg程序的源码:
#include "msg.h" int main() { int running = 1; struct msg_st data; int msgid; char buffer[BUFSIZ]; msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } while (running) { printf("Enter some text: "); fgets(buffer, BUFSIZ, stdin); data.msg_type = 1; strcpy(data.text, buffer); if (msgsnd(msgid, (void *)&data, BUFSIZ, 0) == -1) { fprintf(stderr, "msgsnd failed\n"); exit(EXIT_FAILURE); } if (strncmp(buffer, "end", 3) == 0) running = 0; } exit(EXIT_SUCCESS); }receive_msg程序的源码:
#include "msg.h" int main() { int running = 1; int msgid; struct msg_st data; long int msg_to_receive = 0; msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } while (running) { if (msgrcv(msgid, (void *)&data, BUFSIZ, msg_to_receive, 0) == -1) { fprintf(stderr, "msgrcv failed with error: %d\n", errno); exit(EXIT_FAILURE); } printf("You wrote: %s", data.text); if (strncmp(data.text, "end", 3) == 0) running = 0; } if (msgctl(msgid, IPC_RMID, 0) == -1) { fprintf(stderr, "msgctl(IPC_RMID) failed\n"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); }
与管道例子不同,这里不需要由进程自己来提供同步方法,这时消息相对于管道的一个明显优势。
编译并运行:
$ ./send_msg
/* 发送信息的程序 */
Enter some text: Do you want some data?
Enter some text: I will pass you!
Enter some text: en
Enter some text: nothing
Enter some text: end
$
Enter some text: Do you want some data?
Enter some text: I will pass you!
Enter some text: en
Enter some text: nothing
Enter some text: end
$
$ ./receive_msg
/* 接收信息的程序 */
You wrote: Do you want some data?
You wrote: I will pass you!
You wrote: en
You wrote: nothing
You wrote: end
$
You wrote: Do you want some data?
You wrote: I will pass you!
You wrote: en
You wrote: nothing
You wrote: end
$
整理自 《Linux程序设计第4版》、《Linux C编程从初学到精通》。