《k8s-1.13版本原始碼分析》-排程預選
本系列文章已經開源到github: https://github.com/farmer-hutao/k8s-source-code-analysis
預選過程
1. 預選流程
predicate過程從 pkg/scheduler/core/generic_scheduler.go:389 findNodesThatFit()
方法就算正式開始了,這個方法根據給定的 predicate functions 過濾所有的nodes來尋找一堆可以跑pod的node集。老規矩,我們來看主幹程式碼:
pkg/scheduler/core/generic_scheduler.go:389
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { checkNode := func(i int) { fits, failedPredicates, err := podFitsOnNode( //…… ) if fits { length := atomic.AddInt32(&filteredLen, 1) filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() } } workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) if len(filtered) > 0 && len(g.extenders) != 0 { for _, extender := range g.extenders { // Logic of extenders } } return filtered, failedPredicateMap, nil }
如上,刪的有點多,大家也可以看一下原函式然後對比一下,看看我為什麼只保留這一點。從上面程式碼中我們可以發現,最重要的是一個子函式呼叫過程 fits, failedPredicates, err := podFitsOnNode()
,這個函式的引數我沒有貼出來,下面會詳細講;下半部分是一個extender過程,extender不影響對predicate過程的理解,我們後面專門當作一個主題講。所以這裡的關注點是 podFitsOnNode()
函式。
2. predicate的併發
進入 podFitsOnNode()
函式邏輯之前,我們先看一下呼叫到 podFitsOnNode()
函式的匿名函式變數 checkNode 是怎麼被呼叫的:
pkg/scheduler/core/generic_scheduler.go:458
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
ParallelizeUntil()
函式是用於並行執行N個獨立的工作過程的,這個邏輯寫的挺有意思,我們看一下完整的程式碼(這段的分析思路寫到註釋裡哦):
vendor/k8s.io/client-go/util/workqueue/parallelizer.go:38
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { // 從形參列表看,需要關注的有workers和pieces兩個數字型別的引數,doworkPiece這個函式型別的引數 // DoWorkPieceFunc型別也就是func(piece int)型別 // 注意到上面呼叫的時候workers的實參是16,pieces是allNodes,也就是node數量 var stop <-chan struct{} if ctx != nil { stop = ctx.Done() } // 這裡定義toProcess的容量和pieces相等,也就是和node數量相等 toProcess := make(chan int, pieces) for i := 0; i < pieces; i++ { // 假設有100個node,那麼這裡就寫了100個數到toProcess裡 toProcess <- i } // 關閉了一個有快取的channel close(toProcess) // 如果pieces數量比較少,也就是說假設node只有10個,那麼workers就賦值為10個 // 到這裡差不多可以猜到worker是併發工作數,當node大於16時併發是16,當node小於16時並法數就是node數 if pieces < workers { workers = pieces } wg := sync.WaitGroup{} wg.Add(workers) // 要批量開goroutine了 for i := 0; i < workers; i++ { // 如果100個node,這裡時16;如果是10個node,這裡是10 go func() { defer utilruntime.HandleCrash() defer wg.Done() for piece := range toProcess { // 從toProcess中拿一個數,舉個例子,假如現在併發是10,那麼toProcess裡面存的資料其實 // 也是10個,也就是1個goroutine拿到1個數,開始了一個下面的default邏輯; // 假設併發數是16,node數是100,這時候toProcess裡面也就是100個數, // 這時候就是16個“消費者”在消耗100個數。當然每拿到一個數需要執行到一次下面的default select { case <-stop: return default: // 第piece個節點被doWorkPiece了; // 對應呼叫過程也就是checkNode函式傳入了一個整型引數piece doWorkPiece(piece) } } }() } wg.Wait() }
回想一下前面的 checkNode := func(i int){……}
,上面的 doWorkPiece(piece)
也就是呼叫到了這裡的這個匿名函式 func(i int){……}
;到這裡就清楚如何實現併發執行多個node的predicate過程了。
3. 一個node的predicate
checkNode的主要邏輯就是上面介紹的併發加上下面這個 podFitsOnNode()
函式邏輯:
pkg/scheduler/core/generic_scheduler.go:425
fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, nodeCache, g.schedulingQueue, g.alwaysCheckAllPredicates, equivClass, )
我們從 podFitsOnNode()
的函式定義入手:
pkg/scheduler/core/generic_scheduler.go:537
func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodeCache *equivalence.NodeCache, queue internalqueue.SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class, ) (bool, []algorithm.PredicateFailureReason, error)
關於這個函式的邏輯,註釋裡的描述翻譯過來大概是這個意思:
podFitsOnNode()函式檢查一個通過NodeInfo形式給定的node是否滿足指定的predicate functions. 對於給定的一個Pod,podFitsOnNode()函式會檢查是否有某個“等價的pod”存在,然後重用那個等價pod快取的predicate結果。 這個函式的呼叫入口有2處: Schedule and Preempt.
- 當從Schedule進入時:這個函式想要測試node上所有已經存在的pod外加被指定將要排程到這個node上的其他所有高優先順序(優先順序不比自己低,也就是>=)的pod後,當前pod是否可以被排程到這個node上。
- 當從Preempt進入時:後面講preempt時再詳細分析。
podFitsOnNode()函式的引數有點多,每個跟進去就是一堆知識點。這裡建議大家從字面先過一邊,然後跟進去看一下型別定義,型別的註釋等,瞭解一下功能,先不深究。整體看完一邊排程器程式碼後回過頭深入細節。
我們一起看一下其中這個引數: predicateFuncs map[string]algorithm.FitPredicate
;這裡的 predicateFuncs 是一個map,表示所有的predicate函式。這個map的key是個字串,也就是某種形式的name了;value型別跟進去看一下:
pkg/scheduler/algorithm/types.go:36
// FitPredicate is a function that indicates if a pod fits into an existing node. // The failure information is given by the error. type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
FitPredicate是一個函式型別,3個引數,pod和node都很好理解,meta跟進去簡單看一下可以發現定義的是一些和predicate相關的一些元資料,這些資料是根據pod和node資訊獲取到的,類似pod的埠有哪些,pod親和的pod列表等。返回值是一個表示是否fit的bool值,predicate失敗的原因列表,一個錯誤型別。
也就是說,FitPredicate這個函式型別也就是前面一直說的predicate functions的真面目了。下面看podFitsOnNode()函式的具體邏輯吧:
pkg/scheduler/core/generic_scheduler.go:537
func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodeCache *equivalence.NodeCache, queue internalqueue.SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class, ) (bool, []algorithm.PredicateFailureReason, error) { podsAdded := false for i := 0; i < 2; i++ { metaToUse := meta nodeInfoToUse := info if i == 0 { podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue) } else if !podsAdded || len(failedPredicates) != 0 { break } eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded // 這裡省略一個for迴圈,下面會單獨講 } return len(failedPredicates) == 0, failedPredicates, nil }
這裡的邏輯是從一個for迴圈開始的,關於這個2次迴圈的含義程式碼裡有很長的一段註釋,我們先看一下注釋裡怎麼說的(這裡可以多看幾遍體會一下):
- 出於某些原因考慮我們需要執行兩次predicate. 如果node上有更高或者相同優先順序的“指定pods”(這裡的“指定pods”指的是通過schedule計算後指定要跑在一個node上但是還未真正執行到那個node上的pods),我們將這些pods加入到meta和nodeInfo後執行一次計算過程。
- 如果這個過程所有的predicates都成功了,我們再假設這些“指定pods”不會跑到node上再執行一次。第二次計算是必須的,因為有一些predicates比如pod親和性,也許在“指定pods”沒有成功跑到node的情況下會不滿足。
- 如果沒有“指定pods”或者第一次計算過程失敗了,那麼第二次計算不會進行。
- 我們在第一次排程的時候只考慮相等或者更高優先順序的pods,因為這些pod是當前pod必須“臣服”的,也就是說不能夠從這些pod中搶到資源,這些pod不會被當前pod“搶佔”;這樣當前pod也就能夠安心從低優先順序的pod手裡搶資源了。
- 新pod在上述2種情況下都可排程基於一個保守的假設:資源和pod反親和性等的predicate在“指定pods”被處理為Running時更容易失敗;pod親和性在“指定pods”被處理為Not Running時更加容易失敗。
- 我們不能假設“指定pods”是Running的因為它們當前還沒有執行,而且事實上,它們確實有可能最終又被排程到其他node上了。
看了這個註釋後,上面程式碼裡的前幾行就很好理解了,在第一次進入迴圈體和第二次進入時做了不同的處理,具體怎麼做的處理我們暫時不關注。下面看省略的這個for迴圈做了啥:
pkg/scheduler/core/generic_scheduler.go:583
// predicates.Ordering()得到的是一個[]string,predicate名字集合 for predicateID, predicateKey := range predicates.Ordering() { var ( fitbool reasons []algorithm.PredicateFailureReason errerror ) // 如果predicateFuncs有這個key,則呼叫這個predicate;也就是說predicateFuncs如果定義了一堆亂七八遭的名字,會被忽略調,因為predicateKey是內建的。 if predicate, exist := predicateFuncs[predicateKey]; exist { // 降低難度,先不看快取情況。 if eCacheAvailable { fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass) } else { // 真正呼叫predicate函數了!!!!!!!!! fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) } if err != nil { return false, []algorithm.PredicateFailureReason{}, err } if !fit { // …… } } }
如上,我們看一下2個地方:
- predicates.Ordering()
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
分兩個小節吧~
3.1. predicates的順序
pkg/scheduler/algorithm/predicates/predicates.go:130
var ( predicatesOrdering = []string{ CheckNodeConditionPred, CheckNodeUnschedulablePred, GeneralPred, HostNamePred, PodFitsHostPortsPred, MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred, PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred, CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred, MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred, CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred} )
如上,這裡定義了一個次序,前面的for迴圈遍歷的是這個[]string,這樣也就實現了不管 predicateFuncs
裡定義了怎樣的順序,影響不了predicate的實際呼叫順序。官網對於這個順序有這樣一個 表格 解釋:
PositionPredicatecomments (note, justification...)1 CheckNodeConditionPredicate
we really don’t want to check predicates against unschedulable nodes.2 PodFitsHost
we check the pod.spec.nodeName.3 PodFitsHostPorts
we check ports asked on the spec.4 PodMatchNodeSelector
check node label after narrowing search.5 PodFitsResources
this one comes here since it’s not restrictive enough as we do not try to match values but ranges.6 NoDiskConflict
Following the resource predicate, we check disk7 PodToleratesNodeTaints
check toleration here, as node might have toleration8 PodToleratesNodeNoExecuteTaints
check toleration here, as node might have toleration9 CheckNodeLabelPresence
labels are easy to check, so this one goes before10 checkServiceAffinity
-11 MaxPDVolumeCountPredicate
-12 VolumeNodePredicate
-13 VolumeZonePredicate
-14 CheckNodeMemoryPressurePredicate
doesn’t happen often15 CheckNodeDiskPressurePredicate
doesn’t happen often16 InterPodAffinityMatches
Most expensive predicate to compute
這個表格大家對著字面意思體會一下吧,基本還是可以聯想到意義的。
當然這個順序是可以被配置檔案覆蓋的,使用者可以使用類似這樣的配置:
{ "kind" : "Policy", "apiVersion" : "v1", "predicates" : [ {"name" : "PodFitsHostPorts", "order": 2}, {"name" : "PodFitsResources", "order": 3}, {"name" : "NoDiskConflict", "order": 5}, {"name" : "PodToleratesNodeTaints", "order": 4}, {"name" : "MatchNodeSelector", "order": 6}, {"name" : "PodFitsHost", "order": 1} ], "priorities" : [ {"name" : "LeastRequestedPriority", "weight" : 1}, {"name" : "BalancedResourceAllocation", "weight" : 1}, {"name" : "ServiceSpreadingPriority", "weight" : 1}, {"name" : "EqualPriority", "weight" : 1} ], "hardPodAffinitySymmetricWeight" : 10 }
整體過完原始碼後我們再實際嘗試一下這些特性,這一邊先知道有這回事吧,ok,繼續~
3.2. 單個predicate執行過程
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
這行程式碼其實沒有啥複雜邏輯,不過我們還是重複講一下,清晰理解這一行很有必要。這裡的 predicate()
來自前幾行的if語句 predicate, exist := predicateFuncs[predicateKey]
,往前跟也就是 FitPredicate 型別,我們前面提過,型別定義在 pkg/scheduler/algorithm/types.go:36
,這個߇#x7C7B;型表示的是一個具體的predicate函式,這裡使用 predicate()
也就是一個函式呼叫的語法,很和諧了。
3.3. 具體的predicate函式
一直在講predicate,那麼predicate函式到底長什麼樣子呢,我們從具體的實現函式找一個看一下。開始講design的時候提到過predicate的實現在 pkg/scheduler/algorithm/predicates/predicates.go
檔案中,先看一眼Structure吧:
這個檔案中predicate函式有點多,這樣看眼花,我們具體點開一個觀察一下:
pkg/scheduler/algorithm/predicates/predicates.go:277
func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { for _, v := range pod.Spec.Volumes { for _, ev := range nodeInfo.Pods() { if isVolumeConflict(v, ev) { return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil } } } return true, nil, nil }
我們知道predicate函式的特點,這樣就很好在這個一千六百多行go檔案中尋找predicate函數了。像上面這個 NoDiskConflict()
函式,引數是pod、meta和nodeinfo,很明顯是 FitPredicate 型別的,標準的predicate函式。
這個函式的實現也特別簡單,遍歷pod的Volumes,然後對於pod的每一個Volume,遍歷node上的每個pod,看是否和當前podVolume衝突。如果不fit就返回false加原因;如果fit就返回true,很清晰。