Go 语言实现——goroutine
=========================

概述
--------

goroutine 是对线程的进一步抽象,就像操作系统的进程调度会将 CPU 时间分配给一个个的进程 or 线程去运行一样,goroutine 的调度器会将操作系统分配给 Go 程序各个线程的 CPU 时间再进一步的细分给一个个的 goroutine 去运行,实现了线程的 multiplexing。

和操作系统的进程调度对 CPU 有完全的控制不一样,goroutine 的调度器调度的线程还受到操作系统的限制:

- 线程上运行的代码如果调用了系统调用,那么这个线程会被操作系统挂起直到系统调用返回。
- 线程运行的时间片到了之后,操作系统可能会暂时挂起线程一段时间直到线程下次再被调度到。

Go 是怎么解决这些问题并实现一个高性能的 goroutine 调度器的呢?

首先,为了充分利用多核 CPU,Go 程序会同时有 N(N=CPU 数)个线程在同时运行 goroutine,每个线程有自己的 goroutine 队列,还有一个全局的 goroutine 队列。线程循环的执行自己本地队列里的各个 goroutine,只有本地队列里没有 goroutine 可以执行的时候,才会从全局的 goroutine 队列里取 goroutine,避免每次调度都要加全局锁。

第二,为了解决线程调度在系统调用会被操作系统挂起的问题,Go 在物理线程之上抽象出了一个虚拟线程(类似物理内存和虚拟内存),上面说的线程更准确的说是虚拟线程,如果有 goroutine 陷入系统调用(执行的物理线程被操作系统挂起)太久,那么执行该 goroutine 的虚拟线程会挂起这个 goroutine 并创建新的物理线程来执行该线程中的其他 goroutine。

在 Go 里这三个抽象分别用下面三个结构体来表示:

    | g: goroutine
    | p: 虚拟线程(个数 = CPU 数)
    | m: 物理线程

.. image:: images/go-scheduler-g-m-p.png

最后,为了平衡线程之间的 goroutine,当线程本地队列里没有可执行的 goroutine 且全局队列为空的时候,当前线程会任意选择一个其它线程,从其线程中 *偷(work-stealing)* 一半的 goroutine 过来执行。

.. image:: images/go-scheduler-stealing.png

参考:

- Scalable Go Scheduler Design Doc: https://golang.org/s/go11sched
- Go's work-stealing scheduler: https://rakyll.org/scheduler/

创建
---------

.. code-block:: go

    go func() {
        print("hello world")
    }()

编译成汇编代码后:

.. code-block:: asm

    $ go tool compile -l -S test.go
    ...
    // func newproc(siz int32, fn *funcval)
    0x001d 00029 (test.go:8)    MOVL    $0, (SP)
    0x0024 00036 (test.go:8)    LEAQ    "".main.func1·f(SB), AX
    0x002b 00043 (test.go:8)    MOVQ    AX, 8(SP)
    0x0030 00048 (test.go:8)    PCDATA    $0, $0
    0x0030 00048 (test.go:8)    CALL    runtime.newproc(SB)
    ...

从汇编代码可以看出,go 语句会被转换成下面的函数调用:

.. code-block:: go

    runtime.newproc(0, func() {
        print("hello world")
    })

*runtime.newproc* 主要做的就是:

1. 准备好新 goroutine 的 g 结构体,将其加入到当前 goroutine 运行在的线程的本地运行队列里去。
2. 因为 goroutine 多了,看看是否还能够再召唤新的虚拟线程 p 出来执行 goroutine,已经饱和了就算了。

