Nginx 源码阅读笔记2 原子变量与互斥锁

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/scnu20142005027/article/details/69939111

原子变量

原子变量主要看了 x86 的实现,位于ngx_gcc_automic_x86.h

CAS

#if (NGX_SMP)   // 多核 SMP 架构下需要加锁
#define NGX_SMP_LOCK  "lock;"
#else
#define NGX_SMP_LOCK
#endif

static ngx_inline ngx_atomic_uint_t
ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old,
    ngx_atomic_uint_t set)
{
    u_char  res;

    __asm__ volatile (

         NGX_SMP_LOCK
    "    cmpxchgl  %3, %1;   "
    "    sete      %0;       "

    : "=a" (res) : "m" (*lock), "a" (old), "r" (set) : "cc", "memory");

    return res;
}

这个函数通过内嵌汇编实现,先看最后的部分,=表示只写,a表示 eax 寄存器,=a表示输出到 eax 寄存器,m表示内存,r表示通用寄存器,cc表示会修改 flags 寄存器,memory表示会修改内存
接下来看指令部分,如果是多核 SMP 架构下需要加锁,然后是cmpxchgl指令,根据代码中的注释

cmpxchgl  r, [m]:

if (eax == [m]) {
    zf = 1;
    [m] = r;
} else {
    zf = 0;
    eax = [m];
}

将 eax 寄存器的值与地址 m 中的值比较,若相等则将 r 的值写入 m,否则将地址 m 中的值写入 eax 寄存器,此外还会设置 flags 寄存器的 zf 标志位
最后是sete %0指令,表示根据 zf 标志位设置 %0,在这里就是 eax 寄存器,并作为函数的返回值

FAA

static ngx_inline ngx_atomic_int_t
ngx_atomic_fetch_add(ngx_atomic_t *value, ngx_atomic_int_t add)
{
    __asm__ volatile (

         NGX_SMP_LOCK
    "    xaddl  %0, %1;   "

    : "+r" (add) : "m" (*value) : "cc", "memory");

    return add;
}

这里+表示读写,然后是xaddl指令,依旧是看代码中的注释

xaddl  r, [m]:

temp = [m];
[m] += r;
r = temp;

这个指令比较容易理解,其他部分与之前的函数类似

memory barrier

#define ngx_memory_barrier()    __asm__ volatile ("" ::: "memory")

作用是强制了 memory barrier 前后内存访问指令的顺序,避免了编译器优化带来的内存乱序访问问题

pause

在自旋等待循环时,改善 CPU 性能,详情可以看 http://x86.renejeschke.de/html/file_module_x86_id_232.html

#define ngx_cpu_pause()         __asm__ (".byte 0xf3, 0x90")

0xf3 0x90就是 intel 的pause操作

互斥锁

互斥锁有两种实现方式,一种是利用原子变量,另外一种是利用文件锁,这里只看第一种

结构体定义

typedef struct {
    ngx_atomic_t   lock;        // 原子变量
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t   wait;        // 等待信号量的进程数量
#endif
} ngx_shmtx_sh_t;

typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_atomic_t  *lock;        // 原子变量锁
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t  *wait;        // 等待信号量的进程数量
    ngx_uint_t     semaphore;   // 是否使用信号量
    sem_t          sem;         // 信号量
#endif
#else                           // 使用文件锁
    ngx_fd_t       fd;          // 文件描述符
    u_char        *name;        // 文件路径
#endif
    ngx_uint_t     spin;        // 自旋次数
} ngx_shmtx_t;

lock为 0 时代表锁没有被占用,否则为被占用
这里spin比较特殊,除了用来控制自旋次数,还表示是否使用信号量,从创建函数可以看出,如果值为 -1 则表示不使用信号量,否则值会被设置为 2048

创建互斥锁

ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;

    if (mtx->spin == (ngx_uint_t) -1) { // 不使用信号量
        return NGX_OK;
    }

    mtx->spin = 2048;

#if (NGX_HAVE_POSIX_SEM)

    mtx->wait = &addr->wait;

    // 第二个参数为 0 表示线程间共享 非 0 表示进程间共享
    if (sem_init(&mtx->sem, 1, 0) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed");
    } else {
        mtx->semaphore = 1;
    }

