golang调度 初始化
1. 初始化
调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等
func schedinit() { sched.maxmcount = 10000 // 设置m的最大值为10000 mcommoninit(_g_.m) //初始化当前m // 确认P的个数 // 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // 调整P的个数,这里是新分配procs个P // 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了 if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } ... }
procresize方法主要完成以下任务:
- 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
- 遍历p的缓存,将未初始化的p进行初始化
- 对于收缩的情况,将收缩的p进行回收处理
- 分别将空闲的p和有任务的p加入空闲链表和工作链表
下面是procresize()的源码:
//全局数据结构: allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable sched schedt //全局调度器(综述文中有介绍) // 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整 func procresize(nprocs int32) *p { ... old := gomaxprocs // 扩张allp数组 if nprocs > int32(len(allp)) { lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { // 分配nprocs个*p nallp := make([]*p, nprocs) copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // 初始化新的p for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) ... // 将pp保存到allp数组里, allp[i] = pp atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } ... } // 释放无用的p for i := nprocs; i < old; i++ { p := allp[i] // 任务转移 // 本地任务队列转换到全局队列 for p.runqhead != p.runqtail { p.runqtail-- gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() globrunqputhead(gp) } // 优先执行的也转移到全局 if p.runnext != 0 { globrunqputhead(p.runnext.ptr()) p.runnext = 0 } // 后台标记的g也转移 if gp := p.gcBgMarkWorker.ptr(); gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) p.gcBgMarkWorker.set(nil) } // 做一些内存释放等操作 ... } ... //将p放入队列 var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] // 如果是当前的M绑定的P,不放入P空闲链表 // 否则更改P的状态为_Pidle,放入P空闲链表 if _g_.m.p.ptr() == p { continue } p.status = _Pidle if runqempty(p) { pidleput(p)// 将空闲p放入全局空闲链表 } else { // 非空闲的通过绑定m,链起来 p.m.set(mget()) p.link.set(runnablePs) // 最后一个空闲的不加入空闲列表 直接返回去调度使用 runnablePs = p } } }
新建的无任务p都会被放到空闲链表中:
func pidleput(_p_ *p) { if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } _p_.link = sched.pidle //通过p的link形成链表 sched.pidle.set(_p_) // 将sched.npidle加1 atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic }
默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作
golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是带价很大,会触发STW
lock(&sched.lock) ret := int(gomaxprocs) unlock(&sched.lock) if n <= 0 || n == ret { return ret } // 有stw和重启世界的过程 stopTheWorld("GOMAXPROCS") // newprocs will be processed by startTheWorld newprocs = int32(n) startTheWorld() return ret
以上便是golang初始化调度器的所有步骤,具体:
- 调用schedinit,初始化maxmcount和gomaxprocs的数量
- sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
- schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
- 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。
2. g的创建
在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。
func newproc(siz int32, fn *funcval) { //用fn + PtrSize 获取第一个参数的地址,也就是argp //这里要了解一下go的堆栈 argp := add(unsafe.Pointer(&fn), sys.PtrSize) //用siz - 8 获取pc地址 (汇编实现) pc := getcallerpc() // 用g0的栈创建G对象 systemstack(func() { newproc1(fn, (*uint8)(argp), siz, pc) }) }
了解一下funcval:
// funcval 是一个变长结构,第一个成员是函数指针 // 所以上面的 add 是跳过这个 fn type funcval struct { fn uintptr // variable-size, fn-specific data here }
newproc()获取到参数的地址和callerpc,然后调用newproc1().流程如下图:
代码如下:
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行 func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg // 从m中获取p _p_ := _g_.m.p.ptr() // 从gfree list获取g newg := gfget(_p_) // 如果没获取到g,则新建一个 if newg == nil { // 分配栈为 2k 大小的G对象 newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead // 添加到allg数组,防止gc扫描清除掉 allgadd(newg) } // 参数大小+稍微一点空间 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign // 新协程的栈顶计算,将栈顶减去参数占用的空间 sp := newg.stack.hi - totalSize spArg := sp // 如果有参数 if narg > 0 { // copy参数到栈上 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) ... //一些gc相关的工作省略 } // 初始化G的gobuf,保存sp,pc,任务函数等 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp // 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function // sched.g保存当前新的G newg.sched.g = guintptr(unsafe.Pointer(newg)) // 将当前的pc压入栈,保存g的任务函数为pc gostartcallfn(&newg.sched, fn) // gopc保存newproc的pc newg.gopc = callerpc // 任务函数的地址 newg.startpc = fn.fn ... // 更改当前g的状态为_Grunnable casgstatus(newg, _Gdead, _Grunnable) // 生成唯一的goid newg.goid = int64(_p_.goidcache) // 将当前新生成的g,放入队列 runqput(_p_, newg, true) // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } }
g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist
//从缓存列表获取一个空闲的g func gfget(_p_ *p) *g { retry: gp := _p_.gfree if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) { //本地空闲队列为空的时候,从全局中获取,需要加锁 lock(&sched.gflock) //一次转移最多32个空闲到本地p for _p_.gfreecnt < 32 { if sched.gfreeStack != nil { gp = sched.gfreeStack //获取g sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g } else if sched.gfreeNoStack != nil { gp = sched.gfreeNoStack sched.gfreeNoStack = gp.schedlink.ptr() } else { break } _p_.gfreecnt++ sched.ngfree-- gp.schedlink.set(_p_.gfree) _p_.gfree = gp } unlock(&sched.gflock) goto retry } // 获取到g if gp != nil { // 调整链表头及个数 _p_.gfree = gp.schedlink.ptr() _p_.gfreecnt-- // 堆栈为空就分配 if gp.stack.lo == 0 { // Stack was deallocated in gfput. Allocate a new one. systemstack(func() { gp.stack = stackalloc(_FixedStack) }) gp.stackguard0 = gp.stack.lo + _StackGuard } else { ... } } return gp }
当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。
func gfput(_p_ *p, gp *g) { // 处理堆栈 stksize := gp.stack.hi - gp.stack.lo // 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了) if stksize != _FixedStack { stackfree(gp.stack) gp.stack.lo = 0 gp.stack.hi = 0 gp.stackguard0 = 0 } // 处理p的复用链表 gp.schedlink.set(_p_.gfree) _p_.gfree = gp _p_.gfreecnt++ // 超过64个,放回部分到全局队列 if _p_.gfreecnt >= 64 { lock(&sched.gflock) for _p_.gfreecnt >= 32 { _p_.gfreecnt-- gp = _p_.gfree _p_.gfree = gp.schedlink.ptr() if gp.stack.lo == 0 { gp.schedlink.set(sched.gfreeNoStack) sched.gfreeNoStack = gp } else { gp.schedlink.set(sched.gfreeStack) sched.gfreeStack = gp } sched.ngfree++ } unlock(&sched.gflock) } }
malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。
//一般传入的堆栈大小默认为2k func malg(stacksize int32) *g { newg := new(g) if stacksize >= 0 { stacksize = round2(_StackSystem + stacksize)// 对齐 systemstack(func() { newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈 }) newg.stackguard0 = newg.stack.lo + _StackGuard // 设置 stackguard newg.stackguard1 = ^uintptr(0) } return newg }
创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g
var ( // 存储所有g的数组 allgs []*g // 保护allgs的互斥锁 allglock mutex allglen uintptr ) func allgadd(gp *g) { lock(&allglock) allgs = append(allgs, gp) allglen = uintptr(len(allgs)) unlock(&allglock) }
当获取到一个可用的g之后:
- 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
- 添加到g到p的本地队列
- p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
- 若存在有空闲的p及未自旋的m,调用wakem()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
// 尝试将G放到P的本地队列 func runqput(_p_ *p, gp *g, next bool) { if next { retryNext: oldnext := _p_.runnext // 将G赋值给_p_.runnext // 最新的G优先级最高,最可能先被执行。 // 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点, // 让别的core执行,这样才能充分利用多核,提高并发能 if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } gp = oldnext.ptr() } retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail // 如果本地队列还有剩余的位置,将G插入本地队列的尾部 if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } // 本地队列已满,放入全局队列 if runqputslow(_p_, gp, h, t) { return } goto retry } // 如果本地满了以后,一次将本地的一半的G转移到全局队列 func runqputslow(_p_ *p, gp *g, h, t uint32) bool { //首先转移一半到全局队列,省略 ... // 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。 lock(&sched.lock) globrunqputbatch(batch[0], batch[n], int32(n+1)) unlock(&sched.lock) return true }
放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务
到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力。
3. 系统线程m
在golang中有三种系统线程:
- 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
- 运行sysmon的线程
- 普通用户线程,用来与p绑定,运行g中的任务的线程,主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。
在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:
// 尝试获取一个M来运行可运行的G func wakep() { // 如果有其他的M处于自旋状态,那么就不管了,直接返回 // 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。 if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) } // startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回 // 获取到P后,在尝试获取M,如果获取不到就新建一个M func startm(_p_ *p, spinning bool) { lock(&sched.lock) // 如果P为nil,则尝试获取一个空闲P if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) return } } // 获取一个空闲的M mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } // 如果获取不到,则新建一个,新建完成后就立即返回 newm(fn, _p_) return } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning //标记该M是否在自旋 mp.nextp.set(_p_) // 暂存P notewakeup(&mp.park) // 唤醒M }
上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。
下面从新建m开始:
func newm(fn func(), _p_ *p) { // 根据fn和p和绑定一个m对象 mp := allocm(_p_, fn) // 设置当前m的下一个p为_p_ mp.nextp.set(_p_) ... // 真正的分配os thread newm1(mp) } // 分配一个m,且不关联任何一个os thread func allocm(_p_ *p, fn func()) *m { _g_ := getg() _g_.m.locks++ // disable GC because it can be called from sysmon if _g_.m.p == 0 { acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。 } ... mp := new(m) mp.mstartfn = fn mcommoninit(mp) //初始化当前m // 给g0分配一定的堆栈 if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" { mp.g0 = malg(-1) //这些系统必须使用系统的栈 } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k } mp.g0.m = mp //绑定的p和当前m的p一样,解绑 if _p_ == _g_.m.p.ptr() { releasep() } return mp }
m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;
func mcommoninit(mp *m) { _g_ := getg() // g0 stack won't make sense for user (and is not necessary unwindable). if _g_ != _g_.m.g0 { callers(1, mp.createstack[:]) } lock(&sched.lock) if sched.mnext+1 < sched.mnext { throw("runtime: thread ID overflow") } mp.id = sched.mnext sched.mnext++ // m数量检查 checkmcount() ... // signal g创建初始化 mpreinit(mp) if mp.gsignal != nil { mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard } //加入全局m链表 mp.alllink = allm //链表 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) }
func newm1(mp *m) { // 对cgo的处理 ... execLock.rlock() // Prevent process clone. // 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针 // 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中 newosproc(mp, unsafe.Pointer(mp.g0.stack.hi)) execLock.runlock() }
每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:
//linux // 分配一个系统线程,且完成 g0 和 g0上的栈分配 // 传入 mstart 函数,让线程执行 mstart func newosproc(mp *m, stk unsafe.Pointer) { // Disable signals during clone, so that the new thread starts // with signals disabled. It will enable them in minit. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) if ret < 0 { print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n") if ret == -_EAGAIN { println("runtime: may need to increase max user processes (ulimit -u)") } throw("newosproc") } } //windows func newosproc(mp *m, stk unsafe.Pointer) { const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000 // stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go. const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit) thandle := stdcall6(_CreateThread, 0, stackSize, funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)), _STACK_SIZE_PARAM_IS_A_RESERVATION, 0) if thandle == 0 { if atomic.Load(&exiting) != 0 { // CreateThread may fail if called // concurrently with ExitProcess. If this // happens, just freeze this thread and let // the process exit. See issue #18253. lock(&deadlock) lock(&deadlock) } print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", getlasterror(), ")\n") throw("runtime.newosproc") } // Close thandle to avoid leaking the thread object if it exits. stdcall1(_CloseHandle, thandle) }
创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。
下面将描述获取一个空闲的m在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建
type schedt struct { // idle状态的m midle muintptr // idle m's waiting for work // idle状态的m个数 nmidle int32 // number of idle m's waiting for work // m允许的最大个数 maxmcount int32 // maximum number of m's allowed (or die) } func mget() *m { //从idle 的m链表中搞一个 mp := sched.midle.ptr() if mp != nil { sched.midle = mp.schedlink sched.nmidle-- } return mp }
被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。
// 停止M,使其休眠,但不会被系统回收 // 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行 // 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。 func stopm() { _g_ := getg() retry: lock(&sched.lock) mput(_g_.m) unlock(&sched.lock) // 在lock_futex.go 中 // 休眠,等待被唤醒 notesleep(&_g_.m.park) noteclear(&_g_.m.park) ... // 绑定p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 把mp添加到midle列表 func mput(mp *m) { mp.schedlink = sched.midle sched.midle.set(mp) sched.nmidle++ checkdead() }
到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:
- 系统线程调度上下文切换是有消耗的
- m本身是有占资源的(g0分配了8k的堆栈)
下一章:golang调度 调度流程
1. 初始化调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等func schedinit() { sched.maxmcount ...