直接进入正题,此文将实现一个简易线程池,仅为理解线程池概念,暂不考虑效率问题,如有建议欢迎留言讨论。
首先是类设计,简易线程池嘛,既有简易两个字,设计也尽量简单,主要下面几个类:
一:CallableObject类,封装具体函数,此类可以直接加入任务队列或供WorkThread调用;
二:ThreadPool类,负责所有线程的初始化、调度、回收;
三:WorkThread类,实际工作的线程,它的工作方式就是调用CallableObject类;
四:TaskList类,向线程池指派任务的时候,可能会没有空闲的线程,这时候就需要一个任务队列,并且当一个线程处理完一个工作的时候,应当主动检查任务队列里是否有待完成的任务。
下面是具体实现,涉及到的有c++、stl
define.h:具体的函数可能有参数,也可能没有,所以这里给出了两个版本。为什么需要这么个结构体呢?上面说了,任务并不是一定指派了立即就会被执行,而加入任务队列的时候就需要保存执行函数和参数了。
#include <functional>
//添加任务时需要传递给ThreadPool的参数(带参数版本)
template<typename T>
struct ThreadPoolParamater
{
//工作线程要执行的函数
std::function<void(T)> func;
//传递给该函数的参数
T paramater;
};
//添加任务时需要传递给ThreadPool的参数(不带参数版本)
struct ThreadPoolParamaterWithoutParamater
{
std::function<void()> func;
};
CallableObj.h:函数包装器,FixedFreeLinkList.hpp稍后再说。
#include "defines.h"
#include "FixedFreeLinkList.hpp"
#include <cassert>
#include <functional>
#include <memory>
class WorkThread;
//接口
class CallableObjBase
{
public:
virtual void operator()(void) = 0;
//Node是FixedFreeLinkList链表节点,节点成员为WorkThread,用以和工作线程交互
Node<WorkThread>* node{ nullptr };
};
//带参数版本
template<typename ParamaterStruct>
class CallableObj : public CallableObjBase
{
public:
//规定了函数格式(返回值、参数类型)
CallableObj(std::function<void(std::function<void(ParamaterStruct)>&&, ParamaterStruct, void *)>&& func)
:_func(std::move(func))
{
}
//执行
virtual void operator()(void) override
{
assert(node);
_func(std::move(_tpp.func), _tpp.paramater, node);
}
//传递参数
void passParamater(ThreadPoolParamater<ParamaterStruct>&& tpp) noexcept
{
_tpp = std::move(tpp);
}
//获取参数
ThreadPoolParamater<ParamaterStruct>& getParamaterRefenrence()
{
return _tpp;
}
private:
std::function<void(std::function<void(ParamaterStruct)>&&, ParamaterStruct, void*)> _func;
ThreadPoolParamater<ParamaterStruct> _tpp;
};
//无参数版本
class CallableObjNoParamater : public CallableObjBase
{
public:
CallableObjNoParamater(std::function<void(std::function<void()>&&, void*)>&& func)
:_func(std::move(func))
{
}
virtual void operator()(void) override
{
assert(node);
_func(std::move(_tpp.func), node);
}
void passParamater(ThreadPoolParamaterWithoutParamater&& tpp) noexcept
{
_tpp = std::move(tpp);
}
ThreadPoolParamaterWithoutParamater& getParamaterRefenrence()
{
return _tpp;
}
private:
std::function<void(std::function<void()>&&, void*)> _func;
ThreadPoolParamaterWithoutParamater _tpp;
};
FixedFreeLinkList.hpp:类似自由链表,但是没有增长,所以叫fixed,用来做WorkThread的容器。使用它的原因:我期望线程池内线程数固定(简易,不考虑增减),并且需要快速获取、回收线程,能满足这些的当然第一个想到链表。
#include <new>
#include <mutex>
//链表节点
template<typename T>
struct Node
{
~Node() = default;
Node<T>* next{ nullptr };
T ele;
};
//(固定长度的)自由链表,没有考虑内存对齐
template<typename ElementType>
class FixedFreeLinklist
{
public:
//构造时传入固定节点数
FixedFreeLinklist(size_t fixedSize)
:_fixedSize(fixedSize)
{
buildChunk();
}
//析构,此链表之负责管理节点空间,销毁内存的时候并不会自动调用节点的析构,所以得手动调用一下
~FixedFreeLinklist()
{
for (size_t i = 0; i < _fixedSize; ++i)
{
((Node<ElementType>*)(_memory + sizeof(Node<ElementType>) * i))->~Node<ElementType>();
}
delete[] _memory;
}
//获取空闲节点,如果没有返回nullptr
Node<ElementType>* getFreeNode() noexcept
{
_mutex.lock();
Node<ElementType>* node{ nullptr };
if (_freeHead)
{
node = _freeHead;
_freeHead = _freeHead->next;
}
--_count;
_mutex.unlock();
return node;
}
//判断是否有空闲节点
bool empty() const noexcept
{
return !_freeHead;
}
//回收节点(有借有还
void ReleaseFreeNode(Node<ElementType>* node) noexcept
{
_mutex.lock();
node->next = _freeHead;
_freeHead = node;
++_count;
_mutex.unlock();
}
//获取空闲节点数
size_t size() const noexcept
{
return _count;
}
//禁用拷贝
FixedFreeLinklist(const FixedFreeLinklist& other) = delete;
FixedFreeLinklist(FixedFreeLinklist&& other) = delete;
FixedFreeLinklist& operator=(const FixedFreeLinklist& other) = delete;
FixedFreeLinklist& operator=(FixedFreeLinklist&& other) = delete;
private:
//创建内存
void buildChunk() noexcept
{
size_t sz = sizeof(Node<ElementType>) * _fixedSize;
_memory = new(std::nothrow) char[sz];
if (!_memory)
{
return;
}
_freeHead = new(_memory) Node<ElementType>();
auto node = _freeHead;
for (size_t i = 1; i < _fixedSize; ++i)
{
auto pnew = new(_memory + i * sizeof(Node<ElementType>)) Node<ElementType>();
node->next = pnew;
node = pnew;
}
_count = _fixedSize;
}
private:
//指向空闲链表头的指针,为null代表当前没有空闲节点
Node<ElementType>* _freeHead{ nullptr };
//链表占用的内存段
char* _memory{ nullptr };
//节点数量
size_t _count;
//固定节点数
const size_t _fixedSize;
//线程锁
std::mutex _mutex;
};
WorkThread.h:工作线程
#include "CallableObj.h"
#include <thread>
#include <atomic>
class CallableObjBase;
class WorkThread
{
//线程分离管理类
class ThreadDetachGuard
{
public:
ThreadDetachGuard(std::thread&& t);
~ThreadDetachGuard();
private:
std::thread _t;
};
public:
WorkThread();
~WorkThread();
//指派工作
bool addWork(std::shared_ptr<CallableObjBase> cab);
//判断是否空闲(false为空闲)
bool isWorking() const noexcept;
private:
//设置工作状态,工作中或空闲中
void setWorkStatus(bool isWorking);
//干活
void work();
private:
//是否在工作,为false表示空闲
std::atomic_bool _isWorking = false;
//停止flag
std::atomic_bool _stop;
//工作时调用的函数包装器
std::shared_ptr<CallableObjBase> _cab;
friend class ThreadPool;
};
WorkThread.cpp:
#include "WorkThread.h"
#include <thread>
#include <iostream>
//构造时初始化字段并开启线程
WorkThread::WorkThread()
{
_stop.store(false);
ThreadDetachGuard tg(std::thread{ [this]() {
work();
} });
}
//析构时要停止线程,由于使用链表管理,析构的原因只可能是程序结束(主动退出或被动中断,假如中断能有幸调用析构的话),如果此时不终止线程,可能会导致程序无法正常结束
WorkThread::~WorkThread()
{
_stop.store(true);
}
//指派工作
bool WorkThread::addWork(std::shared_ptr<CallableObjBase> cab)
{
if (_isWorking)
{
return false;
}
_cab = cab;
//_lock.unlock();
return true;
}
//是否在工作中
bool WorkThread::isWorking() const noexcept
{
return _isWorking.load();
}
//设置工作状态
void WorkThread::setWorkStatus(bool isWorking)
{
_isWorking.store(isWorking);
}
//工作函数,没工作时sleep 10毫秒
void WorkThread::work()
{
while (true)
{
if (_cab)
{
setWorkStatus(true);
auto cab = _cab;
_cab.reset();
cab->operator()();
}
while (true)
{
if (_stop.load())
{
return;
}
if (!_cab)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
else
{
break;
}
}
}
}
WorkThread::ThreadDetachGuard::ThreadDetachGuard(std::thread&& t)
:_t(std::move(t))
{
}
WorkThread::ThreadDetachGuard::~ThreadDetachGuard()
{
if (_t.joinable())
{
_t.detach();
}
else
{
printf("no thread\n");
}
}
TaskList.h:任务队列,用自由链表保存任务
#include "CallableObj.h"
#include <memory>
#include <mutex>
//链表节点
struct TaskListNode
{
~TaskListNode() = default;
TaskListNode* next{ nullptr };
std::shared_ptr<CallableObjBase> callableObj;
};
class TaskList
{
//自由链表的桶
struct Chunk
{
//每个桶放1024个节点
enum
{
storage_sz = sizeof(TaskListNode) * 1024
};
//指向下一个桶
Chunk* next{ nullptr };
//内存段
char mem[storage_sz];
};
public:
~TaskList();
//获取队列第一个任务
TaskListNode* getFirstTask() noexcept;
//回收节点
void releaseTask(TaskListNode* node) noexcept;
//向队列加入任务
bool addTask(std::shared_ptr<CallableObjBase> cab) noexcept;
//队列是否为空
bool empty() noexcept;
private:
//增长
bool grow() noexcept;
private:
//队列头部
TaskListNode* _used_head{ nullptr };
//队列尾部
TaskListNode* _used_last{ nullptr };
//空闲节点头部
TaskListNode* _free_head{ nullptr };
//桶头部
Chunk* _chunk_head{ nullptr };
//线程锁
std::mutex _lock;
};
TaskList.cpp:
#include "TaskList.h"
#include <new>
//自由链表只管理内存,所以得手动调用一下节点元素的析构
TaskList::~TaskList()
{
{
auto current = _free_head;
while (current)
{
current->~TaskListNode();
current = current->next;
}
}
{
auto current = _used_head;
while (current)
{
current->~TaskListNode();
current = current->next;
}
}
//回收内存
{
auto current = _chunk_head;
while (current)
{
auto tmp = current;
current = current->next;
delete tmp;
}
}
}
//增长,一个桶只能放1024个节点,当任务超过这个数量时就需要加桶了
bool TaskList::grow() noexcept
{
auto ptr = new(std::nothrow) Chunk;
if (!ptr)
{
return false;
}
size_t sz = Chunk::storage_sz / sizeof(TaskListNode);
_free_head = reinterpret_cast<TaskListNode*>(ptr->mem);
auto current = _free_head;
for (size_t i = 0; i < sz; ++i)
{
auto node = new(ptr->mem + i * sizeof(TaskListNode)) TaskListNode;
current->next = node;
current = node;
}
ptr->next = _chunk_head;
_chunk_head = ptr;
return true;
}
//获取队列第一个任务,返回nullptr表示当前没有任务
TaskListNode* TaskList::getFirstTask() noexcept
{
_lock.lock();
auto node = _used_head;
if (_used_head)
{
_used_head = _used_head->next;
}
else
{
_used_last = nullptr;
}
_lock.unlock();
return node;
}
//向任务队列加入任务,添加位置为队列尾部
bool TaskList::addTask(std::shared_ptr<CallableObjBase> cab) noexcept
{
_lock.lock();
if (!_free_head)
{
if (!grow())
{
_lock.unlock();
return false;
}
}
auto node = _free_head;
_free_head = _free_head->next;
node->next = nullptr;
if (!_used_head)
{
_used_head = _used_last = node;
}
else
{
_used_last->next = node;
_used_last = node;
}
node->callableObj = cab;
_lock.unlock();
return true;
}
//判断队列是否有任务
bool TaskList::empty() noexcept
{
return !_used_head;
}
//回收节点
void TaskList::releaseTask(TaskListNode* node) noexcept
{
_lock.lock();
node->next = _free_head;
_free_head = node;
_lock.unlock();
}
ThreadPool.h:
#include "FixedFreeLinkList.hpp"
#include "WorkThread.h"
#include "TaskList.h"
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
class ThreadPool
{
public:
ThreadPool();
~ThreadPool()
{
}
//添加任务带参数版本
template<typename ParamaterStruct>
void addWork(std::function<void(ParamaterStruct)>&& func, ParamaterStruct ps)
{
_lock.lock();
auto cab = std::make_shared<CallableObj<ParamaterStruct>>(std::bind(&ThreadPool::work_func<ParamaterStruct>, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
ThreadPoolParamater<ParamaterStruct> tpp;
tpp.paramater = ps;
tpp.func = std::move(func);
//传递参数
cab->passParamater(std::move(tpp));
//如果有空闲线程,则直接开始任务,否则将任务加入任务队列
if (!_workThreads.empty())
{
startWork(cab);
}
else
{
_taskList.addTask(cab);
}
_lock.unlock();
}
//添加任务,无参数版本
void addWork(std::function<void()>&& func);
//开始一个任务
void startWork(std::shared_ptr<CallableObjBase> cab);
//获取使用中的线程数量
size_t thread_used() const noexcept;
private:
//要包装给WorkThread的工作函数,带参数版本
template<typename ParamaterStruct>
void work_func(std::function<void(ParamaterStruct)>&& func, ParamaterStruct ps, void* node)
{
//执行工作函数
func(ps);
auto nwt = (Node<WorkThread>*)node;
nwt->ele.setWorkStatus(false);
//用完了回收
_workThreads.ReleaseFreeNode(nwt);
//检查任务队列是否有任务,如果有则接着执行下一个任务
checkTaskList();
}
//要包装给WorkThread的工作函数,无参数版本
void work_func(std::function<void()>&& func, void* node);
//检查任务队列是否有任务
void checkTaskList() noexcept;
private:
//线程锁
std::mutex _lock;
//线程数量
const size_t _threadLimit = 16;
//保存WorkThread的链表
FixedFreeLinklist<WorkThread> _workThreads;
//任务队列
TaskList _taskList;
};
ThreadPool.cpp:
#include "defines.h"
#include "ThreadPool.h"
#include "CallableObj.h"
ThreadPool::ThreadPool()
:_workThreads{16}
{
}
//添加任务,无参数版本
void ThreadPool::addWork(std::function<void()>&& func)
{
_lock.lock();
auto cab = std::make_shared<CallableObjNoParamater>(std::move(std::bind(&ThreadPool::work_func, this, std::placeholders::_1, std::placeholders::_2)));
ThreadPoolParamaterWithoutParamater tpp;
tpp.func = std::move(func);
//传递参数
cab->passParamater(std::move(tpp));
//如果有空闲线程则直接开始任务,否则加入任务队列
if (!_workThreads.empty())
{
startWork(cab);
}
else
{
_taskList.addTask(cab);
}
_lock.unlock();
}
//要包装给WorkThread的工作函数
void ThreadPool::work_func(std::function<void()>&& func, void* node)
{
func();
auto nwt = (Node<WorkThread>*)node;
nwt->ele.setWorkStatus(false);
//用完回收
_workThreads.ReleaseFreeNode(nwt);
checkTaskList();
}
//检查任务队列是否有任务,有则开始执行
void ThreadPool::checkTaskList() noexcept
{
_lock.lock();
if (!_taskList.empty() && !_workThreads.empty())
{
auto node = _taskList.getFirstTask();
startWork(node->callableObj);
//用完回收
_taskList.releaseTask(node);
}
else
{
printf("no task\n");
}
_lock.unlock();
}
//开始任务,应确保调用它之前有空闲线程可用
void ThreadPool::startWork(std::shared_ptr<CallableObjBase> cab)
{
auto node = _workThreads.getFreeNode();
cab->node = node;
node->ele.addWork(cab);
}
//获取当前已使用的线程数
size_t ThreadPool::thread_used() const noexcept
{
return _threadLimit - _workThreads.size();
}
上测试代码:main.cpp,任务很简单,输出hello world,测试线程池在快速添加任务时的工作状况
#include "ThreadPool.h"
#include <iostream>
#include <memory>
#include <functional>
#include <string>
std::mutex lock;
//计数
size_t i = 0;
void helloWorld()
{
lock.lock();
std::cout << i++ << "\thello world" << std::endl;
lock.unlock();
}
int main()
{
ThreadPool tp;
//添加10万个任务
for (int i = 0; i < 100000; ++i)
{
tp.addWork(std::move(helloWorld));
}
std::cin.get();
return 0;
}
运行结果:
nice,欢迎讨论