golang调度 调度流程

1. 初始化

调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等

func schedinit() {
    sched.maxmcount = 10000  // 设置m的最大值为10000
    mcommoninit(_g_.m) //初始化当前m
    // 确认P的个数
    // 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    // 调整P的个数,这里是新分配procs个P
    // 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
    ...
}

procresize方法主要完成以下任务:

  1. 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
  2. 遍历p的缓存,将未初始化的p进行初始化
  3. 对于收缩的情况,将收缩的p进行回收处理
  4. 分别将空闲的p和有任务的p加入空闲链表和工作链表

下面是procresize()的源码:

//全局数据结构:
allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable
sched schedt //全局调度器(综述文中有介绍)

// 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
func procresize(nprocs int32) *p {
    ...
    old := gomaxprocs
	// 扩张allp数组
	if nprocs > int32(len(allp)) {
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			// 分配nprocs个*p
			nallp := make([]*p, nprocs)
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}
	// 初始化新的p
	for i := int32(0); i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		    ...
			// 将pp保存到allp数组里, allp[i] = pp
			atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
		}
        ...
	}
	// 释放无用的p
	for i := nprocs; i < old; i++ {
		p := allp[i]
		// 任务转移
		// 本地任务队列转换到全局队列
		for p.runqhead != p.runqtail {
			p.runqtail--
			gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
			globrunqputhead(gp)
		}
		// 优先执行的也转移到全局
		if p.runnext != 0 {
			globrunqputhead(p.runnext.ptr())
			p.runnext = 0
		}
		// 后台标记的g也转移
		if gp := p.gcBgMarkWorker.ptr(); gp != nil {
			casgstatus(gp, _Gwaiting, _Grunnable)
			globrunqput(gp)
			p.gcBgMarkWorker.set(nil)
		}
		// 做一些内存释放等操作
       ...
	}
    ...
    //将p放入队列
	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		// 如果是当前的M绑定的P,不放入P空闲链表
		// 否则更改P的状态为_Pidle,放入P空闲链表
		if _g_.m.p.ptr() == p {
			continue
		}
		p.status = _Pidle
		if runqempty(p) { 
			pidleput(p)// 将空闲p放入全局空闲链表
		} else {
           // 非空闲的通过绑定m,链起来     
			p.m.set(mget())
			p.link.set(runnablePs)
            // 最后一个空闲的不加入空闲列表 直接返回去调度使用
			runnablePs = p
		}
	}
}

新建的无任务p都会被放到空闲链表中:

func pidleput(_p_ *p) {
    if !runqempty(_p_) {
        throw("pidleput: P has non-empty run queue")
    }
    _p_.link = sched.pidle //通过p的link形成链表
    sched.pidle.set(_p_)
    // 将sched.npidle加1
    atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}

默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作

golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是代价很大,会触发STW

 lock(&sched.lock)
    ret := int(gomaxprocs)
    unlock(&sched.lock)
    if n <= 0 || n == ret {
        return ret
    }
    // 有stw和重启世界的过程
    stopTheWorld("GOMAXPROCS")
    // newprocs will be processed by startTheWorld
    newprocs = int32(n)
    startTheWorld()
    return ret

以上便是golang初始化调度器的所有步骤,具体:

  1. 调用schedinit,初始化maxmcount和gomaxprocs的数量
  2. sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
  3. schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
  4. 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。

2. g的创建

在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。

func newproc(siz int32, fn *funcval) {
    //用fn + PtrSize 获取第一个参数的地址,也就是argp
    //这里要了解一下go的堆栈
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    //用siz - 8 获取pc地址 (汇编实现)
    pc := getcallerpc()
    // 用g0的栈创建G对象
    systemstack(func() {
        newproc1(fn, (*uint8)(argp), siz, pc)
    })
}

了解一下保存需要执行的业务函数的funcval:funcval 是一个变长结构,第一个成员是函数指针,往后是fn的参数,个数长度可变,但是起始位置固定(在有参数的的情况下)。将*funcval的地址跳过一个指针长度(fn)便是参数的起始地址了。

// 上面的 add 是跳过这个 fn,到参数的起始位置
type funcval struct {
    fn uintptr
    // variable-size, fn-specific data here
}

newproc()获取到参数的地址和callerpc,然后调用newproc1().流程如下图:在这里插入图片描述

代码如下:

// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
    _g_ := getg()
    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    // 从m中获取p
    _p_ := _g_.m.p.ptr()
    // 从gfree list获取g
    newg := gfget(_p_)
    // 如果没获取到g,则新建一个
    if newg == nil {
        // 分配栈为 2k 大小的G对象
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
        // 添加到allg数组,防止gc扫描清除掉
        allgadd(newg) 
    }
    // 参数大小+稍微一点空间
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize 
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    // 新协程的栈顶计算,将栈顶减去参数占用的空间
    sp := newg.stack.hi - totalSize
    spArg := sp
    // 如果有参数
    if narg > 0 {
        // copy参数到栈上
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        ... //一些gc相关的工作省略
    }
    // 初始化G的gobuf,保存sp,pc,任务函数等
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    // sched.g保存当前新的G
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 将当前的pc压入栈,保存g的任务函数为pc
    gostartcallfn(&newg.sched, fn)
    // gopc保存newproc的pc
    newg.gopc = callerpc
    // 任务函数的地址
    newg.startpc = fn.fn
    ...
    // 更改当前g的状态为_Grunnable
    casgstatus(newg, _Gdead, _Grunnable)
    // 生成唯一的goid
    newg.goid = int64(_p_.goidcache)
    // 将当前新生成的g,放入队列
    runqput(_p_, newg, true)
    // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
}

