数据结构
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
w:互斥锁,多个写 goroutine之间竞争
writerSem: 加写锁的信号量
readerSem: 加读锁的信号量
readerCount: 需要加读锁的数(已经获取到的+待获取的)
readerWait: 已经获取到读锁数
写锁Lock
func (rw *RWMutex) Lock() {
...
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
...
}
rw 是一个mutex,主要是用来多个协程之间互斥;
readerCount 执行AddInt32之后会是一个特别大的负值
readerWait就等于加写锁一刻的readerCount 数
如果atomic.AddInt32(&rw.readerWait, r) != 0 就说明有已经获取到读锁的协程,等待rw.writerSem 信号量释放
(通过后面的读锁释放可以看到,writerSem 信号量释放的时候,readerWait == 0)
写锁 Unlock
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
加写锁的时候,将readerCount 减去一个特别大的数rwmutexMaxReaders,所以释放写锁,先还原readerCount
因为加写锁成功的情况只有readerWait == 0
如果r大于0,说明有需要 r 个需要加读锁的协程,就需要 readerSem 信号量release r次
最后释放w mutex的锁
读锁RLock
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
}
只要readerCount 小于0 ,就认为有协程加完写锁,此时等待readerSem 信号量
读锁 RUnlock
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
...
}
如果readerCount 小于0,就说明有写锁在等待
将readerWait 数量减1 当readerWait == 0 的时候,此时没有获取到读锁的协程了,释放writerSem 信号量
最大读锁数 rwmutexMaxReaders
readerCount 应当小于 rwmutexMaxReaders,否则加写锁时 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 直接就>=0了,那读锁就认为没有需要加写锁的goroutine
读写锁的顺序
从上面代码可以看出,当一个写锁释放之后,如果同时有需要加读锁和写锁的goroutine,优先加读锁,因为先释放的readerSem,后释放的w(mutex),当w unlock之后,之后的需要加读锁的goroutine又会pending。
不要递归加读锁
假如出现递归加读锁的情况,从上面代码分析中可以看出来,当有一个goroutine 需要加写锁之后,后面的加读锁操作都会pending,程序会block
在递归调用中,新的读锁会pending,之前加完读锁的不能释放,而写锁也会pending。
func f(n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done <- 1
}()
f(4)
<-done
}