消息队列:提供了一个从一个进程向另外一个进程发送一块有类型数据块的方法。
消息队列的不足:
每个消息的最大长度是有限的(MSGMAX),每个消息队列的总的字节点是有上限的(MSGMNB),系统上消息队列的总数也有一个上限(MSGMNI)。
IPC对象数据结构
内核为每个IPC对象维护一个数据结构:(System V进程间通信的结构体)
struct ipc_perm{
key_t _key;//要创建的消息队列的标识。即为ipc键
uid_t uid;
gid_t gid;
uid_t cuid;
gid_t cgid;
unsigned short mode;//权限
unsigned short _seq;//队列的number
};
消息队列的结构:
struct msqid_ds{
struct ipc_perm msg_perm;
struct msg *msg_first;//消息队列头指针
struct msg *msg_last;//尾指针
_kernel_time_t msg_stime;//最后一次插入消息队列消息的时间
_kernel_time_t msg_rtime;//最后一次接收消息(即删除队列中的一个消息的时间)
unsigned long msg_lcbytes;//发送消息等待进程队列。
unsigned long msg_lqbytes;//ditto
unsigned short msg_cbytes;//current number of bytes on queue
unsigned short msg_qunm;//消息个数
unsigned short msg_qbytes;//消息队列的字节上限
_kernel_ipc_pid_t msg_lspid;//最后一次消息发送进程的Pid;
_kernel_ipc_pid_t msg_lrpid;//最后一次接收消息进程pid;
};
消息队列的特点
1、生命周期随内核
2、双向通信
3、按块大小读写//不像管道是面向字节流的
这些接口都是System V标准
消息队列的创建:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
key代表要创建的消息队列的标识, 即为ipc键
msgflag通常是通过IPC_CREAT或者IPC_EXCL来设置的
IPC_CREAT|IPC_EXCL代表创建消息队列,如果这个消息队列已经存在,则立即出错返回
IPC_CREAT则代表创建消息队列,不存在则创建,存在则直接打开
key值的创建:使用ftok函数,在之后的信号量还有共享内存中也要用到
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);//第一个参数代表路径,第二个参数代表权限
消息队列的删除:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);//用于删除一个消息队列
msqid表示要删除的消息队列的标识
cmd:cmd设置成为IPC_RMID (表示删除消息队列)。
buf:通常buf设置为NULL
发送消息使用msgsnd函数:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msgid:由msgget函数返回的消息队列的标识码。
msgp:是一个指针,指针指向准备发送的消息。
msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的long int长整型。
msgflag:控制着当前消息队列满或到达上限要发生的事情。
返回值:成功返回0,失败返回-1。
消息结构:
struct msgbuf {
long mtype; /* message type, must be > 0 */类型必须大于0
char mtext[1]; /* message data */块大小
};
需要自定义
从消息队列中接受消息msgrcv函数:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数:
msgid:由msgget函数返回的消息队列的标识码。
msgp:是一个指针,指针指向准备发送的消息。
msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的long int长整型。
msgtyp:可以实现接收优先级的简单形式。
msgflag:控制着当前消息队列满或到达上限要发生的事情。
msgtype>0:返回队列第一条类型等于msgtype的消息。
返回值:成功返回实际放到接收缓冲区里的字符个数,失败返回-1。
双向通信的具体实现:
comm.h:
1 #ifndef _COMM_H_
2 #define _COMM_H_
3 #include<stdio.h>
4 #include<sys/types.h>
5 #include<sys/msg.h>
6 #include<string.h>
7 #include<sys/ipc.h>
8 #define PATHNAME "."
9 #define PROJ_ID 0x6666
10 #define SERVER_TYPE 1
11 #define CLIENT_TYPE 2
12
13
14 struct Msgbuf{
15 long mtype;
16 char mtext[1024];
17 };
18
19 int commMsgqueue(int flags);
20 int createmsgqueue();
21 int getmsgqueue();
22 int destroymsgqueue(int msgid);
23 int sendmsg(int msgid,int type,char*buf);
24 int recvmsg(int msgid,int rtype,char out[]);
25 #endif
comm.c:
#include"comm.h"
2
3 int commMsgqueue(int flags)
4 {
5 key_t key=ftok(PATHNAME,PROJ_ID);
6 if(key==-1){
7 perror("ftok");
8 return -1;
9 }
10 int msgid=msgget(key,flags);
11 if(msgid<0){
12 perror("msgget");
13 return -1;
14 }
15 return msgid;
16 }
17 int createmsgqueue()
18 {
19 return commMsgqueue(IPC_CREAT|IPC_EXCL|0666);
20 }
21 int getmsgqueue()
22 {
23 return commMsgqueue(IPC_CREAT);
24 }
25 int destroymsgqueue(int msgid)
26 {
27 int ret=msgctl(msgid,IPC_RMID,NULL);
28 if(ret<0){
29 perror("msgctl");
30 return -1;
31 }
32 return 0;
33 }
34 int sendmsg(int msgid,int type,char*buf)
35 {
36 struct Msgbuf msgbuf;
37 msgbuf.mtype=type;
38 // if(size>(sizeof(msgbuf.mtext)-1)){
39 // printf("size is too large\n");
40 // return -1;
41 // }
42 strcpy(msgbuf.mtext,buf);
43 int ret=msgsnd(msgid,(void*)&msgbuf,sizeof(msgbuf.mtext),0);
44 if(ret<0){
45 perror("msgsnd");
46 return -1;
47 }
48 return 0;
49 }
50 int recvmsg(int msgid,int rtype,char out[])
51 {
52 struct Msgbuf msgbuf;
53 int ret=msgrcv(msgid,(void*)&msgbuf,sizeof(msgbuf.mtext),rtype,0);
54 if(ret<0){
55 perror("msgrcv");
56 return -1;
57 }
58 strcpy(out,msgbuf.mtext);
59 return 0;
60 }
server.c:
#include"comm.h"
2
3 int main()
4 {
5 int msgid=createmsgqueue();
6 char buf[1024];
7 while(1){
8 buf[0]=0;
9 recvmsg(msgid,CLIENT_TYPE,buf);
10 printf("client# %s\n",buf);
11
12 printf("please enter#:");
13 fflush(stdout);
14 ssize_t s=read(0,buf,sizeof(buf)-1);
15 if(s>0){
16 buf[s]=0;
17 sendmsg(msgid,SERVER_TYPE,buf);
18 printf("send done,wait recv ...\n");
19 }
20 }
21 destroymsgqueue(msgid);
22 return 0;
23 }
client.c:
#include"comm.h"
2
3 int main()
4 {
5 int msgid=getmsgqueue();
6 char buf[1024]={0};
7 while(1){
8 buf[0]=0;
9 printf("please enter# ");
10 fflush(stdout);
11 ssize_t s=read(0,buf,sizeof(buf)-1);
12 if(s>0){
13 buf[s]=0;
14 sendmsg(msgid,CLIENT_TYPE,buf);
15 printf("send done ,wait recv...\n");
16 }
17 recvmsg(msgid,SERVER_TYPE,buf);
18 printf("server #%s\n",buf);
19 }
20 return 0;
21 }
makefile:
1 .PHONY:all
2 all:client server
3 client:client.c comm.c
4 gcc -o $@ $^
5 server:server.c comm.c
6 gcc -o $@ $^
7 .PHONY:clean
8 clean:
9 rm -f client server
查看消息队列的指令 ipcs -q
删除消息队列的指令 ipcrm -q msqid