g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist

//从缓存列表获取一个空闲的g
func gfget(_p_ *p) *g {
retry:
    gp := _p_.gfree
    if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
        //本地空闲队列为空的时候,从全局中获取,需要加锁
        lock(&sched.gflock)
        //一次转移最多32个空闲到本地p
        for _p_.gfreecnt < 32 {
            if sched.gfreeStack != nil {
                gp = sched.gfreeStack                    //获取g
                sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g
            } else if sched.gfreeNoStack != nil {
                gp = sched.gfreeNoStack
                sched.gfreeNoStack = gp.schedlink.ptr()
            } else {
                break
            }
            _p_.gfreecnt++
            sched.ngfree--
            gp.schedlink.set(_p_.gfree)
            _p_.gfree = gp
        }
        unlock(&sched.gflock)
        goto retry
    }
    // 获取到g
    if gp != nil {
        // 调整链表头及个数
        _p_.gfree = gp.schedlink.ptr()
        _p_.gfreecnt--
        // 堆栈为空就分配
        if gp.stack.lo == 0 {
            // Stack was deallocated in gfput. Allocate a new one.
            systemstack(func() {
                gp.stack = stackalloc(_FixedStack)
            })
            gp.stackguard0 = gp.stack.lo + _StackGuard
        } else {
          ...
        }
    }
    return gp
}

当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。

func gfput(_p_ *p, gp *g) {
    // 处理堆栈
    stksize := gp.stack.hi - gp.stack.lo
    // 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了)
    if stksize != _FixedStack {
        stackfree(gp.stack)
        gp.stack.lo = 0
        gp.stack.hi = 0
        gp.stackguard0 = 0
    }
    // 处理p的复用链表
    gp.schedlink.set(_p_.gfree)
    _p_.gfree = gp
    _p_.gfreecnt++
    // 超过64个,放回部分到全局队列
    if _p_.gfreecnt >= 64 {
        lock(&sched.gflock)
        for _p_.gfreecnt >= 32 {
            _p_.gfreecnt--
            gp = _p_.gfree
            _p_.gfree = gp.schedlink.ptr()
            if gp.stack.lo == 0 {
                gp.schedlink.set(sched.gfreeNoStack)
                sched.gfreeNoStack = gp
            } else {
                gp.schedlink.set(sched.gfreeStack)
                sched.gfreeStack = gp
            }
            sched.ngfree++
        }
        unlock(&sched.gflock)
    }
}

malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。

//一般传入的堆栈大小默认为2k
func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        stacksize = round2(_StackSystem + stacksize)// 对齐
        systemstack(func() {
            newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈
        })
        newg.stackguard0 = newg.stack.lo + _StackGuard        // 设置 stackguard
        newg.stackguard1 = ^uintptr(0)
    }
    return newg
}

创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g

var (
    // 存储所有g的数组
    allgs []*g
    // 保护allgs的互斥锁
    allglock mutex  
    allglen uintptr
)
func allgadd(gp *g) {
    lock(&allglock)
    allgs = append(allgs, gp)
    allglen = uintptr(len(allgs))
    unlock(&allglock)
}

当获取到一个可用的g之后:

  1. 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
  2. 添加到g到p的本地队列
  3. p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
  4. 若存在有空闲的p及未自旋的m,调用wakep()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
// 尝试将G放到P的本地队列
func runqput(_p_ *p, gp *g, next bool) {
    if next {
    retryNext:
        oldnext := _p_.runnext
        // 将G赋值给_p_.runnext
        // 最新的G优先级最高,最可能先被执行。
        // 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点,
        // 让别的core执行,这样才能充分利用多核,提高并发能
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        gp = oldnext.ptr()
    }
retry:
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 如果本地队列还有剩余的位置,将G插入本地队列的尾部
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 本地队列已满,放入全局队列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    goto retry
}

// 如果本地满了以后,一次将本地的一半的G转移到全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    //首先转移一半到全局队列,省略
    ...
    // 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务

到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力

3. 系统线程m

在golang中有三种系统线程:

  • 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
  • 运行sysmon的线程
  • 普通用户线程,用来与p绑定,运行g中的任务的线程,主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。

在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:

