【Go原始碼分析】Go scheduler 原始碼分析
作者:孫偉
1、程序/執行緒/協程基本概念
- 一個程序 可以有多個執行緒,一般情況下固定2MB記憶體塊來做棧,用來儲存當前被呼叫/掛起的函式內部的變數,CPU在執行排程的時候切換的是執行緒,如果下一個執行緒也是當前程序的,就只有執行緒切換,“很快”就能完成;如果下一個執行緒不是當前的程序,就需要切換程序,這就得費點時間了。
- 執行緒 分為核心態執行緒 和使用者態執行緒 ,使用者態執行緒需要繫結核心態執行緒,CPU並不能感知使用者態執行緒的存在,它只知道它在執行1個執行緒,這個執行緒實際是核心態執行緒。
- 使用者態執行緒實際有個名字叫協程 (co-routine),為了容易區分,我們使用協程指使用者態執行緒,使用執行緒指核心態執行緒。
- 協程跟執行緒是有區別的,執行緒由CPU排程是搶佔式的,協程由使用者態排程是協作式的,一個協程讓出CPU後,才執行下一個協程。
協程和執行緒繫結關係有以下3種:
- N:1,N個協程繫結1個執行緒,優點就是協程在使用者態執行緒即完成切換,不會陷入到核心態,這種切換非常的輕量快速。但也有很大的缺點,1個程序的所有協程都繫結在1個執行緒上,一是某個程式用不了硬體的多核加速能力,二是一旦某協程阻塞,造成執行緒阻塞,本程序的其他協程都無法執行了,根本就沒有併發的能力了。
- 1:1,1個協程繫結1個執行緒,這種最容易實現。協程的排程都由CPU完成了,不存在N:1缺點,但有一個缺點是協程的建立、刪除和切換的代價都由CPU完成,有點略顯昂貴了。
- M:N,M個協程繫結N個執行緒,是N:1和1:1型別的結合,克服了以上2種模型的缺點,但實現起來最為複雜。
2、Golang簡介
2.1 Goroutine 概念
因為執行緒切換需要很大的上下文,這種切換消耗了大量CPU時間,所以Go的並行單元並不是傳統意義上的執行緒,而是採用更輕量的協程(goroutine)來處理,大大提高了並行度,因此Go被稱為“最並行的語言”。
2.2與其他併發模型的對比
- Python等解釋性語言採用的是多程序併發模型,程序的上下文是最大的,所以切換耗費巨大,同時由於多程序通訊只能用socket通訊,或者專門設定共享記憶體,給程式設計帶來了極大的困擾與不便;
- C++等語言通常會採用多執行緒併發模型,相比程序,執行緒的上下文要小很多,而且多個執行緒之間本來就是共享記憶體的,所以程式設計相比要輕鬆很多。但是執行緒的啟動和銷燬,切換依然要耗費大量CPU時間;於是出現了執行緒池技術,將執行緒先儲存起來,保持一定的數量,來避免頻繁開啟/關閉執行緒的時間消耗,但是這種初級的技術存在一些問題,比如有執行緒一直被IO阻塞,這樣的話這個執行緒一直佔據著坑位,導致後面的任務排不到隊,拿不到執行緒來執行;
- Go的併發較為複雜,Go採用了更輕量的資料結構來代替執行緒,這種資料結構相比執行緒更輕量,他有自己的棧,切換起來更快。然而真正執行併發的還是執行緒,Go通過排程器將goroutine排程到執行緒中執行,並適時地釋放和建立新的執行緒,並且當一個正在執行的goroutine進入阻塞(常見場景就是等待IO)時,將其脫離佔用的執行緒,將其他準備好執行的goroutine放在該執行緒上執行。通過較為複雜的排程手段,使得整個系統獲得極高的並行度同時又不耗費大量的CPU資源。
2.3 Goroutine的特點
- 非阻塞 。Goroutine的引入是為了方便高併發程式的編寫。一個Goroutine在進行阻塞操作(比如系統呼叫)時,會把當前執行緒中的其他Goroutine移交到其他執行緒中繼續執行,從而避免了整個程式的阻塞。
- 排程器 。雖然Golang引入了垃圾回收(gc),在執行gc時就要求Goroutine是停止的,但Go通過自己實現排程器,也可以方便的實現該功能。 通過多個Goroutine來實現併發程式,既有非同步IO的優勢,又具有多執行緒、多程序編寫程式的便利性。
- 自己維護堆疊 。當然引入Goroutine,也意味著引入了極大的複雜性。一個Goroutine既要包含要執行的程式碼,又要包含用於執行該程式碼的棧、PC(PC值=當前程式執行位置+8)和SP指標。堆疊指標需要保證各種模式下程式完成性。
既然每個Goroutine都有自己的棧,那麼在建立Goroutine時,就要同時建立對應的棧。Goroutine在執行時,棧空間會不停增長。棧通常是連續增長的,由於每個程序中的各個執行緒共享虛擬記憶體空間,當有多個執行緒時,就需要為每個執行緒分配不同起始地址的棧。這就需要在分配棧之前先預估每個執行緒棧的大小。如果執行緒數量非常多,就很容易棧溢位。
為了解決這個問題,就有了Split Stacks 技術:建立棧時,只分配一塊比較小的記憶體,如果進行某次函式呼叫導致棧空間不足時,就會在其他地方分配一塊新的棧空間。新的空間不需要和老的棧空間連續。函式呼叫的引數會拷貝到新的棧空間中,接下來的函式執行都在新棧空間中進行。Golang的棧管理方式與此類似,但是為了更高的效率,使用了連續棧( Golang連續棧) 實現方式也是先分配一塊固定大小的棧,在棧空間不足時,分配一塊更大的棧,並把舊的棧全部拷貝到新棧中。這樣避免了Split Stacks方法可能導致的頻繁記憶體分配和釋放。
Goroutine的執行是可以被搶佔的。如果一個Goroutine一直佔用CPU,長時間沒有被排程過,就會被runtime搶佔掉,把CPU時間交給其他Goroutine。 這個可以通過 debug/goroutine 阻塞實現。
2.4 結構體
- M:指go中的工作者執行緒,是真正執行程式碼的單元;
- P:是一種排程goroutine的上下文,goroutine依賴於P進行排程,P是真正的並行單元;
- G:即goroutine,是go語言中的一段程式碼(以一個函式的形式展現),最小的並行單元;
P必須繫結在M上才能執行,M必須綁定了P才能執行,而一般情況下,最多有MAXPROCS(通常等於CPU數量)個P,但是可能有很多個M,真正執行的只有綁定了M的P,所以P是真正的並行單元。
每個P有一個自己的runnableG佇列,可以從裡面拿出一個G來執行,同時也有一個全域性的runnable G佇列,G通過P依附在M上面執行。不單獨使用全域性的runnable G佇列的原因是,分散式的佇列有利於減小臨界區大小,想一想多個執行緒同時請求可用的G的時候,如果只有全域性的資源,那麼這個全域性的鎖會導致多少執行緒一直在等待。
但是如果一個正在執行的G進入了阻塞,典型的例子就是等待IO,那麼他和它所在的M會在那邊等待,而上下文P會傳遞到其他可用的M上面,這樣這個阻塞就不會影響程式的並行度。
G結構體
type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). stackstack// offset known to runtime/cgo //描述了真實的棧記憶體,包括上下界、 stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink _panic*_panic // innermost panic - offset known to liblink _defer*_defer // innermost defer m*m// current m; offset known to arm liblink//當前的M schedgobuf//goroutine切換時,用於儲存g的上下文 syscallspuintptr// if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpcuintptr// if status==Gsyscall, syscallpc = sched.pc to use during gc stktopspuintptr// expected sp at top of stack, to check in traceback paramunsafe.Pointer // passed parameter on wakeup 用於傳遞引數,睡眠時 其他goroutine可以設定param,喚醒時該goroutine可以獲取 atomicstatusuint32 stackLockuint32 // sigprof/scang lock; TODO: fold in to atomicstatus goidint64//goroutine 的ID waitsinceint64// approx time when the g become blockedg被阻塞的 大概時間 waitreasonstring // if status==Gwaiting schedlinkguintptr preemptbool// preemption signal, duplicates stackguard0 = stackpreempt paniconfaultbool// panic (instead of crash) on unexpected fault address preemptscanbool// preempted g does scan for gc gcscandonebool// g has scanned stack; protected by _Gscan bit in status gcscanvalidbool// false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplitbool// must not split stack raceignoreint8// ignore race detection events sysblocktraced bool// StartTrace has emitted EvGoInSyscall about this goroutine sysexitticksint64// cputicks when syscall has returned (for tracing) tracesequint64// trace event sequencer tracelastppuintptr // last P emitted an event for this goroutine lockedmmuintptr//G被鎖定只能在這個M執行 siguint32 writebuf[]byte sigcode0uintptr sigcode1uintptr sigpcuintptr gopcuintptr // pc of go statement that created this goroutine startpcuintptr // pc of goroutine function racectxuintptr waiting*sudog// sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt[]uintptr// cgo traceback context labelsunsafe.Pointer // profiler labels timer*timer// cached timer for time.Sleep selectDoneuint32// are we participating in a select and did someone win the race? // Per-G GC state // gcAssistBytes is this G's GC assist credit in terms of // bytes allocated. If this is positive, then the G has credit // to allocate gcAssistBytes bytes without assisting. If this // is negative, then the G must correct this by performing // scan work. We track this in bytes to make it fast to update // and check for debt in the malloc hot path. The assist ratio // determines how this corresponds to scan work debt. gcAssistBytes int64 }
Gobuf結構體
type gobuf struct { spuintptr pcuintptr gguintptr ctxt unsafe.Pointer retsys.Uintreg lruintptr bpuintptr // for GOEXPERIMENT=framepointer }
其中最主要的當然是sched了,儲存了goroutine的上下文。goroutine切換的時候不同於執行緒有OS來負責這部分資料,而是由一個gobuf物件來儲存,這樣能夠更加輕量級,再來看看gobuf的結構
M結構體
type m struct { g0*g// 帶有排程棧的goroutine gsignal*g// 處理訊號的goroutine tls[6]uintptr // thread-local storage mstartfnfunc() curg*g// 當前執行的goroutine caughtsigguintptr ppuintptr // 關聯p和執行的go程式碼 nextppuintptr idint32 mallocingint32 // 狀態 spinningbool // m是否out of work blockedbool // m是否被阻塞 inwbbool // m是否在執行寫遮蔽 printlockint8 incgobool // m在執行cgo嗎 fastranduint32 ncgocalluint64// cgo呼叫的總數 ncgoint32// 當前cgo呼叫的數目 parknote alllink*m // 用於連結allm schedlinkmuintptr mcache*mcache // 當前m的記憶體快取 lockedg*g // 鎖定g在當前m上執行,而不會切換到其他m createstack[32]uintptr // thread建立的棧 }
結構體M中有兩個G是需要關注一下的:
- 一個是curg,代表結構體M當前繫結的結構體G。
- 另一個是g0,是帶有排程棧的goroutine,這是一個比較特殊的goroutine。普通的goroutine的棧是在堆上分配的可增長的棧,而g0的棧是M對應的執行緒的棧。所有排程相關的程式碼,會先切換到該goroutine的棧中再執行。也就是說執行緒的棧也是用的g實現,而不是使用的OS的。
P結構體
type p struct { lock mutex idint32 statusuint32 // 狀態,可以為pidle/prunning/... linkpuintptr schedtickuint32// 每排程一次加1 syscalltick uint32// 每一次系統呼叫加1 sysmonticksysmontick mmuintptr// 回鏈到關聯的m mcache*mcache racectxuintptr goidcacheuint64 // goroutine的ID的快取 goidcacheend uint64 // 可執行的goroutine的佇列 runqhead uint32 runqtail uint32 runq[256]guintptr runnext guintptr // 下一個執行的g sudogcache []*sudog sudogbuf[128]*sudog palloc persistentAlloc // per-P to avoid mutex pad [sys.CacheLineSize]byte }
其中P的狀態有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其內部佇列runqhead裡面有可執行的goroutine,P優先從內部獲取執行的g,這樣能夠提高效率。
Schedt結構體
type schedt struct { goidgenuint64 lastpoll uint64 lock mutex midlemuintptr // idle狀態的m nmidleint32// idle狀態的m個數 nmidlelocked int32// lockde狀態的m個數 mcountint32// 建立的m的總數 maxmcountint32// m允許的最大個數 ngsys uint32 // 系統中goroutine的數目,會自動更新 pidlepuintptr // idle的p npidleuint32 nmspinning uint32 // 全域性的可執行的g佇列 runqhead guintptr runqtail guintptr runqsize int32 // dead的G的全域性快取 gflockmutex gfreeStack*g gfreeNoStack *g ngfreeint32 // sudog的快取中心 sudoglockmutex sudogcache *sudog }
大多數需要的資訊都已放在了結構體M、G和P中,schedt結構體只是一個殼。可以看到,其中有M的idle佇列,P的idle佇列,以及一個全域性的就緒的G佇列。schedt結構體中的Lock是非常必須的,如果M或P等做一些非區域性的操作,它們一般需要先鎖住排程器。
2.5具體函式
goroutine排程器的程式碼在/src/runtime/proc.go中,一些比較關鍵的函式分析如下。
2.5.1schedule函式
schedule函式在runtime需要進行排程時執行,為當前的P尋找一個可以執行的G並執行它,尋找順序如下:
- 1) 呼叫runqget函式來從P自己的runnable G佇列中得到一個可以執行的G;
- 2) 如果1)失敗,則呼叫findrunnable函式去尋找一個可以執行的G;
- 3) 如果2)也沒有得到可以執行的G,那麼結束排程,從上次的現場繼續執行。
- 4) 注意)//偶爾會先檢查一次全域性可執行佇列,以確保公平性。否則,兩個goroutine可以完全佔用本地runqueue。 通過 schedtick計數 %61來保證
程式碼如下:
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. } // We should not schedule away from a g that is executing a cgo call, // since the cgo call is using the m's g0 stack. if _g_.m.incgo { throw("schedule: in cgo") } top: if sched.gcwaiting != 0 { gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() } var gp *g var inheritTime bool if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { resetspinning() } if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top } execute(gp, inheritTime) }
2.5.2findrunnable函式
findrunnable函式負責給一個P尋找可以執行的G,它的尋找順序如下:
- 1) 呼叫runqget函式來從P自己的runnable G佇列中得到一個可以執行的G;
- 2) 如果1)失敗,呼叫globrunqget函式從全域性runnableG佇列中得到一個可以執行的G;
- 3) 如果2)失敗,呼叫netpoll(非阻塞)函式取一個非同步回撥的G
- 4) 如果3)失敗,嘗試從其他P那裡偷取一半數量的G過來;
- 5) 如果4)失敗,再次呼叫globrunqget函式從全域性runnableG佇列中得到一個可以執行的G;
- 6) 如果5)失敗,呼叫netpoll(阻塞)函式取一個非同步回撥的G;
- 7) 如果6)仍然沒有取到G,那麼呼叫stopm函式停止這個M。
程式碼如下:
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // local runq if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if gp := netpoll(false); gp != nil { // non-blocking // netpoll returns list of goroutines linked by schedlink. injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // We have nothing to do. If we're in the GC mark phase, can // safely scan and blacken objects, and have work to do, run // idle-time marking rather than give up the P. if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } // Before we drop our P, make a snapshot of the allp slice, // which can change underfoot once we no longer block // safe-points. We don't need to snapshot the contents because // everything up to cap(allp) is immutable. allpSnapshot := allp // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) // Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // Check for idle-priority GC work again. if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } } // poll network if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } gp := netpoll(true) // block until new work is available atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(gp) } } stopm() goto top }
2.5.3newproc函式
newproc函式負責建立一個可以執行的G並將其放在當前的P的runnable G佇列中,它是類似”go func() { … }”語句真正被編譯器翻譯後的呼叫,核心程式碼在newproc1函式。這個函式執行順序如下:
- 1) 獲得當前的G所在的 P,然後從free G佇列中取出一個G;
- 2) 如果1)取到則對這個G進行引數配置,否則新建一個G;
- 3) 將G加入P的runnable G佇列。
程式碼如下:
// Go1.10.8版本預設stack大小為2KB _StackMin = 2048 // 建立一個g物件,然後放到g佇列 // 等待被執行 // Create a new g running fn with narg bytes of arguments starting // at argp. callerpc is the address of the go statement that created // this. The new g is put on the queue of g's waiting to run. func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 // We could allocate a larger initial stack if necessary. // Not worth it: this is almost always an error. // 4*sizeof(uintreg): extra space added below // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall). if siz >= _StackMin-4*sys.RegSize-sys.RegSize { throw("newproc: function arguments too large for new goroutine") } _p_ := _g_.m.p.ptr() newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } if newg.stack.hi == 0 { throw("newproc1: newg missing stack") } if readgstatus(newg) != _Gdead { throw("newproc1: new g is not Gdead") } totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.SpAlign - 1)// align to spAlign sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } if narg > 0 { memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) // This is a stack-to-stack copy. If write barriers // are enabled and the source stack is grey (the // destination is always black), then perform a // barrier copy. We do this *after* the memmove // because the destination stack may have garbage on // it. if writeBarrier.needed && !_g_.m.curg.gcscandone { f := findfunc(fn.fn) stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps)) // We're in the prologue, so it's always stack map index 0. bv := stackmapdata(stkmap, 0) bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata) } } memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } if isSystemGoroutine(newg) { atomic.Xadd(&sched.ngsys, +1) } newg.gcscanvalid = false casgstatus(newg, _Gdead, _Grunnable) if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } newg.goid = int64(_p_.goidcache) _p_.goidcache++ if raceenabled { newg.racectx = racegostart(callerpc) } if trace.enabled { traceGoCreate(newg, newg.startpc) } runqput(_p_, newg, true) if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } }
2.5.4goexit0函式
goexit函式是當G退出時呼叫的。這個函式對G進行一些設定後,將它放入free G列表中,供以後複用,之後呼叫schedule函式排程。
// goexit continuation on g0. func goexit0(gp *g) { _g_ := getg() //設定g的 status從 _Grunning變為 _Gdead casgstatus(gp, _Grunning, _Gdead) if isSystemGoroutine(gp) { atomic.Xadd(&sched.ngsys, -1) } //對該g 進行釋放設定 基本為nil /0 gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = "" gp.param = nil gp.labels = nil gp.timer = nil if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { // Flush assist credit to the global pool. This gives // better information to pacing if the application is // rapidly creating an exiting goroutines. scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes)) atomic.Xaddint64(&gcController.bgScanCredit, scanCredit) gp.gcAssistBytes = 0 } // Note that gp's stack scan is now "valid" because it has no // stack. gp.gcscanvalid = true dropg() if _g_.m.lockedInt != 0 { print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n") throw("internal lockOSThread error") } _g_.m.lockedExt = 0 //把這個g 推到free G 列表 gfput(_g_.m.p.ptr(), gp) if locked { // The goroutine may have locked this thread because // it put it in an unusual kernel state. Kill it // rather than returning it to the thread pool. // Return to mstart, which will release the P and exit // the thread. if GOOS != "plan9" { // See golang.org/issue/22227. gogo(&_g_.m.g0.sched) } } schedule() }
2.5.5handoffp函式
handoffp函式將P從系統呼叫或阻塞的M中傳遞出去,如果P還有runnable G佇列,那麼新開一個M,呼叫startm函式,新開的M不空旋。
// Hands off P from syscall or locked M. // Always runs without a P, so write barriers are not allowed. //go:nowritebarrierrec func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. //如果這個P的佇列不為空或排程內的size不為空 那麼 進行startm 且不空旋 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } //如果正在進行GC處理同上 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } //如果沒活可做了,檢查下有沒有 空閒/自旋的 M //否則 不需要我們做自旋 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } //排程上鎖將這個P 摘除走 lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } pidleput(_p_) unlock(&sched.lock) }
2.5.6startm函式
startm函式排程一個M或者必要時建立一個M來執行指定的P。
// Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. // If spinning is set, the caller has incremented nmspinning and startm will // either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrierrec func startm(_p_ *p, spinning bool) { //加鎖 lock(&sched.lock) if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) }
2.5.7sysmon函式
sysmon函式是Go runtime啟動時建立的,負責監控所有goroutine的狀態,判斷是否需要GC,進行netpoll等操作。sysmon函式中會呼叫retake函式進行搶佔式排程。
// Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() { lock(&sched.lock) sched.nmsys++ checkdead() unlock(&sched.lock) // If a heap span goes unused for 5 minutes after a garbage collection, // we hand it back to the operating system. scavengelimit := int64(5 * 60 * 1e9) if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 } lastscavenge := nanotime() nscavenge := 0 lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) for { if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. maxsleep := forcegcperiod / 2 if scavengelimit < forcegcperiod { maxsleep = scavengelimit / 2 } shouldRelax := true if osRelaxMinNS > 0 { next := timeSleepUntil() now := nanotime() if next-now < osRelaxMinNS { shouldRelax = false } } if shouldRelax { osRelax(true) } notetsleep(&sched.sysmonnote, maxsleep) if shouldRelax { osRelax(false) } lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) idle = 0 delay = 20 } unlock(&sched.lock) } // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(gp) incidlelocked(1) } } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } // check if we need to force a GC if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 forcegc.g.schedlink = 0 injectglist(forcegc.g) unlock(&forcegc.lock) } // scavenge heap once in a while if lastscavenge+scavengelimit/2 < now { mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) lastscavenge = now nscavenge++ } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } }
2.5.8retake函式
列舉所有的P 如果P在系統呼叫中(_Psyscall), 且經過了一次sysmon迴圈(20us~10ms), 則搶佔這個P, 呼叫handoffp解除M和P之間的關聯, 如果P在執行中(_Prunning), 且經過了一次sysmon迴圈並且G執行時間超過forcePreemptNS(10ms), 則搶佔這個P
並設定g.preempt = true,g.stackguard0 = stackPreempt。
為什麼設定了stackguard就可以實現搶佔?
因為這個值用於檢查當前棧空間是否足夠, go函式的開頭會比對這個值判斷是否需要擴張棧。
newstack函式判斷g.stackguard0等於stackPreempt, 就知道這是搶佔觸發的, 這時會再檢查一遍是否要搶佔。
搶佔機制保證了不會有一個G長時間的執行導致其他G無法執行的情況發生。
func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } else if s == _Prunning { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } unlock(&allpLock) return uint32(n) }
3、排程器總結
3.1 排程器的兩大思想
- 複用執行緒:協程本身就是執行在一組執行緒之上,不需要頻繁的建立、銷燬執行緒,而是對執行緒的複用。在排程器中複用執行緒還有2個體現:1)work stealing,當本執行緒無可執行的G時,嘗試從其他執行緒繫結的P偷取G,而不是銷燬執行緒。2)handoff,當本執行緒因為G進行系統呼叫阻塞時,執行緒釋放繫結的P,把P轉移給其他空閒的執行緒執行。
- 利用並行:GOMAXPROCS設定P的數量,當GOMAXPROCS大於1時,就最多有GOMAXPROCS個執行緒處於執行狀態,這些執行緒可能分佈在多個CPU核上同時執行,使得併發利用並行。另外,GOMAXPROCS也限制了併發的程度,比如GOMAXPROCS = 核數/2,則最多利用了一半的CPU核進行並行。
3.2排程器的兩小策略:
- 搶佔:在coroutine中要等待一個協程主動讓出CPU才執行下一個協程,在Go中,一個goroutine最多佔用CPU 10ms,防止其他goroutine被餓死,這就是goroutine不同於coroutine的一個地方。
- 全域性G佇列:在新的排程器中依然有全域性G佇列,但功能已經被弱化了,當M執行work stealing從其他P偷不到G時,它可以從全域性G佇列獲取G。
4、參考資料
- Golang程式碼倉庫:https://github.com/golang/go
- 《ScalableGo Schedule》:https://docs.google.com/docum...
- 《GoPreemptive Scheduler》:https://docs.google.com/docum...
-
網上文章:
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic... 排程例項分析
https://www.cnblogs.com/sunsk... 搶佔式
https://blog.csdn.net/u010853... schedule 剖析理解分析的很到位--建議大家認真閱讀幾遍-因為圖形很形象。