Go并发编程 信号量Semaphore源码分析
Go 源码文件 go/src/runtime/sema.go 实现了信号量 Semaphore,提供了 sleep 和 wakeup 同步原语。
Go 内部使用信号量 Semaphore 来控制 goroutine 的阻塞和唤醒,它跟 Windows 的 Event、Linux Futex 目标是一致的,只不过 Go Semaphore 的语义更简单一些。
1. 信号量的数据结构
runtime 内部定义了一个大小为 251 的全局变量 semtable 数组,来管理所有的 Semaphore。
semtable 数组定义在 go/src/runtime/sema.go 文件中。semtable 数组元素的类型是一个匿名结构体,这个结构体为了避免伪共享问题做了一下内存填充。
semtable 数组定义如下:
// Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize] struct { root semaRoot // 平衡树的根 pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte // 填充位 }
这个大小固定的 semtable 数组,怎么在程序执行阶段管理 Semaphore 呢?
首先,semtable 存储的元素是一个匿名的结构体,结构体中 root 字段是 251 棵平衡树的根。
平衡树的根就是结构体中 root 字段,它的类型为 semaRoot。
semaRoot 结构定义如下:
type semaRoot struct { lock mutex treap *sudog // 平衡树的根节点 nwait uint32 // 挂起等待的 goroutin 的数量 }
treap 就是平衡树的根节点,而且平衡树中每个节点都是一个 sudog 类型的对象。
当使用一个信号量的时候,需要提供一个记录信号量数值的变量,系统根据变量的地址进行计算,映射到 semtable 中的一棵平衡树上。在 semtable 找到对应的节点,就能找到了该信号量的等待队列。
sudog 结构定义如下:
type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer // 数据元素 acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
sudog 是从一个特殊的 pool 中进行分配,使用 acquireSudog 和 releaseSudog 来分配和释放他们。
sudog 的 next、prev、parent 字段构成了平衡树,waitlink 和 waittail 是链表结构,都指向该信号量的等待队列。
例如,我们常用的 sync.Mutex,定义如下:
type Mutex struct { state int32 sema uint32 }
sync.Mutex 有一个 sema 字段,用于记录信号量的数值。如果有协程需要等待这个 Mutex,就会根据 sema 字段的地址计算映射到 semtable 中的某棵树上,找到对应的节点,进而找到了这个 Mutex 的等待队列。所以 sync.Mutex 就是通过信号量来实现排队的。
2. 信号量的操作
如果读过 sync.Mutex 源码就会知道,sync.Mutex 主要依赖 runtime_SemacquireMutex 和 runtime_Semrelease,它们都定义在 go/src/sync/runtime.go中,运行时分别对应于 go/src/runtime/sema.go 文件中的 sync_runtime_SemacquireMutex 和 sync_runtime_Semrelease。
在 go/src/sync/runtime.go 中,定义了以下方法,:
func runtime_Semacquire(s *uint32) func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
- runtime_Semacquire 等待直到 *s >0,然后以原子的方法将其递减。它旨在作为一个简单的睡眠原语供同步库使用,但不要直接使用它。
- runtime_SemacquireMutex 类似于 Semacquire,但是用于分析竞争的互斥对象。如果 lifo 为true,表示等待队列采用先进先出的模式,将等待者排在队列头部,反之排在队尾。skipframes 表示从 runtime_SemacquireMutex 的调用者开始计算跟踪期间要忽略的帧数。
- runtime_Semrelease 自动递增 *s 并通知等待在 Semacquire上 的 goroutine。它实现一个简单的唤醒语义供同步库使用,也不要直接使用它。如果 handoff 为 true,则将计数直接传递给第一个等待者。skipframes 表示从 runtime_Semrelease 的调用者开始计算跟踪期间要忽略的帧数。
go/src/sync/runtime.go 的这些方法只有声明,实际的代码实现部分,利用 //go:linkname编译器指令转移到了 sema.go 文件中,主要有如下方法:
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) } //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease func poll_runtime_Semrelease(addr *uint32) { semrelease(addr) }
sync_xxx主要用于支持并发包的实现,poll_xxx我目前看到的主要是用于管理fd的生命周期并且顺序访问Read、Write、Close操作。
操作方法
runtime_SemacquireMutex
sync_runtime_SemacquireMutex主要为sync.Mutex服务,实际调用semacquire1方法,实际sync_runtime_Semacquire、poll_runtime_Semacquire也都是调用semacquire1来实现。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // 检查信号量大于0且CAS成功则直接返回 if cansemacquire(addr) { return } // Harder case: // 增加等待者计数 // 再次尝试cansemacquire 如果成功则返回 // 将自己作为等待者入队 // 休眠 // (waiter descriptor is dequeued by signaler) s := acquireSudog() //获取一个 sudog 对象 root := semroot(addr) //根据信号量地址 hash 到 semtable 中 t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 ... ... for { lockWithRank(&root.lock, lockRankRoot) // 将自己添加到nwait中来禁止semrelease中的easy case atomic.Xadd(&root.nwait, 1) // 检查cansemacquire 避免错过唤醒 if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // cansemacquire之后的所有semrelease都知道我们正在等待 // (我们上面已经设置了nwait),所以进入休眠 root.queue(addr, s, lifo) goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } releaseSudog(s) }
大致逻辑:
- 获取当前的 g,并判断是否跟 m 上实际运行的 g 一致。
- 循环判断信号量的值,若等于 0,则直接返回 false,进入harder case;否则,原子操作 *addr -= 1,如果成功,就是拿到信号量直接返回。否则继续循环。
cansemacquire 源码:
func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
- 从当前 P 中获取 sudog,用于保存需要阻塞的 g 的相关信息。
//go:nosplit func acquireSudog() *sudog { // 设置禁止抢占 mp := acquirem() pp := mp.p.ptr() //当前本地sudog缓存没有了,则去全局缓存中拉取一批 if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // 首先尝试从全局缓存中获取sudog,直到本地容量达到50% for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // 如果全局缓存为空,则分配创建一个新的sudog if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } //解除抢占限制 releasem(mp) return s }
关于这里sudog获取使用了二级缓存,即P本地sudog缓存和全局的sched全局的sudog缓存。当本地的sudog缓存不足,则从全局缓存中获取;如果全局缓存也没有,则重新分配一个新的sudog。
- 递增nwait进而避免semrelease中的快速路径
- 再次检查cansemacquire避免错过wakeup,如果成功则nwait-1并返回
- 将当前g封装进sudog并放入等待队列
// queue adds s to the blocked goroutines in semaRoot. func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.g = getg() s.elem = unsafe.Pointer(addr) s.next = nil s.prev = nil var last *sudog pt := &root.treap for t := *pt; t != nil; t = *pt { //说明存在相同地址的节点 if t.elem == unsafe.Pointer(addr) { // Already have addr in list. if lifo {//先进先出的话 将新节点放到链表的第一位 // 用s将t替换掉 *pt = s s.ticket = t.ticket s.acquiretime = t.acquiretime s.parent = t.parent s.prev = t.prev s.next = t.next if s.prev != nil { s.prev.parent = s } if s.next != nil { s.next.parent = s } // 将t放入到s的等待链表的第一位 s.waitlink = t s.waittail = t.waittail if s.waittail == nil { s.waittail = t } t.parent = nil t.prev = nil t.next = nil t.waittail = nil } else { // 将s放到等待列表的末尾 if t.waittail == nil { t.waitlink = s } else { t.waittail.waitlink = s } t.waittail = s s.waitlink = nil } return } last = t // 根据地址大小来进行查找 if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { pt = &t.prev } else { pt = &t.next } } // 将s作为一个新的叶子节点加入到唯一地址树中 // 平衡树是一个treap树,使用ticket作为随机堆优先级 // 也就是说,它是根据elem地址排序的二叉树 // 但是在代表这些地址的可能的二叉树空间中,是通过ticket满足s.ticket均 <=s.prev.ticket 和 s.next.ticket来维护堆 // 的顺序,从而平均得保持平衡。 // https://en.wikipedia.org/wiki/Treap // https://faculty.washington.edu/aragon/pubs/rst89.pdf // s.ticket在几个地方与零比较,因此设置了最低位 // 这不会明显影响treap的质量 s.ticket = fastrand() | 1 s.parent = last *pt = s // 根据ticket翻转树 for s.parent != nil && s.parent.ticket > s.ticket { if s.parent.prev == s { root.rotateRight(s.parent) } else { if s.parent.next != s { panic("semaRoot queue") } root.rotateLeft(s.parent) } } }
这里入队的树结构是一个treap,故名思义,treap=tree+heap,即拥有tree的特性,又有heap的特性。主要思想是在二叉搜索树的基础上,给每个节点一个随机权重(这里是一个随机值ticket),然后通过旋转在不破坏二叉搜索树性质的前提下将所有节点根据权重重新组织,使其满足堆的性质。由于权重的随机性,所以可以认为treap能在增删操作下相对平衡,不会退化为链表。
这个treap是根据elem地址排序的二叉树,又根据随机值ticket作为权重值,来维护其平衡(ticket满足s.ticket均 <=s.prev.ticket 和 s.next.ticket),即:
当当前节点的权重值小于根节点的权重值则旋转
- 如果当前节点是根的左孩子则右旋
- 如果当前节点是根的右孩子则左旋
所以从elem的角度,这个treap是个二叉搜索树,从ticket来看是个小顶堆。
其实最早并不是treap结构而是linked list,可以看看(https://github.com/golang/go/issues/17953)
- 调用gopark挂起当前g
- 当前g被唤醒后释放sudog
//go:nosplit func releaseSudog(s *sudog) { ... ... gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } mp := acquirem() // 设置P禁止抢占 pp := mp.p.ptr() if len(pp.sudogcache) == cap(pp.sudogcache) { // 将本地一半的sudog缓存放回全局缓存 var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
道理很简单,为了保证sudog的复用,当goroutine被唤醒,当前的sudog需要回收到缓存中以备后续使用。刚刚提到这里涉及到P和sched的二级缓存。所以归还sudog时,如果本地sudog已经满了,会将本地的一半缓存交还回全局缓存。
runtime_Semrelease
runtime_Semrelease实际调用semrelease1完成了wakeup的语义。
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semroot(addr) atomic.Xadd(addr, 1) // 快速路径:没有等待者? // 检查必须发生在xadd之后,避免错过wakeup // (详见semacquire中的循环). if atomic.Load(&root.nwait) == 0 { return } //查找一个等待着并唤醒它 lockWithRank(&root.lock, lockRankRoot) if atomic.Load(&root.nwait) == 0 { //计数已经被其他goroutine消费,所以不需要唤醒其他goroutine unlock(&root.lock) return } s, t0 := root.dequeue(addr)//查找第一个出现的addr if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // 可能比较慢 甚至被挂起所以先unlock acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5+skipframes) //goready(s.g,5)标记runnable 等待被重新调度 if s.ticket == 1 && getg().m.locks == 0 { // 直接切换G // readyWithTime已经将等待的G作为runnext放到当前的P // 我们现在调用调度器可以立即执行等待的G // 注意waiter继承了我们的时间片:这是希望避免在P上无限得进行激烈的信号量竞争 // goyield类似于Gosched,但是它是发送“被强占”的跟踪事件,更重要的是,将当前G放在本地runq // 而不是全局队列。 // 我们仅在饥饿状态下执行此操作(handoff=true),因为非饥饿状态下,当我们yielding/scheduling时, // 其他waiter可能会获得信号量,这将是浪费的。我们等待进入饥饿状体,然后开始进行ticket和P的手递手交接 // See issue 33747 for discussion. goyield() } } }
大致逻辑:
- 根据信号量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot
- 信号量原子递增+1,这样semacquire1阻塞的goroutine就可能通过cansemacquire操作
- 通过原子判断root.nwait的值是否为0,为0表示当前不存在阻塞的goroutine。这里的检查必须发生在semacquire1中的atomic.Xadd(&root.nwait, 1),防止错过唤醒操作。
- 加锁再次检查root.nwait的值,没有阻塞的goroutine 则直接返回。
否则,从treap中出队当前信号量上的sudog。
// 如果semacquire1中设置了对sudog进行概要分析,dequeue计算到现在为止唤醒goroutine的时间作为now返回,否则now值为0 func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { ps := &root.treap s := *ps for ; s != nil; s = *ps { if s.elem == unsafe.Pointer(addr) {//查找到指定信号量地址上的sudog goto Found } if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) { ps = &s.prev } else { ps = &s.next } } return nil, 0 Found: now = int64(0) if s.acquiretime != 0 { now = cputicks() } if t := s.waitlink; t != nil { // 用t替换唯一addrs的根树中的s *ps = t t.ticket = s.ticket t.parent = s.parent t.prev = s.prev if t.prev != nil { t.prev.parent = t } t.next = s.next if t.next != nil { t.next.parent = t } if t.waitlink != nil { t.waittail = s.waittail } else { t.waittail = nil } t.acquiretime = now s.waitlink = nil s.waittail = nil } else {//该信号量地址上 只有一个sudog时 // 将s旋转为树的叶子节点方便移除,同时注意权重 for s.next != nil || s.prev != nil { if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket { root.rotateRight(s) } else { root.rotateLeft(s) } } // s当前为叶子节点,移除s if s.parent != nil { if s.parent.prev == s {//为根节点的左孩子 s.parent.prev = nil } else {//为根节点的右孩子 s.parent.next = nil } } else {//当前treap只有s一个节点 root.treap = nil } } s.parent = nil s.elem = nil s.next = nil s.prev = nil s.ticket = 0 return s, now }
查找semaRoot中阻塞在指定信号量addr上的第一个goroutine。熟悉了treap结构及queue的逻辑后这里dequeue就比较简单:
- 查找treap中指定addr的sudog节点
- 若链表长度大于1,则将头节点弹出,返回弹出的sudog
- 若链表长度等于1,即需要移除treap树的节点,这时候需要通过循环旋转将节点根据权重保持平衡,将目标节点旋转为叶子节点,然后删除
- 如果未找到 则返回nil,0
- 如果找到,判断节点的等待链表
- 如果出队的sudog不为空,将root.nwait原子-1,并释放锁(),让其他goroutine可以继续执行
- readyWithTime将sudog中的g唤醒,并放到当前P本地队列的下一个执行位置
func readyWithTime(s *sudog, traceskip int) { if s.releasetime != 0 { s.releasetime = cputicks() } goready(s.g, traceskip) } func goready(gp *g, traceskip int) { systemstack(func() { //切换到系统堆栈 ready(gp, traceskip, true) }) } // 标记 gp 准备 run func ready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) } status := readgstatus(gp) // Mark runnable. _g_ := getg() mp := acquirem() // 设置禁止P抢占 if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) // 将g放到P的本地队列,注意这里next=true即放到本地队列的下一个执行位置 // 否则放到对尾 runqput(_g_.m.p.ptr(), gp, next) wakep() releasem(mp)//解除抢占 }
- 饥饿状态下,调用 goyield() 让出当前时间片,由等待的 g 继承时间片,避免无限的争夺信号量。因为 readyWithTime 已经将等待的G放到P本地队列下一个位置,所以调度器会立即执行 s.g
func goyield() { checkTimeouts() mcall(goyield_m) } func goyield_m(gp *g) { if trace.enabled { traceGoPreempt() } pp := gp.m.p.ptr() casgstatus(gp, _Grunning, _Grunnable)//让出时间片 dropg() runqput(pp, gp, false)//将当前g放到P本地队列尾部 schedule() //触发调度 }
这也是 sync.Mutex 饥饿模式下,等待 goroutine 能优先获得锁的原因。
3. 总结
semacquire 和 semrelease 成对出现,实现了简单的 sleep 和 wakeup 原语。主要解决并发场景的资源争用问题,显然他们一定是在两个不同的 m 上执行的场景发生。我们不妨假设 m1 和 m2。
- 当 m1 上的 g1 执行到 semacquire1 时,如果快速路径 cansemacquire 成功,则说明 g1 抢到锁,能够继续执行。但一旦失败且在Harder Case下依然抢不到锁,则会进入goparkunlock,将当前 g1 放到等待队列中,进而让 m1 切换并执行其他的g。
- 当 m2 上的 g2 开始调用 semrelease1 时,将等待的 g1 放回 P 的本地调度队列中,若当前为饥饿模式(handoff=ture)则让当前等待继承时间片立刻执行,如果成功则 semacquire1 中会归还 sudog。
下一章:Go同步原语 互斥锁Mutex源码分析
Mutex,即互斥锁,是由 Mutual Exclusion 的前缀组成。它是 Go 语言在 sync 包下提供的最基本的同步原语之一,位于 src/sync/mutex.go 文件中。1. Mute ...