一、消息队列简介
1、消息队列是内核中的一个链表。
2、用户进程将数据传输到内核后,内核重新添加一些用户ID、组ID、读写进程的ID和优先级相关信息后打包成一个数据包成为消息。
3、可以多个进程往消息队列中写消息和读消息,但一个消息只能被一个进程读取,读完后自动删除。
4、消息队列有FIFO的先进先出的特性。
二、消息队列相关的API
1、打开或者创建消息队列
int msgget(key_t key,int flag);
成功:返回内核中消息队列的标识ID。 错误:返回-1 。
key:创建消息队列的键值
flag: IPC_CREAT、IPC_EXCL等组合
2、消息队列的控制
int msgctl(int msgid,int cmd,struct msqid_ds *buf);
成功:返回0; 错误:返回-1 。
msgid: 消息队列ID。
cmd: IPC_STAT:获取消息队列属性,数据返回在struct msqid_ds *的结构体中。
IPC_SET: 设置属性
IPC_RMID: 删除队列
buf: struct msqid_ds 的定义如下:
3、消息发送
int msgsnd(int msgqid,const void * ptr,size_t nbytes,int flag);
成功: 返回0, 出错: 返回-1。
ptr的定义如下:
struct msg{
long type; //前sizeof(long) 个byte,固定为消息类型,后面开始为用户自定义的消息
char buffer[512];
};
nbytes:消息的大小,不包括type的大小。nbytes = sizeof(struct msg) -sizeof(long);
flag: 0: 阻塞
IPC_NOWAIT:非阻塞.
4、消息接收
size_t msg_rcv(int msgqid,void *ptr,size_t nbytes,long type,int flag);
type:消息类型
type==0 : 获取消息队列中的第一个消息
type>0 :获取消息队列中类型为type的第一个消息
type<0:获取消息队列中小于或等于type绝对值的消息(类型最小的)
flag: 0或者IPC_NOWAIT.
三、代码实践
创建两个进程,一个进程向队列中发送消息,另外一个进程从消息队列中读取消息。
向消息队列中发送数据进程代码:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
/*创建消息队列,并向消息队列中发送消息数据
*
* */
struct msgBuf
{
long mtype; /* message type, must be > 0 */
int start;
int end;
};
int main(int argc,char **argv)
{
int msgid = -1;
if(argc < 2)
{
printf("usage: %s key\n",argv[0]);
exit(-1);
}
//创建消息队列
printf("create msg queue\n");
key_t key = atoi(argv[1]);
printf("key:%d\n",key);
msgid = msgget( key,IPC_CREAT|IPC_EXCL|0777);
if(msgid < 0)
{
printf("msg create fail,:%d\n",msgid);
exit(-1);
}
printf("msg queue create success,msgid:%d\n",msgid);
//向消息队列中发送数据
struct msgBuf buf[5] = {
{1,100,1000},
{2,200,2000},
{3,300,3000},
{4,400,4000},
{5,500,5000}
};
if( msgsnd( msgid,&buf[0],sizeof(struct msgBuf)-sizeof(long), IPC_NOWAIT) < 0)
{
printf("1msg send error\n");
exit(-1);
}
if( msgsnd( msgid,( const void *)&buf[1],sizeof(struct msgBuf)-sizeof(long), IPC_NOWAIT) < 0)
{
printf("msg send error\n");
exit(-1);
}
if( msgsnd( msgid,( const void *)&buf[2],sizeof(struct msgBuf)-sizeof(long), IPC_NOWAIT) < 0)
{
printf("msg send error\n");
exit(-1);
}
if( msgsnd( msgid,( const void *)&buf[3],sizeof(struct msgBuf)-sizeof(long), IPC_NOWAIT) < 0)
{
printf("msg send error\n");
exit(-1);
}
if( msgsnd( msgid,( const void *)&buf[4],sizeof(struct msgBuf)-sizeof(long), IPC_NOWAIT) < 0)
{
printf("msg send error\n");
exit(-1);
}
//发送后获取当前消息队列中消息的总数
struct msqid_ds dds;
msgctl(msgid, IPC_STAT, &dds);
printf("msg total cnt:%d\n",dds.msg_qnum);
exit(0);
}
从消息队列中读取数据进程代码:
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
/*打开消息队列,并从消息队列中接收数据*/
struct msgBuf
{
long mtype; /* message type, must be > 0 */
int start;
int end;
};
int main(int argc,char*argv[])
{
if(argc < 3)
{
printf("usage:%s key type\n",argv[0]);
return -1;
}
key_t key = atoi(argv[1]);
long type = atoi(argv[2]);
struct msgBuf msg;
int msgid = msgget(key,0777);
if(msgid < 0)
{
printf("msgget error:%d\n",msgid);
return -1;
}
if(msgrcv(msgid, &msg, sizeof(struct msgBuf) - sizeof(long), type,IPC_NOWAIT) < 0)
{
printf("msgrcv error\n");
}
else
{
printf("rcv msg: msgid:%d,type:%ld,start:%d,end:%d\n",msgid,msg.mtype,msg.start,msg.end);
}
exit(0);
}