本文需要关注如下两个问题:
-
如何唤醒睡眠中的工作线程?
-
如何创建新的工作线程?
上篇文章聊到ready通过将需唤醒的goroutine放入运行队列来唤醒它,本文接着分析。
为充分利用CPU,ready在唤醒goroutine之后会验证是否需要创建新的工作线程来工作,验证规则就是,如果当前有空闲的p而没有工作线程正在尝试从各个工作线程本地运行队列偷取goroutine(没有spinning状态的工作线程)的话,就需将空闲的p叫起来工作,来看runtime/proc.go文件639行代码分析ready:
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
......
// Mark runnable.
_g_ := getg()
......
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next) //放入运行队列
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
//有空闲的p而且没有正在偷取goroutine的工作线程,则需要唤醒p出来工作
wakep()
}
......
}
唤醒空闲p是由wakep完成,来看runtime/proc.go文件2051行代码:
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
wakep先通过cas操作再次确认是否有其它工作线程处于spinning状态,之所以要使用cas操作再次确认,就是因为当前工作线程是通过【atomic.Load(&sched.npidle)!= 0 && atomic.Load(&sched.nmspinning) == 0】这个条件来判断启动工作线程之后到真正启动工作线程之前这段时间内是否有工作线程进入spinning状态,正在四处寻找需要运行的goroutine,有的话,就不需创建了。
如cas操作成功,则继续调用startm创建一个新的或是唤醒一个处于睡眠状态的工作线程出来工作。
来看runtime/proc.go文件1947行代码分析startm:
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil { //没有指定p的话需要从p的空闲队列中获取一个p
_p_ = pidleget() //从p的空闲队列中获取空闲p
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
//spinning为true表示进入这个函数之前已经对sched.nmspinning加了1,需要还原
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return //没有空闲的p,直接返回
}
}
mp := mget() //从m空闲队列中获取正处于睡眠之中的工作线程,所有处于睡眠状态的m都在此队列中
unlock(&sched.lock)
if mp == nil {
//没有处于睡眠状态的工作线程
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_) //创建新的工作线程
return
}
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
//唤醒处于休眠状态的工作线程
notewakeup(&mp.park)
}
startm先验证是否有空闲p结构体对象,没有直接返回,有的话创建或是唤醒一个工作线程出来与之绑定,可以看到,唤醒p本质就是将空闲p利用起来。
确保有可绑定的p之后,startm先尝试从m空闲中查找处于休眠状态的工作线程,找到则通过notewakeup唤醒它,反之通过newm创建新的工作线程。
来看runtime/lock_futex.go文件130行代码分析notewakeup:
func notewakeup(n *note) {
//设置n.key = 1, 被唤醒的线程通过查看该值是否等于1来确定是被其它线程唤醒还是意外从睡眠中苏醒
old := atomic.Xchg(key32(&n.key), 1)
if old != 0 {
print("notewakeup - double wakeup (", old, ")\n")
throw("notewakeup - double wakeup")
}
//调用futexwakeup唤醒
futexwakeup(key32(&n.key), 1)
}
notewakeup先通过atomic.Xchg设置note.key值为1,此操作是为了使被唤醒的线程可通过查看该值是否等于1来确认是否被其它线程唤醒还是意外从睡眠状态中醒了过来,如该值为1,则表示是被唤醒的,可继续工作了,如该值为0,则表示是意外苏醒,需再次进入睡眠,设置note.key值为1后,notewakeup继续调用futexwakeup。
来看runtime/os_linux.go文件66行代码分析futexwakeup:
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
//调用futex函数唤醒工作线程
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// I don't know that futex wakeup can return
// EAGAIN or EINTR, but if it does, it would be
// safe to loop and call futex again.
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
对Linux来说,工作线程通过note休眠实质是通过futex系统调用睡眠在内核之中,所以唤醒处于睡眠状态的线程也需要futex系统调用进入内核来唤醒,所以这里futexwakeup又调用包装futex系统调用的futex函数来唤醒睡眠在内核中的线程。
来看runtime/sys_linux_amd64.s文件525行代码:
// int64 futex(int32 *uaddr, int32 op, int32 val,
//struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI #这6条指令在为futex系统调用准备参数
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX #futex系统调用编号放入AX寄存器
SYSCALL #系统调用,进入内核
MOVL AX, ret+40(FP) #系统调用通过AX寄存器返回返回值,这里把返回值保存到内存之中
RET
futex函数由汇编代码编写,前几条指令都在为futex系统调用准备参数,完成之后通过SYSCALL进入操作系统内核完成线程的唤醒工作,内核在完成唤醒功能之后,当前工作线程则从内核返回到futex函数继续执行SYSCALL之后的代码并按函数调用链路原路返回,继续执行其它代码,被唤醒的线程由内核负责在适当的时侯调度到CPU上执行。
接下来看线程的创建,startm没有找到处于休眠状态的工作线程,就需调用newm创建一个新的工作线程,来看runtime/proc.go文件1807行代码分析newm:
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_)
......
newm1(mp)
}
newm先调用allocm从堆上分配一个m结构体对象,之后调用newm1。
来看runtime/proc.go文件1843行代码分析newm1:
func newm1(mp *m) {
//省略cgo相关代码.......
execLock.rlock() // Prevent process clone.
newosproc(mp)
execLock.runlock()
}
newm1继续调用newosproc,newosproc主要是调用clone创建一个新的系统线程,而这个新创建的系统线程将从mstart开始运行。
来看runtime/os_linux.go文件143行代码分析newosproc:
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
......
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
......
}
//clone系统调用的Flags选项
cloneFlags = _CLONE_VM | /* share memory */ //指定父子线程共享进程地址空间
_CLONE_FS | /* share cwd, etc */
_CLONE_FILES | /* share fd table */
_CLONE_SIGHAND | /* share sig handler table */
_CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */
_CLONE_THREAD /* revisit - okay for now */ //创建子线程而不是子进程
还有runtime/sys_linux_amd64.s文件539行代码分析clone:
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
MOVL flags+0(FP), DI //系统调用的第一个参数
MOVQ stk+8(FP), SI //系统调用的第二个参数
MOVQ $0, DX //第三个参数
MOVQ $0, R10 //第四个参数
// Copy mp, gp, fn off parent stack for use by child.
// Careful: Linux system call clobbers CX and R11.
MOVQ mp+16(FP), R8
MOVQ gp+24(FP), R9
MOVQ fn+32(FP), R12
MOVL $SYS_clone, AX
SYSCALL
clone首先用4条指令为clone系统调用准备参数,clone需要的四个参数根据Linux系统调用约定,须分别放入rdi、rsi、rdx、r10四个寄存器中,前两个参数分别用来指定内核创建线程时的选项以及新线程应该使用的栈。
因为即将被创建的线程与当前工作线程共用一个进程地址空间,所以必须为子进程指定其使用的栈,否则父子线程会共享一个栈而造成混乱,从上述newosproc可看出,新线程使用的栈为m.g0.stack.lo~m.g0.stack.hi这段内存,该内存是newm创建m结构体对象时从进程的堆上分配过来的。
准备好系统调用参数之后,需将clone其它参数(mp、gp、线程入口函数)保存到寄存器之中,在系统调用之前保存该参数就是因为这几个参数目前还保存在父进程的栈中,一旦通过系统调用将子进程创建出来之后,子线程会使用clone系统调用给其指定的栈,所以需将这几个参数先保存到寄存器中,等子线程从系统调用返回过来之后,直接在寄存器中获取这几个参数。
虽然上述几个参数保存在了父线程的寄存器中,但创建子线程时,操作系统内核会将父线程所有寄存器复制一份给子线程,所以当子线程开始运行时就可以拿到父线程保存在寄存器的值,这些准备工作完成后通过syscall进入内核,由内核创建线程。
clone系统调用完成后,实际上就多了一个系统线程,新创建的子线程和当前线程都得从系统调用返回然后继续执行后面的代码。
通过clone系统调用返回值可验证那个是父线程,那个是子线程,简化后C代码如下:
if (clone(...) == 0) { //子线程
子线程代码
} else { //父线程
父线程代码
}
虽然这里只调用了一次clone,但clone却返回了2次,一次返回到父线程,一次返回到子线程,之后两个线程各自执行自己的代码。
来看runtime/sys_linux_amd64.s文件555行代码:
// In parent, return.
CMPQ AX, $0 #判断clone系统调用的返回值
JEQ 3(PC) / #跳转到子线程部分
MOVL AX, ret+40(FP) #父线程需要执行的指令
RET #父线程需要执行的指令
上述代码第一条指令就在验证clone系统调用的返回值,如是子线程就跳到后面的代码继续执行,反之将返回值保存在栈上,之后执行ret返回到newosproc。
再来看runtime/sys_linux_amd64.s文件561行代码来分析子线程要继续执行的逻辑:
# In child, on new stack.
#子线程需要继续执行的指令
MOVQ SI, SP #设置CPU栈顶寄存器指向子线程的栈顶,这条指令看起来是多余的?内核应该已经把SP设置好了
# If g or m are nil, skip Go-related setup.
CMPQ R8, $0 # m,新创建的m结构体对象的地址,由父线程保存在R8寄存器中的值被复制到了子线程
JEQ nog
CMPQ R9, $0 # g,m.g0的地址,由父线程保存在R9寄存器中的值被复制到了子线程
JEQ nog
# Initialize m->procid to Linux tid
MOVL $SYS_gettid, AX #通过gettid系统调用获取线程ID(tid)
SYSCALL
MOVQ AX, m_procid(R8) #m.procid = tid
#Set FS to point at m->tls.
#新线程刚刚创建出来,还未设置线程本地存储,即m结构体对象还未与工作线程关联起来,
#下面的指令负责设置新线程的TLS,把m对象和工作线程关联起来
LEAQ m_tls(R8), DI #取m.tls字段的地址
CALL runtime·settls(SB)
#In child, set up new stack
get_tls(CX)
MOVQ R8, g_m(R9) # g.m = m
MOVQ R9, g(CX) # tls.g = &m.g0
CALL runtime·stackcheck(SB)
nog:
# Call fn
CALL R12 #这里调用mstart函数
......
上述代码第一条指令将CPU寄存器的栈顶指针设置为新线程的栈顶,看起来像是多余的,因为clone系统调用时已经将栈信息告诉操作系统了,操作系统在将新线程调度起来运行时已将CPU的rsp寄存器的值设置好了,接下来4条指令验证m和g是否为nil,为空去执行fn,不为空继续对m进行初始化。
对新创建出来的线程进行初始化过程是从第6条指令开始,先通过系统调用获取子线程的线程ID,并赋值给m.procid,之后通过settls设置本地线程存储并通过将m.g0的地址放入本地线程存储中,实现了m结构体对象与工作线程之间的关联。
新线程创建完成后,就开始从mstart执行,mstart首先会设置m.g0的stackguard成员,之后调用mstart1将工作线程的g0的调度信息保存在m.g0.sched成员中,最后就通过schedule函数进入调度循环。
至此,本文对读写channel阻塞产生的goroutine被动调度分析就结束了,还有其它如读写网络连接阻塞、加锁阻塞、select阻塞等都会发生goroutine被动调度,感兴趣的各位大佬可自行阅读源码分析一下。
以上仅为个人观点,不一定准确,能帮到各位那是最好的。
好啦,到这里本文就结束了,喜欢的话就来个三连击吧。
以上均为个人认知,如有侵权,请联系删除。