本文字数:5688 字
精读时间:13 分钟
也可在 6 分钟内完成速读
Go 1.13 版本中有几个比较大的修改,其中之一是sync.Pool修改了部分实现,减小某些极端情况下的性能开销。文中内容来源于笔者读完 sync.Pool 源代码的思考和总结,内容以 Go 1.13 中的实现为准,少量内容涉及到 Go 1.13 之前,如有误区请读者多多指教。
概念
在本文内容开始之前需要理解几个在 Go runtime 中的概念,以便于更好的理解sync.Pool中一些实现。
goroutine 抢占
Go 中调度器是 GMP 模型,简单理解 G 就是 goroutine;M 可以类比内核线程,是执行 G 的地方;P 是调度 G 以及为 G 的执行准备所需资源。一般情况下,P 的数量 CPU 的可用核心数,也可由runtime.GOMAXPROCS指定。本文的重点并非 goroutine 调度器,在此不做详细解释,感兴趣可以翻阅延伸阅读的文章。
Go 有这样的调度规则:某个 G 不能一直占用 M,在某个时刻的时候,runtime(参见sysmon)会判断当前 M 是否可以被抢占,即 M 上正在执行的 G 让出。P 在合理的时刻将 G 调度到合理的 M 上执行,在 runtime 里面,每个 P 维护一个本地存放待执行 G 的队列 localq,同时还存在一个全局的待执行 G 的队列 globalq;调度就是 P 从 localq 或 globalq 中取出 G 到对应的 M 上执行,所谓抢占,runtime 将 G 抢占移出运行状态,拷贝 G 的执行栈放入待执行队列中,可能是某个 P 的 localq,也可能是 globalq,等待下一次调度,因此当被抢占的 G 重回待执行队列时有可能此时的 P 与前一次运行的 P 并非同一个。
所谓禁止抢占,即当前执行 G 不允许被抢占调度,直到禁止抢占标记解除。Go runtime 实现了 G 的禁止抢占与解除禁止抢占。
func runtime_procPin() int
禁止抢占,标记当前 G 在 M 上不会被抢占,并返回当前所在 P 的 ID。
func runtime_procUnpin()
解除 G 的禁止抢占状态,之后 G 可被抢占。
数据结构
poolDequeue type poolDequeue struct { headTail uint64 vals []eface } type eface struct { typ, val unsafe.Pointer }
poolDequeue被实现为单生产者多消费者的固定大小的无锁 Ring 式队列。生产者可以从 head 插入 head 删除,而消费者仅可从 tail 删除。headTail指向了队列的头和尾,通过位运算将 head 和 tail 位置存入headTail变量中。
func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // Ring式队列,头尾相等则队列已满 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // 原子操作拿到slot.typ typ := atomic.LoadPointer(&slot.typ) if typ != nil { return false } if val == nil { val = dequeueNil(nil) } // slot占位,将val存入vals中 *(*interface{})(unsafe.Pointer(slot)) = val // 更改队列指向头 atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
1.在头入时,先判断队列是否已满,判断条件:head == tail
2.从队列中取到 head 位置的slot,根据slot.typ判断当前 slot 是否已被存放数据,注意这里使用了atomic.LoadPointer取代锁操作。
3.将val赋值给slot,这里实现的比较巧妙,slot是eface类型,将slot转为interface{}类型,这样val能以interface{}赋值给slot让slot.typ和slot.val指向其内存块,这样slot.typ和slot.val均不为空,这也就是第 2 点条件判断的来由。插入成功后 head 加 1,指向队头的前一个空位,由于插入删除都涉及到对headTail的修改,此处使用原子操作取代锁。
func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判断队列是否为空 if tail == head { return nil, false } // head位置是队头的前一个位置,所以此处要先退一位 head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot,typ和val均为nil *slot = eface{} return val, true }
popHead的代码比较简单,流程上无非就是判断队列是否空,拿出slot转换为interface{}然后重置slot。popHead与pushHead有一点需要注意,在popHead中,是先修改了headTail,然后再取slot,而在pushHead中是先插入数据,然后再修改headTail,至于为什么这里先留一个疑问,后面将会详细解释。
func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判断队列是否空 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot.typ slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true }
popTail的代码类似popHead,只是删除对象从队首变成队尾,注意结尾部分代码,使用了atomic.StorePointer取代锁操作。
poolChain
poolDequeue被实现为 Ring 式队列,而poolChain则是基于poolDequeue实现为双向链表。
type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt }
同理,poolChain也实现了pushHead,popHead和popTail。
func (c *poolChain) pushHead(val interface{}) { d := c.head if d == nil { // poolDequeue初始长度为8 const initSize = 8 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } // 前一个poolDequeue长度的2倍 newSize := len(d.vals) * 2 if newSize >= dequeueLimit { newSize = dequeueLimit } // 首尾相连,构成链表 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
到这里大概就明白了,poolDequeue是在poolChain的pushHead中创建的,且每次创建的长度都是前一个poolDequeue长度的 2 倍,初始长度为 8。
func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } d = loadPoolChainElt(&d.prev) } return nil, false }
popHead以队首向队尾为方向遍历链表,对每个poolDequeue执行popHead尝试取出存放的对象。
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } }
popTail以队尾向队队首为方向遍历链表,对每个poolDequeue执行popTail尝试从尾部取出存放的对象。
宏观上来看,poolChain的结构如下图:
Pool 的源代码实现
poolChain为 sync.Pool 的底层数据结构,接下来一览 Pool 的实现。
type Pool struct { // ... // 每个P的本地队列,实际类型为[P]poolLocal local unsafe.Pointer // [P]poolLocal的大小,<= P localSize uintptr victim unsafe.Pointer victimSize uintptr // 自定义的对象创建回调,当pool中无可用对象时会调用此函数 New func() interface{} } type poolLocalInternal struct { // 每个P的私有共享,使用时无需加锁 private interface{} // 对象列表,本地P可以pushHead/popHead,其他P仅能popTail shared poolChain } type poolLocal struct { poolLocalInternal // 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
接下来看看 Pool 的具体实现。
Pool.Get func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
pin首先标记了当前 G 禁止抢占,在runtime_procUnpin之前,当前 G 和 P 不会被抢占。此处之所以标记禁止抢占是因为下文中有使用到 P ID,如果被抢占了,有可能接下里使用的 P ID 与所绑定的 P 并非同一个。
在获得 P ID 之后,当 P ID 小于p.local数组长度时在p.local数组里找到 P 对应的poolLocal对象,否则进入pinSlow函数创建新的poolLocal。
func (p *Pool) pinSlow() (*poolLocal, int) { runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // 当前P的数量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid }
pinSlow的实现比较简单,即当前 P ID 在 Pool 中没有对应的poolLocal对象时,则创建一个新的poolLocal对象,旧的poolLocal将会进入 GC。仔细观察pinSlow函数发现先执行了runtime_procUnpin,随后有执行了runtime_procPin,后者的目的在于获取最新的 P ID,这里的意图有点难以理解,在仔细阅读和理解之后,发现目的在于尽量减少[]poolLocal的创建次数,因为pinSlow之前和pinSlow里面可能会因为解除禁止抢占而导致绑定的 P 不一致,万一新绑定的 P 里存在可用的poolLocal呢。
func (p *Pool) Get() interface{} { // ... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } // 解除禁止抢占 runtime_procUnpin() // ... if x == nil && p.New != nil { x = p.New() } return x }
1.根据当前 P 取得对应的poolLocal和 P ID。
2.若当前poolLocal.private不为空,则表示可复用此对象;若为空,则在poolLocal.shared队列中获取对象。
3.若poolLocal.shared无可用对象,则进入getSlow获取对象。
4.若未能从其他 P 成功窃取对象,则调用自定义的对象创建函数,如果该函数不为nil。
func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) locals := p.local // 从其他P中窃取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 尝试从victim cache中取对象 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 清空victim cache atomic.StoreUintptr(&p.victimSize, 0) return nil }
进入getSlow的前提是当前 P 本地无可用对象,于是转头去其他 P 的里窃取对象,getSlow就是做这件事情。
5.遍历Pool.local中的其他 P,确认其他 P 的shared里是否有可用对象,如果有,则从链尾取出。
6.如果其他 P 中也没有可用对象,则尝试从 victim cache 中取可用对象,至于 victim cache 本文下半部分将会做详细解释。
纵观整个 Get 过程会发现,从当前 P 的poolLocal中取对象时使用的时popHead,而从其他 P 的poolLocal中窃取对象时使用的时popTail,再回到上文中对poolChain的定义,可以知道,当前 P 对本地 poolLocal 是生产者,对其他 P 的 poolLocal 而言是消费者。
再次回到poolDequeue和poolChain上。我们知道某一时刻 P 只会调度一个 G,那么对于生产者而言,调用pushHead和popHead并不需要加锁,因为当前 P 操作的是本地poolLocal;当消费者是其他 P,在进行popTail操作时,则会对pushHead以及popHead形成竞争关系,对这种问题,poolDequeue的实现直指要害。
首先注意eface这个结构,若插入成功eface下的两个字段会指向要缓存对象的内存地址,在pushHead中使用了原子操作判断typ字段是否为nil,存在这样一种可能性:pushHead所取到的slot正在popTail里准备重置,这种情况下pushHead会直接返回失败。
回到竞争问题上,pushHead的流程可以简化为先取slot,再判断是否可插入最后修改headTail,而popTail的流程可以简化为先修改headTail再取slot然后重置slot,pushHead修改 head 位置,popTail修改 tail 位置,所以对于headTail字段使用原子操作避免即可读写冲突。
疑问是为何popTail中需要先修改headTail呢,因为存在其他 P 都会到当前 P 上窃取对象,当多个 P 都调用本地 P 的popTail时,竞争现象就会更加明显,所以此时应尽早修改headTail,一旦某个 P 窃取到了其他 P 就无法再窃取此对象。
Pool.Put func (p *Pool) Put(x interface{}) { if x == nil { return } // ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() // ... }
Put的实现比较简单,优先将对象存入private,若private已存在则放入 shared 链表中,pin中会标记禁止抢占,因此需要在pin结束以及 Put 逻辑结束后取消禁止抢占。
Victim Cache
Victim Cache 本是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool引入的意图在于降低 GC 压力同时提高命中率,本文并不需要详解 Victim Cache 的原理,分析sync.Pool即可明白其意图。对于 Pool 而言有一点需要明白,这个 Pool 并非是无限制扩展的,否则会引起内存溢出。几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?
在 Pool.Get 函数中,取不到对象时会尝试从p.victim中取,用完后放回当前 P 的本地队列,而p.vcitim是什么被创建的呢?是在poolCleanup函数中,该函数会在 GC 时被调用到,在init函数里注册。
func poolCleanup() { // 1 for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // 2 for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
poolCleanup会在 STW 阶段被调用,函数实现虽然看起来简单,但其意图较为复杂,那么该如何解释呢?
尝试模拟一下实际情况:
1.初始状态下,oldPools和allPools均为nil
2.第 1 次调用 Get,由于p.local为nil,将会在pinSlow中创建p.local,然后将p放入allPools,此时allPools长度为 1,oldPools为nil
3.对象使用完毕,第 1 次调用 Put 放回对象
4.第 1 次 GC STW 阶段,allPools中所有p.local将值赋值给victim并置为nil,最后allPools为nil,oldPools长度为 1
5.第 2 次调用 Get,由于p.local为nil,此时会从p.victim里面尝试取对象
6.对象使用完毕,第 2 次调用 Put 放回对象,但由于p.local为nil,重新创建p.local,并将对象放回,此时allPools长度为 1,oldPools长度为 1
7.第 2 次 GC STW 阶段,oldPools中所有p.victim置nil,前一次的 cache 在本次 GC 时被回收,allPools所有p.local将值赋值给victim并置为nil,最后allPools为 nil,oldPools长度为 1
再来看看 Go 1.13 以前poolCleanup的实现:
func poolCleanup() { for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} }
Go 1.13 以前poolCleanup的实现简单粗暴,每次 GC STW 阶段遍历allPools,清空p.local、poolLocal.shared。通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。
由此基本明白p.victim的作用,它的定位是次级缓存,GC 时将对象放入其中,下一次 GC 来临之前如果有 Get 调用则会从p.victim中取,直到再一次 GC 来临做回收,同时由于从p.victim中取出对象使用完毕之后并未放回p.victim中,在一定程度也减小了下一次 GC 的开销。原来 1 次 GC 的开销被拉长到 2 次且会有一定程度的开销减小,这就是p.victim引入的意图。
关于 Victim Cache 更多的信息可以在延伸阅读中找到。
sync.Pool 的设计理念
无锁
无锁编程是很多编程语言里逃离不了的话题。sync.Pool的无锁是在poolDequeue和poolChain层面实现的。
操作对象隔离
纵观整个 sync.Pool的实现,明确了生产者(本地 P)访问 head,消费者(其他 P)访问 tail,从 P 的角度切入操作方向,实现了目标操作对象层面的 “解耦”,大部分时候两者的操作互不影响。图文示意如下:
原子操作代替锁
poolDequeue对一些关键变量采用了 CAS 操作,比如poolDequeue.headTail,既可完整保证并发又能降低相比锁而言的开销。
行为隔离——链表
这点与 “操作对象隔离” 是相辅相成的,一旦设计目标为尽量减少对同一对象的操作锁,就需要对行为进行隔离,链表能很好的满足这个设计目标:特定的 P 访问特定的位置。从整个过程来看,链表是减少锁的高效数据结构。
Victim Cache 降低 GC 开销
GC 的开销已经足够足够小了,但仍不可避免。对于sync.Pool而言,避免极端情况 GC 的开销也是重点之一,所以 Go 1.13 的sync.Pool引入了 Victim Cache 机制,有效拉长真正回收的时间线,从而减小单次 GC 的开销。
延伸阅读
High Performance Cache Architecture Using Victim Cache
Go: Understand the Design of Sync.Pool
也谈 Goroutine 调度器
Goroutine 调度实例简要分析