原子变量
原子变量主要看了 x86 的实现,位于ngx_gcc_automic_x86.h
CAS
#if (NGX_SMP) // 多核 SMP 架构下需要加锁
#define NGX_SMP_LOCK "lock;"
#else
#define NGX_SMP_LOCK
#endif
static ngx_inline ngx_atomic_uint_t
ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old,
ngx_atomic_uint_t set)
{
u_char res;
__asm__ volatile (
NGX_SMP_LOCK
" cmpxchgl %3, %1; "
" sete %0; "
: "=a" (res) : "m" (*lock), "a" (old), "r" (set) : "cc", "memory");
return res;
}
这个函数通过内嵌汇编实现,先看最后的部分,=
表示只写,a
表示 eax 寄存器,=a
表示输出到 eax 寄存器,m
表示内存,r
表示通用寄存器,cc
表示会修改 flags 寄存器,memory
表示会修改内存
接下来看指令部分,如果是多核 SMP 架构下需要加锁,然后是cmpxchgl
指令,根据代码中的注释
cmpxchgl r, [m]:
if (eax == [m]) {
zf = 1;
[m] = r;
} else {
zf = 0;
eax = [m];
}
将 eax 寄存器的值与地址 m 中的值比较,若相等则将 r 的值写入 m,否则将地址 m 中的值写入 eax 寄存器,此外还会设置 flags 寄存器的 zf 标志位
最后是sete %0
指令,表示根据 zf 标志位设置 %0
,在这里就是 eax 寄存器,并作为函数的返回值
FAA
static ngx_inline ngx_atomic_int_t
ngx_atomic_fetch_add(ngx_atomic_t *value, ngx_atomic_int_t add)
{
__asm__ volatile (
NGX_SMP_LOCK
" xaddl %0, %1; "
: "+r" (add) : "m" (*value) : "cc", "memory");
return add;
}
这里+
表示读写,然后是xaddl
指令,依旧是看代码中的注释
xaddl r, [m]:
temp = [m];
[m] += r;
r = temp;
这个指令比较容易理解,其他部分与之前的函数类似
memory barrier
#define ngx_memory_barrier() __asm__ volatile ("" ::: "memory")
作用是强制了 memory barrier 前后内存访问指令的顺序,避免了编译器优化带来的内存乱序访问问题
pause
在自旋等待循环时,改善 CPU 性能,详情可以看 http://x86.renejeschke.de/html/file_module_x86_id_232.html
#define ngx_cpu_pause() __asm__ (".byte 0xf3, 0x90")
而0xf3 0x90
就是 intel 的pause
操作
互斥锁
互斥锁有两种实现方式,一种是利用原子变量,另外一种是利用文件锁,这里只看第一种
结构体定义
typedef struct {
ngx_atomic_t lock; // 原子变量
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t wait; // 等待信号量的进程数量
#endif
} ngx_shmtx_sh_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
ngx_atomic_t *lock; // 原子变量锁
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t *wait; // 等待信号量的进程数量
ngx_uint_t semaphore; // 是否使用信号量
sem_t sem; // 信号量
#endif
#else // 使用文件锁
ngx_fd_t fd; // 文件描述符
u_char *name; // 文件路径
#endif
ngx_uint_t spin; // 自旋次数
} ngx_shmtx_t;
当lock
为 0 时代表锁没有被占用,否则为被占用
这里spin
比较特殊,除了用来控制自旋次数,还表示是否使用信号量,从创建函数可以看出,如果值为 -1 则表示不使用信号量,否则值会被设置为 2048
创建互斥锁
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) { // 不使用信号量
return NGX_OK;
}
mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
mtx->wait = &addr->wait;
// 第二个参数为 0 表示线程间共享 非 0 表示进程间共享
if (sem_init(&mtx->sem, 1, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed");
} else {
mtx->semaphore = 1;
}
#endif
return NGX_OK;
}
这个函数的其中一个参数类型为ngx_shmtx_sh_t
,里面存放了描述锁的两个原子变量,mtx
的lock
与wait
分别指向它们
如果不想使用原子变量(从后面的函数可以知道,使用原子变量会使进程阻塞),则再传入mtx
之前将spin
的值设置为 -1 即可
销毁互斥锁
void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
if (mtx->semaphore) {
if (sem_destroy(&mtx->sem) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_destroy() failed");
}
}
#endif
}
只有使用信号量的情况下才需要进行处理,也就是简单地调用sem_destroy
非阻塞获取锁
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
这里使用了前面实现的CAS
操作,这个地方相当于判断了两次lock == 0
,应该是为了效率,因为后面的函数可能会锁总线,可以看到,获取锁就是简单地将一个非零值赋予lock
,当然,从释放锁的函数中知道,这里选择pid
是有意义的
阻塞获取锁
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
ngx_uint_t i, n;
for ( ;; ) {
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
return;
}
if (ngx_ncpu > 1) {
for (n = 1; n < mtx->spin; n <<= 1) { // 自旋
for (i = 0; i < n; i++) {
ngx_cpu_pause();
}
if (*mtx->lock == 0
&& ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
{
return;
}
}
}
#if (NGX_HAVE_POSIX_SEM)
if (mtx->semaphore) {
(void) ngx_atomic_fetch_add(mtx->wait, 1);
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
(void) ngx_atomic_fetch_add(mtx->wait, -1);
return;
}
while (sem_wait(&mtx->sem) == -1) {
ngx_err_t err;
err = ngx_errno;
if (err != NGX_EINTR) {
break;
}
}
continue;
}
#endif
ngx_sched_yield(); // 让出 CPU
}
}
首先尝试获取锁,若成功则返回,否则继续执行
如果是多核的情况,则开始自旋,其中执行了pause
操作,之前说过可以改善性能,那么为什么可以改善性能呢?想象一下,如果是一个空循环,则会不断地执行比较操作,这会导致 CPU 流水线中充满着 LOAD 指令,也就是读取lock
的值,此时如果持有锁的那个进程释放了锁,需要将lock
的值设置为零,产生一条 STORE 指令,这里存在一个内存顺序的问题,也就是 STORE 指令必须在 LOAD 指令前执行,不然会出现数据不一致的现象,然而 CPU 为了提升性能是乱序执行指令的,由于 STORE 的出现可能导致数据不一致,所以不得不重排指令,也就导致了一定的性能问题
如果自旋结束依旧没获取到锁,若使用信号量则递增wait
的值,然后调用sem_wait
阻塞进程,若不使用信号量则调用ngx_sched_yield
让出 CPU,这两者是有区别的,前者会使进程处于TASK_INTERRUPTIBLE
状态,只能等待信号或其他进程唤醒,而后者执行后进程依旧是TASK_RUNNING
状态,是可以被 CPU 调度的
话说如果sem_wait
是被信号唤醒的,那wait
的值是不是不太对
释放锁
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
ngx_shmtx_wakeup(mtx);
}
}
ngx_uint_t
ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid)
{
if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) {
ngx_shmtx_wakeup(mtx);
return 1;
}
return 0;
}
释放锁就是将lock
的值设置为零,如果使用了信号量的话,还需要唤醒信号量,此外,还提供了强制释放锁的函数,这个函数可以释放其他进程持有的锁,但是需要提供该进程的pid
,那么哪里会使用这个强制的函数呢?例如worker
进程意外退出,但是持有负载均衡锁,此时需要由master
进程来强制释放
static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_uint_t wait;
if (!mtx->semaphore) {
return;
}
for ( ;; ) {
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0) {
return;
}
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
break;
}
}
if (sem_post(&mtx->sem) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_post() failed while wake shmtx");
}
#endif
}
利用wait
的值来判断是否需要唤醒信号量,这里再次使用了CAS
操作,这是必要的,因为可能在读取wait
的之后,其他进程改变了wait
的值