C++11提供了thread,但是过于复杂,我们还是倾向于在项目中编写自己的Thread。
Posix Thread的使用这里不再赘述。
重点是这个函数:
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
第三个参数是一个回调函数,该函数必须返回值为void*,而且只有一个参数,类型也是void*。
POSIX的thread默认是joinable,需要手工调用pthread_join函数来回收,也可以调用pthread_detach将其变为detachable,此时不需要手工回收线程。
通过虚函数封装
下面介绍Thread的封装。
我们把Thread的声明先放在这里:
#ifndef THREAD_H_
#define THREAD_H_
#include "NonCopyable.h"
#include <pthread.h>
class Thread : NonCopyable
{
public:
Thread();
virtual ~Thread();
void start();
void join();
virtual void run() = 0;
pthread_t getThreadId() const
{ return threadId_; }
private:
//提供给pthread_create的第三个参数使用
static void *runInThread(void *arg);
pthread_t threadId_;
//pid_t tid_; //进程标示
bool isRunning_;
};
#endif //THREAD_H_
这里需要说明的是:
- 首先,为了获得最干净的语义,Thread应该是不可复制的,所以需要继承NonCopyable。
- 其次,为了调用pthread_create创建线程,我们往里面注册的不能是一个成员函数,因为成员函数含有一个隐式参数,导致函数的指针类型并不是void *(*start_routine) (void *),所以我们采用了static函数。
- static函数无法访问某一对象的成员,所以我们在调用pthread_create时,将this指针作为回调函数的参数。
这里相关代码如下:
//static
void *Thread::runInThread(void *arg)
{
Thread *pt = static_cast<Thread*>(arg);
//pt->tid_ = syscall(SYS_gettid);
pt->run();
return NULL;
}
用户将自己的逻辑注册在run中就可以了。
这个Thread不提供detach函数,因为我们在析构函数中做了如下的处理,如果Thread对象析构,线程还在运行,那么需要将Thread设置为detach状态。
Thread::~Thread()
{
if(isRunning_)
{
pthread_detach(threadId_);
}
}
大部分逻辑都是固定的,用户只需要改变run里面的代码即可,于是我们将run设置为纯虚函数,让用户继承Thread类。所以析构函数为virtual。
完整的CPP实现如下:
#include "Thread.h"
#include <assert.h>
#include <unistd.h>
#include "MutexLock.h" //TINY_CHECK
Thread::Thread()
:threadId_(0),
isRunning_(false)
{
}
Thread::~Thread()
{
if(isRunning_)
{
TINY_CHECK(!pthread_detach(threadId_));
}
}
//static
void *Thread::runInThread(void *arg)
{
Thread *pt = static_cast<Thread*>(arg);
//pt->tid_ = syscall(SYS_gettid);
pt->run();
return NULL;
}
void Thread::start()
{
TINY_CHECK(!pthread_create(&threadId_, NULL, Thread::runInThread, this));
isRunning_ = true;
}
void Thread::join()
{
assert(isRunning_);
TINY_CHECK(!pthread_join(threadId_, NULL));
isRunning_ = false;
}
测试代码如下:采用继承的方式使用这个类。
#include "Thread.h"
#include <iostream>
#include <unistd.h>
using namespace std;
class MyThread : public Thread
{
public:
void run()
{
cout << "foo" << endl;
}
};
int main(int argc, char const *argv[])
{
MyThread t;
t.start();
t.join();
return 0;
}
NonCopyable类的定义如下:
#ifndef NONCOPYABLE_H
#define NONCOPYABLE_H
class NonCopyable //禁用值语义
{
public:
NonCopyable() { }
~NonCopyable() { }
private:
NonCopyable(const NonCopyable &);
void operator=(const NonCopyable &);
};
#endif //NONCOPYABLE_H
通过函数对象借助function/bind封装
上面采用的是虚函数+继承的方式,用户通过重写Thread基类的run方法,传入自己的用户逻辑。
现在我们采用C++11的function,将函数作为Thread类的成员,用户只需要将function对象传入线程即可,所以Thread的声明中,应该含有一个function成员变量。\
类的声明如下:
#ifndef THREAD_H_
#define THREAD_H_
#include <boost/noncopyable.hpp>
#include <functional>
#include <pthread.h>
class Thread : boost::noncopyable
{
public:
typedef std::function<void ()> ThreadCallback;
Thread(ThreadCallback callback);
~Thread();
void start();
void join();
static void *runInThread(void *);
private:
pthread_t threadId_;
bool isRunning_;
ThreadCallback callback_; //回调函数
};
#endif //THREAD_H_
那么如何开启线程?思路与之前一致,写一个static函数,用户pthread_create的第三个参数,this作为最后一个参数即可。
void Thread::start()
{
pthread_create(&threadId_, NULL, runInThread, this);
isRunning_ = true;
}
回调函数
注意在这种封装方式中,我们采用了回调函数。回调函数与普通函数的区别就是,普通函数写完由我们自己直接调用,函数调用是一种不断往上堆积的方式,而回调函数通常是我们把某一个函数传入一个“盒子”,由该盒子内的机制来调用它。
在这个例子里面,我们将function传入Thread,当Thread启动的时候,由Thread去执行function对象。
在win32编程中大量用到这种机制,我们为鼠标单击、双击等事件编写相应的函数,然后将其注册给windows系统,然后系统在我们触发各种事件的时候,根据事件的类型,调用相应的构造函数。
关于回调函数,可以参考:http://www.zhihu.com/question/19801131
以后有时间,再专门总结下回调函数。
完整的cpp如下:
#include "Thread.h"
Thread::Thread(ThreadCallback callback)
: threadId_(0),
isRunning_(false),
callback_(std::move(callback))
{
}
Thread::~Thread()
{
if(isRunning_)
{
//detach
pthread_detach(threadId_);
}
}
void Thread::start()
{
pthread_create(&threadId_, NULL, runInThread, this);
isRunning_ = true;
}
void Thread::join()
{
pthread_join(threadId_, NULL);
isRunning_ = false;
}
void *Thread::runInThread(void *arg)
{
Thread *pt = static_cast<Thread*>(arg);
pt->callback_(); //调用回调函数
return NULL;
}
这个线程的使用方式有三种:
- 一是将普通函数作为回调函数
void foo()
{
while(1)
{
printf("foo\n");
sleep(1);
}
}
int main(int argc, char const *argv[])
{
Thread t(&foo);
t.start();
t.join();
return 0;
}
- 二是采用类的成员函数作为回调函数:
class Foo
{
public:
void foo(int i)
{
while(1)
{
printf("foo %d\n", i++);
sleep(1);
}
}
};
int main(int argc, char const *argv[])
{
Foo f;
int i = 34;
Thread t(bind(&Foo::foo, &f, i));
t.start();
t.join();
return 0;
}
- 最后一种是组合一个新的线程类,注意这里采用的是类的组合:
class Foo
{
public:
Foo()
: thread_(bind(&Foo::foo, this))
{
}
void start()
{
thread_.start();
thread_.join();
}
void foo()
{
while(1)
{
printf("foo\n");
sleep(1);
}
}
private:
Thread thread_;
};
int main(int argc, char const *argv[])
{
Foo f;
f.start();
return 0;
}
有些复杂的类,还需要将三种方式加以整合,例如后面要谈到的TimerThread,里面含有一个Thread和Timer,用户将逻辑注册给Timer,然后Timer的start函数注册给Thread。
这种方式的Thread,使用灵活性相对于面向对象的风格,提高了很多。
基于对象和面向对象
这里总结几点:
面向对象依靠的是虚函数+继承,用户通过重写基类的虚函数,实现自己的逻辑。
基于对象,依赖类的组合,使用function和bind实现委托机制,更加依赖于回调函数传入逻辑。
使用C++11封装线程池ThreadPool
https://www.cnblogs.com/inevermore/p/4038698.html
线程池本质上是一个生产者消费者模型,所以请熟悉这篇文章:Linux组件封装(五)一个生产者消费者问题示例。
在ThreadPool中,物品为计算任务,消费者为pool内的线程,而生产者则是调用线程池的每个函数。
- 搞清了这一点,我们很容易就需要得出,ThreadPool需要一把互斥锁和两个同步变量,实现同步与互斥。
- 存储任务,当然需要一个任务队列。
- 除此之外,我们还需要一系列的Thread,因为Thread无法复制,所以我们使用unique_ptr作为一个中间层。
所以Thread的数据变量如下:
class ThreadPool : boost::noncopyable
{
public:
typedef std::function<void ()> Task;
ThreadPool(size_t queueSize, size_t threadsNum);
ThreadPool::~ThreadPool()
{
if(isStarted_)
stop();
}
void start();
void stop();
void addTask(Task task); //C++11
Task getTask();
bool isStarted() const { return isStarted_; }
void runInThread();
private:
mutable MutexLock mutex_;
Condition empty_;
Condition full_;
size_t queueSize_;
std::queue<Task> queue_;
const size_t threadsNum_;
std::vector<std::unique_ptr<Thread> > threads_;
bool isStarted_;
};
显然,我们使用了function,作为任务队列的任务元素。
构造函数的实现较简单,不过,之前务必注意元素的声明顺序与初始化列表的顺序相一致。
ThreadPool::ThreadPool(size_t queueSize, size_t threadsNum)
: empty_(mutex_),
full_(mutex_),
queueSize_(queueSize),
threadsNum_(threadsNum),
isStarted_(false)
{
}
添加和取走任务是生产者消费者模型最核心的部分,但是套路较为固定,如下:
void ThreadPool::addTask(Task task)
{
MutexLockGuard lock(mutex_);
while(queue_.size() >= queueSize_)
empty_.wait();
queue_.push(std::move(task));
full_.notify();
}
ThreadPool::Task ThreadPool::getTask()
{
MutexLockGuard lock(mutex_);
while(queue_.empty())
full_.wait();
Task task = queue_.front();
queue_.pop();
empty_.notify();
return task;
}
注意我们的addTask使用了C++11的move语义,在传入右值时,可以提高性能。
还有一些老生常谈的问题,例如:
- wait前加锁
- 使用while循环判断wait条件(为什么?)
要想启动线程,需要给Thread提供一个回调函数,编写如下:
void ThreadPool::runInThread()
{
while(1)
{
Task task(getTask());
if(task)
task();
}
}
就是不停的取走任务,然后执行。
OK,有了线程的回调函数,那么我们可以编写start函数。
void ThreadPool::start()
{
isStarted_ = true;
//std::vector<std::unique<Thread> >
for(size_t ix = 0; ix != threadsNum_; ++ix)
{
threads_.push_back(
std::unique_ptr<Thread>(
new Thread(
std::bind(&ThreadPool::runInThread, this))));
}
for(size_t ix = 0; ix != threadsNum_; ++ix)
{
threads_[ix]->start();
}
}
这里我们采用C++11的unique_ptr,成功实现vector无法存储Thread(为什么?)的问题。
我们的第一个版本已经编写完毕了。
添加stop功能
刚才的ThreadPool只能启动,无法stop,我们从几个方面着手,利用bool变量isStarted_,实现正确退出。
改动的有以下几点:
首先是Thread的回调函数不再是一个死循环,而是:
void ThreadPool::runInThread()
{
while(isStarted_)
{
Task task(getTask());
if(task)
task();
}
}
然后addTask和getTask,在while循环判断时,加入了bool变量:
void ThreadPool::addTask(Task task)
{
MutexLockGuard lock(mutex_);
while(queue_.size() >= queueSize_ && isStarted_)
empty_.wait();
if(!isStarted_)
return;
queue_.push(std::move(task));
full_.notify();
}
ThreadPool::Task ThreadPool::getTask()
{
MutexLockGuard lock(mutex_);
while(queue_.empty() && isStarted_)
full_.wait();
if(!isStarted_) //线程池关闭
return Task(); //空任务,存量任务扔了
assert(!queue_.empty());
Task task = queue_.front();
queue_.pop();
empty_.notify();
return task;
}
这里注意,退出while循环后,需要再判断一次bool变量,因为未必是条件满足了,可能是线程池需要退出,调整了isStarted变量。
最后一个关键是我们的stop函数:
void ThreadPool::stop()
{
if(isStarted_ == false)
return;
{
MutexLockGuard lock(mutex_);
isStarted_ = false;
//清空任务
while(!queue_.empty())
queue_.pop();
}
full_.notifyAll(); //激活所有的线程
empty_.notifyAll();//getTask在empty_.notify()前已经退出
for(size_t ix = 0; ix != threadsNum_; ++ix)
{
threads_[ix]->join();
}
threads_.clear();
}
先将bool设置为false,然后调用notifyAll,激活所有等待的线程(为什么)。
最后我们总结下ThreadPool关闭的流程:
- 1.isStarted设置为false
- 2.加锁,清空队列
- 3.发信号激活所有线程
- 4.正在运行的Thread,执行到下一次循环,退出
- 5.正在等待的线程被激活,然后while判断为false,执行到下一句,检查bool值,然后退出。
- 6.主线程依次join每个线程。
- 7.退出。