// 尝试获取一个M来运行可运行的G
func wakep() {
    // 如果有其他的M处于自旋状态,那么就不管了,直接返回
    // 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

// startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回
// 获取到P后,在尝试获取M,如果获取不到就新建一个M
func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    // 如果P为nil,则尝试获取一个空闲P
    if _p_ == nil {
        _p_ = pidleget()
        if _p_ == nil {
            unlock(&sched.lock)
            return
        }
    }
    // 获取一个空闲的M
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        // 如果获取不到,则新建一个,新建完成后就立即返回
        newm(fn, _p_)
        return
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning //标记该M是否在自旋
    mp.nextp.set(_p_) // 暂存P
    notewakeup(&mp.park) // 唤醒M
}

上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。

下面从新建m开始:

func newm(fn func(), _p_ *p) {
    // 根据fn和p和绑定一个m对象
    mp := allocm(_p_, fn)
    // 设置当前m的下一个p为_p_
    mp.nextp.set(_p_)
    ...
    // 真正的分配os thread
    newm1(mp)
}

// 分配一个m,且不关联任何一个os thread
func allocm(_p_ *p, fn func()) *m {
    _g_ := getg()
    _g_.m.locks++ // disable GC because it can be called from sysmon
    if _g_.m.p == 0 {
        acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。
    }
    ...
    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp)   //初始化当前m
    // 给g0分配一定的堆栈
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
        mp.g0 = malg(-1)   //这些系统必须使用系统的栈
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k
    }
    mp.g0.m = mp
    //绑定的p和当前m的p一样,解绑
    if _p_ == _g_.m.p.ptr() {
        releasep()
    }
    return mp
}

m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;

func mcommoninit(mp *m) {
    _g_ := getg()
    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        callers(1, mp.createstack[:])
    }
    lock(&sched.lock)
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext
    sched.mnext++
    // m数量检查
    checkmcount()
    ...
    // signal g创建初始化
    mpreinit(mp)
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }
    //加入全局m链表
    mp.alllink = allm //链表
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)
}
func newm1(mp *m) {
    // 对cgo的处理
    ...
    execLock.rlock() // Prevent process clone.
    // 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针
    // 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中
    newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
    execLock.runlock()
}

每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:

//linux
// 分配一个系统线程,且完成 g0 和 g0上的栈分配
// 传入 mstart 函数,让线程执行 mstart
func newosproc(mp *m, stk unsafe.Pointer) { 
    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)
   ...
}

//windows
func newosproc(mp *m, stk unsafe.Pointer) {
    const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000
    // stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go.
    const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit)
    thandle := stdcall6(_CreateThread, 0, stackSize,
        funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)),
        _STACK_SIZE_PARAM_IS_A_RESERVATION, 0)
    ...
    // Close thandle to avoid leaking the thread object if it exits.
    stdcall1(_CloseHandle, thandle)
}

创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。

下面将描述获取一个空闲的m在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建

func mget() *m {
   //从idle 的m链表中搞一个
    mp := sched.midle.ptr()
    if mp != nil {
        sched.midle = mp.schedlink
        sched.nmidle--
    }
    return mp
}

被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。

