《unix环境高级编程》--- 线程控制

以分离状态创建的线程
分离状态:父进程不用再管子进程了 :)。

#include "apue.h"
#include <pthread.h>

int makethread(void *(*fn)(void *), void *arg)
{
    int err;
    pthread_t tid;
    pthread_attr_t attr;

    /*
       int pthread_attr_init(pthread_attr_t *attr);
       初始化attr,使得attr所包含的内容为操作系统实现支持的线程所有属性的默认值。
    */
    err = pthread_attr_init(&attr);
    if(err != 0)
        return (err);

    /*
       int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
       让线程以分离状态启动。如果线程处理分离状态,线程底层存储资源可以在线程终止时立即被收回。
       当线程被分离时,pthread_join调用会失败,返回EINVAL。
       detachstate:PTHREAD_CREATE_DETACHED,以分离状态启动线程;
                PTHREAD_CREATE_JOINABLE,正常启动线程,应用程序可获得线程的终止状态。
    */
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if(err == 0)
        err = pthread_create(&tid, &attr, fn, arg);
    /*
       int pthread_attr_destroy(pthread_attr_t *attr);
       清理attr,如果为attr分配了动态内存空间,则释放
    */
    pthread_attr_destroy(&attr);
    return (err);
}

使用递归互斥量
递归锁:同一线程占有锁后,还可再次获得该锁,不会导致死锁。

#include "apue.h"
#include <pthread.h>
#include <time.h>
#include <sys/time.h>

extern int makethread(void *(*)(void *), void *);

struct to_info
{
    void (*to_fn)(void *);   /* function */
    void *to_arg;            /* argument */
    struct timespec to_wait; /* time to wait */
};

#define SECTONSEC 1000000000 /* seconds to nanoseconds */
#define USECTONSEC 1000      /* microseconds to nanoseconds */

void *timeout_helper(void *arg)
{
    struct to_info *tip;

    tip = (struct to_info *)arg;
    nanosleep(&tip->to_wait, NULL);
    (*tip->to_fn)(tip->to_arg);
    return (0);
}

/* 如果到达或超过时间when,则创建一个线程在未来某个时刻调用func */
void timeout(const struct timespec *when, void (*func)(void *), void *arg)
{
    struct timespec now;
    struct timeval tv;
    struct to_info *tip;
    int err;

    gettimeofday(&tv, NULL);
    now.tv_sec = tv.tv_sec;
    now.tv_nsec = tv.tv_usec * USECTONSEC;  
    if((when->tv_sec > now.tv_sec) || 
       (when->tv_sec == now.tv_sec && when->tv_nsec > now.tv_nsec))
    {
        tip = malloc(sizeof(struct to_info));
        if(tip != NULL)
        {
            tip->to_fn = func;
            tip->to_arg = arg;
            tip->to_wait.tv_sec = when->tv_sec - now.tv_sec;
        }
        else
        {
            tip->to_wait.tv_sec--;
        }
        err = makethread(timeout_helper, (void *)tip);
        if(err == 0)
            return;
    }

    /*
       We get here if (a) when <= now, or (b) malloc fails, or (c) we can't
       make a thread, so we just call the function now
    */
    (*func)(arg);
}

pthread_mutexattr_t attr;
pthread_mutex_t mutex;

void retry(void *arg)
{
    pthread_mutex_lock(&mutex);   /* 再次使用mutex */
    /* perform retry steps ... */
    pthread_mutex_unlock(&mutex);
}

int main(void)
{
    int err, condition, arg;
    struct timespec when;

    if((err = pthread_mutexattr_init(&attr)) != 0)
        err_exit(err, "pthread_mutexattr_init failed");

    /*
       int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
       type:取值为PTHREAD_MUTEX_RECURSIVE时,
        允许同一线程在互斥量解锁之前对互斥量进行多次加锁。
        同一个递归互斥量维护锁的计数,在解锁的次数和加锁次数不同的情况下不会释放锁。
    */
    if((err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)) != 0)
        err_exit(err, "can't set recursive type");
    if((err = pthread_mutex_init(&mutex, &attr)) != 0)
        err_exit(err, "can't create recursive mutex");
    /* ... */
    pthread_mutex_lock(&mutex);  /* 第一次使用mutex */
    if(condition)   
    {
        /* calculate target time "when" 
               调用互斥量检查条件后,将retry安排为原子操作
           retry试图对同一互斥量加锁,除非互斥量时递归的,
           否则timeout直接调用retry会导致死锁
        */
        timeout(&when, retry, (void *)arg);
    }
    /* ... */
    pthread_mutex_unlock(&mutex);
    /* ... */
    exit(0);
}

getenv的非可重入版本
因为所有调用getenv的线程返回的字符串都放在同一个静态缓冲区中,所以这个版本时不可重入的。
如果两个线程同时调用这个函数,就会看到不一致的结果。

#include <limits.h>
#include <string.h>

static char envbuf[ARG_MAX];  

extern char **environ;

