问题描述:
生产者-消费者问题,也称作有界缓冲区问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行阻塞,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行阻塞,等待生产者放入一个或者多个数据时再唤醒它。
实现代码
#include "shm_fifo.h" #include <assert.h> shmfifo_t* shmfifo_init(int key,int blksize,int blocks) { //申请一个块空间 shmfifo_t *fifo = (shmfifo_t *)malloc(sizeof(shmfifo_t)); //是否申请成功 assert( fifo != NULL); memset(fifo, 0, sizeof(shmfifo_t)); int shmid; //判断共享内存是否存在 shmid = shmget(key, 0 , 0); //得到要创建的共享内存的大小 int size = sizeof(shmhead_t) + blksize*blocks; //如果不存在 if (shmid == -1) { //创建一个共享内存 fifo->shmid = shmget(key, size, IPC_CREAT | 0666); if(fifo->shmid == -1) ERR_EXIT("shmget"); //头部指针指向共享内存 fifo->p_shm = (shmhead_t*)shmat(fifo->shmid, NULL , 0); if( fifo->p_shm == (shmhead_t*)-1 ) ERR_EXIT("shmat"); fifo->p_payload = (char*)(fifo->p_shm + 1); //设置信号量的key值 fifo->sem_mutex = sem_create(key); fifo->sem_full = sem_create(key+1); fifo->sem_empty = sem_create(key+2); //信号量初始化 sem_setval(fifo->sem_mutex, 1); sem_setval(fifo->sem_full, blocks); sem_setval(fifo->sem_empty, 0); //头部初始化 fifo->p_shm->blksize = blksize; fifo->p_shm->blocks = blocks; fifo->p_shm->rd_index = 0; fifo->p_shm->wr_index = 0; } //如果存在 else { fifo->shmid = shmid; //头部指针指向共享内存 fifo->p_shm = (shmhead_t*)shmat(fifo->shmid, NULL , 0); if( fifo->p_shm == (shmhead_t*)-1 ) ERR_EXIT("shmat"); fifo->p_payload = (char*)(fifo->p_shm + 1); fifo->sem_mutex = sem_open(key); fifo->sem_full = sem_open(key+1); fifo->sem_empty = sem_open(key+2); //读写索引初始化 fifo->p_shm->rd_index = 0; fifo->p_shm->wr_index = 0; } return fifo; } //生产者 void shmfifo_put(shmfifo_t *fifo,const void* buf) { sem_p(fifo->sem_full); sem_p(fifo->sem_mutex); memcpy(fifo->p_payload+fifo->p_shm->blksize*fifo->p_shm->wr_index, buf, fifo->p_shm->blksize); fifo->p_shm->wr_index = (fifo->p_shm->wr_index + 1) % fifo->p_shm->blocks; sem_v(fifo->sem_mutex); sem_v(fifo->sem_empty); } //消费者 void shmfifo_get(shmfifo_t *fifo,void* buf) { sem_p(fifo->sem_empty); sem_p(fifo->sem_mutex); memcpy(buf, fifo->p_payload+fifo->p_shm->blksize*fifo->p_shm->rd_index, fifo->p_shm->blksize); fifo->p_shm->rd_index = (fifo->p_shm->rd_index + 1) % fifo->p_shm->blocks; sem_v(fifo->sem_mutex); sem_v(fifo->sem_full); } void shmfifo_destory(shmfifo_t *fifo) { sem_delete(fifo->sem_mutex); sem_delete(fifo->sem_full); sem_delete(fifo->sem_empty); int shmid = fifo->shmid; shmctl(fifo->shmid, IPC_RMID, 0); free(fifo); }
#ifndef _SHM_FIFO_H #define _SHM_FIFO_H_ #include "ipc.h" typedef struct shmfifo shmfifo_t; typedef struct shmhead shmhead_t; struct shmhead { unsigned int blksize; //块大小 unsigned int blocks; //总块数 unsigned int rd_index; //读索引 unsigned int wr_index; //写索引 }; struct shmfifo { shmhead_t *p_shm; //共享内存头部指针 char *p_payload; //有效负载的起始地址 int shmid; //共享内存IO int sem_mutex; //用来互斥用的信号量 int sem_full; //用来控制共享内存是否满的信号量 int sem_empty; //用来控制共享内存是否空的信号量 }; shmfifo_t* shmfifo_init(int key,int blksize,int blocks); void shmfifo_put(shmfifo_t *fifo,const void* buf); void shmfifo_get(shmfifo_t *fifo,void *buf); void shmfifo_destory(shmfifo_t *fifo); #endif/* _SHM_FIFO_H */
测试程序:
生产者:
#include "shm_fifo.h" typedef struct stu { char name[32]; int age; }STU; int main(void) { shmfifo_t *fifo = shmfifo_init(1234,sizeof(STU),3); int i; STU s; memset(&s, 0, sizeof(STU)); s.name[0] = 'A'; for(i=0; i<5; i++) { s.age = 20 + i; shmfifo_put(fifo,&s); s.name[0] = s.name[0] + 1; printf("send ok\n"); } return 0; }
消费者:
#include "shm_fifo.h" typedef struct stu { char name[32]; int age; }STU; int main(void) { shmfifo_t *fifo = shmfifo_init(1234,sizeof(STU),3); int i; STU s; memset(&s, 0, sizeof(STU)); for(i=0; i<5; i++) { shmfifo_get(fifo,&s); printf("recv:name = %s,age=%d\n",s.name,s.age); } return 0; }