条件变量 深入理解总结(C++ 11 中的并发库 为例)

理解条件变量

条件变量可以用来管理thread间的通信。一个线程可以等待在一个条件变量上,直到发生某个事件。

考虑一个场景,一个线程访问一个队列时,发现队列为空,他只能等待 直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。(当然,用低效的轮询也可以,不停的去判断队列中是否有节点)

C++11 标准库提供的 condition_variable 概览

condition_variable cv{};  //默认构造函数
cv.~condition_variable();  //析构函数 

----------
cv.notify_one();   //随机通知一个等待该条件变量的线程
cv.notify_all();    //通知全部等待该条件变量的线程   
----------

cv.wait(lck);           //等待  直到  被唤醒  (存在为唤醒 )
cv.wait(lck,pred);      // 相当于 while(!pared())wait(lck)
x=cv.wait_util(lck,tp);   //等待  直到 被唤醒或者到达某个时间点
                        //返回值 x 如果超时 则 x==timeout,否者x==cv_status::no_timeou

b=cv.wait_until(lck,tp,pred) // 等价于: while(!pared())
                                 //     { if(wait_until(lck,tp)==cv_sattus::timeout);}b=pred()

x=cv.wait_for(lck,d);   //等待 直到被唤醒或者超时   @d 超时时间  
b=cv.wait_for(lck,d,pred);  //等价于 b=cv.wait_until(lck,steady_clock::now()+d,std::move(pre)); 

剖析等待操作

cv.wait(lck) 需要与一个互斥锁搭配使用
代码通常如下

//伪代码  
...
condition   cond;    //条件变量  
mutex   mu;           //互斥锁 
/*
   进入临界区
*/
mu.lock();
while(v.is_empty())
{
  cond.wait(&mu);
}
 ...do something  .. 
mu.unlock();
/*
 退出临界区
*/

在调用wait时:
首先该线程需要获取到保护v的锁 进入临界取,因为 v 应该是多个线程可以访问的。
在wait函数执行如下操作:
1. wait函数内部首先 mu.unlock() 释放锁 ;
2. 然后进入等待;
3. 如果被唤醒,则调用mu.lock() 再次获取锁。


为什么wait函数要在正真开始等待之前释放锁?
因为 :
1. 变量 v 需要其他线程 需要获取锁 然后给v添加元素,以便可以更改v.is_empty()这个条件;
2. 让其他线程也有机会进入临界区 等待相同的条件

生产者-消费者 示例代码

//sync_queue 

#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include  <system_error>
#include <iostream>

template<typename T>
class Sync_queue
{
public:
    Sync_queue();
    ~Sync_queue();

    void put(const T& val);
    void put(T&& val);
    void get(T& val);
private:
    std::condition_variable m_cond;
    std::mutex   m_mutex;
    std::list<T> m_q;
};

template<typename T>
Sync_queue<T>::~Sync_queue()
{

}

template<typename T>
void Sync_queue<T>::get(T& val)
{
    std::unique_lock<std::mutex> lck(m_mutex);
    while (m_q.empty())     //使用while 而不用 if  是为了防止虚假唤醒  
    {
        if (m_cond.wait_for(lck, std::chrono::milliseconds{ 2000 }) == std::cv_status::timeout)
        {
            break;
        }
    }
    if (m_q.empty())
    {
        throw "超时";
    }
    else
    {
        val = m_q.front();
        m_q.pop_front();

        std::cout << "取出一个元素\n";
    }
}


template<typename T>
Sync_queue<T>::Sync_queue()
{
}


template<typename T>
void Sync_queue<T>::put(T&& val)
{
    std::lock_guard<std::mutex> lck(m_mutex);
    m_q.push_back(std::forward<T>(val));
    m_cond.notify_one();
}

template<typename T>
void Sync_queue<T>::put(const T& val)
{
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        m_q.push_back(val);
    }
    m_cond.notify_one();   //通知时可以不需要mutex的保护  
}


//main.cpp


#include "Sync_queue.h"

Sync_queue<int> g_queue;


void produce()
{
    static int i = 0;
    while (true)
    {
        g_queue.put(++i);
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

void consumer()
{
    while (true)
    {
        try
        {
            int val;
            g_queue.get(val);
            std::cout << "获取 :" << val << std::endl;
        }
        catch (const char * e)
        {
            std::cout << e << std::endl;
        }
    }
}

int main()
{
    std::thread t_c{ consumer };
    std::thread t_p{ produce };

    t_c.join();
    t_p.join();
    return 0;
}

// 在多个消费者的情况 put  函数还可以进行一些改进 

//  if(m_q.size()>n)
//  {m_cond.notify_all();}
//  else
//  {m_cond.notify_one();}  


总结:
主要是要理解 wait 内部做了什么,以及理解给wait传进去的互斥锁;

猜你喜欢

转载自blog.csdn.net/qq_33775402/article/details/80528135
今日推荐