版权声明:guojawee https://blog.csdn.net/weixin_36750623/article/details/84315341
BlockingQueue.h 无界队列
代码功能:使用条件变量+互斥锁,实现无界队列(即生产者消费者模型),保证对临界资源(队列)的访问是安全的
BlockingQueue类的源代码
template<typename T>
class BlockingQueue : noncopyable
{
private:
mutable MutexLock mutex_; //互斥锁,保护共享的队列queue_
Condition notEmpty_ GUARDED_BY(mutex_); //条件变量
std::deque<T> queue_ GUARDED_BY(mutex_); //无界缓冲区队列queue_
public:
BlockingQueue(): mutex_(),notEmpty_(mutex_),queue_() {}
void put(const T& x)
{
MutexLockGuard lock(mutex_); //修改队列前,先加锁
queue_.push_back(x); //向队列中添加元素
notEmpty_.notify(); //通知当前队列不为空,take()方法不阻塞
}
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
T take() //取出对头元素
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty()) //如果队列是空
{
notEmpty_.wait(); //则一直等待,直到队列不为空
}
assert(!queue_.empty()); //再次判断队列不为空
T front(std::move(queue_.front())); //取出队列中的头元素,赋值给front变量
queue_.pop_front(); //弹出front
return std::move(front); //返回值为取出的对头元素front
}
size_t size() const //返回队列的大小
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
};
示例代码:BlockingQueue_test.cc
#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>
#include <memory>
#include <string>
#include <vector>
#include <stdio.h>
#include <unistd.h>
class Test
{
public:
Test(int numThreads): latch_(numThreads)
{
for (int i = 0; i < numThreads; ++i)
{
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
//创建线程,绑定threadFunc线程函数
threads_.emplace_back(new muduo::Thread(std::bind(&Test::threadFunc, this), muduo::string(name)));
}
for (auto& thr : threads_)
{
thr->start(); //让线程函数threadFunc跑起来
}
}
void run(int times) //只有主线程生产产品,主线程是生产者
{
printf("waiting for count down latch\n");
//1.主线程将一直阻塞在count_>0条件上,直到count_!>0
//2.每个子线程启动后,都会调用countDown()函数将count_--
latch_.wait();
//3.当count减为0时,wait被唤醒,继续执行下面的代码
printf("all threads started\n");
for (int i = 0; i < times; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "hello %d", i);
queue_.put(buf); //主线程向queue_中放元素
printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
}
}
void joinAll()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
queue_.put("stop");
}
for (auto& thr : threads_)
{
thr->join();
}
}
private:
void threadFunc()
{
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
latch_.countDown(); //每个线程都对latch_.count_--
bool running = true;
while (running)
{
//queue_.take():使用条件变量控制队列,当队列为空时,一直阻塞
std::string d(queue_.take());
printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
running = (d != "stop");
}
printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
}
muduo::BlockingQueue<std::string> queue_;
muduo::CountDownLatch latch_;
std::vector<std::unique_ptr<muduo::Thread>> threads_; // 线程容器
};
void testMove()
{
muduo::BlockingQueue<std::unique_ptr<int>> queue;
queue.put(std::unique_ptr<int>(new int(42)));
std::unique_ptr<int> x = queue.take();
printf("took %d\n", *x);
*x = 123;
queue.put(std::move(x));
std::unique_ptr<int> y = queue.take();
printf("took %d\n", *y);
}
int main()
{
Test t(5);
t.run(100);
t.joinAll();
testMove();
printf("number of created threads %d\n", muduo::Thread::numCreated());
}
BoundedBlockingQueue 有界环形队列
有界缓冲区:与无界缓冲区相比,多了一个条件变量notFull成员,并且使用boost库的环形缓冲区。
template<typename T>
class BoundedBlockingQueue : noncopyable
{
private:
mutable MutexLock mutex_;
Condition notEmpty_ ;
Condition notFull_ ;
boost::circular_buffer<T> queue_ ; // 环形队列
public:
explicit BoundedBlockingQueue(int maxSize) //maxSize环形队列容量
: mutex_(),notEmpty_(mutex_),notFull_(mutex_),queue_(maxSize)
{}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
notFull_.notify();
return front;
}
bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}
size_t size() const //有效元素个数
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
size_t capacity() const //队列容量
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}
};