// 停止M,使其休眠,但不会被系统回收
// 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行
// 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。
func stopm() {
    _g_ := getg()
retry:
    lock(&sched.lock)
    mput(_g_.m)
    unlock(&sched.lock)
    // 在lock_futex.go 中
    // 休眠,等待被唤醒
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
   ...
    // 这里是被wakenote唤醒后的操作了
    // 绑定p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

// 把mp添加到midle列表
func mput(mp *m) {
    mp.schedlink = sched.midle
    sched.midle.set(mp)
    sched.nmidle++
    checkdead()
}

到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:

  • 系统线程调度上下文切换是有消耗的
  • m本身是有占资源的(自身的栈内存,寄存器等)

4. 执行goroutine

m 执行 g有两个起点:

  1. 从m的启动函数(创建m的时候绑定的)mstart()开始,触发m的调度
  2. 调度过程中调用stopm()睡眠后,通过 notewakeup(&mp.park)恢复m的执行,并从stopm()的位置开始执行,重新调度。

mstart() 流程:

func mstart() {
    _g_ := getg()
    osStack := _g_.stack.lo == 0     // 检查栈边界,为0的话是系统栈
    if osStack {
        // 处理系统栈
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    mstart1(0)   //启动m
}

// dummy一直为0,给getcallersp当参数
func mstart1(dummy int32) {
    _g_ := getg()
    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }
    // 记录mstart1 函数结束后的地址pc和mstart1 函数参数到当前g的运行现场
    save(getcallerpc(), getcallersp(unsafe.Pointer(&dummy)))
    asminit()
    // 初始化m
    minit()
    // 如果当前g的m是初始m0,执行mstartm0()
    if _g_.m == &m0 {
        // 对于初始m,需要一些特殊处理
        mstartm0()
    }
    // 如果有m的起始任务函数,则执行,比如 sysmon 函数
    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }
    // GC startworld的时候,会检查闲置m是否少于并发标记需求(needaddgcproc)
    // 新建m,设置 m.helpgc=-1,加入限制队列等待唤醒
    if _g_.m.helpgc != 0 {
        _g_.m.helpgc = 0
        stopm()
    } else if _g_.m != &m0 {
        // 绑定p
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    // 进入调度,而且不会在返回
    schedule()
}

绑定号p之后,m拥有了可分配cache和执行队列,进入核心调度循环,核心调度从schedule函数开始,调度完一次之后会引导重新执行schedule,实现循环调度。

schedule方法会主要功能:尽可能给m找到可以运行的g,这其中主要是分为以下几种:

  1. 当前m已经指定了g。该情况下会将m与p解绑,然后m睡眠,等待被绑定的g被调度然后唤醒该m执行该g
  2. gc触发STW的时候,m直接睡眠
  3. gcmark(标记)阶段,大概有1/4的g用来并行标记,这里也会检测是否调度gc标记的g(gcBlackenEnabled!=0)
  4. 调度61次后会从全局的g队列中尝试获取g
  5. 全局队列中未获取到便去绑定p的本地任务队列获取g
  6. 还未获取便调用findrunnable()去尽可能获取,取不到便会睡眠,不返回。
  7. 获取到的g有绑定的m,交出当前的p和g,与指定的m绑定,唤醒指定的m,自己睡眠,等待唤醒。
  8. 执行获取到的g

综上,在该方法中,m在以下情况会休眠:

  • 当m.lockedg != 0(m有绑定固定执行的g),m会在stoplockedm()解绑p并休眠,等待被绑定的g被其他m调度的时候来唤醒该m,直接被绑定的g
  • sched.gcwaiting != 0(gc STW)m会休眠
  • findrunnable()中想尽一切办法都没有获取到可执行的g的时候,m会休眠
  • 获取到g的时候,g绑定了其他的m(gp.lockedm != 0),当前m会解绑p,休眠,然后唤醒g绑定的m,执行该g

当m休眠被唤醒的时候,并不会从固定的位置开始执行,会直接从休眠的位置开始执行。

以下是schedule方法的g获取流程(省略了lockm和lockg的处理以及gc STW的处理):

在这里插入图片描述

另外一个展示当m绑定了g即该m只能执行特定的g(m.lockedg != 0, g.lockedm != 0)以及检测gc STW的调度:

在这里插入图片描述

具体代码:

func schedule() {
    _g_ := getg()
    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }
    // 如果当前M锁定了某个G,那么应该交出P,进入休眠
    // 等待某个M调度拿到lockedg,然后唤醒lockedg的M
    if _g_.m.lockedg != 0 {
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }
top:
    // 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    var gp *g
    var inheritTime bool
    // 如果当前GC正在标记阶段, 则查找有没有待运行的GC Worker, GC Worker也是一个G
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
    }
    if gp == nil {
        // 每隔61次调度,尝试从全局队列种获取G
        // ? 为何是61次? https://github.com/golang/go/issues/20168
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        // 从p的本地队列中获取
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    if gp == nil {
        // 想尽办法找到可运行的G,找不到就不用返回了
        gp, inheritTime = findrunnable() // blocks until work is available
    }
    // M即将要执行G,如果M还是spinning,那么重置为false
    if _g_.m.spinning {
        // 重置为非自旋,并根据需要唤醒或新建一个M来运行
        resetspinning()
    }
    // 如果找到的G已经锁定M了,dolockOSThread和cgo会将G和M绑定
    // 则用startlockedm执行,将P和G都交给对方lockedm,唤醒绑定M-lockedm,自己回空闲队列。
    if gp.lockedm != 0 {
        startlockedm(gp)
        goto top
    }
    execute(gp, inheritTime)
}

excute()方法将会去执行g

// 执行goroutine的任务函数
// 如果inheritTime=true,那么当前的G继承剩余的时间片,其实就是不让schedtick累加,
// 这样的话就不会触发每61次从全局队列找G
func execute(gp *g, inheritTime bool) {
    _g_ := getg()
    // 更改gp的状态为_Grunning
    casgstatus(gp, _Grunnable, _Grunning)
    // 置等待时间为0
    gp.waitsince = 0
    // 置可抢占标志为fasle
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    // 如果不是inheritTime,schedtick累加
    if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }
    // 当前的M的G改为gp
    _g_.m.curg = gp
    // gp的M改为当前的M
    gp.m = _g_.m
    ...
    // gogo由汇编实现, runtime/asm_amd64.s
    // 实现当前的G切换到gp,然后用JMP跳转到G的任务函数
    // 当任务函数执行完后会调用 goexit
    gogo(&gp.sched)
}

gogo由汇编实现,主要是由g0切换到g栈,然后执行函数。

// 从g0栈切换到G栈,然后JMP到任务函数代码
TEXT runtime·gogo(SB), NOSPLIT, $16-8
    MOVQ    buf+0(FP), BX       // gobuf
    MOVQ    gobuf_g(BX), DX   //G
    MOVQ    0(DX), CX       // make sure g != nil
    get_tls(CX)
    MOVQ    DX, g(CX)
    MOVQ    gobuf_sp(BX), SP    // restore SP 恢复sp寄存器值切换到g栈
    MOVQ    gobuf_ret(BX), AX
    MOVQ    gobuf_ctxt(BX), DX
    MOVQ    gobuf_bp(BX), BP
    MOVQ    $0, gobuf_sp(BX)    // clear to help garbage collector
    MOVQ    $0, gobuf_ret(BX)
    MOVQ    $0, gobuf_ctxt(BX)
    MOVQ    $0, gobuf_bp(BX)
    MOVQ    gobuf_pc(BX), BX // 获取G任务函数的地址
    JMP BX                           // 转到任务函数执行