.. code-block:: go

    // proc.go
    func newproc(siz int32, fn *funcval) {
        argp := add(unsafe.Pointer(&fn), sys.PtrSize)
        pc := getcallerpc()
        // 切换到当前线程的系统栈(也就是 g0 )去执行 newproc1
        systemstack(func() {
            newproc1(fn, (*uint8)(argp), siz, pc)
        })
    }

    func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
        _g_ := getg()

        _p_ := _g_.m.p.ptr()
        // 从线程本地的 goroutine 队列里取一个空闲的 g 结构体
        newg := gfget(_p_)
        if newg == nil {
            // 本地队列满了的话就新建,_StackMin = 2k
            newg = malg(_StackMin)
            casgstatus(newg, _Gidle, _Gdead)
        }

        sp := newg.stack.hi - totalSize
        spArg := sp

        if narg > 0 {
            // 将函数入参从系统中创建者的调用栈中 copy 到新创建的 goroutine 的栈中
            memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        }

        // newg.sched 这个结构体是 goroutine 切换时用来保存寄存器,栈指针等现场信息的结构体
        // 这里伪造在 runtime.goexit 的一开始执行 fn 的现场,这样:
        //   - goroutine 调度到后恢复现场执行就是开始执行 goroutine
        //   - goroutine 执行完了会返回 goexit 中执行 goroutine 退出的逻辑(清理啊,调度啊)
        memclr(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
        newg.sched.sp = sp
        newg.stktopsp = sp
        // 这个是 runtime.goexit 中的 runtime.goexit1 指令,函数返回后执行的下一条指令
        newg.sched.pc = funcPC(goexit) + sys.PCQuantum
        newg.sched.g = guintptr(unsafe.Pointer(newg))
        gostartcallfn(&newg.sched, fn)
        newg.gopc = callerpc
        newg.startpc = fn.fn

        // 标示 newg 的状态为 runnable
        casgstatus(newg, _Gdead, _Grunnable)

        // 将 newg 放到当前线程的执行队列 p.runnext 里。
        // goroutine 的运行队列 runq 分为 3 级,p.runnext, p.runq, sched.runq,优先级从高到低。
        runqput(_p_, newg, true)

        // 如果还有空闲的逻辑线程(刚开始只有一个逻辑线程工作,新建 goroutine 才会新增工作的逻辑线程,直到饱和)
        // 并且没有物理线程在 spinning,也就是在寻找空闲逻辑线程(优先复用线程而不是新建)
        // 并且当前不是创建 runtime.main 线程(创建 runtime.main goroutine 时,不用管)
        // 那么唤醒一个空闲的逻辑线程(可能会创建新的物理线程)出来执行工作
        if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
            wakep()
        }
    }

执行
--------

在 :doc:`golang-internals-bootstrap` 中可以看到主线程最后调用了 *mstart* 函数,这个函数会调度第一个 goroutine 上来执行,也就是 *runtime.main* 。

.. code-block:: go

    func mstart() {
        mstart1(0)
    }

    func mstart1(dummy int32) {
        _g_ := getg()

        // 将 m 和 p 绑定
        if _g_.m != &m0 {
            acquirep(_g_.m.nextp.ptr())
            _g_.m.nextp = 0
        }

        // 调度一个 goroutine 过来执行
        schedule()
    }

    func schedule() {
        _g_ := getg()

        var gp *g

        if gp == nil {
            // 偶尔从全局 goroutine 队列里取 goroutine,保持一定的公平调度
            if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
                gp = globrunqget(_g_.m.p.ptr(), 1)
            }
        }
        if gp == nil {
            // 优先从线程本地队列里取 goroutine
            gp, inheritTime = runqget(_g_.m.p.ptr())
        }
        if gp == nil {
            // 本地线程队列里没有 goroutine,从其它队列偷或者从全局队列去
            gp, inheritTime = findrunnable()
        }

        // 恢复调用现场,开始执行 goroutine
        execute(gp, inheritTime)
    }

在 runtime.main 中 *mainStarted* 这个变量会设置为 true,告诉 runtime.newproc,之后再创建新 goroutine 的时候,如果虚拟线程未饱和,尝试唤醒,也就是调用 *wakep* 函数。

