理解条件变量
条件变量可以用来管理thread间的通信。一个线程可以等待在一个条件变量上,直到发生某个事件。
考虑一个场景,一个线程访问一个队列时,发现队列为空,他只能等待 直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。(当然,用低效的
轮询也可以,不停的去判断队列中是否有节点)
C++11 标准库提供的 condition_variable 概览
condition_variable cv{}; //默认构造函数
cv.~condition_variable(); //析构函数
----------
cv.notify_one(); //随机通知一个等待该条件变量的线程
cv.notify_all(); //通知全部等待该条件变量的线程
----------
cv.wait(lck); //等待 直到 被唤醒 (存在为唤醒 )
cv.wait(lck,pred); // 相当于 while(!pared())wait(lck)
x=cv.wait_util(lck,tp); //等待 直到 被唤醒或者到达某个时间点
//返回值 x 如果超时 则 x==timeout,否者x==cv_status::no_timeou
b=cv.wait_until(lck,tp,pred) // 等价于: while(!pared())
// { if(wait_until(lck,tp)==cv_sattus::timeout);}b=pred()
x=cv.wait_for(lck,d); //等待 直到被唤醒或者超时 @d 超时时间
b=cv.wait_for(lck,d,pred); //等价于 b=cv.wait_until(lck,steady_clock::now()+d,std::move(pre));
剖析等待操作
cv.wait(lck)
需要与一个互斥锁搭配使用
代码通常如下
//伪代码 ... condition cond; //条件变量 mutex mu; //互斥锁 /* 进入临界区 */ mu.lock(); while(v.is_empty()) { cond.wait(&mu); } ...do something .. mu.unlock(); /* 退出临界区 */
在调用wait时:
首先该线程需要获取到保护v
的锁 进入临界取,因为v
应该是多个线程可以访问的。
在wait函数执行如下操作:
1. wait函数内部首先 mu.unlock() 释放锁 ;
2. 然后进入等待;
3. 如果被唤醒,则调用mu.lock() 再次获取锁。
为什么wait函数要在正真开始等待之前释放锁?
因为 :
1. 变量v
需要其他线程 需要获取锁 然后给v
添加元素,以便可以更改v.is_empty()
这个条件;
2. 让其他线程也有机会进入临界区 等待相同的条件
生产者-消费者 示例代码
//sync_queue
#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <system_error>
#include <iostream>
template<typename T>
class Sync_queue
{
public:
Sync_queue();
~Sync_queue();
void put(const T& val);
void put(T&& val);
void get(T& val);
private:
std::condition_variable m_cond;
std::mutex m_mutex;
std::list<T> m_q;
};
template<typename T>
Sync_queue<T>::~Sync_queue()
{
}
template<typename T>
void Sync_queue<T>::get(T& val)
{
std::unique_lock<std::mutex> lck(m_mutex);
while (m_q.empty()) //使用while 而不用 if 是为了防止虚假唤醒
{
if (m_cond.wait_for(lck, std::chrono::milliseconds{ 2000 }) == std::cv_status::timeout)
{
break;
}
}
if (m_q.empty())
{
throw "超时";
}
else
{
val = m_q.front();
m_q.pop_front();
std::cout << "取出一个元素\n";
}
}
template<typename T>
Sync_queue<T>::Sync_queue()
{
}
template<typename T>
void Sync_queue<T>::put(T&& val)
{
std::lock_guard<std::mutex> lck(m_mutex);
m_q.push_back(std::forward<T>(val));
m_cond.notify_one();
}
template<typename T>
void Sync_queue<T>::put(const T& val)
{
{
std::lock_guard<std::mutex> lck(m_mutex);
m_q.push_back(val);
}
m_cond.notify_one(); //通知时可以不需要mutex的保护
}
//main.cpp
#include "Sync_queue.h"
Sync_queue<int> g_queue;
void produce()
{
static int i = 0;
while (true)
{
g_queue.put(++i);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void consumer()
{
while (true)
{
try
{
int val;
g_queue.get(val);
std::cout << "获取 :" << val << std::endl;
}
catch (const char * e)
{
std::cout << e << std::endl;
}
}
}
int main()
{
std::thread t_c{ consumer };
std::thread t_p{ produce };
t_c.join();
t_p.join();
return 0;
}
// 在多个消费者的情况 put 函数还可以进行一些改进
// if(m_q.size()>n)
// {m_cond.notify_all();}
// else
// {m_cond.notify_one();}
总结:
主要是要理解 wait 内部做了什么,以及理解给wait传进去的互斥锁;