当调用任务函数结束返回的时候,会执行到我们在创建g流程中就初始化好的指令:goexit

TEXT runtime·goexit(SB),NOSPLIT,$0-0
    BYTE    $0x90   // NOP
    CALL    runtime·goexit1(SB) // does not return 调用goexit1函数
    // traceback from goexit1 must hit code range of goexit
    BYTE    $0x90   // NOP

下面是goexit1()函数:

// 当goroutine结束后,会调用这个函数
func goexit1() {
    // 切换到g0执行goexit0
    mcall(goexit0)
}

func goexit0(gp *g) {
    _g_ := getg()
    // gp的状态置为_Gdead
    casgstatus(gp, _Grunning, _Gdead)
    // 状态重置
    gp.m = nil
    // G和M是否锁定
    locked := gp.lockedm != 0
    // G和M解除锁定
    gp.lockedm = 0
    _g_.m.lockedg = 0
    ...
    // 处理G和M的清除工作
    dropg()
    if _g_.m.lockedInt != 0 {
        print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
        throw("internal lockOSThread error")
    }
    _g_.m.lockedExt = 0
    // 将G放入P的G空闲链表
    gfput(_g_.m.p.ptr(), gp)
    // 再次进入调度
    schedule()
}

注意:无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从“g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈将初始化,从头开始。

至此一个m的调度流程已经清晰:在这里插入图片描述

  1. 由创建m绑定的mstart()函数或者notewakeup()唤醒的执行位置(一般都在schedule()方法中休眠,详细请看上方有介绍)
  2. 进入schedule()方法,开始获取可执行的g,获取不到就休眠,等待wakep()调度,获取到p后调用execute()启动执行
  3. execute()调用gogo执行g
  4. gogo切换到g栈,并执行fn
  5. 结束后调用goexit()->goexit1()->goexit0()->sechedule()重新调度。

无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从“g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈从头开始。mstart1中保存了g0的gobuf信息。

findrunable

该方法会想尽一切办法找到可以执行的任务,核心调度函数这里逻辑较为复杂,下面将以代码中的两个标签top和stop将流程分开:top label:在这里插入图片描述

stop label:在这里插入图片描述

上面是top和stop标签内的流程图,结合在一起便是findrunnable的全部流程,其中gcmark的调度部分有省略,将会在gc中详细描述。结合代码看看这个方法:

// 找到一个可以运行的G,不找到就让M休眠,然后等待唤醒,直到找到一个G返回
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
    // 此处和handoffp中的条件必须一致:如果findrunnable将返回G运行,则handoffp必须启动M.
top:
    _p_ := _g_.m.p.ptr()
    // 如果gc正等着运行,停止M,也就是STW
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    if _p_.runSafePointFn != 0 {
        runSafePointFn()
    }
    // fing是执行finalizer的goroutine
    if fingwait && fingwake {
        if gp := wakefing(); gp != nil {
            ready(gp, 0, true)
        }
    }
    if *cgo_yield != nil {
        asmcgocall(*cgo_yield, nil)
    }
    // local runq
    // 再尝试从本地队列中获取G
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }
    // global runq
    // 尝试从全局队列中获取G
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
    // 从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll returns list of goroutines linked by schedlink.
            // 如果找到的可运行的网络IO的G列表,则把相关的G插入全局队列
            injectglist(gp.schedlink.ptr())
            // 更改G的状态为_Grunnable,以便下次M能找到这些G来执行
            casgstatus(gp, _Gwaiting, _Grunnable)
            // goroutine trace事件记录-unpark
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
    // Steal work from other P's.
    procs := uint32(gomaxprocs)
    // 如果其他P都是空闲的,就不从其他P哪里偷取G了
    if atomic.Load(&sched.npidle) == procs-1 {
        goto stop
    }
    // 如果当前的M没在自旋 且 正在自旋的M数量大于等于正在使用的P的数量,那么block
    // 当GOMAXPROCS远大于1,但程序并行度低时,防止过多的CPU消耗。
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    // 如果M为非自旋,那么设置为自旋状态
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    // 随机选一个P,尝试从这P中偷取一些G
    for i := 0; i < 4; i++ { // 尝试四次
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 从allp[enum.position()]偷去一半的G,并返回其中的一个
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
stop:
    // 当前的M找不到G来运行。如果此时P处于 GC mark 阶段
    // 那么此时可以安全的扫描和黑化对象,和返回 gcBgMarkWorker 来运行
    if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
        // 设置gcMarkWorkerMode 为 gcMarkWorkerIdleMode
        _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
        // 获取gcBgMarkWorker goroutine
        gp := _p_.gcBgMarkWorker.ptr()
        casgstatus(gp, _Gwaiting, _Grunnable)
        if trace.enabled {
            traceGoUnpark(gp, 0)
        }
        return gp, false
    }
    allpSnapshot := allp
    // return P and block
    lock(&sched.lock)
    if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
        unlock(&sched.lock)
        goto top
    }
    // 再次从全局队列中获取G
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    // 将当前对M和P解绑
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    // 将p放入p空闲链表
    pidleput(_p_)
    unlock(&sched.lock)
    wasSpinning := _g_.m.spinning
    // M取消自旋状态
    if _g_.m.spinning {
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }
    // check all runqueues once again
    // 再次检查所有的P,有没有可以运行的G
    for _, _p_ := range allpSnapshot {
        // 如果p的本地队列有G
        if !runqempty(_p_) {
            lock(&sched.lock)
            // 获取另外一个空闲P
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                // 如果P不是nil,将M绑定P
                acquirep(_p_)
                // 如果是自旋,设置M为自旋
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                // 返回到函数开头,从本地p获取G
                goto top
            }
            break
        }
    }
    // gcmark的goroutine,这里会控制这类g的数量
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
        lock(&sched.lock)
        _p_ = pidleget()
        if _p_ != nil && _p_.gcBgMarkWorker == 0 {
            pidleput(_p_)
            _p_ = nil
        }
        unlock(&sched.lock)
        if _p_ != nil {
            acquirep(_p_)
            if wasSpinning {
                _g_.m.spinning = true
                atomic.Xadd(&sched.nmspinning, 1)
            }
            goto stop
        }
    }
    // poll network
    // 再次检查netpoll
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        gp := netpoll(true) // block until new work is available
        if gp != nil {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            acquirep(_p_)
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false
        }
    }
    // 实在找不到G,那就休眠吧
    // 且此时的M一定不是自旋状态
    stopm()
    goto top
}