.. code-block:: go

    func wakep() {
        startm(nil, true)
    }

    func startm(_p_ *p, spinning bool) {
        // 取一个空闲的 p,没有的话直接返回
        if _p_ == nil {
            _p_ = pidleget()
        }

        // 尝试从空闲线程列表里取一个物理线程 m
        mp := mget()
        if mp == nil {
            var fn func()
            // 创建一个新物理线程 m 来运行 p
            newm(fn, _p_)
            return
        }

        // 通知找到的空闲线程 m 来运行逻辑线程 p
        mp.nextp.set(_p_)
        notewakeup(&mp.park)
    }

    func newm(fn func(), _p_ *p) {
        // 申请一个新的 m 结构体并初始化:
        //  - 调用 mcommoninit()
        //  - 申请新线程的系统栈 m.g0.stack,默认 8k,后续会传给 clone 的 childstack 参数传给新线程
        mp := allocm(_p_, fn)
        // 设置该线程 m 执行用来执行逻辑线程 p
        mp.nextp.set(_p_)
        newm1(mp)
    }

    func newm1(mp *m) {
        newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
    }

    func newosproc(mp *m, stk unsafe.Pointer) {
        // 在 sys_linux_amd64.s 中定义
        // int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void))
        // 这个函数除了完成系统调用的 clone 功能,还会设置好新的线程的 root goroutine 的执行环境(系统栈、TLS 等)
        // 最好跳转到 mstart 函数执行
        clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))
    }

wakep 最终可能会创建新的线程出来执行 goroutine,这些新线程的入口函数就是上面主线程最后调用的 *mstart* 函数。

线程上执行的第一个 goroutine 阻塞、运行时间太长或者退出后会触发 Go 系统代码,从而调度第二个 goroutine 上来运行,如此往复,直到终结。

.. _golang-goroutine-schedule:

切换
-----------

以下条件下 Go 调度器会切换 goroutine:

- 系统调用。
- goroutine 运行的时间太长了。
- channel/network/... 阻塞。

前面两个切换和启动过程中创建的 *sysmon* 这个独立线程执行的代码有关。

.. code-block:: go

    func sysmon() {
        for {
            usleep(delay)
            now := nanotime()
            retake(now)
        }
    }

    func retake(now int64) uint32 {
        for i := 0; i < len(allp); i++ {
            _p_ := allp[i]
            pd := &_p_.sysmontick
            s := _p_.status
            if s == _Psyscall {
                // 如果 p 陷入系统调用的时间太长(1 sysmon tick,至少 20us),handoffp 也就是让其它线程来执行这个 p。
                if atomic.Cas(&_p_.status, s, _Pidle) {
                    handoffp(_p_)
                }
            } else if s == _Prunning {
                // 如果当前的 goroutine 运行的时间太长,抢占之。
                if pd.schedwhen+forcePreemptNS > now {
                    continue
                }
                // 设置当前 p 上运行的 goroutine gp.stackguard0 = stackPreempt
                // 强制 goroutine 下次函数调用的时候栈空间不够,从而进入系统代码运行以触发切换逻辑
                // 如果 goroutine 不调用函数的话,那就没有办法了。
                preemptone(_p_)
            }
        }
    }

goroutine 在系统调用前会将运行 goroutine 的线程的状态标记为 _Psyscall ,系统调用返回后,如果线程没有被 retake,goroutine 会直接恢复执行(running),如果被 retake 了,goroutine 会被加入执行队列,等待调度器下次调度执行(runnable)。这些是在系统调用前后的 *runtime·entersyscall* 和 *runtime·exitsyscall* 中处理的。

.. code-block:: asm

    // src/syscall/asm_linux_amd64.s
    TEXT    ·Syscall(SB),NOSPLIT,$0-56
        CALL    runtime·entersyscall(SB)
        ...
        SYSCALL
        ...
        CALL    runtime·exitsyscall(SB)
        RET

channel/network 在阻塞的时候会调用调度器的接口挂起对应的 goroutine,跟操作系统的 IPC 类似。

退出
--------

.. code-block:: asm

    // asm_amd64.s
    TEXT runtime·goexit(SB),NOSPLIT,$0-0
        BYTE    $0x90   // <- goroutine 会伪装成插入到这里被调用的样子
        CALL    runtime·goexit1(SB) // does not return
        // traceback from goexit1 must hit code range of goexit
        BYTE    $0x90   // NOP

