1. 消息队列
消息队列也是进程间通信的一种方式。消息队列的本质上是内核中一个消息链表,存储在内核空间。当进程间使用消息队列进行通信时,可以一个进程向内核空间中写入消息(向消息队列中添加一个消息),另一个进程从内核空间中读取消息(从消息队列中删除一个消息)。
2. 打开或创建一个消息队列
msgget函数是用于创建一个消息队列,和shmget函数的用法都是一样的。
#include <sys/msg.h>
int msgget(key_t key , int flag);
返回值:成功返回ipc内核对象消息队列的标识符id,失败则返回 -1
参数key:用于指定创建的消息队列的键值
参数flag:用于指定创建的消息队列权限(IPC_CREAT , IPC_EXCL , IPC_PRIVATE)。创建消息队列必须指定key值,或者将key设置为IPC_PRIVATE ;如果是需要获取消息队列,那么只需指定消息队列对应的key值,其他参数可设置为0。
3. 发送消息——msgsnd函数
msgsnd函数的作用是将消息加入到消息队列中,也就是发送消息。
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
返回值:成功返回0,失败返回 -1
参数说明:
msqid: ipc内核对象的id(消息队列的标识符id)
msgp: 消息数据的地址,指向用户自定义的消息msgbuf
msgsz: 消息正文的长度,以字节为单位(注意:不包括消息类型)
msgflg: 可选参数。
1. 指定msgflg = IPC_NOWAIT,如果消息队列空间满了(消息总数或队列中的字节总数等于系统限制值范围),则出错返回EAGAIN。
2. 如果msgflg = 0,如果消息队列空间满了,则进程会一直阻塞,除非发生以下几种情况:
- 直到内核有空间可以容纳新的消息
- 从系统中删除了该消息队列
- 捕捉到一个信号,并从信号处理函数处返回
4. 接收消息——msgrcv函数
msgrcv函数是将消息从消息队列中取出,并将其从消息队列列表中删除掉,即接收消息。
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
返回值:成功返回消息的数据部分长度,失败返回 -1。
参数说明:
msqid: ipc对象的id(消息队列的标识符)
msgp: 用于接收消息数据的地址
msgsz:消息正文的长度,以字节为单位(注意:不包括消息类型),如果消息正文数据的大小超过msgsz,而msgflg又没有指定MSG_NOERROR的话,默认返回E2BIG错误,不会从消息队列中删除该消息。
msgtyp: 可选参数,指定你要获取哪一种类型的消息,消息的读取与发送的顺序无序一致。
1. msgtyp = 0,获取消息队列中第一条消息
2. msgtyp > 0,获取消息队列中类型为msgtyp的第一个消息,如果msgflg 指定了MSG_EXCEPT的话,那么获取除了msgtyp类型以外的第一个消息
3. msgtyp < 0,获取小于或等于|msgtyp |(取的绝对值)的类型的第一条消息
msgflg: 可选参数。
- msgflg = 0,没有消息可获取就阻塞
- msgflg = MSG_EXCEPT,获取除了msgtyp类型意外的第一个消息(只有当msgtyp > 0才生效)
- msgflg = IPC_NOWAIT,如果指定的消息在消息队列中不存在,出错返回-1,同时设置errno 为ENOMSG。
- msgflg = MSG_NOERROR,如果消息正文数据大小超过了msgsz,那么将截断为msgsz大小,超过的部分丢失。
关于msgrcv函数中的msgtyp参数小于0的情况:
上图中是消息队列中的一组消息,如果在调用msgrcv函数时是以下面这种方式调用时:
msgrcv(msqid , &msg , msgsz , -300 , 0);
那么msgrcv将会以2 ,5 , 3 ,1的顺序读取消息,后面的调用将会阻塞,因为4位置的消息的类型(400)超过了300,我们看到以这种方式读取消息时,msgrcv总是先从mtype值最小的消息开始读取。
关于MSG_EXCEPT标志:
MSG_EXCEPT标志只有在msgtyp大于0的情况下才会起作用,且这个标志是linux下特有的,不过只有在开启_GNU_SOURCE之后才有效,然后以下面这种方式调用msgrcv函数时:
msgrcv(msqid , &msg , msgsz , 100 , MSG_EXCEPT);
msgrcv将会以1 ,3 , 4的顺序读取消息,之后阻塞等待。
5. 操作消息队列——msgctl函数
msgctl函数用法和 shmctl 一样,可以用于获取消息队列的属性,设置消息队列的属性,删除消息队列属性。
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid:消息队列标识符id
buf:消息队列属性指针
cmd指定消息队列的执行命令,一般有以下三种:
IPC_STAT: 获取消息队列的信息,获取此消息队列的msqid_ds结构并将其存放到buf指向的msqid_ds结构中。
IPC_SET: 设置消息队列的属性,把buf指向的shmid_ds结构体中的值(uid,gid,mode)复制到此消息队列的shmid_ds结构体中。
IPC_RMID: 删除消息队列,buf设置为NULL。
6. 消息的数据结构
msgsnd函数和msgrcv 函数用于发送和读取消息队列中的消息,这两个函数的第二个参数是一个由调用者指定的一个void *类型的msgp结构体指针,这个结构体就是用户自定义消息的数据结构。
6.1 用户自定义消息格式msgbuf
可以把msgbuf结构体看做是一个存储消息数据的消息模板,msgsnd 和msgrcv 系统调用使用的消息缓冲区。
include/linux/msg.h
struct msgbuf {
long mtype; /* 消息的类型,必须为正数 */
char mtext[1]; /* 消息正文 */
};
这个是系统的消息格式,实际上这个消息结构是可以由程序员自己定义的。
可以按照上面的消息结构来自定义消息格式:
//自定义消息格式1
struct my_msg{
long type; //消息类型,格式固定不可变
struct msg data; //可以在在自定义消息中的指定struct msg类型的消息
int pid;
};
//自定义消息格式2
struct my_msg{
long type; //消息类型,格式固定不可变,type的值必须大于0
char name[64];
int age;
};
my_msg是程序调用者自定义的新的消息格式,除了消息类型格式必须固定,其他成员可自行扩展,也就是说消息队列的方便之处在于在于可以发送任意数据类型的消息,但需要注意消息的长度是有限制的:
include/linux/msg.h
#define MSGMAX 8192 //注意:自定义的消息长度最大为8k,不包括消息类型
消息格式my_msg最大不能超过8k,且消息类型值必须为大于0的整数
6.2 消息结构msg
内核会把消息存储在以msg结构格式的消息队列中,include/linux/msg.h中定义如下:
struct msg {
struct msg *msg_next; /* 消息队列的下一条消息 */
long msg_type; /*消息类型*/
char *msg_spot; /* 消息正文的地址 */
short msg_ts; /* 消息正文的大小 */
};
msg_next成员是一个指针,指向队列中下一个消息的指针,在内核空间中形成一个消息单链表。
6.3 消息队列结构msqid_ds
通常,进程写入的msg结构的消息都指明了消息类型,存放到系统内核维护的消息队列中,每一个消息队列由一个IPC对象的标识符来进行标识,即msqid_ds结构体,该结构体具体定义如下:
struct msqid_ds {
struct ipc_perm msg_perm; /* 消息的权限 */
struct msg *msg_first; /* 队列上第一条消息,即链表头 */
struct msg *msg_last; /* 队列中的最后一条消息,即链表尾 */
time_t msg_stime; /* 发送给队列的最后一条消息的时间 */
time_t msg_rtime; /* 从消息队列接收到的最后一条消息的时间 */
time_t msg_ctime; /* 最后修改队列的时间*/
ushort msg_cbytes; /*队列上所有消息总的字节数 */
ushort msg_qnum; /*在当前队列上消息的个数 */
ushort msg_qbytes; /* 队列最大的字节数 */
ushort msg_lspid; /* 发送最后一条消息的进程的pid */
ushort msg_lrpid; /* 接收最后一条消息的进程的pid */
};
msqid_ds结构中描述了当前消息队列的一些信息。
7. 发送消息队列
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/msg.h>
typedef struct{
int start;
int end;
}Data;
typedef struct{
long type; //消息类型
Data data; //消息数据
}MSG;
//发送消息
int main(void)
{
int ret = 0;
int msg_qid = 0;
//创建消息队列
msg_qid = msgget(0x00112233 , IPC_CREAT | 0664);
//定义要发送的消息
MSG m[6] = {
{4 , 40 , 400},
{1 , 10 , 100},
{2 , 20 , 200},
{3 , 30 , 300},
{5 , 50 , 500},
{4 , 10 , 100}
};
//发送消息到消息队列
//注意,发送消息时,只发送数据部分的长度,不包括类型
int i;
for(i = 0; i < 6; i++){
ret = msgsnd(msg_qid , &m[i] , sizeof(Data) , IPC_NOWAIT);
if(ret < 0){
perror("msgsend error:");
}
}
struct msqid_ds msg_ds;
//获取消息的总数
ret = msgctl(msg_qid , IPC_STAT , &msg_ds);
if(ret < 0){
perror("msgctl error:");
}
//打印消息总数
printf("msg_qnum = %ld\n" , msg_ds.msg_qnum);
return 0;
}
程序执行结果:
8. 接收消息
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>
typedef struct{
int start;
int end;
}Data;
typedef struct{
long type; //消息类型
Data data; //消息数据
}MSG;
int main(int argc , char *argv[])
{
if(argc < 2){
puts("argc < 2");
exit(0);
}
//接收type
long type = atoi(argv[1]);
int ret = 0;
int msg_qid = 0;
MSG msg;
//获取消息队列
msg_qid = msgget(0x00112233 , 0);
while(1){
//以非阻塞的方式接收消息
ret = msgrcv(msg_qid, &msg, sizeof(Data), type, IPC_NOWAIT);
if(ret < 0){
//没有消息了
if(errno == ENOMSG){
puts("message no exist");
break;
}else{
perror("msgrcv error:");
}
}
//打印消息
printf("{ type = %ld, start = %d, end = %d }\n", msg.type, msg.data.start, msg.data.end);
}
return 0;
}
程序执行结果:
从上图来看,进程每读取一条消息,该消息都会从消息队列中删除掉。当调用msgrcv函数获取消息时,指定了消息类型,那么msgrcv就会从消息队列中依次取出对应类型的消息,如果没有那么返回ENOMSG错误,说明消息队列中已经没有该类型的消息了。
9. 消息队列和管道
-
管道通常应用于有血缘关系的父子进程,只能承载无格式的字节流数据,且数据一旦被读取就从管道中清除。
-
消息队列可应用于无关系的进程间,数据(消息)是有类型的。
-
消息队列同样也具备管道的一些特性,一个消息一旦被读取完毕就会从消息队列中删除掉,但是允许一个或多个进程往消息队列中写消息和读消息。