Refactor the structure nodeScoreHeap

This commit is contained in:
Xiao Yang 2021-07-28 14:48:12 +08:00
parent 1e2bf2548c
commit 144f25afb5

View File

@ -17,7 +17,9 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"container/heap"
"context" "context"
"fmt" "fmt"
"math/rand" "math/rand"
"sort" "sort"
@ -51,11 +53,14 @@ const (
// to ensure that a certain minimum of nodes are checked for feasibility. // to ensure that a certain minimum of nodes are checked for feasibility.
// This in turn helps ensure a minimum level of spreading. // This in turn helps ensure a minimum level of spreading.
minFeasibleNodesPercentageToFind = 5 minFeasibleNodesPercentageToFind = 5
// minNodeScoresToDump is the number of nodes and plugins score to dump // minNodeScoresToDump is the number of nodes' score to dump when the logging
// when the logging level ranged in [4, 6) // level ranged in [4, 6)
minNodeScoresToDump = 3 minNodeScoresToDump = 3
// minNodeScoresToDump is the number of nodes score to dump when the // minPluginScoresToDump is the number of plugins' score to dump when the
// logging level ranged in [6, 10), and all plugins' score would be dumped // logging level ranged in [4, 6)
minPluginScoresToDump = 3
// moreNodeScoresToDump is the number of nodes score to dump when the
// logging level ranged in [6, 10), and all plugins' score will be dumped
moreNodeScoresToDump = 10 moreNodeScoresToDump = 10
) )
@ -97,14 +102,6 @@ type pluginScoreList []pluginScore
// NodeToPluginScores declares a map from node name to its PluginScoreList. // NodeToPluginScores declares a map from node name to its PluginScoreList.
type nodeToPluginScores map[string]pluginScoreList type nodeToPluginScores map[string]pluginScoreList
type nshp []framework.NodeScore
func (h nshp) Len() int { return len(h) }
func (h nshp) Less(i, j int) bool { return h[i].Score < h[j].Score }
func (h nshp) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nshp) Push(v interface{}) { *h = append(*h, v.(framework.NodeScore)) }
func (h *nshp) Pop() interface{} { a := *h; v := a[len(a)-1]; *h = a[:len(a)-1]; return v }
// snapshot snapshots scheduler cache and node infos for all fit and priority // snapshot snapshots scheduler cache and node infos for all fit and priority
// functions. // functions.
func (g *genericScheduler) snapshot() error { func (g *genericScheduler) snapshot() error {
@ -428,6 +425,7 @@ func prioritizeNodes(
pod *v1.Pod, pod *v1.Pod,
nodes []*v1.Node, nodes []*v1.Node,
) (framework.NodeScoreList, error) { ) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one. // If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format // This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() { if len(extenders) == 0 && !fwk.HasScorePlugins() {
@ -463,7 +461,9 @@ func prioritizeNodes(
} }
} }
if klog.V(4).Enabled() {
logNodeAndPluginScores(result, scoresMap, pod) logNodeAndPluginScores(result, scoresMap, pod)
}
if len(extenders) != 0 && nodes != nil { if len(extenders) != 0 && nodes != nil {
var mu sync.Mutex var mu sync.Mutex
@ -525,12 +525,12 @@ func NewGenericScheduler(
} }
} }
// To expose more information about the scheduling process but not has a big drop on performance,
// we use a dynamic strategy to dump depending on the logging level:
// * for logging level ranged in [4, 6), dump the scores for topM nodes/plugins. And make it as performant as possible.
// * for logging level ranged in [6, 10), dump the scores for topN nodes, and probably show all plugins
// * for logging level >= 10, dump the scores for all nodes/plugins.
func logNodeAndPluginScores(nodeScores framework.NodeScoreList, scoresMap framework.PluginToNodeScores, pod *v1.Pod) { func logNodeAndPluginScores(nodeScores framework.NodeScoreList, scoresMap framework.PluginToNodeScores, pod *v1.Pod) {
if !klog.V(4).Enabled() {
// No log dump if logging level < 4
return
}
if klog.V(10).Enabled() { if klog.V(10).Enabled() {
for plugin, nodeScoreList := range scoresMap { for plugin, nodeScoreList := range scoresMap {
for _, nodeScore := range nodeScoreList { for _, nodeScore := range nodeScoreList {
@ -540,58 +540,89 @@ func logNodeAndPluginScores(nodeScores framework.NodeScoreList, scoresMap framew
return return
} }
var topM int8 // Log the top M node scores for the pod
topMNodes := minNodeScoresToDump
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
topM = moreNodeScoresToDump topMNodes = moreNodeScoresToDump
} else {
//if klog.V(4).Enabled()
topM = minNodeScoresToDump
} }
hp := make(nshp, topM) // Use the min-heap to maintain a top M node score list
for _, nodeScore := range nodeScores { nsh := make(nodeScoreHeap, 0, topMNodes)
if len(hp) < int(topM) || nodeScore.Score < hp[0].Score { for i, nodeScore := range nodeScores {
if len(hp) == int(topM) { if len(nsh) < int(topMNodes) || nodeScore.Score > nsh[0].Score {
hp.Pop() if len(nsh) == int(topMNodes) {
heap.Pop(&nsh)
} }
hp.Push(nodeScore) heap.Push(&nsh, &nodeScores[i])
} }
} }
// Build a map of Nodes->PluginScores on that node // Build a map of Nodes->PluginScores on that node
nodeToPluginScores := make(nodeToPluginScores, topM) nodeScoresMap := make(nodeToPluginScores, topMNodes)
for _, nodeScore := range hp { for _, nodeScore := range nsh {
nodeToPluginScores[nodeScore.Name] = make(pluginScoreList, 0) nodeScoresMap[nodeScore.Name] = make(pluginScoreList, 0)
} }
// Convert the scoresMap (which contains Plugins->NodeScores) to the Nodes->PluginScores map // Convert the scoresMap (which contains Plugins->NodeScores) to the Nodes->PluginScores map
for plugin, nodeScoreList := range scoresMap { for plugin, nodeScoreList := range scoresMap {
for _, nodeScore := range nodeScoreList { for _, nodeScore := range nodeScoreList {
// Get the top M nodes' plugin scores // Get the top M nodes' plugin scores
if _, ok := nodeToPluginScores[nodeScore.Name]; ok { if _, ok := nodeScoresMap[nodeScore.Name]; ok {
nodeToPluginScores[nodeScore.Name] = append(nodeToPluginScores[nodeScore.Name], pluginScore{Name: plugin, Score: nodeScore.Score}) nodeScoresMap[nodeScore.Name] = append(nodeScoresMap[nodeScore.Name], pluginScore{Name: plugin, Score: nodeScore.Score})
} }
} }
} }
var topNPlugins int8 = -1 // Log the top N plugin scores for the top M nodes
if !klog.V(6).Enabled() { topNPlugins := minPluginScoresToDump
topNPlugins = minNodeScoresToDump if klog.V(6).Enabled() {
// Log all plugins' socre if -v6
topNPlugins = -1
} }
for name, pluginScores := range nodeToPluginScores {
//Get the PluginScore for logging level 4 for name, pluginScores := range nodeScoresMap {
sort.Slice(pluginScores, func(i int, j int) bool { sort.Slice(pluginScores, func(i int, j int) bool {
return pluginScores[i].Score > pluginScores[j].Score return pluginScores[i].Score > pluginScores[j].Score
}) })
// Get the top N plugin socres only if logging level ranged in [4, 6)
if topNPlugins > 0 && len(pluginScores) > int(topNPlugins) { if topNPlugins > 0 && len(pluginScores) > int(topNPlugins) {
nodeToPluginScores[name] = pluginScores[:topNPlugins] nodeScoresMap[name] = pluginScores[:topNPlugins]
} }
} }
for len(hp) > 0 { scoresMessage := fmt.Sprintf("Top %d plugins for pod on node", topMNodes)
nodeScore := hp.Pop().(framework.NodeScore) for len(nsh) > 0 {
pluginScores := nodeToPluginScores[nodeScore.Name] nodeScore := heap.Pop(&nsh).(*framework.NodeScore)
klog.InfoS("Plugins scored node for pod", "pod", klog.KObj(pod), "node", nodeScore.Name, "score", nodeScore.Score, "plugins", pluginScores) pluginScores := nodeScoresMap[nodeScore.Name]
klog.InfoS(scoresMessage, "pod", klog.KObj(pod), "node", nodeScore.Name, "score", nodeScore.Score, "plugins", pluginScores)
}
} }
// nodeScoreHeap is a min-heap ordered by the score of nodes. The
// scheduler uses this to find the top-N scoring nodes when the
// logging verbose level is ranged in [4, 10)
type nodeScoreHeap []*framework.NodeScore
// var _ heap.Interface = &nodeScoreHeap{}
func (nsh nodeScoreHeap) Len() int {
return len(nsh)
}
func (nsh nodeScoreHeap) Less(i, j int) bool {
return nsh[i].Score < nsh[j].Score
}
func (nsh nodeScoreHeap) Swap(i, j int) {
nsh[i], nsh[j] = nsh[j], nsh[i]
}
func (nsh *nodeScoreHeap) Push(v interface{}) {
*nsh = append(*nsh, v.(*framework.NodeScore))
}
func (nsh *nodeScoreHeap) Pop() interface{} {
ns := (*nsh)[nsh.Len()-1]
*nsh = (*nsh)[:nsh.Len()-1]
return ns
} }