为了充分利用多核的优势,程序一般会开启多个进程或者线程去处理任务。进程是资源分配的基本单位,而线程是执行和调度的基本单位。因此多线可以充分利用多核,而且因为共享资源,线程间数据共享便于程序开发,而进程间通讯就没有那么方便。
但是由于多个线程共享同一进程的资源,如果其中一个线程现了段错误或者其它的致命错误,会导致整个程序退出,而多进程程序,由于进程的资源是独立的,如果某一个程序出现错误也只会影响一个进程。
多线程中由于线程间共享资源,对于公共资源的排他性,同一时刻只能有一个线程访问资源,所以为了次序访问资源,需要锁的帮助。影响服务器的四大杀手分别是:1、数据拷贝 2、环境切换 3、内存分配 4、锁竞争。所以在多线程变成中需要注意合理用锁。
多线程编程为了提高程序执行效率,但是每当有任务是就创建一个线程去处理任务,处理完成后再销毁线程,这样显然会不太方便同时也会影响效率。我们可以创建一组线程,统一管理,如同一个池子,里面有多个线程。有任务到来,就放到池子中,由池子中的线程处理而不会销毁线程,这个便是线程池。线程池其实生产者与消费者模型类似。生产者产生任务,将任务丢到任务队列中,线程池中的线程作为消费者,从任务队列中,取出任务执行。
为了减少锁竞争,可以为每一个消费者配一个任务队列,根据每个消费者(线程)的任务数,将任务投递给任务数最少的线程
下面是针对每线程每队列方式实现的线程池
NoneCopyCtorAssign.h文件
#ifndef NONE_COPY_ABLE_H_
#define NONE_COPY_ABLE_H_
class NoneCopyCtorAssign
{
protected:
NoneCopyCtorAssign() {}
~NoneCopyCtorAssign() {}
private:
NoneCopyCtorAssign(const NoneCopyCtorAssign&);
NoneCopyCtorAssign& operator=(const NoneCopyCtorAssign&);
};
#endif
Mutex.h文件
#ifndef __BASE_MUTEX_H_
#define __BASE_MUTEX_H_
#include <pthread.h>
#include "NoneCopyCtorAssign.h"
class Mutex : private NoneCopyCtorAssign
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, NULL);
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
void lock()
{
pthread_mutex_lock(&_mutex);
}
void unlock()
{
pthread_mutex_unlock(&_mutex);
}
pthread_mutex_t *getMutex()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class MutexGuard : NoneCopyCtorAssign
{
public:
explicit MutexGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.lock();
}
~MutexGuard()
{
_mutex.unlock();
}
private:
Mutex &_mutex;
};
#endif // __BASE_MUTEX_H_
Condition.h文件
#ifndef _BASE_CONDITION_H_
#define _BASE_CONDITION_H_
#include <pthread.h>
#include <time.h>
#include "Mutex.h"
#include "NoneCopyCtorAssign.h"
class Condition : NoneCopyCtorAssign
{
public:
explicit Condition(Mutex &mutex)
: _mutex(mutex)
{
::pthread_cond_init(&_cond, NULL);
}
~Condition()
{
::pthread_cond_destroy(&_cond);
}
bool wait(unsigned int millssecond = 0)
{
int ret = 0;
if (0 == millssecond)
{
ret = ::pthread_cond_wait(&_cond, _mutex.getMutex());
}
else
{// returns true if time out
struct timespec ts = {0, 0};
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (int)(millssecond / 1000);
ts.tv_nsec += (millssecond % 1000) * 1000000;
if (ts.tv_nsec > 1000000000)
{
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
ret = ::pthread_cond_timedwait(&_cond, _mutex.getMutex(), &ts);
}
return 0 == ret;
}
bool notify()
{
return (0 == ::pthread_cond_signal(&_cond));
}
bool notifyAll()
{
return (0 == ::pthread_cond_broadcast(&_cond));
}
private:
Mutex& _mutex;
pthread_cond_t _cond;
};
#endif // _BASE_CONDITION_H_
Thread.h文件
#ifndef _BASE_THREAD_H_
#define _BASE_THREAD_H_
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <string>
#include "Mutex.h"
#include "Condition.h"
class Thread : private NoneCopyCtorAssign
{
public:
explicit Thread(const std::string &name);
virtual ~Thread() {};
static void* threadFunc(void *arg);
bool start();
void join();
inline const bool isAlive() const { return _alive; }
void final() { _alive = false; }
const std::string &getThreadName() const { return threadName; }
const pthread_t getThreadId() const { return _thread; }
virtual void run() = 0;
private:
std::string threadName; /**< 线程名称 */
volatile bool _alive; /**< 线程是否在运行 */
pthread_t _thread; /**< 线程编号 */
Mutex _mutex; /**< 互斥锁 */
Condition _cond; /**< 条件变量 */
};
#endif // _BASE_THREAD_H_
Thread.cpp文件
#include <signal.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include "Thread.h"
/**
* \brief 线程函数
*
* 在函数体里面会调用线程类对象实现的回调函数
*
* \param arg 传入线程的参数
* \return 返回线程结束信息
*/
void* Thread::threadFunc(void *arg)
{
Thread *t = (Thread*)arg;
t->_mutex.lock();
t->_alive = true;
t->_cond.notify();
t->_mutex.unlock();
//运行线程的主回调函数
t->run();
//运行结束
t->_mutex.lock();
t->_alive = false;
t->_cond.notify();
t->_mutex.unlock();
return 0;
}
Thread::Thread(const std::string &name)
: threadName(name),
_alive(false),
_thread(0),
_mutex(),
_cond(_mutex)
{};
bool Thread::start(){
//线程已经创建运行,直接返回
if (_alive)
{
return true;
}
//设置线程分离
pthread_attr_t attr;
::pthread_attr_init(&attr);
::pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//开启线程
if (0 != ::pthread_create(&_thread, NULL, Thread::threadFunc, this)) {
fprintf(stderr, "pthread_create() failed, reason: %s\n", strerror(errno));
return false;
}
::pthread_attr_destroy(&attr);
_mutex.lock();
while(!_alive)
_cond.wait();
_mutex.unlock();
return true;
}
/**
* \brief 等待一个线程结束
*
*/
void Thread::join(){
if (0 != _thread)
{
_thread = 0;
_mutex.lock();
while(_alive)
_cond.wait();
_mutex.unlock();
}
}
ThreadPool.h文件
#ifndef THREAD_POOL_H_H_
#define THREAD_POOL_H_H_
#include <vector>
#include "Thread.h"
struct stTask {
unsigned int id;
void *context;
void(*handler)(void *data);
};
class taskThread;
class ThreadPool
{
public:
ThreadPool(unsigned short _maxThread = 8);
~ThreadPool();
bool init();
void pushTask(stTask* t);
void exitThreadPool();//等任务处理完再终止
private:
unsigned short threadCount;
std::vector<taskThread*> threadGroup;
};
#endif // !THREAD_POOL_H_H_
ThreadPool.cpp文件
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <iostream>
#include <sstream>
#include <assert.h>
#include <queue>
#include "ThreadPool.h"
class taskThread : public Thread
{
public:
taskThread(ThreadPool* p, const std::string &name = std::string("taskThread")) :
Thread(name), pool(p), taskCount(0), mutex(), cond(mutex)
{}
virtual void run();
void pushTask(stTask* t);
int getTaskCount() const { return taskCount; }
private:
ThreadPool* pool;
std::queue<stTask* > taskQueue;
int taskCount;
Mutex mutex; /**< 互斥锁 */
Condition cond; /**< 条件变量 */
};
void taskThread::run()
{
std::cout << "thread run " << ::pthread_self() << std::endl;
while (isAlive())
{
mutex.lock();
while (taskCount <= 0)
{
std::cout << "thread wait:" << ::pthread_self() << std::endl;
cond.wait();
}
stTask* t = taskQueue.front();
taskQueue.pop();
taskCount--;
mutex.unlock();
std::cout << "task id:"<< t->id << std::endl;
t->handler(t->context);
delete t;
}
}
void taskThread::pushTask(stTask* t)
{
std::cout << "thread push " << ::pthread_self() << "size:"<< taskCount <<std::endl;
MutexGuard lock(mutex);
taskQueue.push(t);
if (taskCount <= 0){
cond.notify();
}
taskCount++;
}
static void exitHandler(void *data)
{
std::cout << "exitHandler: " << ::pthread_self() << std::endl;
Thread *t = (Thread*)data;
t->final();
}
ThreadPool::ThreadPool(unsigned short _maxThread):threadCount(_maxThread)
{}
ThreadPool::~ThreadPool()
{
for (std::vector<taskThread*>::iterator it = threadGroup.begin(); it != threadGroup.end(); ++it){
taskThread* pThread = *it;
if (!pThread)
continue;
if (pThread->isAlive())
{
pThread->final();
pThread->join();
}
delete pThread;
}
}
bool ThreadPool::init()
{
for (size_t i = 0; i < threadCount; i++){
std::ostringstream name;
name << "taskThread[" << i << "]";
taskThread *pEventThread = new(std::nothrow) taskThread(this, name.str());
assert(NULL != pEventThread);
if (!pEventThread->start())
return false;
threadGroup.push_back(pEventThread);
}
return true;
}
void ThreadPool::pushTask(stTask* t)
{
std::cout << "pushTask: " << (long)t->context << std::endl;
taskThread * pushThread = NULL;
for (size_t i = 0; i < threadGroup.size(); i++){
if (NULL == pushThread ||
pushThread->getTaskCount() > threadGroup[i]->getTaskCount())
{
pushThread = threadGroup[i];
}
}
if (pushThread){
pushThread->pushTask(t);
}
}
void ThreadPool::exitThreadPool()
{
for (size_t i = 0; i < threadGroup.size(); i++) {
stTask* t = new stTask;
memset(t, 0, sizeof(stTask));
t->handler = exitHandler;
t->context = (void*)threadGroup[i];
threadGroup[i]->pushTask(t);
}
for (size_t i = 0; i < threadGroup.size(); i++) {
threadGroup[i]->join();
}
}
main.cpp
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <iostream>
#include <queue>
#include "ThreadPool.h"
using namespace std;
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
static void testThreadPool(void *data)
{
cout << "testThreadPool: " << (long)data << "threadid: " << pthread_self() << endl;
usleep(500 * 1000);
}
int main()
{
ThreadPool pool;
pool.init();
for (int i = 0; i < 50; i++)
{
stTask* t = new stTask();
memset(t, 0, sizeof(stTask));
t->handler = testThreadPool;
t->context = (void *)i;
pool.pushTask(t);
}
pool.exitThreadPool();
return 0;
}
这是一个简单的线程池的实现,对于stTask任务的封装很粗糙,但是通用。在开发中,我更想,封装成一个事件,如下:
struct stEvent
{
u16 eventTyp;
stEvent(eEventType _etype) :eventTyp(_etype)
{}
virtual ~stEvent() {}
};
然后我们根据自己的需求写自己的事件例如:
enum eEventType
{
eEventType_base = 0,
eEventType_Login = 1,
eEventType_Max
};
struct stLogin : public stEvent
{
stLogin() :stEvent(eEventType_Login) {}
int passward;
int size;
char data[0];
};
然后定义函数指针
typedef void(*evenHandler)(stEvent* e);
根据需求注册一组函数保存在线程池的成员变量 evenHandler handlers[eEventType_Max]中,根据事件eEventType觉得调用那个回调函数。这就是一个简单的事件驱动的线程池实现,与上一篇文章中的reactor模式结合便是简单的网络服务器了。