5. 系统调用

系统调用有用户态到内核态的切换,并且部分系统调用甚至会阻塞,当goroutine在处理系统调用的时候,如果不采取措施的话,会导致一个goroutine占用p时间过长,导致p中其他的goroutine无法及时调度。针对系统调用调度器做了一些操作,保证系统调用阻塞的同时,其他的goroutine可以被合理调度。

golang封装的系统到到最后都会调用到 Syscall() 或者 RawSyscall()这两个方法,RawSyscall是一个直接进行系统调用,而Syscall方法是做了部分处理,来配合go的调度器工作,以下代码省略了部分无关内容,展示了golang系统调用的过程:

EXT    ·Syscall(SB),NOSPLIT,$0-56
    CALL    runtime·entersyscall(SB)   //调用entersyscall进行调用前准备
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL                                 // linux amd64平台下的系统调用指令
    CMPQ    AX, $0xfffffffffffff001
    JLS ok
    CALL    runtime·exitsyscall(SB)     //结束系统调用
    RET
ok:
    CALL    runtime·exitsyscall(SB)
    RET
// func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr)
TEXT ·RawSyscall(SB),NOSPLIT,$0-56
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL  //linux amd64平台下的系统调用指令
    CMPQ    AX, $0xfffffffffffff001
    JLS ok1
    RET
ok1:
    RET

明显SysCall比RawSyscall多调用了两个方法,entersyscall和exitsyscall,增加这两个函数的调用,让调度器有机会去对即将要进入系统调用的goroutine进行调整,方便调度。

entersyscall():

// 系统调用的时候调用该函数
// 进入系统调用,G将会进入_Gsyscall状态,也就是会被暂时挂起,直到系统调用结束。
// 此时M进入系统调用,那么P也会放弃该M。但是,此时M还指向P,在M从系统调用返回后还能找到P
func entersyscall(dummy int32) {
    reentersyscall(getcallerpc(), getcallersp(unsafe.Pointer(&dummy)))
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()
    _g_.m.locks++
    // 让G进入_Gsyscall状态,此时G已经被挂起了,直到系统调用结束,才会让G重新写进入running
    casgstatus(_g_, _Grunning, _Gsyscall)
    //唤醒 sysmon m,这个监控长时间执行的g
    if atomic.Load(&sched.sysmonwait) != 0 {
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }
    // 这里很关键:P的M已经陷入系统调用,于是P忍痛放弃该M
    // 但是请注意:此时M还指向P,在M从系统调用返回后还能找到P
    _g_.m.mcache = nil
    _g_.m.p.ptr().m = 0
    // P的状态变为Psyscall
    atomic.Store(&_g_.m.p.ptr().status, _Psyscall)

}

该方法主要是为系统调用前做了准备工作:

  • 修改g的状态为_Gsyscall
  • 检查sysmon线程是否在执行,睡眠需要唤醒
  • p放弃m,但是m依旧持有p的指针,结束调用后优先选择p
  • 修改p的状态为_Psyscal

