互斥量:
可以使用Pthread的互斥接口来保护数据,确保同一时间只有一个线程访问数据。互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行设置(加锁),在访问完成后释放(解锁)互斥量。对互斥量进行加锁以后,任何其他试图再次对互斥量进行加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放一个互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都被变成可运行状态,第一个变为运行的线程就可以对互斥量进行加锁,其它线程线程就会看到互斥量依然是锁着的,只能回去再次等待它重新变为可用。在这种方式下,每次只用一个线程可以向前执行。
信号量:
int sem_init(sem_t *sem,int pshared,unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
sem_destroy(sem_t *sem);
sem_init用于对指定信号初始化,pshared为0,表示信号在当前进程的多个线程之间共享,value表示初始化信号的值。
sem_wait可以用来阻塞当前线程,直到信号量的值大于0,解除阻塞。解除阻塞后,sem的值-1,表示公共资源被执行减少了。
sem_post用于增加信号量的值+1,当有线程阻塞在这个信号量上时,调用这个函数会使其中的一个线程不在阻塞,选择机制由线程的调度策略决定。
sem_destroy用于销毁指定信号量。
注:参考UNIX环境高级编程(第三版)
实现代码:
#include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <string.h> #include <semaphore.h> #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ }while(0); \ #define CONSUMERS_COUNT 1 //消费者 #define PRODUCERS_COUNT 1 //生产者 #define BUFFSIZE 10 //缓冲区大小 int g_buffsize[BUFFSIZE]; //缓冲区 unsigned short in = 0; //存放的位置 unsigned short out = 0; //消费的位置 unsigned short produce_id = 0; //当前正在生产的ID unsigned short consume_id = 0; //当前正在消费的ID sem_t g_sem_full; //满信号量 sem_t g_sem_empty; //空信号量 pthread_mutex_t g_mutex; //互斥锁 pthread_t g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT];//线程ID void* consume(void *arg) { int i; int num = (int)arg; while (1) { //打印等待线程 printf(" %d consume wait \n",num); sem_wait(&g_sem_empty); //等待空的信号量,如果不空则消费产品 pthread_mutex_lock(&g_mutex); //打印仓库当前的状态 for (i=0; i<BUFFSIZE; i++) { printf("%02d",i); if (g_buffsize[i] == -1) //无产品打印空 printf("%s","null"); else printf("%d",g_buffsize[i]); //打印产品 if (i == out) printf("\t<--consume"); //消费产品的位置 printf("\n"); } //获取要消费的位置 consume_id = g_buffsize[out]; //开始消费产品 printf("begin consume product %d\n",consume_id); g_buffsize[out] = -1; out = (out+1)%BUFFSIZE; //消费的位置加1 printf("end consume product %d\n",consume_id); pthread_mutex_unlock(&g_mutex); sem_post(&g_sem_full); sleep(5); } return NULL; } void* produce(void *arg) { int i; int num = (int)arg; while (1) { //打印等待线程 printf(" %d produce wait \n",num); sem_wait(&g_sem_full);//等待满的信号量,如果不满则生产产品 pthread_mutex_lock(&g_mutex); //打印仓库当前的状态 for (i=0; i<BUFFSIZE; i++) { printf("%02d",i); if (g_buffsize[i] == -1) //无产品打印空 printf("%s","null"); else printf("%d",g_buffsize[i]); //打印产品 if (i == in) printf("\t<--produce"); //生产产品的位置 printf("\n"); } //开始生产产品 printf("begin produce product %d\n",produce_id); g_buffsize[in] = produce_id; in = (in+1)%BUFFSIZE; printf("end produce product %d\n",produce_id); produce_id++; pthread_mutex_unlock(&g_mutex); sem_post(&g_sem_empty); sleep(1); } return NULL; } int main(void) { int i; for (i=0; i<BUFFSIZE; i++) { g_buffsize[i] = -1; //-1表示无产品 } sem_init(&g_sem_full, 0, BUFFSIZE);//0表示只用于当前进程中的线程通信 sem_init(&g_sem_empty, 0, 0); pthread_mutex_init(&g_mutex, NULL);//初始化互斥锁 //创建消费者线程 for (i=0; i<CONSUMERS_COUNT; i++) { pthread_create(&g_thread[i], NULL, consume, (void*)i); } //创建生产者线程 for (i=0; i<PRODUCERS_COUNT; i++) { pthread_create(&g_thread[i+CONSUMERS_COUNT], NULL, produce, (void*)i); } //等待线程结束 for (i=0; i<CONSUMERS_COUNT+PRODUCERS_COUNT; i++) { pthread_join(g_thread[i], NULL); } sem_destroy(&g_sem_full); sem_destroy(&g_sem_empty); pthread_mutex_destroy(&g_mutex); return 0; }
运行截图: