概述:
消息队列可认为是一个消息链表。有足够写权限的线程可往队列中放置消息,有足够读取权限的线程在队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这跟管道和FIFO是相反的,对后者来说,除非读出者已存在,否则先有写入者是没有意义的。
一个进程可以往队列写入一些消息,然后终止,再让另外一个进程在以后某个时刻读出消息。我们说过消息队列具有随内核的持续性。这和管道和FIFO不一样。当一个管道或FIFO的最后一次关闭发生时,仍在该管道或FIFO上的数据将被丢弃。
Posix消息队列和System V消息队列的区别:
对Posix消息队列的总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。
当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或者启动一个线程,System V消息队列则不提供类似的机制。
相关函数:
mq_open函数:
mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
oflag参数是O_RDONLY、O_WRONLY或O_RDWR之一,可以按位或上O_CREAT、O_EXCL或O_NONBLOCK。
attr参数用于给新队列指定某些属性。如果它为空指针,那就使用默认属性。
mq_opne的返回值称为消息队列描述符。
int main(void) { mqd_t mqid; mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); printf("mq_open success\n"); return 0; }
mq_close函数:
int mq_close(mqd_t mqdes);
int main(void) { mqd_t mqid; mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); printf("mq_open success\n"); mq_close(mqid); return 0; }
mq_unlink函数:
int mq_unlink(const char *name);
从系统中删除用作mq_open第一个参数的某个name,必须调用mq_unlink
mq_unlink("/abc");
mq_gettattr和mq_settattrr函数:
int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
每个消息队列有四个属性,mq_getatrr返回所有这些属性,mq_setatrr则设置其中某个属性。
int main(void) { mqd_t mqid; struct mq_attr attr; mqid = mq_open("/abc", O_RDONLY); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); printf("mq_open success\n"); mq_getattr(mqid,&attr); printf("mq_maxmsg is %ld,mq_msgsize is %ld,mq_curmsgs is %ld\n",attr.mq_maxmsg,attr.mq_msgsize,attr.mq_curmsgs); return 0; }
mq_send和mq_receive函数:
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
这两个函数分别用于往一个队列中放置一个消息和从一个队列中取走一个消息。每个消息都有一个优先级,它是一个小于MQ_PRIO_MAX的无符号整数。
mq_receive总是返回所指定队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一同返回。
发送:
typedef struct stu { char name[32]; int age; }STU; int main(int argc,char *argv[]) { if(argc != 2) { fprintf(stderr,"Usage: %s <prio>\n",argv[1]); exit(EXIT_FAILURE); } mqd_t mqid; mqid = mq_open("/abc", O_WRONLY); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); STU s; strcpy(s.name, "test"); s.age = 20; int prio = atoi(argv[1]); mq_send(mqid, (const char*)&s, sizeof(s), prio); return 0; }
接收:
#define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ }while(0); \ typedef struct stu { char name[32]; int age; }STU; int main(int argc,char *argv[]) { mqd_t mqid; mqid = mq_open("/abc", O_RDONLY); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); STU s; memset(&s, 0, sizeof(s)); unsigned int prio; struct mq_attr attr; mq_getattr(mqid,&attr); size_t size = attr.mq_msgsize; if( mq_receive(mqid, (char*)&s, size, &prio) == (ssize_t)-1 ) ERR_EXIT("mq_recvive"); printf("name = %s,age = %d,prio = %d\n",s.name,s.age,prio); return 0; }
mq_notify函数:
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
POSIX允许异步事件通知,已告知何时有一个消息放置到了某个空消息队列中。这种通知有两种方式可供选择:
产生一个信号;
创建一个线程来执行一个指定的函数。
一个简单的信号通知:
#define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ }while(0); \ typedef struct stu { char name[32]; int age; }STU; mqd_t mqid; void handle_sigusr1(int sig) { STU s; memset(&s, 0, sizeof(s)); unsigned int prio; struct mq_attr attr; mq_getattr(mqid,&attr); size_t size = attr.mq_msgsize; if( mq_receive(mqid, (char*)&s, size, &prio) == (ssize_t)-1 ) ERR_EXIT("mq_recvive"); printf("name = %s,age = %d,prio = %d\n",s.name,s.age,prio); } int main(int argc,char *argv[]) { mqid = mq_open("/abc", O_RDONLY); if(mqid == (mqd_t)-1) ERR_EXIT("mq_open"); signal(SIGUSR1, handle_sigusr1); struct sigevent sigev; sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; mq_notify(mqid, &sigev); while(1) { pause(); } return 0; }
注:参考UNIX网络编程卷2:进程间通信
ipcs指令不能查看创建的消息队列,需要挂载查看:
# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue
mq_receive的len参数不能小于所指定队列中的消息最大大小(该队列mq_attr结构的mq_msgsize成员)。要是小于该值,mq_receive就立即返回EMSGSIZE错误。