做好这些准备工作便可以真正的执行系统调用了。当该线程m长时间阻塞在系统调用的时候,一直在运行的sysmon线程会检测到该p的状态,并将其剥离,驱动其他的m(新建或获取)来调度执行该p上的任务,这其中主要是在retake方法中实现的,该方法还处理了goroutine抢占调度,这里省略,后面介绍抢占调度在介绍:

//实现go调度系统的抢占
func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        //p在系统调用中
        if s == _Psyscall {
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            //没有可以调度的任务且时间阻塞时间未到阀值,直接跳过
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // 这里出发了系统调用长时间阻塞的调度
            unlock(&allpLock)
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                n++
                _p_.syscalltick++
                //关键方法,将对长时间阻塞的p进行重新调度
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        } else if s == _Prunning {
          //暂时省略
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

当系统调用时间过长的时候,会调用handoffp()方法:

// p的切换,系统调用或者绑定M时使用
func handoffp(_p_ *p) {
    //当前p有任务或者全局任务队列有任务,触发一次调度
    //startm()上文有描述,会获取一个m来调度当前p的任务,当前p为nil时,会调度其他p任务队列
    if !runqempty(_p_) || sched.runqsize != 0 {
        startm(_p_, false)
        return
    }
    //gc标记阶段且当前p有标记任务,触发调度
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    //有自旋m或空闲p,触发调度
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
  ...
    //全局队列不为空
    if sched.runqsize != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    //实在没任务,放入空闲队列
    pidleput(_p_)
    unlock(&sched.lock)
}

可以看到,通过handoffp方法,阻塞在系统调用的p会被重新调度,不会阻塞其他任务的执行。没有空闲m的时候,这里有可能会创建出新的m来进行调度。

回到Syscall的执行流程中,当系统Syscall返回的时,会调用exitsyscall方法恢复调度:

// goroutine g退出系统调用。安排它再次在cpu上运行。
// 这个函数只能从go syscall库中调用,而不是从运行时使用的低级系统调用中调用
func exitsyscall(dummy int32) {
    _g_ := getg()
    // 重新获取p
    if exitsyscallfast() {
        casgstatus(_g_, _Gsyscall, _Grunning)
        return
    }
    // 没有获取到p,只能解绑当前g,重新调度该m了
    mcall(exitsyscall0)
}

exitsyscall会尝试重新绑定p,优先选择之前m绑定的p(进入系统的调用的时候,p只是单方面解绑了和m的关系,通过m依旧可以找到p):

func exitsyscallfast() bool {
    _g_ := getg()
    //stw,直接解绑p,然后退出
    if sched.stopwait == freezeStopWait {
        _g_.m.mcache = nil
        _g_.m.p = 0
        return false
    }
    // 如果之前附属的P尚未被其他M,尝试绑定该P
    if _g_.m.p != 0 && _g_.m.p.ptr().status == _Psyscall && atomic.Cas(&_g_.m.p.ptr().status, _Psyscall, _Prunning) {
        exitsyscallfast_reacquired()
        return true
    }
    // 否则从空闲P列表中取出一个来
    oldp := _g_.m.p.ptr()
    _g_.m.mcache = nil
    _g_.m.p = 0
    if sched.pidle != 0 {
        var ok bool
        systemstack(func() {
            ok = exitsyscallfast_pidle()
        })
        if ok {
            return true
        }
    }
    return false
}

当获取p失败的时候,只能选择重新调度:

func exitsyscall0(gp *g) {
    _g_ := getg()
    //修改g状态为 _Grunable
    casgstatus(gp, _Gsyscall, _Grunnable)
    dropg() //解绑
    lock(&sched.lock)
    //尝试获取p
    _p_ := pidleget()
    if _p_ == nil {
        //未获取到p,g进入全局队列等待调度
        globrunqput(gp)
    } else if atomic.Load(&sched.sysmonwait) != 0 {
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    //获取到p,绑定,然后执行
    if _p_ != nil {
        acquirep(_p_)
        execute(gp, false) // Never returns.
    }
    // m有绑定的g,解绑p然后绑定的g来唤醒,执行
    if _g_.m.lockedg != 0 {
        stoplockedm()
        execute(gp, false) // Never returns.
    }
    //关联p失败了,休眠,等待唤醒,在进行调度。
    stopm()
    schedule() // Never returns.
}

上述便是golang系统调用的整个流程,大致如下:

  1. 业务调用封装好的系统调用函数,编译器翻译到Syscall
  2. 执行entersyscall()方法,修改g,p的状态,p单方面解绑m,并检查唤醒sysmon线程,检测系统调用。
  3. 当sysmon线程检测到系统调用阻塞时间过长的时候,调用retake,重新调度该p,让p上可执行的得以执行,不浪费资源
  4. 系统调用返回,进入exitsyscall方法,优先获取之前的p,如果该p已经被占有,重新获取空闲的p,绑定,然后继续执行该g。当获取不到p的时候,调用exitsyscall0,解绑g,休眠,等待下次唤醒调度。

6. 抢占调度

golang调度高效秘诀之一是它的抢占式调度。当任务函数执行的时间超过了一定的时间,sysmon方法会不断的检测所有p上任务的执行情况,当有超过预定执行时间的g时,会发起抢占。这一切也是在retake函数中实现的,上文描述了该函数在系统调用中的功能,这里讲下该函数如何执行抢占。

// retake()函数会遍历所有的P,如果一个P处于执行状态,
// 且已经连续执行了较长时间,就会被抢占。
// retake()调用preemptone()将P的stackguard0设为
// stackPreempt(关于stackguard的详细内容,可以参考 Split Stacks),
// 这将导致该P中正在执行的G进行下一次函数调用时,
// 导致栈空间检查失败。进而触发morestack()(汇编代码,位于asm_XXX.s中)
// 然后进行一连串的函数调用,主要的调用过程如下:
// morestack()(汇编代码)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
// http://ga0.github.io/golang/2015/09/20/golang-runtime-scheduler.html
func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {
        //系统调用部分可看系统调用的分析
       ...
        } else if s == _Prunning {
            // 超时抢占
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            preemptone(_p_)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

当检测到某个p上的任务执行超过一定时间后,调用preemptone对当前g进行抢占:

func preemptone(_p_ *p) bool {
    // 标记可抢占
    gp.preempt = true
    // gorotuine 中的每个调用都会通过将当前堆栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。
    // 将 gp->stackguard0 设置为 stackPreempt 会将抢占折叠为正常的堆栈溢出检查。
    gp.stackguard0 = stackPreempt
    return true
}

可以看到只是设置了两个参数,并没有执行实际的抢占工作,事实上这个过程是异步的,将在其他的地方执行真正的抢占操作。

stackguard0本身是用来检测goroutine的栈是否需要扩充的,当设置为stackPreempt时,在执行函数的时候,便会触发栈扩充,调用morestack()方法,morestack会调用newstack,该方法会扩充g的栈空间,也兼职了goroutine的抢占功能。preempt 为抢占的备用手段,在stackguard0设置stackPreempt且在newstack中未能被抢占时,该标记也会在其他地方设置stackguard0的值为stackPreempt,再次触发抢占。

func newstack() {
    thisg := getg()
    gp := thisg.m.curg
    // 注意:如果另一个线程即将尝试抢占gp,则stackguard0可能会在发生变化。
    // 所以现在读一次,判断是否被抢占。
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
  
    if preempt {
        //以下情况不会被抢占
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }
  
    if preempt {
        casgstatus(gp, _Grunning, _Gwaiting)
        //gc扫描抢占
        if gp.preemptscan {
            for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
            }
            if !gp.gcscandone {
                //扫描当前gp栈
                gcw := &gp.m.p.ptr().gcw
                scanstack(gp, gcw)
                if gcBlackenPromptly {
                    gcw.dispose()
                }
                gp.gcscandone = true
            }
            gp.preemptscan = false
            gp.preempt = false
            casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
            // This clears gcscanvalid.
            casgstatus(gp, _Gwaiting, _Grunning)
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) //  恢复后继续执行
        }
        //转换状态为 _Gwaiting
        casgstatus(gp, _Gwaiting, _Grunning)
        gopreempt_m(gp) // never return
    }
  ...
}

以上关于gc的抢占可以先忽略,关注一下普通抢占:

func gopreempt_m(gp *g) {
    goschedImpl(gp)
}

// 将当前的G入全剧队列,然后调用调度器
func goschedImpl(gp *g) {
    status := readgstatus(gp)
    // 将gp的状态改为_Grunnable
    casgstatus(gp, _Grunning, _Grunnable)
    // 解除与当前M的关联
    dropg()
    lock(&sched.lock)
    // 入全局队列
    globrunqput(gp)
    unlock(&sched.lock)
    // 启动调度
    xx()
}

这里最终会取消m和g的绑定,并将g放入全局队列中,然后开始调度m执行新的任务

以上是golang抢占调度的基本内容,总结如下:

  1. 正常goroutine的抢占都时由监控线程的sysmon发起的,超时执行的goroutine会被打上可抢占的标志。(gc scan阶段也会发生抢占,主要是为了扫描正在运行的g的栈空间)
  2. 在任务的每个函数中,编译器会加上栈空间检测代码,有需要栈空间扩容或者抢占便会进入morestack,然后调用newstack方法
  3. newstack中会检测是否抢占和抢占类型。gc扫描触发的抢占回扫描当前g栈上的内容,然后继续执行当前g。而普通抢占则会解绑当前g,将g放入全局队列,然后继续调度。

sysmon

上文中的系统调用和抢占调度都离不开这个函数。现在简单介绍下,在以后分析完内存,gc后会做详细的介绍

sysmon独立的运行在一个特殊的m上,它定期执行一次,每次会做以下事情:

  • 2分钟没有gc则触发一次gc
  • 系统调用和抢占调度的实现
  • 处理长时间未返回结果的

下一章:Go channel源码详解

channel 可以让一个 goroutine 发送特定值到另一个 gouroutine 的通信机制。也就是说,gouroutine 之间能够通过 channel 进行通信。 1. chan 使用范例 ...