#endif

    return NGX_OK;
}

这个函数的其中一个参数类型为ngx_shmtx_sh_t,里面存放了描述锁的两个原子变量,mtxlockwait分别指向它们
如果不想使用原子变量(从后面的函数可以知道,使用原子变量会使进程阻塞),则再传入mtx之前将spin的值设置为 -1 即可

扫描二维码关注公众号,回复: 2961192 查看本文章

销毁互斥锁

void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)

    if (mtx->semaphore) {
        if (sem_destroy(&mtx->sem) == -1) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                          "sem_destroy() failed");
        }
    }

#endif
}

只有使用信号量的情况下才需要进行处理,也就是简单地调用sem_destroy

非阻塞获取锁

ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

这里使用了前面实现的CAS操作,这个地方相当于判断了两次lock == 0,应该是为了效率,因为后面的函数可能会锁总线,可以看到,获取锁就是简单地将一个非零值赋予lock,当然,从释放锁的函数中知道,这里选择pid是有意义的

阻塞获取锁

void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;

    for ( ;; ) {

        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }

        if (ngx_ncpu > 1) {

            for (n = 1; n < mtx->spin; n <<= 1) {   // 自旋

                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }

                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }

#if (NGX_HAVE_POSIX_SEM)

        if (mtx->semaphore) {
            (void) ngx_atomic_fetch_add(mtx->wait, 1);

            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                (void) ngx_atomic_fetch_add(mtx->wait, -1);
                return;
            }

            while (sem_wait(&mtx->sem) == -1) {
                ngx_err_t  err;

                err = ngx_errno;

                if (err != NGX_EINTR) {
                    break;
                }
            }

            continue;
        }

#endif

        ngx_sched_yield(); // 让出 CPU
    }
}

首先尝试获取锁,若成功则返回,否则继续执行
如果是多核的情况,则开始自旋,其中执行了pause操作,之前说过可以改善性能,那么为什么可以改善性能呢?想象一下,如果是一个空循环,则会不断地执行比较操作,这会导致 CPU 流水线中充满着 LOAD 指令,也就是读取lock的值,此时如果持有锁的那个进程释放了锁,需要将lock的值设置为零,产生一条 STORE 指令,这里存在一个内存顺序的问题,也就是 STORE 指令必须在 LOAD 指令前执行,不然会出现数据不一致的现象,然而 CPU 为了提升性能是乱序执行指令的,由于 STORE 的出现可能导致数据不一致,所以不得不重排指令,也就导致了一定的性能问题
如果自旋结束依旧没获取到锁,若使用信号量则递增wait的值,然后调用sem_wait阻塞进程,若不使用信号量则调用ngx_sched_yield让出 CPU,这两者是有区别的,前者会使进程处于TASK_INTERRUPTIBLE状态,只能等待信号或其他进程唤醒,而后者执行后进程依旧是TASK_RUNNING状态,是可以被 CPU 调度的
话说如果sem_wait是被信号唤醒的,那wait的值是不是不太对

释放锁

void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}

ngx_uint_t
ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid)
{
    if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) {
        ngx_shmtx_wakeup(mtx);
        return 1;
    }

    return 0;
}

释放锁就是将lock的值设置为零,如果使用了信号量的话,还需要唤醒信号量,此外,还提供了强制释放锁的函数,这个函数可以释放其他进程持有的锁,但是需要提供该进程的pid,那么哪里会使用这个强制的函数呢?例如worker进程意外退出,但是持有负载均衡锁,此时需要由master进程来强制释放

static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_uint_t  wait;

    if (!mtx->semaphore) {
        return;
    }

    for ( ;; ) {

        wait = *mtx->wait;

        if ((ngx_atomic_int_t) wait <= 0) {
            return;
        }

        if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
            break;
        }
    }

    if (sem_post(&mtx->sem) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_post() failed while wake shmtx");
    }

#endif
}

利用wait的值来判断是否需要唤醒信号量,这里再次使用了CAS操作,这是必要的,因为可能在读取wait的之后,其他进程改变了wait的值

猜你喜欢

转载自blog.csdn.net/scnu20142005027/article/details/69939111