深入应用C++11 笔记---异步操作 (九)

深入应用C++11 笔记—异步操作 (九)

异步操作

C++11 提供了异步操作相关的类:
* std::future 作为异步结果的传输通道,用于获取线程函数的的返回值;
* std::promise用于包装一个值,将数据和future绑定起来,方便线程赋值;
* std::package_task将函数和future绑定起来,以便异步调用。

1.1 获取线程函数返回值的类std::future

​ thread库提供了future用来访问异步操作的结果,因为一个异步操作的结果不能马上获取,只能在未来某个时候从某个地方获取,这个异步操作的结果是一个未来的期待值,所以被称为future,future提供了获取异步操作结果的通道。可以以同步等待的方式获取结果,可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有如下3种状态:

  • Deferred:异步操作还没开始
  • Ready:异步操作已经完成
  • Timeout:异步操作超时

我们可以查询future状态,通过它内部的状态可以知道异步任务的执行情况:

//查询future的状态
std::future_status status;
do{
    status=future.wait_for(std::chrono::seconds(1));
    if(status==std::future_status::deferred){}
    else if(status==std::future_status::timeout){}
    else if(status==std::future_status::ready){}
}while(status!=std::future_status::ready);

获取future结果有三种方式:

  • get: 等待异步操作结束并返回结果
  • wait:只是等待异步操作完成,没有返回值
  • wait_for:是超时等待返回结果

1.2 协助线程赋值的类 std::promise

​ std::promise将数据和future绑定起来,在线程函数中为外面传进来的promise赋值,在线程函数执行完之后就可以通过promis的future获取该值了。取值是间接地通过promise内部提供的future来获取的。

std::promise<int> pr;
std::thread t([](std::promise<int> &p){p.set_value_at_thread_exit(9);},std::ref(pr));
std::future<int> f=pr.get_future();
auto f=f.get();

1.3 可调用对象的包装类std::packaged_task

​ std::packaged_task包装了一个可调用对象的包装类(如function、lambda expression、bind expression和another function object),将函数和future绑定起来,以便异步调用,它和std::promise在某种程度上有点像,promise保存了一个共享状态的值,而packaged_task保存的是一个函数。

std::packaged_task<int()> task([](){return 7;});
std::thread t1(std::ref(task));
std::future<int> f1=task.get_future();
auto r1=f1.get();

1.4 std::promise、std::packaged_task和std::future三者之间的关系

​ std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的,属于低层次的对象。std::promise和std::packaged_task,它们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,而std::promise包装的是一个值,都是为了方便异步操作的返回值。

  • std::promise:需要获取线程中的某个值

  • std::packaged_task: 需要获取一个异步操作的返回值

    ​ future被promise和packaged_task用来作为异步操作或者异步结果的连接通道,用std::future和std::shared_future来获取异步调用的结果。future是不可拷贝的,只能移动,shared_future是可以拷贝的,当需要将future放到容器中则需要用shared_future。

  //packaged_task::get_future
  #include <iostream>
  #include <utility>
  #include <future>
  #include <thread>
  #include <vector>
  #include <algorithm>
  #include <cassert>
  #include <random>

  namespace parallel
  {
    template <class InputIt, class T>
    InputIt find(InputIt first, InputIt last, const T& value)
    {
        /*
        * 计算合适的线程数
        * std::thread::hardware_concurrency()用于返回当前系统支持的并发数
        */
        auto count = std::distance(first, last);
        auto avaThreadNums = std::thread::hardware_concurrency();
        auto perThreadMinNums = 20;
        auto maxThreadNums = ((count + (perThreadMinNums - 1)) & (~(perThreadMinNums - 1))) / perThreadMinNums;
        auto threadNums =
            avaThreadNums == 0 ?
            maxThreadNums :
            std::min(static_cast<int>(maxThreadNums), static_cast<int>(avaThreadNums));
        auto blockSize = count / threadNums;

        /* 主线程创建std::promise实例,模板参数是返回值类型 */
        std::promise<InputIt> result;
        /* 因为不同线程会并发查找,当一个线程找到后其他线程就可以停止查找了,原子变量done用于标记是否找到 */
        std::atomic<bool> done(false);
        {
            std::vector<std::thread> threads;
            auto front = first;
            for (int i = 0; i < threadNums; ++i)
            {
                auto back = front;
                if (i != threadNums - 1)
                    std::advance(back, blockSize);
                else
                    back = last;
                threads.emplace_back(
                    [front, back, &value, &result, &done]
                {
                    /* 当一个线程找到后所有线程都会退出,通过done标记管理 */
                    for (auto it = front; !done && it != back; ++it)
                    {
                        if (*it == value)
                        {
                            done.store(true);
                            /* 如果找到,记录找到的值 */
                            result.set_value(it);
                            return;
                        }
                    }
                }
                );
            }
            /* 回收线程资源 */
            for (auto &th : threads)
                th.join();
        }
        /* 通过std::promise::get_future获得std::future对象,然后调用get获取结果 */
        return done ? result.get_future().get() : last;
    }
  }

  int main()
  {
    std::vector<int> v(100000000);
    int n = 0;
    std::generate(v.begin(), v.end(), [&n] { return ++n; });
    auto value = std::random_device()() % 65536;
    auto it1 = parallel::find(v.begin(), v.end(), value);
    auto it2 = std::find(v.begin(), v.end(), value);
    assert(it1 == it2);
    return 0;
  }

声明:以上主要来源于深入应用C++11 这本书,强烈推荐

猜你喜欢

转载自blog.csdn.net/zt_xcyk/article/details/80931416