原文地址:http://locklessinc.com/articles/locks/ 翻译时与原文略有简化,并增加了一些注释方便理解
大多数的并行编程都或多或少的要用到锁,锁可以提供互斥的访问来保证数据的一致性。没有锁的话,多线程并发很可能同时修改同一个数据结构。而无锁算法往往需要仔细复杂的设计,否则程序会进入未预期的状态导致系统崩溃。而无锁算法的设计非常困难,大部分的程序都会用到锁。
如果锁保护的代码段,如更新一个数据结构很慢则可以使用 mutex 锁。当线程 block 时会将控制权交给操作系统,这样当等锁的线程睡眠时,可以让出 CPU 让其他的线程得到调度。这个过程包含上下文的切换,是一个很慢的过程。因此如果持锁的时间比较短时,使用 mutex 锁将会不够快。
Spinlock 锁
不同于切换上下文,spinlock将会空转,不断的检查锁是否已经被释放。该循环过程很快,所以(解锁–重新获取锁)这一过程很快。然而,空转的过程不会完成任何有效的工作,所以当等锁的时间变得可观时,将没有 mutex 锁能更有效的利用 CPU。
在我们描述 spinlock 锁之前,我们需要一组原子操作,幸运的是,gcc 以内置函数的方式提供了其中的一些。
#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V)) /* 返回 P 值,执行后 P += V */
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N)) /* 比较指针与旧值,相等用新值替换 */
#define atomic_inc(P) __sync_add_and_fetch((P), 1) /* P += 1,返回 P 值 */
#define atomic_dec(P) __sync_add_and_fetch((P), -1) /* P -= 1,返回 P 值 */
#define atomic_add(P, V) __sync_add_and_fetch((P), (V)) /* P += V,返回 P 值 */
#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V)) /* 置某位为 1 */
#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V))) /* 将某位清零 */
其他的一些我们需要自己实现:
/* 设置内存屏障,将当前CPU缓存的值全部写入内存 */
#define barrier() asm volatile("": : :"memory")
/* Pause instruction to prevent excess processor bus usage */
#define cpu_relax() asm volatile("pause\n": : :"memory")
/* 如果满足条件,则交换两个指针的值 */
static inline void *xchg_64(void *ptr, void *x)
{
__asm__ __volatile__("xchgq %0,%1"
:"=r" ((unsigned long long) x)
:"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)
:"memory");
return x;
}
static inline unsigned xchg_32(void *ptr, unsigned x)
{
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");
return x;
}
static inline unsigned short xchg_16(void *ptr, unsigned short x)
{
__asm__ __volatile__("xchgw %0,%1"
:"=r" ((unsigned short) x)
:"m" (*(volatile unsigned short *)ptr), "0" (x)
:"memory");
return x;
}
/* Test and set a bit */
static inline char atomic_bitsetandtest(void *ptr, int x)
{
char out;
__asm__ __volatile__("lock; bts %2,%1\n"
"sbb %0,%0\n"
:"=r" (out), "=m" (*(volatile long long *)ptr)
:"Ir" (x)
:"memory");
return out;
}
使用这些原子操作,我们可以实现一个较为直观的 spinlock 锁。
#define EBUSY 1
typedef unsigned spinlock;
static void spin_lock(spinlock *lock)
{
while (1)
{ /* lock 为空,置 BUSY 返回,加锁成功 */
if (!xchg_32(lock, EBUSY)) return;
while (*lock) cpu_relax();
}
}
static void spin_unlock(spinlock *lock)
{ /* 使用 barrier 后,CPU 缓存失效,让等锁的线程尽快读取新值,而不是使用 cache 中的旧值 */
barrier();
*lock = 0;
}
static int spin_trylock(spinlock *lock)
{
return xchg_32(lock, EBUSY);
}
以上代码实现的速度如何,一个简单的评判标准就是开启一定量的线程来竞争锁,每次加锁执行定量的工作。在一定的时间,如果加解锁成功的次数在竞争线程数增加的情况仍能够保持稳定,那么这个算法对于竞争的处理就是比较好的。一个好的 spinlock 锁实现应该在任意给定的线程数目下都尽可能的快。
以上 spinlock 锁实现的表现如下:
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 5.5 | 5.6 | 5.7 | 5.7 | 5.7 |
以上结果已经很好了,但是仍可以改进。问题在于有大量的线程在竞争,当锁释放的时候,他们会同时尝试对其进行加锁。这样会导致巨大的处理器总线流量,消耗大量的性能。因此,如果我们用某种方式把锁的申请者进行排队,这样就可以知道谁是锁的下一个使用者,我们就可以极大的减少总线的流量。
一种实现以上方法的 spinlock 锁叫做 MCS 锁,其使用一个队列来维护锁的申请者的顺序。
typedef struct mcs_lock_t mcs_lock_t;
struct mcs_lock_t
{
mcs_lock_t *next;
int spin;
};
typedef struct mcs_lock_t *mcs_lock;
static void lock_mcs(mcs_lock *m, mcs_lock_t *me) /* m 指针为队列尾部的指针 */
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
tail = xchg_64(m, me); /* 将自己的指针替换为 m 指针 */
/* 之前的队列为空,加锁成功 */
if (!tail) return;
/* 将自己加入原来的队列 */
tail->next = me;
/* 尽快置 tail->next = me,防止释放锁的线程等待 */
barrier();
/* Spin on my spin variable */
while (!me->spin) cpu_relax();
return;
}
static void unlock_mcs(mcs_lock *m, mcs_lock_t *me)
{
/* 没有继承者了 */
if (!me->next)
{
/* 判断自己是否是最后一个节点,是直接返回 */
if (cmpxchg(m, me, NULL) == me) return;
/* 等待申请者置 m->next */
while (!me->next) cpu_relax();
}
/* 通知继承者,释放锁 */
me->next->spin = 1;
}
static int trylock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
/* Try to lock */
tail = cmpxchg(m, NULL, &me);
/* No one was there - can quickly return */
if (!tail) return 0;
return EBUSY;
}
这个有很不同的表现:
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.6 | 4.4 | 4.5 | 4.8 | >1min>1min |
当竞争的线程数超过处理的 CPU 核数时(这里以 4 核为例),MCS 锁可能会花费一段很长的时间。这是因为当解锁后,获取到锁的下一个线程可能并不处在活跃的状态,大家都需要等待操作系统决定去调度它。所有的“公平”算法都存在这个问题。因此简单的不公平的算法仍是很有效的,在你不知道线程数与绑定的 CPU 核数的情况下。
一个更大的问题在于 MCS 锁的 API,它需要额外的结构来传递当前锁的地址,另一个结构来保存锁等待者队列的指针。然而,大多数使用 spinlock 锁的已有代码并没有这个额外的信息,所以我们并不能很方便的用 MCS 锁来替换之前的 spinlock 锁,这也是一个问题。
一个 IBM 的工作组改进了 MCS 算法移除了对这个额外结构体的需要,作为替代,其使用了堆栈信息。这就是 K42 锁算法
typedef struct k42lock k42lock;
struct k42lock
{
k42lock *next;
k42lock *tail;
};
static void k42_lock(k42lock *l)
{
k42lock me; /* 此处应动态申请? */
k42lock *pred, *succ;
me.next = NULL;
barrier();
pred = xchg_64(&l->tail, &me); /* 新来的请求者替换为队列尾指针 */
if (pred)
{ /* 如果之前有请求者,当前等待 */
me.tail = (void *) 1;
barrier();
pred->next = &me; /* 将自己加入请求者队列 */
barrier();
while (me.tail) cpu_relax();
}
succ = me.next;
if (!succ) /* 没有继承者 */
{
barrier();
l->next = NULL;
if (cmpxchg(&l->tail, &me, &l->next) != &me) /* 自己是否为最后一个请求者 */
{
while (!me.next) cpu_relax();
l->next = me.next; /* 需要等待新的请求者置 l->next */
}
}
else
{
l->next = succ;
}
}
static void k42_unlock(k42lock *l)
{
k42lock *succ = l->next;
barrier();
if (!succ)
{ /* 没有继承者了 */
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;
while (!l->next) cpu_relax();
succ = l->next;
}
succ->tail = NULL; /* 只有 l->tail 用来标识最后一个申请者,其他节点 tail 用来判断锁是否被释放 */
}
static int k42_trylock(k42lock *l)
{ /* 不需要重新申请锁结构体,最后一个请求者的锁结构未释放,此处重用 */
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;
return EBUSY;
}
K42 锁算法的表现与 MCS 锁差不多。
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.7 | 4.8 | 4.5 | 4.9 | >1min>1min |
K42 锁算法的另一个问题是其被 IBM 公司注册为了专利,也不能够使用。
另一种方法是使用一种不同类型的队列。K42 算法和 MCS 算法都使用队列来保存申请者的顺序,所以找到下一个使用者比较简单,找到队列尾部比较难。如果我们反转指针的朝向,使得找到队列尾部比较简单,而找到下一个使用者难点呢。这样有了以下的算法:
该实现主要思想就是在 MCS 锁的基础上,不保存指针 me,而是通过队列尾部指针,一直往前找找到当前操作的节点指针。
typedef struct listlock_t listlock_t;
struct listlock_t
{
listlock_t *next;
int spin;
};
typedef struct listlock_t *listlock;
#define LLOCK_FLAG (void *)1
static void listlock_lock(listlock *l)
{
listlock_t me;
listlock_t *tail;
/* Fast path - no users */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;
me.next = LLOCK_FLAG;
me.spin = 0;
/* Convert into a wait list */
tail = xchg_64(l, &me);
if (tail)
{
/* Add myself to the list of waiters */
if (tail == LLOCK_FLAG) tail = NULL;
me.next = tail;
/* Wait for being able to go */
while (!me.spin) cpu_relax();
return;
}
/* Try to convert to an exclusive lock */
if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;
/* Failed - there is now a wait list */
tail = *l;
/* Scan to find who is after me */
while (1)
{
/* Wait for them to enter their next link */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (tail->next == &me)
{
/* Fix their next pointer */
tail->next = NULL;
return;
}
tail = tail->next;
}
}
static void listlock_unlock(listlock *l)
{
listlock_t *tail;
listlock_t *tp;
while (1)
{
tail = *l;
barrier();
/* Fast path */
if (tail == LLOCK_FLAG)
{
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;
continue;
}
tp = NULL;
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
/* There is a wait list */
if (tail->next) break;
/* Try to convert to a single-waiter lock */
if (cmpxchg(l, tail, LLOCK_FLAG) == tail)
{
/* Unlock */
tail->spin = 1;
return;
}
cpu_relax();
}
/* A long list */
tp = tail;
tail = tail->next;
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (!tail->next) break;
tp = tail;
tail = tail->next;
}
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int listlock_trylock(listlock *l)
{
/* Simple part of a spin-lock */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;
/* Failure! */
return EBUSY;
}
同样,该算法表现也不会太好:
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.6 | 5.1 | 5.8 | 6.3 | >1min>1min |
当竞争线程数少时其仍然比标准 spinlock 锁快,但是当多于两个线程开始尝试竞争锁时表现就会变差,并且越来越差。
另一个可以用的小技巧是在 spinlock 内使用 spinlock 锁。第一个锁可以非常轻量级,因为我们知道它只会被持有一小段时间。它能用于给请求真正 spinlock 锁的等待者队列进行加锁。如果处理得当,我们能够将等待者队列长度维持在一个较短的长度,来减少总线流量。
typedef struct bitlistlock_t bitlistlock_t;
struct bitlistlock_t
{
bitlistlock_t *next;
int spin;
};
typedef bitlistlock_t *bitlistlock;
#define BLL_USED ((bitlistlock_t *) -2LL)
static void bitlistlock_lock(bitlistlock *l)
{
bitlistlock_t me;
bitlistlock_t *tail;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
/* Remove locked bit */
tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);
/* Fast path, no waiters */
if (!tail) {
/* Set to be a flag value */
*l = BLL_USED;
return;
}
if (tail == BLL_USED) tail = NULL;
me.next = tail;
me.spin = 0;
barrier();
/* Unlock, and add myself to the wait list */
*l = &me;
/* Wait for the go-ahead */
while (!me.spin) cpu_relax();
}
static void bitlistlock_unlock(bitlistlock *l)
{
bitlistlock_t *tail;
bitlistlock_t *tp;
/* Fast path - no wait list */
if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
tp = *l;
barrier();
/* Get end of list */
tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);
/* Actually no users? */
if (tail == BLL_USED)
{
barrier();
*l = NULL;
return;
}
/* Only one entry on wait list? */
if (!tail->next)
{
barrier();
/* Unlock bitlock */
*l = BLL_USED;
barrier();
/* Unlock lock */
tail->spin = 1;
return;
}
barrier();
/* Unlock bitlock */
*l = tail;
barrier();
/* Scan wait list for start */
do
{
tp = tail;
tail = tail->next;
}
while (tail->next);
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int bitlistlock_trylock(bitlistlock *l)
{
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;
return EBUSY;
}
然而,该实现比之前的队列锁算法更差,只有在没有竞争的情况下表现好一些。
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.6 | 5.3 | 6.3 | 6.8 | >1min>1min |
另外一种可能是将一些其他类型的锁修改为 spinlock 锁。读写锁 设计在大规模竞争的场景表现的非常好,如果我们将 read 锁部分移除,剩下的 write 锁之间的互斥竞争就表现的和 spinlock 锁一样了。实现如下:
/* Bit-lock for editing the wait block */
#define SLOCK_LOCK 1
#define SLOCK_LOCK_BIT 0
/* Has an active user */
#define SLOCK_USED 2
#define SLOCK_BITS 3
typedef struct slock slock;
struct slock
{
uintptr_t p;
};
typedef struct slock_wb slock_wb;
struct slock_wb
{
slock_wb *last; /* 指向最后一个等待者,该值只有从第一个等待者读取时才有效 */
slock_wb *next; /* 指向下一个等待着 */
int wake; /* 标记锁是否被释放 */
};
/* Wait for control of wait block */
static slock_wb *slockwb(slock *s)
{
uintptr_t p;
/* 加位锁,成功后可操作等待者队列 */
while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))
{
cpu_relax();
}
p = s->p;
if (p <= SLOCK_BITS)
{
/* Oops, looks like the wait block was removed. */
atomic_dec(&s->p);
return NULL;
}
return (slock_wb *)(p - SLOCK_LOCK);
}
static void slock_lock(slock *s)
{
slock_wb swblock;
/* Fastpath - no other readers or writers */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;
/* Initialize wait block */
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = 0;
while (1)
{
uintptr_t p = s->p;
cpu_relax();
/* Fastpath - no other readers or writers */
if (!p)
{
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;
continue;
}
if (p > SLOCK_BITS)
{
slock_wb *first_wb, *last;
/* 获取等待者队列的操作权 */
first_wb = slockwb(s);
if (!first_wb) continue;
/* 将自己加入到等待者队列尾 */
last = first_wb->last;
last->next = &swblock;
first_wb->last = &swblock;
/* 操作完成后,先释放位锁 */
barrier();
s->p &= ~SLOCK_LOCK;
break;
}
/* Try to add the first wait block */
if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;
}
/* 等待锁被释放 */
while (!swblock.wake) cpu_relax();
}
static void slock_unlock(slock *s)
{
slock_wb *next;
slock_wb *wb;
uintptr_t np;
while (1)
{
uintptr_t p = s->p;
/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */
if (p == SLOCK_USED)
{
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;
continue;
}
/* 获取操作等待者队列的位锁 */
wb = slockwb(s);
if (wb) break;
cpu_relax();
}
next = wb->next;
if (next)
{
/* 释放锁之后,需要将 next->last 置为有效的队列尾指针 */
np = (uintptr_t) next;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
np = SLOCK_USED;
}
barrier();
/* 设置位锁的状态 */
s->p = np;
barrier();
/* 通知继承者锁已释放 */
wb->wake = 1;
}
static int slock_trylock(slock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;
return EBUSY;
}
这个结果也不令人满意,与 bitlistlock 算法很类似,因为它们都采用了一个位锁来进行等待者队列的互斥操作。
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.7 | 5.1 | 5.8 | 6.5 | >1min>1min |
现在我们需要进行一些思考分析,以上算法的一个问题在于等待者队列的同步。一个核心的问题是我们需要用某种方式识别出队列的头部和尾部。头部被用来添加新的等待者,尾部用来决定谁是锁的下一个使用者。MCS 锁算法增加了额外的结构,我们能够很快的找到队列尾部。K42 锁算法将其保存在了第二个队列指针里面。
然而我们还可以使用另外一种技巧,如果我们能够在堆栈保存额外的信息。那我们就可能在堆栈中识别我们需要的指针,我们就可以用这个信息来知道队列的尾部。这就是堆栈锁(stack-lock)算法。
typedef struct stlock_t stlock_t;
struct stlock_t
{
stlock_t *next;
};
typedef struct stlock_t *stlock;
static __attribute__((noinline)) void stlock_lock(stlock *l)
{
stlock_t *me = NULL;
barrier();
me = xchg_64(l, &me);
/* 等待锁释放 */
while (me) cpu_relax();
}
#define MAX_STACK_SIZE (1<<12)
/* 通过堆栈来找到尾部指针 */
static __attribute__((noinline)) int on_stack(void *p)
{
int x;
uintptr_t u = (uintptr_t) &x;
return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2);
}
static __attribute__((noinline)) void stlock_unlock(stlock *l)
{
stlock_t *tail = *l;
barrier();
/* Fast case */
if (on_stack(tail))
{
/* Try to remove the wait list */
if (cmpxchg(l, tail, NULL) == tail) return;
tail = *l;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (on_stack(tail->next)) break;
tail = tail->next;
}
barrier();
/* 释放锁 */
tail->next = NULL;
}
static int stlock_trylock(stlock *l)
{
stlock_t me;
if (!cmpxchg(l, NULL, &me)) return 0;
return EBUSY;
}
这个算法将会很简单,如果你知道一个线程堆栈的具体分配方式。然而,该算法也很慢。
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.6 | 5.3 | 5.7 | 6.2 | >1min>1min |
以上锁算法的加锁过程看起来很高效,而解锁流程很慢很复杂。也许我们可以在锁的内部保存更多的信息,使得解锁过程能够更快。因为很大一部分时间都是用来查找下一个锁的使用者。或许当我们空转等待锁时就可以就完成该操作,如果我们将其保存在锁结构内,这样在解锁流程我们就不需要重新计算它。
typedef struct plock_t plock_t;
struct plock_t
{
plock_t *next;
};
typedef struct plock plock;
struct plock
{
plock_t *next;
plock_t *prev;
plock_t *last;
};
static void plock_lock(plock *l)
{
plock_t *me = NULL;
plock_t *prev;
barrier();
me = xchg_64(l, &me);
prev = NULL;
/* Wait until we get the lock */
while (me)
{
/* Scan wait list for my previous */
if (l->next != (plock_t *) &me)
{
plock_t *t = l->next;
while (me)
{
if (t->next == (plock_t *) &me)
{
prev = t;
while (me) cpu_relax();
goto done;
}
if (t->next) t = t->next;
cpu_relax();
}
}
cpu_relax();
}
done:
l->prev = prev;
l->last = (plock_t *) &me;
}
static void plock_unlock(plock *l)
{
plock_t *tail;
/* Do I know my previous? */
if (l->prev)
{
/* Unlock */
l->prev->next = NULL;
return;
}
tail = l->next;
barrier();
/* Fast case */
if (tail == l->last)
{
/* Try to remove the wait list */
if (cmpxchg(&l->next, tail, NULL) == tail) return;
tail = l->next;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (tail->next == l->last) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int plock_trylock(plock *l)
{
plock_t me;
if (!cmpxchg(&l->next, NULL, &me))
{
l->last = &me;
return 0;
}
return EBUSY;
}
以上算法开始有了一些速度上的优化,但是仍没有 K42 算法快。(然而总是比原始的 spinlock 锁算法高效,只要竞争的线程数少于处理器核数)
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.7 | 5.1 | 5.3 | 5.4 | >1min>1min |
在仔细分析以上算法后它仍是有优化空间的,我们实际上并不需要知道下一个继承者的指针值,而是可以用其他唯一的值来替代。不再保存指针的值,而是使用一个递增的计数器。如果等待者知道哪个计数是轮到它,那么它只要等待知道该计数出现。这就是 ticket 锁算法。
typedef union ticketlock ticketlock;
union ticketlock
{
unsigned u;
struct
{
unsigned short ticket;
unsigned short users;
} s;
};
static void ticket_lock(ticketlock *t)
{
unsigned short me = atomic_xadd(&t->s.users, 1);
/* 每个等待者都有唯一的加锁成功条件,避免了原生 spinlock 锁并发的问题 */
while (t->s.ticket != me) cpu_relax();
}
static void ticket_unlock(ticketlock *t)
{
barrier();
t->s.ticket++;
}
static int ticket_trylock(ticketlock *t)
{
unsigned short me = t->s.users;
unsigned short menew = me + 1;
unsigned cmp = ((unsigned) me << 16) + me;
unsigned cmpnew = ((unsigned) menew << 16) + me;
if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static int ticket_lockable(ticketlock *t)
{
ticketlock u = *t;
barrier();
return (u.s.ticket == u.s.users);
}
以上算法特别快,打败了所有以上描述的其他的公平的锁算法。
Threads | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
Time (s) | 3.6 | 4.4 | 4.5 | 4.8 | >1min>1min |
实际上,这就是 Linux 内核使用的 spinlock 锁算法。当然为了提高速度,其是用汇编语言实现,而不是我们原生的 C 代码。需要注意以上代码依赖计算机结构的字节序。它是为小端字节序设计的,大端字节序需要做一些修改。
Ticket 锁算法证明了一个常常提到的谬论是错误的。以上很多公平的锁算法在大规模竞争的情况仍表现的不错,这是因为等待者是在不同的内存地址段空转,这减小了总线的压力并提升了性能。然而这个效果很有限,更重要的是等待者需要进行排序以便知道谁是下一个继承者。而 ticket 算法很好的实现了这一点。实际上即使大量的等待者在同一个锁的位置等待,性能也不会表现太差。