BlockingQueue是一个继承自Queue的接口,在Queue的队列基础上增加了阻塞操作。简单来说,就是在在BlockingQueue为空时从队头取数据将会被阻塞,因为此时还没有数据可取,一旦队列中有数据了,取数据的线程就会释放得到了数据;如果BlockingQueue有容量限制且满了,那么插入数据的线程将会阻塞,知道队列中有空闲位置可以插入数据了,才会释放。经过上面一段描述,可以发现这就是一个生产者-消费者模型。
下面进行代码示例
头文件BQueue.h
class BQueue
{
public:
BQueue(int worker_count_, int capacity_ = 8);
~BQueue();
bool push(const Message &item);
bool get(Message &item, const uint16_t &time_ms = 1000);
void getNow(Message &item);
int size() { return (tail - head + capacity) % capacity; }
void shutdown();
private:
int capacity;
int head = 0;
int tail = 0;
std::vector<Message> queue;
std::mutex mtx;
std::condition_variable notFull, notEmpty;
std::vector<Worker<BQueue>*> threads; //这个Worker类是一个线程池,稍后说明
};
头文件BQueue.cpp
BQueue::BQueue(int worker_count_, int capacity_)
: capacity(capacity_), queue(std::vector<Message>(capacity))
{
// assert(workerCount_ > 0);
///此处创建了一个线程池
try {
for (int i = 0; i < worker_count_; i++)
threads.push_back(
new Worker<BQueue>(*this));
} catch (std::exception &e) {
LOG(ERROR) << "Thread init failed!";
LOG(ERROR) << e.what();
} catch (...) {
LOG(ERROR) << "Thread init failed!";
}
}
BQueue::~BQueue()
{
shutdown();
// join();
for (auto it : threads) {
if (!it) {
delete it;
it = 0;
}
}
threads.clear();
}
bool BQueue::push(const Message &item)
{
{
std::unique_lock<std::mutex> lck(mtx);
notFull.wait(lck, [&] { return (tail + 1) % capacity != head; });
// if (!notFull.wait_for(lck, std::chrono::milliseconds(1000), [&] { return (tail + 1) % capacity != head; }))
// return false;
// while ((tail + 1) % capacity == head) //is full
// notFull.wait(lck);
queue[tail] = item;
tail = (tail + 1) % capacity;
}
//wake up get thread
notEmpty.notify_one();
return true;
}
bool BQueue::get(Message &msg, const uint16_t &time_ms)
{
{
std::unique_lock<std::mutex> lck(mtx);
if (!notEmpty.wait_for(lck, std::chrono::milliseconds(time_ms), [&] { return head != tail; }))
return false;
msg = queue[head];
head = (head + 1) % capacity;
}
notFull.notify_one();
return true;
}
void BQueue::getNow(Message& msg)
{
{
std::unique_lock<std::mutex> lck(mtx);
notEmpty.wait(lck, [&] { return head != tail; });
// while (head == tail) // is empty
// notEmpty.wait(lck);
msg = queue[head];
head = (head + 1) % capacity;
DEBUGLOG(std::cout << "get..." << std::endl);
}
//wake up push thread
notFull.notify_one();
}
void BQueue::shutdown()
{
std::lock_guard<std::mutex> lock(mtx);
for (auto it : threads) {
if (!it) {
it->shutdown();
}
}
notFull.notify_all();
notEmpty.notify_all();
}
线程池Worker类代码:
template <class T>
class Worker
{
public:
Worker(T & queue_)
: queue(queue_),status(true)
{
thread = new std::thread(&Worker::run, this);
}
~Worker()
{
if (thread) {
delete thread;
thread = nullptr;
}
}
void run()
{
Message message;
while (status) {
if (queue.get(message)) {
handle(message);///此处处理消息
}
}
}
void shutdown()
{
status = false;
join();
}
private:
void join()
{
if (thread->joinable())
thread->join();
}
T& queue;
std::thread* thread;
bool status;
};