线程池在一个C++项目中是必不可少的。去看任何一个C++开发框架,绝大部分都会实现一个线程池。而如今C++11已经成熟,借助C++标准库中的线程库std::thread
,以及标准库提供的多线程同步神器std::condition_variable
(这个已经被我封装成了Event
,详情见C++中的事件Event(基于条件变量的封装))和std::future
,互斥变量以及锁std::mutex
(这个我也封装了下,详情见C++互斥变量对象的包装(基于C++的特性)),可变参数模板,等等这些工具可以很方便实现跨平台的线程池。其中有很多C++11的细节,或许不太熟悉,但是多看看,用用,就会了。基本上是语法糖。
首先,不说跨平台,因为跨平台就是个噱头,你用无关平台的API,即标准库就可以实现跨平台(不然你就要写一堆#if - #else
)就让你实现一个线程池,你会怎么做?依据“如何把大象装进冰箱”的套路。下面是我给出的步骤
- 准备好一些线程
- 向线程池提交任务
- 线程去执行任务
从上面的三个步骤可以看出来,实现一个线程池需要容器管理线程,这里选std::vector
;需要一个容器来管理任务,这里选std::queue
,这里借鉴消息队列的原理;这里其实就是生产者消费者模型,既然是这种模型,那么在C++互斥变量对象的包装(基于C++的特性)中有提到,这种模型需要需要考虑同步与互斥(线程总会有空闲的时候,如果有任务来了,要通知线程去执行任务;同时,多个线程会同时访问唯一的任务队列,因此还要考虑互斥访问,各种锁),因此,还需要一个同步变量管理同步关系,这里选用Event
。
有个上面三个核心的东西,实现线程池的原料就齐了。接下来就是处理搞一些线程出来,让它们去任务队列中领任务做,没有任务就去休息。
到这里,线程池其实已经实现了,绝对能跑起来,只不过这个线程池太简单了,不能满足项目的需求。
现在我还想要对线程池提出一点新的需求,比如下面的:
- 线程池中的线程中的按需创建,不需要在一开始就把线程准备好
- 线程池中的线程可扩展,也就是说分为核心线程和扩展线程,核心线程一旦创建出来不会主动结束,而扩展线程在空闲一段时间后自动结束
- 提交任务支持任意数量和类型的参数
- 任务支持等待返回
现在加了四条需求,想想这四条需求该用什么方法去实现。
第一条,提交任务的时候只需要看看已创建的线程数量是否小于指定线程数就行了
第二条,在新建线程的时候给该线程一个标记(标记是否为核心线程)。在线程空闲休眠醒来之时判断下是否为核心线程,不是直接干掉自己就好了
第三条,这个用可变模板参数就好了
第四条,这个就用线程同步神器std::future / std::packaged_task
就行了。
到这一步线程池基本完成了,这样一个线程池可以满足大部分项目的需求了。但是,还有最重要的一步,你的线程池是否存在BUG,有下列BUG需要注意:
- 线程池能否正常退出,并且析构
- 加锁粒度是否合适
- 线程池是否存在死锁
线程池大部分情况是作为全局变量使用,因此在程序运行中根本不需要退出,但是如果线程池作为局部变量使用,是否会正常退出,又或者,是否会有线程池中的线程把线程池干掉,等等,代码写出来容易,实际上还是要考虑方方面面的情况。
如果你把上面的情况都考虑了,那么恭喜你,线程池已经是你的了。那么下面的代码也是水到渠成。这段代码也是经受了几个项目和Windows/Linux平台的考验,参考了几位大牛的Code然后再次优化出来的,因此可以放心使用。
class ThreadPool
{
typedef std::function<void()> Task;
typedef std::queue<Task> TaskQueue;
typedef std::shared_ptr<std::thread> ThreadPtr;
typedef std::vector<ThreadPtr> Pool;
public:
ThreadPool(size_t coreCnt = 1, bool expand = false, size_t maxCnt = std::thread::hardware_concurrency())
: coreCnt(coreCnt)
, expand(coreCnt ? expand : true)
, maxCnt(maxCnt)
, run(true)
{
}
~ThreadPool()
{
Close();
}
void Start()
{
run = true;
event.Reset();
}
void Close()
{
run = false;
event.NotifyAll();
Pool vec;
do {
MUTEXOBJECT_LOCK_GUARD(pool);
vec = pool.Data();
} while (false);
std::for_each(std::begin(vec), std::end(vec), [] (const ThreadPtr & it)
{
if (it->joinable()) {
if (std::this_thread::get_id() == it->get_id())
it->detach();
else
it->join();
}
});
}
template<typename Fun, typename... Args>
std::future< typename std::result_of<Fun(Args...)>::type > Submit(Fun && fun, Args&&... args)
{
if (!run.load())
throw std::runtime_error("ThreadPool has closed!");
typedef typename std::result_of<Fun(Args...)>::type ReturnType;
auto task = std::make_shared< std::packaged_task<ReturnType()> >
(std::bind(std::forward<Fun>(fun), std::forward<Args>(args)...));
do {
MUTEXOBJECT_LOCK_GUARD(taskQueue);
taskQueue->emplace([task] ()
{
(*task)();
});
} while (false);
event.NotifyOne();
if (NeedNewThread())
NewThread();
return task->get_future();
}
private:
bool NeedNewThread()
{
do {
MUTEXOBJECT_LOCK_GUARD(pool);
if (pool->empty())
return true;
if (pool->size() == maxCnt)
return false;
} while (false);
do {
MUTEXOBJECT_LOCK_GUARD(taskQueue);
return taskQueue->size() > 0;
} while (false);
assert(false);
}
void NewThread()
{
MUTEXOBJECT_LOCK_GUARD(pool);
if (pool->size() < coreCnt)
pool->emplace_back(new std::thread(std::bind(&ThreadPool::Dispath, this, true)));
else if (expand)
pool->emplace_back(new std::thread(std::bind(&ThreadPool::Dispath, this, false)));
}
void Dispath(bool core)
{
while (run.load()) {
if (Task task = PickOneTask())
task();
else if (!event.WaitFor(std::chrono::minutes(1)) && !core) {
KillSelf();
break;
}
}
}
void KillSelf()
{
MUTEXOBJECT_LOCK_GUARD(pool);
auto it = std::find_if(std::begin(pool.Data()), std::end(pool.Data()), [] (const ThreadPtr & it)
{
return std::this_thread::get_id() == it->get_id();
});
(*it)->detach();
pool->erase(it);
}
Task PickOneTask()
{
MUTEXOBJECT_LOCK_GUARD(taskQueue);
Task ret = nullptr;
if (!taskQueue->empty()) {
ret = std::move(taskQueue->front());
taskQueue->pop();
}
return ret;
}
private:
MutexObject<Pool> pool;
MutexObject<TaskQueue> taskQueue;
Event event;
size_t coreCnt;
bool expand;
size_t maxCnt;
std::atomic<bool> run;
};
至于《C++并发编程实战中》还有一种做法:为每条线程搞一个本地任务队列,我觉得这种做法对于实际情况没什么作用,因为Submit
这个函数一般情况下不会在线程中的线程中调用,因此本地任务队列大部时间都没有任务。
代码中的C++11的特性在这里就不多解释了,了解一下不懂的地方就自然知道为什么这么写了。有异议或者有疑问的都可以在评论区留言,大家一起前进。
使用Demo:
void fun(int n)
{
std::cout << "n : " << n << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
void work()
{
static const int LEN = 100;
ThreadPool pool(4, true, 10);
int n = 5;
pool.Submit(fun, n);
std::cout << "n : " << n << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.Submit(fun, 1);
pool.Close();
pool.Start();
std::future<void> fu[LEN];
for (int i = 0; i < LEN; i++)
fu[i] = pool.Submit(fun, i);
for (int i = 0; i < LEN; fu[i++].wait()) {}
pool.Close();
}
int main()
{
work();
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
也可以用这个程序测试一下程序是否正确,打点断点,看看线程池中的线程数量的变化。
参考: