对比大佬写的,我的线程池创建出来都是使用同一套线程响应函数,而大佬的把线程响应函数放到了队列里面,这就很高明了,还是太年轻。而且我的线程池对响应函数要求比较严格。------>https://blog.csdn.net/qq_41985711/article/details/82494738这是我自己写的线程池。
源码奉上(源码有点多,请耐心看,个人都是纯手打,无复制粘贴,照着抄也会出错。啊啊啊(O(∩_∩)O))
一、头文件
#ifndef __PTHPOOL_H__
#define __PTHPOOL_H__
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
// 线程池最多可以创建线程数
#define MAX_WAITING_TASKS 1000
// 同时活跃存在的最大线程数
#define MAX_ACTIVE_THREADS 20
struct task // 任务节点,响应函数设置
{
void *(* task)(void *arg);
void *arg;
struct task *next;
};
typedef struct thread_pool // 线程池属性
{
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
struct task *task_list; // 线程响应队列
pthread_t *tids; // 创建线程
unsigned waiting_tasks; // 处于睡眠的线程数量
unsigned active_threads; // 活跃线程数量
bool shutdown; // 销毁线程
}thread_pool;
// 初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);
// 投放任务
bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg);
// 增加线程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
// 删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
// 销毁线程
bool destroy_pool(thread_pool *pool);
// 线程例程
void *routine(void *arg);
#endif
二、程序源码
#include "pthpool.h"
void handler(void *arg)
{
// 响应取消请求之后自动处理的例程:防止死锁
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
// 线程例程
void *routine(void *arg)
{
thread_pool *pool = (thread_pool *)arg;
struct task *p;
while (1)
{
// 访问任务队列前加锁,为1防止取消后死锁,处理处理例程 hander
pthread_cleanup_push(handler, (void *)&pool->lock);
pthread_mutex_lock(&pool->lock);
// 若当前没有任务,且线程池未关闭,则进入条件变量等待队列睡眠
while (pool->waiting_tasks == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 若当前没有任务,且线程关闭标识符为真,则立即释放互斥锁并退出
if (pool->waiting_tasks == 0 && pool->shutdown == true)
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
// 若当前有任务,则消费任务队列中的任务
p = pool->task_list->next;
pool->task_list->next = p->next;
pool->waiting_tasks--;
// 释放互斥锁,并弹栈 handler (不执行handler)
pthread_mutex_unlock(&pool->lock);
pthread_cleanup_pop(0);
// 执行任务,并且在此期间禁止响应取消请求
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
(p->task)(p->arg); // 这里是函数调用
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free (p);
}
pthread_exit(NULL);
}
/**************************************/
/***************函数说明***************/
/**********函数功能:初始化线程池属性**/
/**********函数名:init_pool***********/
/**********参数:pool--线程池属性******/
/****************number--活跃线程条数**/
/**********返回值:true--成功**********/
/**************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
// 初始化互斥锁和条件变量
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
// 关闭线程池销毁标识
pool->shutdown = false;
// 任务队列头节点
pool->task_list = malloc(sizeof (struct task));
pool->tids = malloc(sizeof (pthread_t) * MAX_ACTIVE_THREADS);
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("allocate memory error");
return false;
}
pool->task_list->next = NULL;
pool->waiting_tasks = 0;
pool->active_threads = threads_number;
// 创建线程
for (int i = 0; i<pool->active_threads; i++)
{
if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
{
perror("create threads error");
return false;
}
}
return true;
}
// 任务投放
bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg)
{
// 新任务节点
struct task *new_task = malloc(sizeof (struct task));
if (new_task == NULL)
{
perror("mallcate memory error");
return false;
}
new_task->task = task;
new_task->arg = arg;
new_task->next = NULL;
// 访问任务队列前获取互斥锁,此处无须注册取消处理函数
pthread_mutex_lock(&pool->lock);
if (pool->waiting_tasks >= MAX_WAITING_TASKS)
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
return false;
}
// 到达任务队列尾部
struct task *tmp = pool->task_list;
while (tmp->next != NULL)
tmp = tmp->next;
// 加入新节点
tmp->next = new_task;
pool->waiting_tasks++;
// 开锁,并唤醒一个线程
pthread_mutex_unlock(&pool->lock);
pthread_cond_signal(&pool->cond);
return true;
}
// 添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads)
{
if (additional_threads == 0)
return 0;
unsigned total_threads = pool->active_threads + additional_threads;
int actual_increment = 0;
for (int i = pool->active_threads; i < total_threads &&
i < MAX_ACTIVE_THREADS; i++)
{
if (pthread_create(&((pool->tids)[i]), NULL, routine,
(void *)pool) != 0)
{
perror("add threads error");
if (actual_increment == 0)
return -1;
break;
}
actual_increment++;
}
pool->active_threads += actual_increment;
return actual_increment;
}
// 删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
if (removing_threads == 0)
return pool->active_threads;
int remain_threads = pool->active_threads - removing_threads;
remain_threads = remain_threads > 0 ? remain_threads : 1;
// 循环删除指定数目的线程
int i = 0;
for (i = pool->active_threads-1; i > remain_threads-1; i--)
{
errno = pthread_cancel(pool->tids[i]);
if (errno != 0)
break;
}
if (i == pool->active_threads-1)
return -1;
else
{
pool->active_threads = i+1;
return i+1;
}
}
// 销毁线程池
bool destroy_pool(thread_pool *pool)
{
// 准备
pool->shutdown = true;
// 广播唤醒线程池里的所有线程
pthread_cond_broadcast(&pool->cond);
for (int i = 0; i < pool->active_threads; i++)
{
errno = pthread_join(pool->tids[i], NULL);
if (errno != 0)
{
printf("join tids[%d] error: %s\n", i, strerror(errno));
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]);
}
free(pool->task_list);
free(pool->tids);
free(pool);
return true;
}
三、测试代码
// 测试代码
#include "pthpool.h"
// 任务线程
void *mytask(void *arg)
{
int n = (int)arg;
printf("[%u][%s] ==> job will be done in %d sec...\n",
(unsigned)pthread_self(), __FUNCTION__, n);
sleep(n);
printf("[%u][%s] ==> job dome!\n",
(unsigned)pthread_self(), __FUNCTION__);
return NULL;
}
// 计时线程
void *count_time(void *arg)
{
int i = 0;
while (1)
{
sleep(1);
printf("sec: %d\n", ++i);
}
}
int main(void)
{
// 创建计时线程
pthread_t a;
pthread_create(&a, NULL, count_time, NULL);
// 一、初始化一个带有 2 条线程的线程池
thread_pool *pool = malloc(sizeof (thread_pool));
init_pool(pool, 2);
// 二、投入 3 个任务
printf("throwing 3 tasks...\n");
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
// 三、显示当前有多少条线程
printf("current thread number: %d\n", remove_thread(pool, 0));
sleep(9);
// 四、再投入两个任务
printf("throwing another 2 tasks...\n");
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
// 五、增加两条线程
add_thread(pool, 2);
sleep(5);
// 删除3条线程
printf("remove 3 threads from the pool, current thread number: %d\n",
remove_thread(pool, 3));
// 销毁线程池
destroy_pool(pool);
return 0;
}
因为抄完代码又出现了错误,被搞得很惨,所以暂时不写其他内容了。