diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 6649584386f..79b2c6e4028 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -309,10 +309,10 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } - if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 { + if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { return nil, nil, nil, ErrNoNodesAvailable } - potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError) + potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoList, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. @@ -351,13 +351,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) - if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { - return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil - } - - return nil, nil, nil, fmt.Errorf( - "preemption failed: the target node %s has been deleted from scheduler cache", - candidateNode.Name) + return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil } // processPreemptionWithExtenders processes preemption with extenders @@ -859,18 +853,14 @@ func (g *genericScheduler) selectNodesForPreemption( ctx context.Context, state *framework.CycleState, pod *v1.Pod, - potentialNodes []*v1.Node, + potentialNodes []*schedulernodeinfo.NodeInfo, pdbs []*policy.PodDisruptionBudget, ) (map[*v1.Node]*extenderv1.Victims, error) { nodeToVictims := map[*v1.Node]*extenderv1.Victims{} var resultLock sync.Mutex checkNode := func(i int) { - nodeName := potentialNodes[i].Name - if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil { - return - } - nodeInfoCopy := g.nodeInfoSnapshot.NodeInfoMap[nodeName].Clone() + nodeInfoCopy := potentialNodes[i].Clone() stateCopy := state.Clone() pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs) if fits { @@ -879,7 +869,7 @@ func (g *genericScheduler) selectNodesForPreemption( Pods: pods, NumPDBViolations: int64(numPDBViolations), } - nodeToVictims[potentialNodes[i]] = &victims + nodeToVictims[potentialNodes[i].Node()] = &victims resultLock.Unlock() } } @@ -1033,16 +1023,17 @@ func (g *genericScheduler) selectVictimsOnNode( // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates // that may be satisfied by removing pods from the node. -func nodesWherePreemptionMightHelp(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, fitErr *FitError) []*v1.Node { - var potentialNodes []*v1.Node - for name, node := range nodeNameToInfo { +func nodesWherePreemptionMightHelp(nodes []*schedulernodeinfo.NodeInfo, fitErr *FitError) []*schedulernodeinfo.NodeInfo { + var potentialNodes []*schedulernodeinfo.NodeInfo + for _, node := range nodes { + name := node.Node().Name // We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable' // to determine whether preemption may help or not on the node. if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable { continue } klog.V(3).Infof("Node %v is a potential node for preemption.", name) - potentialNodes = append(potentialNodes, node.Node()) + potentialNodes = append(potentialNodes, node) } return potentialNodes } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 1c66f90db19..1135d49e7fa 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -27,10 +27,6 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,9 +44,12 @@ import ( extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -1599,18 +1598,14 @@ func TestSelectNodesForPreemption(t *testing.T) { assignDefaultStartTime(test.pods) - // newnode simulate a case that a new node is added to the cluster, but nodeNameToInfo - // doesn't have it yet. - newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5) - newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"} - nodes = append(nodes, newnode) state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus) } - nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil) + nodeInfos := nodesToNodeInfos(nodes, snapshot) + nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil) if err != nil { t.Error(err) } @@ -1839,8 +1834,9 @@ func TestPickOneNodeForPreemption(t *testing.T) { } assignDefaultStartTime(test.pods) + nodeInfos := nodesToNodeInfos(nodes, snapshot) state := framework.NewCycleState() - candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil) + candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil) node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { @@ -1964,13 +1960,20 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) { fitErr := FitError{ FilteredNodesStatuses: test.nodesStatuses, } - nodes := nodesWherePreemptionMightHelp(nodeinfosnapshot.CreateNodeInfoMap(nil, makeNodeList(nodeNames)), &fitErr) + var nodeInfos []*schedulernodeinfo.NodeInfo + for _, n := range makeNodeList(nodeNames) { + ni := schedulernodeinfo.NewNodeInfo() + ni.SetNode(n) + nodeInfos = append(nodeInfos, ni) + } + nodes := nodesWherePreemptionMightHelp(nodeInfos, &fitErr) if len(test.expected) != len(nodes) { t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes) } for _, node := range nodes { - if _, found := test.expected[node.Name]; !found { - t.Errorf("node %v is not expected.", node.Name) + name := node.Node().Name + if _, found := test.expected[name]; !found { + t.Errorf("node %v is not expected.", name) } } }) @@ -2464,3 +2467,11 @@ func TestFairEvaluationForNodes(t *testing.T) { } } } + +func nodesToNodeInfos(nodes []*v1.Node, snapshot *nodeinfosnapshot.Snapshot) []*schedulernodeinfo.NodeInfo { + var nodeInfos []*schedulernodeinfo.NodeInfo + for _, n := range nodes { + nodeInfos = append(nodeInfos, snapshot.NodeInfoMap[n.Name]) + } + return nodeInfos +} diff --git a/pkg/scheduler/nodeinfo/snapshot/snapshot.go b/pkg/scheduler/nodeinfo/snapshot/snapshot.go index 46134a76f32..bb6eee6b676 100644 --- a/pkg/scheduler/nodeinfo/snapshot/snapshot.go +++ b/pkg/scheduler/nodeinfo/snapshot/snapshot.go @@ -134,7 +134,7 @@ func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister { // ListNodes returns the list of nodes in the snapshot. func (s *Snapshot) ListNodes() []*v1.Node { - nodes := make([]*v1.Node, 0, len(s.NodeInfoMap)) + nodes := make([]*v1.Node, 0, len(s.NodeInfoList)) for _, n := range s.NodeInfoList { if n.Node() != nil { nodes = append(nodes, n.Node()) @@ -159,11 +159,11 @@ func (p *podLister) FilteredList(podFilter schedulerlisters.PodFilter, selector // can avoid expensive array growth without wasting too much memory by // pre-allocating capacity. maxSize := 0 - for _, n := range p.snapshot.NodeInfoMap { + for _, n := range p.snapshot.NodeInfoList { maxSize += len(n.Pods()) } pods := make([]*v1.Pod, 0, maxSize) - for _, n := range p.snapshot.NodeInfoMap { + for _, n := range p.snapshot.NodeInfoList { for _, pod := range n.Pods() { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, pod) @@ -189,7 +189,7 @@ func (n *nodeInfoLister) HavePodsWithAffinityList() ([]*schedulernodeinfo.NodeIn // Returns the NodeInfo of the given node name. func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) { - if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok { + if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok && v.Node() != nil { return v, nil } return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)