goroutine 执行完 return 会回到 *runtime·goexit* 函数中,从 *CALL runtime.goexit1(SB)* 处继续开始执行,goexit1 中会调用 *schedule* 函数调度下一个 goroutine 过来执行。

.. code-block:: go

    func goexit1() {
        mcall(goexit0)
    }

    // goexit continuation on g0.
    func goexit0(gp *g) {
        _g_ := getg()

        casgstatus(gp, _Grunning, _Gdead)

        // ... 清理退出的 goroutine 的 G 结构体,将其和其它的 M 结构体等脱钩

        // 将 G 结构体放回空闲列表复用
        gfput(_g_.m.p.ptr(), gp)

        // 调度下一个 goroutine 来执行
        schedule()
    }

goroutine 栈和系统栈
---------------------

每个 goroutine 都会有自己的代码执行栈 *g.stack* ,这个栈开始的时候只有 2k ,如果栈空间不够了,Go 会申请一段更大的栈空间,然后将现在的栈的内容拷贝过去,如果堆上有指针指向旧栈上的变量,那么修改这个指针指向新栈,然后继续在新栈中执行,realloc stack 的操作叫做 split stack。

Go 函数一开始一般都会有一段检测是否要 split stack 的代码,如果需要的话会先 split stack 然后再执行函数。

.. code-block:: go

    "".main t=1 size=78 args=0x0 locals=0x18
        0x0000 00000 (test.go:3)    TEXT    "".main(SB), $24-0
        0x0000 00000 (test.go:3)    MOVQ    (TLS), CX
        // if SP < g.stackguard0 跳转到 0071 去执行
        0x0009 00009 (test.go:3)    CMPQ    SP, 16(CX)
        0x000d 00013 (test.go:3)    JLS    71
        ...
        0x0047 00071 (test.go:5)    NOP
        0x0047 00071 (test.go:3)    PCDATA    $0, $-1
        // 调用 runtime.morestack_noctxt split stack
        0x0047 00071 (test.go:3)    CALL    runtime.morestack_noctxt(SB)
        // 返回函数开头继续执行
        0x004c 00076 (test.go:3)    JMP    0

split stack 这个操作本身也需要 Go 代码去完成,这个时候 goroutine 的栈上已经没有栈空间去执行这个函数了,为了解决这个问题,split stack 的代码会切换到线程的系统栈去执行。也就是 *g.g0.stack* 。同样的为了防止调度之类的系统代码执行的时候 split stack,这些系统代码也是切换到系统栈去执行的。

- `Contiguous Stacks <https://docs.google.com/document/d/1wAaf1rYoM4S4gtnPh0zOlGzWtrZFQ5suE8qr2sD8uWQ/pub>`_
- https://groups.google.com/forum/#!msg/golang-nuts/JCKWH8fap9o/MBrs2FCnAgAJ

runtime.Gosched()
------------------------

`runtime.Gosched() <https://golang.org/src/runtime/proc.go?s=8739:8753#L248>`_ 做的就是主动放弃 goroutine 本次的运行机会,将自己放到队列后面去,等待下次再被调度到。

.. code-block:: go

    // proc.go
    func Gosched() {
        // mcall 会将当前函数的 pc(mcall 的返回地址), sp 指针保存到 g.sched 中
        // 然后执行 gosched_m 函数
        // 这样 goroutine 被重新调度执行就等于从 mcall 函数中返回。
        mcall(gosched_m)
    }

    func gosched_m(gp *g) {
        goschedImpl(gp)
    }

    func goschedImpl(gp *g) {
        // 将当前 goroutine 设置为 runnable 状态
        casgstatus(gp, _Grunning, _Grunnable)
        // 将 goroutine 和 m 脱钩
        dropg()
        // 将 goroutine 放到全局的运行队列中
        globrunqput(gp)

        // 调度下一个 goroutine 上来执行
        schedule()
    }