diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 59a4d40179e..d11ee4f915a 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -141,7 +141,7 @@ type genericScheduler struct { extenders []algorithm.SchedulerExtender lastNodeIndex uint64 alwaysCheckAllPredicates bool - cachedNodeInfoMap map[string]*schedulernodeinfo.NodeInfo + nodeInfoSnapshot schedulerinternalcache.NodeInfoSnapshot volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister pdbLister algorithm.PDBLister @@ -153,7 +153,7 @@ type genericScheduler struct { // functions. func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. - return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) + return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot) } // Schedule tries to schedule the given pod to one of the nodes in the node list. @@ -210,8 +210,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister }, nil } - metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) - priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) + metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) + priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return result, err } @@ -290,7 +290,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if !ok || fitError == nil { return nil, nil, nil, nil } - if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) { + if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } @@ -311,7 +311,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if err != nil { return nil, nil, nil, err } - nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, + nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err @@ -335,7 +335,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) - if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { + if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } @@ -355,7 +355,7 @@ func (g *genericScheduler) processPreemptionWithExtenders( newNodeToVictims, err := extender.ProcessPreemption( pod, nodeToVictims, - g.cachedNodeInfoMap, + g.nodeInfoSnapshot.NodeInfoMap, ) if err != nil { if extender.IsIgnorable() { @@ -452,14 +452,14 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v ctx, cancel := context.WithCancel(context.Background()) // We can use the same metadata producer for all nodes. - meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) + meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() fits, failedPredicates, err := podFitsOnNode( pod, meta, - g.cachedNodeInfoMap[nodeName], + g.nodeInfoSnapshot.NodeInfoMap[nodeName], g.predicates, g.schedulingQueue, g.alwaysCheckAllPredicates, @@ -476,7 +476,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v cancel() atomic.AddInt32(&filteredLen, -1) } else { - filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() + filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node() } } else { predicateResultLock.Lock() @@ -500,7 +500,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v if !extender.IsInterested(pod) { continue } - filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap) + filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap) if err != nil { if extender.IsIgnorable() { klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", @@ -1193,7 +1193,7 @@ func NewGenericScheduler( priorityMetaProducer: priorityMetaProducer, pluginSet: pluginSet, extenders: extenders, - cachedNodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), + nodeInfoSnapshot: schedulerinternalcache.NewNodeInfoSnapshot(), volumeBinder: volumeBinder, pvcLister: pvcLister, pdbLister: pdbLister, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 73e3ce75fdc..2efdfe2edde 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -513,7 +513,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes emptyPluginSet, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) - cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) + cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) } diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 3245378869a..790a3bcc6e3 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -47,6 +47,15 @@ func New(ttl time.Duration, stop <-chan struct{}) Cache { return cache } +// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly +// linked list. When a NodeInfo is updated, it goes to the head of the list. +// The items closer to the head are the most recently updated items. +type nodeInfoListItem struct { + info *schedulernodeinfo.NodeInfo + next *nodeInfoListItem + prev *nodeInfoListItem +} + type schedulerCache struct { stop <-chan struct{} ttl time.Duration @@ -59,8 +68,11 @@ type schedulerCache struct { assumedPods map[string]bool // a map from pod key to podState. podStates map[string]*podState - nodes map[string]*schedulernodeinfo.NodeInfo - nodeTree *NodeTree + nodes map[string]*nodeInfoListItem + // headNode points to the most recently updated NodeInfo in "nodes". It is the + // head of the linked list. + headNode *nodeInfoListItem + nodeTree *NodeTree // A map from image name to its imageState. imageStates map[string]*imageState } @@ -94,7 +106,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul period: period, stop: stop, - nodes: make(map[string]*schedulernodeinfo.NodeInfo), + nodes: make(map[string]*nodeInfoListItem), nodeTree: newNodeTree(nil), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), @@ -102,15 +114,82 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact, -// and should be only used in non-critical path. +// newNodeInfoListItem initializes a new nodeInfoListItem. +func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem { + return &nodeInfoListItem{ + info: ni, + } +} + +// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it. +func NewNodeInfoSnapshot() NodeInfoSnapshot { + return NodeInfoSnapshot{ + NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), + } +} + +// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly +// linked list. The head is the most recently updated NodeInfo. +// We assume cache lock is already acquired. +func (cache *schedulerCache) moveNodeInfoToHead(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.Errorf("No NodeInfo with name %v found in the cache", name) + return + } + // if the node info list item is already at the head, we are done. + if ni == cache.headNode { + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + if cache.headNode != nil { + cache.headNode.prev = ni + } + ni.next = cache.headNode + ni.prev = nil + cache.headNode = ni +} + +// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly +// linked list. +// We assume cache lock is already acquired. +func (cache *schedulerCache) removeNodeInfoFromList(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.Errorf("No NodeInfo with name %v found in the cache", name) + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + // if the removed item was at the head, we must update the head. + if ni == cache.headNode { + cache.headNode = ni.next + } + delete(cache.nodes, name) +} + +// Snapshot takes a snapshot of the current scheduler cache. This is used for +// debugging purposes only and shouldn't be confused with UpdateNodeInfoSnapshot +// function. +// This method is expensive, and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.RLock() defer cache.mu.RUnlock() nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes)) for k, v := range cache.nodes { - nodes[k] = v.Clone() + nodes[k] = v.info.Clone() } assumedPods := make(map[string]bool, len(cache.assumedPods)) @@ -124,22 +203,43 @@ func (cache *schedulerCache) Snapshot() *Snapshot { } } -func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) error { +// UpdateNodeInfoSnapshot takes a snapshot of cached NodeInfo map. This is called at +// beginning of every scheduling cycle. +// This function tracks generation number of NodeInfo and updates only the +// entries of an existing snapshot that have changed after the snapshot was taken. +func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error { cache.mu.Lock() defer cache.mu.Unlock() + balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) - for name, info := range cache.nodes { - if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { - // Transient scheduler info is reset here. - info.TransientInfo.ResetTransientSchedulerInfo() + // Get the last generation of the the snapshot. + snapshotGeneration := nodeSnapshot.Generation + + // Start from the head of the NodeInfo doubly linked list and update snapshot + // of NodeInfos updated after the last snapshot. + for node := cache.headNode; node != nil; node = node.next { + if node.info.GetGeneration() <= snapshotGeneration { + // all the nodes are updated before the existing snapshot. We are done. + break } - if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() { - nodeNameToInfo[name] = info.Clone() + if balancedVolumesEnabled && node.info.TransientInfo != nil { + // Transient scheduler info is reset here. + node.info.TransientInfo.ResetTransientSchedulerInfo() + } + if np := node.info.Node(); np != nil { + nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } - for name := range nodeNameToInfo { - if _, ok := cache.nodes[name]; !ok { - delete(nodeNameToInfo, name) + // Update the snapshot generation with the latest NodeInfo generation. + if cache.headNode != nil { + nodeSnapshot.Generation = cache.headNode.info.GetGeneration() + } + + if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { + for name := range nodeSnapshot.NodeInfoMap { + if _, ok := cache.nodes[name]; !ok { + delete(nodeSnapshot.NodeInfoMap, name) + } } } return nil @@ -157,12 +257,12 @@ func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selecto // can avoid expensive array growth without wasting too much memory by // pre-allocating capacity. maxSize := 0 - for _, info := range cache.nodes { - maxSize += len(info.Pods()) + for _, n := range cache.nodes { + maxSize += len(n.info.Pods()) } pods := make([]*v1.Pod, 0, maxSize) - for _, info := range cache.nodes { - for _, pod := range info.Pods() { + for _, n := range cache.nodes { + for _, pod := range n.info.Pods() { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, pod) } @@ -249,10 +349,11 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { func (cache *schedulerCache) addPod(pod *v1.Pod) { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[pod.Spec.NodeName] = n } - n.AddPod(pod) + n.info.AddPod(pod) + cache.moveNodeInfoToHead(pod.Spec.NodeName) } // Assumes that lock is already acquired. @@ -266,12 +367,17 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { // Assumes that lock is already acquired. func (cache *schedulerCache) removePod(pod *v1.Pod) error { - n := cache.nodes[pod.Spec.NodeName] - if err := n.RemovePod(pod); err != nil { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + return fmt.Errorf("node %v is not found", pod.Spec.NodeName) + } + if err := n.info.RemovePod(pod); err != nil { return err } - if len(n.Pods()) == 0 && n.Node() == nil { - delete(cache.nodes, pod.Spec.NodeName) + if len(n.info.Pods()) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(pod.Spec.NodeName) + } else { + cache.moveNodeInfoToHead(pod.Spec.NodeName) } return nil } @@ -407,15 +513,16 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { n, ok := cache.nodes[node.Name] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[node.Name] = n } else { - cache.removeNodeImageStates(n.Node()) + cache.removeNodeImageStates(n.info.Node()) } + cache.moveNodeInfoToHead(node.Name) cache.nodeTree.AddNode(node) - cache.addNodeImageStates(node, n) - return n.SetNode(node) + cache.addNodeImageStates(node, n.info) + return n.info.SetNode(node) } func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { @@ -424,31 +531,37 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { n, ok := cache.nodes[newNode.Name] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[newNode.Name] = n } else { - cache.removeNodeImageStates(n.Node()) + cache.removeNodeImageStates(n.info.Node()) } + cache.moveNodeInfoToHead(newNode.Name) cache.nodeTree.UpdateNode(oldNode, newNode) - cache.addNodeImageStates(newNode, n) - return n.SetNode(newNode) + cache.addNodeImageStates(newNode, n.info) + return n.info.SetNode(newNode) } func (cache *schedulerCache) RemoveNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() - n := cache.nodes[node.Name] - if err := n.RemoveNode(node); err != nil { + n, ok := cache.nodes[node.Name] + if !ok { + return fmt.Errorf("node %v is not found", node.Name) + } + if err := n.info.RemoveNode(node); err != nil { return err } // We remove NodeInfo for this node only if there aren't any pods on this node. // We can't do it unconditionally, because notifications about pods are delivered // in a different watch, and thus can potentially be observed later, even though // they happened before node removal. - if len(n.Pods()) == 0 && n.Node() == nil { - delete(cache.nodes, node.Name) + if len(n.info.Pods()) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(node.Name) + } else { + cache.moveNodeInfoToHead(node.Name) } cache.nodeTree.RemoveNode(node) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index e7f15291c65..e3eb74163e2 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1132,7 +1132,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.UpdateNodeNameToInfoMap(cachedNodes) + cache.SnapshotNodeInfo(cachedNodes) } } diff --git a/pkg/scheduler/internal/cache/fake/BUILD b/pkg/scheduler/internal/cache/fake/BUILD index 06c1d501a8a..caed79bb710 100644 --- a/pkg/scheduler/internal/cache/fake/BUILD +++ b/pkg/scheduler/internal/cache/fake/BUILD @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 05873d35cb7..fe814d8766e 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // Cache is used for testing @@ -75,8 +74,8 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil } // RemoveNode is a fake method for testing. func (c *Cache) RemoveNode(node *v1.Node) error { return nil } -// UpdateNodeNameToInfoMap is a fake method for testing. -func (c *Cache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error { +// UpdateNodeInfoSnapshot is a fake method for testing. +func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulerinternalcache.NodeInfoSnapshot) error { return nil } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 204659d8625..699818b1e6e 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -95,10 +95,10 @@ type Cache interface { // RemoveNode removes overall information about node. RemoveNode(node *v1.Node) error - // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. + // UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. - UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error + UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*v1.Pod, error) @@ -118,3 +118,11 @@ type Snapshot struct { AssumedPods map[string]bool Nodes map[string]*schedulernodeinfo.NodeInfo } + +// NodeInfoSnapshot is a snapshot of cache NodeInfo. The scheduler takes a +// snapshot at the beginning of each scheduling cycle and uses it for its +// operations in that cycle. +type NodeInfoSnapshot struct { + NodeInfoMap map[string]*schedulernodeinfo.NodeInfo + Generation int64 +}