Golang1.17 学习笔记009
包位置:runtime/runtime2.go
全局变量 g0 主协程、m0 工作线程
P 里面只是有个本地 runq,全局的 runq 存储在 sched 中
获取任务顺序,先从 m 自身 p 中的 runq 获取,没有就去全局 sched 中获取,没有再去其他 q 中拿一点
type g struct { _panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink goid int64 // 协程id } type m struct { g0 *g // goroutine with scheduling stack curg *g // current running goroutine // 处理器相关信息 p puintptr // attached p for executing go code (nil if not executing go code) nextp puintptr oldp puintptr // the p that was attached before executing a syscall } type p struct { status // 状态:_Pidle、_Prunning、_Psyscall、_Pgcstop、_Pdead m muintptr // back-link to associated m (nil if idle) // Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]guintptr runnext guintptr // Available G's (status == Gdead) gFree struct { gList n int32 } } type schedt struct { midle muintptr // idle m's waiting for work pidle puintptr // idle p's // Global runnable queue. runq gQueue runqsize int32 }
创建:runtime/proc.go
调用 newproc(),然后内部调用 newproc1()
详细流程:newproc -> newproc1 -> (如果P数目没到上限)wakep -> startm -> (可能引发)newm -> newosproc
-> (线程入口)mstart -> schedule -> execute -> goroutine运行 (参考《深入理解go》之goroutine调度)
func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), sys.PtrSize) gp := getg() pc := getcallerpc() systemstack(func() { newg := newproc1(fn, argp, siz, gp, pc) _p_ := getg().m.p.ptr() // 绑当前协程往可执行队列中加;如果本地队列满了,就往全局队列中加 runqput(_p_, newg, true) if mainStarted { wakep() } }) } func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g { // 获取协程 _g_ := getg() // 获取线程 acquirem() // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 // 协程参数大小判断 if siz >= _StackMin-4*sys.PtrSize-sys.PtrSize { throw("newproc: function arguments too large for new goroutine") } // 获取当前协程所在线程的 P 指针 _p_ := _g_.m.p.ptr() // 获取当前空闲的协程 newg := gfget(_p_) if newg == nil { // 栈上分配一个新的协程 newg = malg(_StackMin) // 修改状态为 Gdead casgstatus(newg, _Gidle, _Gdead) // 加入到全局的 allg 中 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } // 获取栈帧地址 totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.StackAlign - 1) // align to StackAlign sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } // 参数拷贝到栈中 if narg > 0 { memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) // This is a stack-to-stack copy. If write barriers // are enabled and the source stack is grey (the // destination is always black), then perform a // barrier copy. We do this *after* the memmove // because the destination stack may have garbage on // it. if writeBarrier.needed && !_g_.m.curg.gcscandone { f := findfunc(fn.fn) stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps)) if stkmap.nbit > 0 { // We're in the prologue, so it's always stack map index 0. bv := stackmapdata(stkmap, 0) bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata) } } } memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc // 保存父级协程 newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } // 如果是系统协程,那么 sched.ngsys(系统协程数)加 1 if isSystemGoroutine(newg, false) { atomic.Xadd(&sched.ngsys, +1) } // 协程状态修改为 Grunable casgstatus(newg, _Gdead, _Grunnable) if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } newg.goid = int64(_p_.goidcache) _p_.goidcache++ // 释放获取协程所在的线程 releasem(_g_.m) return newg }
P 和 M 个数
P: 可以由 runtime.GOMAXPROCES()
M: go 启动时,设置 M 最大只,默认 10000。runtime/debug.SetMaxThreds 设置 M 最大数量
一个 M 被阻塞,会创建新的 M;反之,空闲 M 也可以被回收或睡眠
P 和 M 创建
调度器策略:
workd stealing 机制:当前 M 没有 G,尝试去其他 M 绑定 P 中偷取 G
hand off 机制:当前 M 因 G 进行系统调用阻塞时,M 释放 P,把去转移到其他空闲线程执行
抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU
10ms,防止其他goroutine被饿死
全局G队列:当 M 执行 work stealing 从其他 P 偷不到 G 时,它可以从全局 G 队列获 G
参考文献:
《Go专家编程》之协程
《深入解析Go》之 goroutine 调度
《Golang 修养之路》之 Golang 的协程调度器原理及 GMP 设计思想?