Go 语言实现——同步原语¶
Mutex¶
正常模式¶
先看 Go1.8 的 Mutex 实现,这个版本的 Mutex 实现还比较简单,1.9 开始 Mutex 加入了一个饥饿模式的优化,这个后面再说。
Mutex 定义如下:
type Mutex struct {
state int32
sema uint32
}
其中 state
为当前 Mutex 的状态,sema
是解锁信号量。
state
的状态位定义如下:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
从最低位开始:
第一个比特位表示“当前 Mutex 是不是已经上锁”。
第二个比特位表示“当前有 goroutine 处于自旋或者收到解锁信号目前处于运行状态”。
剩余的位用作计数器,存当前有多少 goroutine 正在等待解锁的信号量。
Mutex 上锁的逻辑如下:
首先使用 CAS 尝试上锁,如果成功直接返回。
如果失败,自旋几次等待解锁并重新尝试上锁。
自旋次数太多后,将 goroutine 睡眠,等待 Unlock 发信号唤醒。
func (m *Mutex) Lock() {
// CAS 尝试上锁,成功直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
// 如果锁已经被其它 goroutine 持有了
if old&mutexLocked != 0 {
// 检查当前 goroutine 能不能够自旋
if runtime_canSpin(iter) {
// 设置 woken 标示位,告诉 Unlock 不用唤醒 goroutine
// 有 goroutine 处在运行状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 一次自旋等待解锁
runtime_doSpin()
iter++
continue
}
// 不能自旋,计数器 + 1
new = old + 1<<mutexWaiterShift
}
// 如果 mutexWoken 被置位的话,清除标志位
if awoke {
new &^= mutexWoken
}
// 尝试加锁或者睡眠等待解锁信号
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 加锁成功,返回
if old&mutexLocked == 0 {
break
}
// 睡眠 💤 等待解锁信号
runtime_SemacquireMutex(&m.sema)
// 收到解锁信号,回循环开始重新开始尝试加锁
awoke = true
iter = 0
}
}
}
要进入自旋也是有条件限制的:
自旋的次数小于 4(别一直自旋,浪费 CPU)。
运行在多核机器上(单核机器自旋没有意义)。
其它逻辑线程 P 至少得有一个处在工作状态,而不是都在无所事事(加上下面的条件 4,如果都在无所事事,那么自旋等也等来不来解锁的,除非有新的 goroutine 进来,所以这种情况下还是睡眠等信号吧)。
当前 goroutine 归属的逻辑线程 P 上只有当前 goroutine 一个,没有其它了。
func sync_runtime_canSpin(i int) bool {
if i >= 4 || ncpu <= 1 || int32(sched.npidle+sched.nmspinning)+1 >= gomaxprocs {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
自旋就是执行 30 次 PAUSE 指令。
func sync_runtime_doSpin() {
procyield(30)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
而解锁的逻辑就是:
首先解锁。如果等待锁的 goroutine 有在运行状态的,直接返回就行。
如果没有,那么使用信号量给等待的 goroutine 发送个信号。
func (m *Mutex) Unlock() {
// 解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
old := new
for {
// old>>mutexWaiterShift 是当前等待解锁信号量的 goroutine 计数器
// 如果没有等待解锁信号量的 goroutine,或者刚解的锁已经被其它 goroutine 重新上锁
// 或者有在自旋等待锁的 goroutine,直接返回。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// 等待解锁信号量的 goroutine 数减 1 并且设置“已经有 goroutine 唤醒”标志位。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}
https://github.com/golang/go/blob/release-branch.go1.8/src/sync/mutex.go
饥饿模式¶
上面正常模式运行下的 Mutex 有一个问题,就是通过 runtime_Semrelease
信号通知的等待锁的 goroutine 相比于自旋正在等待锁或者运行中的等待锁的 goroutine 有点弱势,因为收到信号后 goroutine 并不是立刻运行,只是被加入了可运行队列,抢锁就很容易抢不过正在 CPU 上运行着的 goroutine,因此 Go1.9 之后 Go 新加了一个饥饿模式,在这个模式下,通过 runtime_SemacquireMutex
唤醒的 goroutine 会检查自己等待锁的时间是不是太长了,如果是那么就将锁的争抢模式改为“饥饿模式”,禁止自旋,所有等待锁的进程都排排坐吃果果。另外,runtime_SemacquireMutex
的时候将自己直接加到等待队列的前面,优先获取锁。
详细参见:
RWMutex¶
RWMutex 结构体如下:
type RWMutex struct {
w Mutex // 写锁
writerSem uint32 // 读锁释放信号量
readerSem uint32 // 写锁释放信号量
readerCount int32 // 已上的读锁计数器,为负的时候表示已加写锁
readerWait int32 // 加写锁时读锁计数器的值,写锁加完后需
// 要等待之前这些读锁全部释放整个写锁加锁过程才算完成
// 写锁加上后后续上读锁操作会阻塞,直接等待写锁释放信号
}
上读锁比较简单,就是给读锁计数器加一,如果计数器的值为负数,说明 RWMutex 当前已上写锁,此时需要等写锁释放,才算上锁完成。
释放读锁的时候计数器减一,同时需要判断当前是不是有写锁,如果有写锁的话,判断写锁等待的那些读锁是不是全部释放完毕,是的话通知写锁“所有读锁已经释放,可以返回进行写操作了”。
func (rw *RWMutex) RLock() {
// 更新读操作计数器,如果值为负,说明有写锁
// 直接等待写锁释放信号,否则返回上读锁成功。
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_Semacquire(&rw.readerSem)
}
}
func (rw *RWMutex) RUnlock() {
// 更新读操作计数器,如果计数器为负,说明有写锁
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 更新 readerWait 计数器,当计数器为 0,说明加写锁时正在执行的读操作已经完成
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 通知读锁全部释放了
runtime_Semrelease(&rw.writerSem, false)
}
}
}
写锁就是 RWMutex 中的 w 这个 Mutex 排他锁,加写锁的第一件事就是上这个排他锁,保证只能有一个写操作在执行,然后修改 readerCount 为负数,这个主要是通知读锁操作,我这边已经加了写锁了,后续上读锁直接等我 写锁释放信号 就行。将此时的 readerCount 快照下来,等待这些读锁释放后,就可以返回进行写操作了。
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// 加写锁
rw.w.Lock()
// 将 readerCount 减去一个常数变成负数,作为加了写锁的标示
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 记录当前正在执行的读操作数量
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 等待上面记录的这些读操作完成的信号
runtime_Semacquire(&rw.writerSem)
}
}
func (rw *RWMutex) Unlock() {
// 将 readerCount 恢复原样
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 通知这些读操作写锁已经释放了
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 释放写锁
rw.w.Unlock()
}
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
因为加写锁:
需要等待 加锁前所有的读锁 释放才算整个加锁过程完成。
加锁后 后续的读锁操作 直接阻塞并等待写锁释放信号才能上锁。
就造成了上面引用的官方文档里说的现象。
这个用 https://www.zenlife.tk/go-read-mutex-block.md 中的一段代码来说明。
package main
import (
"fmt"
"time"
"sync"
)
// g1
func main() {
var mu sync.RWMutex
// g2
go func() {
mu.RLock()
time.Sleep(10*time.Second)
mu.RUnlock()
}()
time.Sleep(100*time.Millisecond)
// g3
go func() {
mu.Lock()
mu.Unlock()
}()
time.Sleep(100*time.Millisecond)
start := time.Now()
fmt.Println("before get read block")
// 会被 g2 阻塞
mu.RLock()
fmt.Println("inside read load no block", time.Since(start))
mu.RUnlock()
}
我们以 g1, g2, g3 分别标示上面代码中的几个 goroutine,g1 最后的 RLock 会被 g2 的 RLock 阻塞。整个逻辑链条如下:
g2 中给 RWMutex 上了一个读锁。
g3 中尝试给 RWMutex 上写锁,写锁需要等待 g2 中加的读锁完成才算加锁完成。
g1 中上读锁时,发现有写锁,直接阻塞等待写锁释放的信号。
10 秒之后,g2 中的读锁释放,写锁等到释放信号,完成加锁,然后写锁释放,g1 等到释放信号,继续上读锁操作。