简述:自我感觉对C指针了解的还是比较透彻,能够运用自如,一方面练习一下多线程编程,另一方面练习一下数据结构链表。
但是完成的过程中,各种问题还是扑面而来,共花了2个小时完成了此线程池来实现TCP并发连接,创建固定数目的线程
时间仓促,可能有某些指针未释放,或者还没完善的地方,希望能共同讨论。
双向链表提高扫描速度,信号等待用while保护,防止惊群,在任务提取和加入过程动态维护链表,防止链表过长和冗余。
创建线程,初始化属性为DETEACH,避免人为对子线程的回收。
#include "unp.h" #include <assert.h> typedef struct demytask { /*任务*/ //void *(*func)(void *arg); /*套接字处理函数指针*/ void *data; /*连接套接字*/ struct demytask *next; /*后向索引*/ struct demytask *prev; /*后向索引*/ }task; typedef struct pool { pthread_mutex_t mutex; /*锁和条件*/ pthread_cond_t cond; int nthreads; /*线程池中线程数*/ int totaltasks; /*线程池中总的任务数*/ int runtasks; /*正在服务的任务数*/ int shut; /*线程池关闭状态*/ int waittasks; /*等待空闲线程的任务数*/ task *head; /*任务头指针*/ task *tail; /*任务尾指针*/ /*pthread_t *threads*/ /*线程ID数组,暂无用*/ }thread_pool; thread_pool *pool; /*线程池指针*/ void *thread_func(void *arg); /*线程函数*/ void inital_thread_poll(int num); /*线程池初始化*/ void insert_task_2_poll(/*void *(*func)(void *),*/ void *data);/*增加任务*/ void del_task_2_poll(); /*删除任务*/ void serve_machine(int connfd) ; /*为客户端服务函数*/ void clear_pool(int signo) ; /*线程池资源释放*/ int inital_thread_poll(int num) { pthread_attr_t attr; pthread_t pid; int i, err; pool = (thread_pool *)malloc(sizeof(thread_pool)); /*在堆上分配*/ if(pool == NULL) { #ifdef PRT PST("Malloc error"); #endif return -1; } assert(pool != NULL); pthread_mutex_init( &(pool->mutex), NULL); /*锁*/ pthread_cond_init( &(pool->cond), NULL); /*条件*/ pool->nthreads = num; pool->totaltasks = pool->runtasks = 0; /*当前总接受的任务数和运行任务数*/ pool->shut = 0; /*关闭标志*/ pool->head = pool->tail = NULL; pool->head = pool->tail = malloc(sizeof(task)); /*初始化头尾指针*/ pthread_attr_init( &attr); /*属性*/ pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED); for( i =0; i < num; i++) { if((err = pthread_create( &pid, &attr, thread_func, NULL)) != 0) { /*创建线程*/ return err; } } pthread_attr_destroy( &attr); return 0; } /* *给线程池中加入任务 */ void insert_task_2_poll(/*void *(*func)(void *),*/ void *data) { /*添加任务*/ task *mytask; mytask = (task *)malloc(sizeof(task)); /*在堆上分配*/ if(mytask == NULL) { #ifdef PRT PST("Malloc task error"); #endif return; } /* mytask->func = func;*/ mytask->data = data; pthread_mutex_lock(&(pool->mutex)); pool->tail->next = mytask; mytask->prev = pool->tail; pool->tail = mytask; pool->tail->next = NULL; pool->totaltasks ++; /*任务数加1*/ pool->waittasks ++; /*等待处理的任务数加1*/ #ifdef PRT printf("Add a task ,the total tasks is %d, waittask is %d\n",pool->totaltasks,pool->waittasks); #endif pthread_cond_signal(&(pool->cond));/*唤醒等待任务的线程*/ pthread_mutex_unlock(&(pool->mutex)); return; } /* *删除线程池中尾任务 */ void del_task_2_poll() { /*删除任务*/ /* pthread_mutex_lock(&(pool->mutex));*/ free(pool->tail->data); pool->tail->data = NULL; pool->tail = pool->tail->prev; free(pool->tail->next); pool->tail->next = NULL; pool->totaltasks --; #ifdef PRT printf("Delete a task ,the total tasks is %d, waittask is %d\n",pool->totaltasks,pool->waittasks); #endif /* pthread_mutex_lock(&(pool->mutex));*/ } /* *线程函数 */ void *thread_func(void *arg) { /*等待任务,无则阻塞*/ /*void *(*functemp)(void *);*/ int connfd; for( ; ;) { pthread_mutex_lock(&(pool->mutex)); while(pool->waittasks == 0) pthread_cond_wait(&(pool->cond), &(pool->mutex)); /* functemp = pool->tail->func;*/ connfd = *((int *)(pool->tail->data)); pool->runtasks ++; /*处理过的任务数*/ pool->waittasks --; /*等待处理的任务数*/ del_task_2_poll(); /*释放这个任务*/ pthread_mutex_unlock(&(pool->mutex)); /*解锁,使其他线程可以等待其他任务*/ serve_machine(&connfd); /*任务开始执行*/ } } /* *释放线程池资源 */ void clear_pool(int signo) { /*清除线程池*/ task *temp; #ifdef PRT printf("\n服务器正在关闭...\n"); #endif pthread_mutex_lock(&(pool->mutex)); temp = pool->head->next; for(; temp != NULL; temp = pool->head->next) { free(pool->head->data); pool->head->data = NULL; if(pool->head->prev != NULL){ free(pool->head->prev); pool->head->prev = NULL; } free(pool->head); pool->head = NULL; pool->head = temp; } /*free(temp);*/ pthread_mutex_unlock(&(pool->mutex)); pthread_mutex_destroy(&(pool->mutex)); pthread_cond_destroy(&(pool->cond)); free(pool); pool = NULL; exit(0); } int main(int argc, char **argv) { int listenfd, *connfd, num; socklen_t clilen; struct sockaddr_in cliaddr, servaddr; const int on = 1; if( argc != 2) err_quit("argc"); listenfd = Socket(AF_INET, SOCK_STREAM, 0); num = atoi(argv[1]); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); setsockopt(listenfd,SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)); Bind(listenfd, (SA *) &servaddr, sizeof(servaddr)); inital_thread_poll(num); /*初始化线程池*/ signal(SIGINT,clear_pool); /*中断时,释放资源*/ Listen(listenfd, LISTENQ); for ( ; ; ) { clilen = sizeof(struct sockaddr); connfd = (int * )malloc(sizeof(int)); /*每个连接一个*/ *connfd = Accept(listenfd, (SA *) &cliaddr, &clilen); insert_task_2_poll(connfd); /*增加任务,投入到线程池*/ } clear_pool(SIGINT); exit(0); }