如题
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>
#include<unistd.h>
#include<string.h>
#include<strings.h>
#include<errno.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<fcntl.h>
#include<dirent.h>
#include<time.h>
#define PTHREAD_MAX 20 //最多有20个线程
#define TASK_FULL_NUM 100 //最多等待100个任务
//任务链表结构体
struct task{
void *(*task_func)(char* from_adr,char* to_adr);//任务函数
// int task_arg;//任务函数的参数
char src[300];
char dst[300];
struct task *next;
};
//线程池
struct pthread_pool{
pthread_t tid[PTHREAD_MAX];//线程号数组
struct task *head;//任务链表的头节点
unsigned int active_pthread_num;//正在运行的线程个数
unsigned int wait_task_num;//正在等待的任务个数
pthread_mutex_t m;//线程池互斥锁
pthread_cond_t v;//线程池条件变量
bool shutdown;//线程销毁标志位 flase:不销毁 true:销毁
};
int is_dir( char* file_name);
int endwith(char* s,char c);
//void cp_file( char *source_path , char *destination_path);
void copy_folder(char* source_path,char *destination_path,struct pthread_pool *pool);
int is_dir( char* file_name);
void *f(char* from_adr , char* to_adr);
void cleanup(void *arg);
void *func(void *arg);
struct pthread_pool* pthread_pool_init();
int pthread_pool_add(struct pthread_pool *pool,int addnum);
int pool_add_task(struct pthread_pool *pool,void *(*f)(char* src ,char* dst),char* src,char* dst );
int pool_delete(struct pthread_pool *pool);
// void cp_file( char *source_path , char *destination_path)
// {
// }
int endwith(char* s,char c){//用于判断字符串结尾是否为“.”
printf("end : %c\n",s[strlen(s)-1]);
if(s[strlen(s)-1]==c){
return 1;
}
else{
return 0;
}
}
void copy_folder(char* source_path,char *destination_path,struct pthread_pool *pool)
{
//printf(" your dir is correct !\n");// remind user input dir is correct
DIR *dst_dp = opendir(destination_path);
if(dst_dp == NULL )
{
printf(" your dest dir is not existed \n");
printf(" system will mkdir for U \n");
if(mkdir(destination_path,0777) == -1)
{
printf(" error occur during mkdir");
exit(-1);
}
}
DIR *src_dp = opendir(source_path);
struct dirent *ep_src = readdir(src_dp);
char address[300] = {0};
char toaddress[300] = {0};
while(1)//this is a recursion,break until ( all files in source_path have been copied , or error occured)
{
sprintf(address,"%s%s",source_path,"/");//ep_src->d_name can't be passed to function as argu
sprintf(address,"%s%s",address,ep_src->d_name);//so we should use sprintf() to catch address
//printf(" source_path : %s\n",address);
sprintf(toaddress,"%s%s",destination_path,"/");
sprintf(toaddress,"%s%s",toaddress,ep_src->d_name);
//printf(" dest_path : %s\n",toaddress);
//printf(" got %d\n",is_dir(address));
//printf(" %d\n",endwith(address,'.'));
if(endwith(address,'.') == 1)//if the file is . or .. pass
{
//printf(" program src jump the %s\n",address);
// ep_src = readdir(src_dp);
}
else if( ( is_dir(address) != 1) )//if the file is not dir just copy file
{
// printf(" in else if");
//cp_file(address,toaddress);
// ep_src = readdir(src_dp);
pool_add_task(pool,f,address,toaddress);
}
else
{
// printf(" i am copying your folder\n %s to %s\n",address,toaddress);
copy_folder(address,toaddress,pool);// when test the file is a dir , call copy_folder function again
}
if((ep_src = readdir(src_dp)) == NULL )// if all files in address have been copied , break;
break;
memset(address,300,1);
memset(toaddress,300,1);
}
closedir(dst_dp);
closedir(src_dp);
}
int is_dir( char* file_name)
{
struct stat info;
stat(file_name,&info);
if(S_ISDIR(info.st_mode))
return 1;
else
return 0;
}
//任务函数,需要重写
void *f(char* source_path , char* destination_path)
{
FILE* fp_src = NULL;
FILE* fp_dst = NULL;
// int in,out;
if((fp_src = fopen(source_path,"r"))==NULL){//打开源文件的文件流
printf("源文件打开失败!\n");
perror("failed!\n");
printf(" address : %s\n",source_path);
exit(1);
}
if((fp_dst=fopen(destination_path,"w"))==NULL){//打开目标文件的文件流
printf("目标文件创建失败!\n");
exit(1);
}
printf("fopen %d\n",fp_src->_fileno);
printf("fopen %d\n",fp_dst->_fileno);
int c, total = 0;
while(1)
{
c = fgetc( fp_src);
if( c == EOF && feof(fp_src))
{
break;
}
else if( ferror(fp_src))
{
perror("fget()");
break;
}
fputc(c,fp_dst);
}
printf("fclose %d\n",fp_src->_fileno);
printf("fclose %d\n",fp_dst->_fileno);
fclose(fp_src);
fclose(fp_dst);
}
//死锁清理函数
void cleanup(void *arg)
{
printf("%#x cleanup \n",(unsigned int)pthread_self());
struct pthread_pool *pool = (struct pthread_pool *)arg;
pthread_mutex_unlock(&pool->m);
}
void *func(void *arg)//线程执行的函数
{
struct pthread_pool *pool = (struct pthread_pool *)arg;
struct task *p =NULL;
while(1)
{
//=======注册死锁退出函数====
pthread_cleanup_push(cleanup,(void *)pool);
//=======加上互斥锁========
pthread_mutex_lock(&pool->m);
//由于没有正在等待的任务,释放互斥锁,开始等待条件变量
if(pool->wait_task_num == 0 && pool->shutdown == false)
pthread_cond_wait(&pool->v,&pool->m);
//唤醒条件变量可能是任务链表增加,也可能是因为销毁了线程
//如果是销毁线程引起的,那么就结束线程
if(pool->shutdown == true)
{
pthread_exit(NULL);//调用死锁注册函数
}
//从任务链表中拿取任务执行,因为任务链表不为NULL,那么至少有一个任务
p = pool->head->next;
pool->head->next = p->next;
//======释放互斥锁========
pthread_mutex_unlock(&pool->m);
//======解除死锁注册函数===
pthread_cleanup_pop(0);
//*******在处理任务过程中禁止响应线程取消*******
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
//将正在等待的任务个数-1
pool->wait_task_num--;
//执行任务函数
p->task_func(p->src,p->dst);
//释放任务节点
free(p);
//*******在释放掉所有资源以后,可以取消线程*******
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
}
pthread_exit(NULL);
}
//初始化线程池
struct pthread_pool* pthread_pool_init()
{
struct pthread_pool *pool = malloc(sizeof(struct pthread_pool));
if(pool == NULL)
{
perror("pool malloc failed");
exit(-1);
}
//任务链表头节点的初始化---整个链表为NULL
pool->head = malloc(sizeof(struct task));
if(pool->head == NULL)
{
perror("pool->head malloc failed");
exit(-1);
}
pool->head->next = NULL;
//初始化完以后,一个正在运行线程都没有
pool->active_pthread_num = 0;
//初始化完以后,一个任务都没有
pool->wait_task_num = 0;
//初始化互斥锁
pthread_mutex_init(&pool->m,NULL);
//初始化条件变量
pthread_cond_init(&pool->v,NULL);
//将线程池销毁标志位置为flase---不销毁
pool->shutdown = false;
return pool;
}
//创建线程-->向线程池添加线程
int pthread_pool_add(struct pthread_pool *pool,int addnum)
{
//添加0个线程
if(addnum == 0)
return 0;//只是结束当前线程
int i=0;
int added=0;
for(i = pool->active_pthread_num;
i<PTHREAD_MAX && i< pool->active_pthread_num + addnum;
i++)
{
//创建线程
if( pthread_create( &pool->tid[i],NULL, func,(void *)pool) != 0)
{
perror("pthread_create faield");
break;
}
//成功添加的线程个数
added++;
}
//更新正在运行的线程个数
pool->active_pthread_num += added;
return added;
}
//任务添加函数
//任务添加函数的参数需要更改为两个文件地址
int pool_add_task(struct pthread_pool *pool,void *(*f)(char* src ,char* dst),char* src,char* dst )
{
//任务链表中已经等待的任务个数已经达到最大
if(pool->wait_task_num == TASK_FULL_NUM)
{
printf("task list full\n");
return -1;
}
//创建任务节点
struct task *node = malloc(sizeof(struct task));
if(node == NULL)
{
perror("task node malloc failed");
exit(-1);
}
//将需要添加到任务链表中的函数和参数进行赋值
node->task_func = f;
strcpy(node->src,src);
strcpy(node->dst,dst);
printf("src : %s\n",src);
printf("node->src : %s\n",node->src);
printf("dst : %s\n",dst);
printf("node->dst : %s\n",node->dst);
// node->src = src;
// node->dst = dst;
node->next = NULL;
//==========加互斥锁============
pthread_mutex_lock(&pool->m);
//把新任务加入到链表的末尾
struct task *p = pool->head;
while(p->next != NULL)
p=p->next;
p->next = node;
//==========释放互斥锁==========
pthread_mutex_unlock(&pool->m);
//每当有任务加入到链表中,让正在等待的任务链表个数数值++
pool->wait_task_num++;
//向一个线程发送信号,让它开始执行
pthread_cond_signal(&pool->v);
return 0;
}
//销毁线程池
int pool_delete(struct pthread_pool *pool)
{
//将线程池销毁标志位置为true---销毁
pool->shutdown = true;
//向所有线程发起唤醒条件变量
pthread_cond_broadcast(&pool->v);
//如果需要立即销毁所有线程,那么向所有线程发起取消信号
//并且等待线程结束
//---->强制取消线程可能会引起死锁q
//---->万一任务执行函数有malloc或者其他申请资源,需要让他自己释放
int i;
for(i=0; i<pool->active_pthread_num; i++ )
{
pthread_cancel(pool->tid[i]);
printf("tid[%d]=%#x\n",i,(unsigned int)pool->tid[i]);
pthread_join(pool->tid[i],NULL);
}
//销毁任务链表
// struct task *p = pool->head->next;
// while(p != NULL)
// {
// pool->head->next = p->next;
// free(p);
// p = pool->head->next;
// }
free(pool->head);
//销毁线程池
free(pool);
printf("pthrad_pool end\n");
}
//int pool_empty();
int main( int argc , char* argv[])
{
struct pthread_pool *pool = pthread_pool_init();
//线程池添加2个线程
pthread_pool_add(pool,20);
sleep(2);
copy_folder(argv[1],argv[2],pool);
while(1)
{
if( pool->wait_task_num == 0)
{
pool_delete(pool);
printf(" copy finished\n ");
break;
}
printf("pool->active_pthread_num : %d\n ",pool->active_pthread_num);
printf("pool->wait_task_num : %d\n ",pool->wait_task_num);
sleep(1);
}
//销毁线程池
//pool_delete(pool);
//暂停
//pause();
return 0;
}
我这份代码是将前两篇文章的代码综合起来编写的
注意事项:
1:结构体中存储文件名的数组长度需要足够大,因为你不知道你要复制的文件有多少层目录
2:你使用了多少次open,就得对应的使用多少次close
3:我犯的一个小问题,strcpy是将后面的复制给前面
代码讲解部分
我这里还是使用递归的思想,每读到一个文件便增加一个task,task是干嘛的呢,线程池里的线程从task链表中拿task,task里面存储着单一文件copy所需要的两个地址(源文件地址和目标文件地址),而线程执行的函数,f则是我上上篇文章中的cp_file(有所改进,增加了几个close),