char *getenv(const char *name)
{
    int i, len;

    len = strlen(name);
    for(i=0; environ[i] != NULL; i++)
    {
        if((strncmp(name, environ[i], len) == 0) && 
            (environ[i][len] == '='))
        {
            strcpy(envbuf, &environ[i][len+1]);
            return (envbuf);
        }
    }
    return (NULL);
}

getenv的可重入(线程安全)版本
调用者必须自己提供缓冲区,这样每个线程可以使用各自的缓冲区从而避免其他线程的干扰。
同时,用互斥量保证在搜索字符串时环境不被修改。
为了对信号处理程序可重入,使用递归互斥量,既可阻止其他线程改变环境,还可避免死锁。

#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>

extern char **environ;

pthread_mutex_t env_mutex;

static pthread_once_t init_done = PTHREAD_ONCE_INIT;  

static void thread_init(void)
{
    pthread_mutexattr_t attr;

    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&env_mutex, &attr);
    pthread_mutexattr_destroy(&attr);
}


int getenv_r(const char *name, char *buf, int buflen)
{
    int i, len, olen;

    /* 
       确保每个进程只调用一次thread_init
       pthread_once_t initflag = PTHREAD_ONCE_INIT;
       int pthread_once(pthread_once_t *initflag, void (*initfn)(void));
       initflag:必须是全局变量或静态变量
       如果每个线程都调用pthread_once时,系统保证initfn只被调用一次,
       即系统首次调用pthread_once时。
    */
    pthread_once(&init_done, thread_init);
    len = strlen(name);
    pthread_mutex_lock(&env_mutex);
    for(i=0; enviorn[i] != NULL; i++)
    {
        if((strncmp(name, environ[i], len) == 0) &&
            (environ[i][len] == '='))
        {
            olen = strlen(&environ[i][len+1]);
            if(olen >= buflen)
            {
                pthread_mutex_unlock(&env_mutex);
                return (ENOSPC);
            }
            strcpy(buf, &environ[i][len+1]);
            pthread_mutex_unlock(&env_mutex);
            return (0);
        }
    }
    pthread_mutex_unlock(&env_mutex);
    return (ENOENT);
}

线程安全的getenv的兼容版本
如果无法修改应用程序以直接使用新的接口,即调用者无法将自己的缓冲区作为参数传入,
可使用线程私有数据来维护每个线程的数据缓冲区的副本,存放各自的返回字符串。

#include <limits.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>

static pthread_key_t key;
static pthread_once_t init_done = PTHREAD_ONCEC_INIT;
pthread_mutex_t env_mutex = PTHREAD_MUTEX_INITIALIZER;

extern char **environ;

static void thread_init(void)
{
    /* 
       int pthread_key_create(pthread_key_t *keyp, void (*destructor)(void *)); 
       创建的键存放在keyp指向的内存单元,这个键可被进程中的所有线程使用,
       但每个线程把这个键与不同的线程私有数据地址进行关联。
       当线程调用pthread_exit或返回,正常退出时,会调用destructor,
       但如果线程调用exit、_exit、_Exit、abort或其他非正常退出时,就不会调用析构函数。
       线程可以为线程私有数据分配多个键,每个键都可以由一个析构函数与它关联
    */
    pthread_key_create(&key, free);
}

char *getenv(const char *name)
{
    int i, len;
    char *envbuf;

    pthread_once(&init_done, thread_init);
    pthread_mutex_lock(&env_mutex);

    /*
       void *pthread_getspecific(pthread_key_t key);
       获得线程私有数据的地址
    */
    envbuf = (char *)pthread_getspecific(key);
    if(envbuf == NULL)
    {
        envbuf = malloc(ARG_MAX);   
        if(envbuf == NULL)
        {
            pthread_mutex_unlock(&env_mutex);
            return (NULL);
        }
        /*
           int pthread_setspecific(pthread_key_t key, const void *value);
           把键和线程私有数据关联起来
        */
        pthread_setspecific(key, envbuf);
    }
    len = strlen(name);
    fo(i=0; environ[i] != NULL; i++)
    {
        if((strncmp(name, environ[i][len+1], len) == 0) &&
            (environ[i][len] == '='))
        {
            strcpy(envbuf, &environ[i][len+1]);
            pthread_mutex_unlock(&env_mutex);
            return (envbuf);
        }
    }
    pthread_mutex_unlock(&env_mutex);
    return (NULL);
}

同步信号处理
等待信号处理程序设置标志,该标志表明主程序应该退出。
唯一可运行的控制线程就是主线程和信号处理程序,所以阻塞信号可以避免错过标志修改。

不让信号处理程序中断主线程,而是由专门的独立控制线程进行信号处理。
改动quitflag是在互斥量的保护下进行的,这样主线程不会在调用pthread_cond_signal时错失唤醒调用。
主控线程中使用相同的互斥量检查标志的值,并且原子地释放互斥量,等待条件发生。
主线程开始时阻塞SIGINT和SIGQUIT,新线程会继承信号屏蔽字。
因为sigwait会解除信号的阻塞状态,所以只有一个线程可以接收到信号,这样主线程就不会被这些信号中断。

#include "apue.h"
#include <pthread.h>

int quitflag;   /* set nonzero by thread */
sigset_t mask;

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait = PTHREAD_COND_INITIALIZER;

void *thr_fn(void *arg)
{
    int err, signo;

    for(;;)
    {
        /*
           int sigwait(const sigset_t *restrict set, int *restrict signop);
           等待一个或多个信号发生
           set:信号要等待的信号集
           如果信号集中的某个信号在sigwait调用时处于未决状态,sigwait将无阻塞地返回,
           在返回前,移除处于未决状态的信号。
           为避免错误动作发送,线程在调用sigwait前,必须阻塞它正在等待的信号。
           sigwant会自动取消信号集的阻塞状态,直到有新的信号被递送。
           在返回之前,将恢复线程的信号屏蔽字。
           如果信号在sigwait调用时没有被阻塞,在完成对sigwait调用之前会出现一个时间窗,
           在这个窗口期,某个信号可能在线程完成sigwait调用前就被递送了。
        */
        err = sigwait(&mask, &signo);
        if(err != 0)
            err_exit(err, "sigwait failed");
        switch(signo)
        {
        case SIGINT:
            printf("\ninterrupt\n");
            break;
        case SIGQUIT:
            pthread_mutex_lock(&lock);
            quitflag = 1;
            pthread_mutex_unlock(&lock);
            pthread_cond_signal(&wait);
            return (0);
        default:
            printf("unexpected signal %d\n", signo);
            exit (1);
        }
    }
}

int main(void)
{
    int err;
    sigset_t oldmask;
    pthread_t tid;

    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigaddset(&mask, SIGQUIT);

    /*
        int pthread_sigmask(int how, const sigset_t *restrict set,
                sigset_t *restrict oset);
        与sigprocmask基本相同
    */
    if((err = pthread_sigmask(SIG_BLOCK, &mask, &oldmask)) != 0)
        err_exit(err, "SIG_BLOCK error");

    err = pthread_create(&tid, NULL, thr_fn, 0);
    if(err != 0)
        err_exit(err, "can't create thread");

    pthread_mutex_lock(&lock);
    while(quitflag == 0)
        pthread_cond_wait(&wait, &lock);
    pthread_mutex_unlock(&lock);

    /* SIGQUIT has been caught and is now blocked; do whatever */
    quitflag = 0;

    /* reset signal mask which unbocks SIGQUIT */
    if(sigprocmask(SIG_SETMASK, &oldmask, NULL) < 0)
        err_sys("SIG_SETMASK error");
    exit(0);
}

这里写图片描述

pthread_atfork实例
子进程会继承父进程的锁,如果父进程包含多个线程,子进程在fork返回后,如果不马上调用exec,就需要清理锁。
清理锁,可通过pthread_atfork建立fork处理程序。

#include "apue.h"
#include <pthread.h>

pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;


void prepare(void)
{
    printf("preparing locks...\n");
    pthread_mutex_lock(&lock1);
    pthread_mutex_lock(&lock2);
}

void parent(void)
{
    printf("parent unlocking locks...\n");
    pthread_mutex_unlock(&lock1);
    pthread_mutex_unlock(&lock2);
}

void child(void)
{
    printf("child unlocking locks...\n");
    pthread_mutex_unlock(&lock1);
    pthread_mutex_unlock(&lock2);
}

void *thr_fn(void *arg)
{
    printf("thread started...\n");
    pause();
    return (0);
}

int main(void)
{
    int err;
    pid_t pid;
    pthread_t tid;

#if defined(BSD) || defined(MACOS)
    printf("pthread_atfork is unsupported\n");
#else
    /*
       int pthread_atfork(void (*prepare)(void), void (*parent)(void), 
                void (*child)(void));
       最多安装三个清理锁的函数
       prepare:父进程在fork创建子进程前调用,获取父进程定义的所有锁。
       parent:在fork创建了子进程后,但在fork返回之前在父进程环境中调用,
               释放prepare获得的所有锁。
       child:在fork返回之前在子进程环境中调用,释放prepare获得的所有锁。
    */
    if((err = pthread_atfork(prepare, parent, child)) != 0)
        err_exit(err, "can't install fork handlers");

    err = pthread_create(&tid, NULL, thr_fn, 0);
    if(err != 0)
        err_exit(err, "can't create thread");

    sleep(2);

    printf("parent about to fork...\n");
    if((pid = fork()) < 0)
        err_quit("fork failed");
    else if(pid == 0)   /* child */
        printf("child returned from fork\n");
    else    /* parent */
        printf("parent returned from fork\n");
#endif
    exit(0);
}

可见,prepare在调用fork后运行,child在fork调用返回到子进程前运行,parent在fork调用返回给父进程前运行。
这里写图片描述

猜你喜欢

转载自blog.csdn.net/u012319493/article/details/80468368