Merge pull request #86930 from alculquicondor/fix/node_info_list

Use Snapshot.NodeInfoList for listing operations
This commit is contained in:
Kubernetes Prow Robot 2020-01-08 14:46:03 -08:00 committed by GitHub
commit ca05f4c63b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 38 deletions

View File

@ -309,10 +309,10 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoList, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
@ -351,13 +351,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil
}
// processPreemptionWithExtenders processes preemption with extenders
@ -859,18 +853,14 @@ func (g *genericScheduler) selectNodesForPreemption(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
potentialNodes []*v1.Node,
potentialNodes []*schedulernodeinfo.NodeInfo,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
var resultLock sync.Mutex
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil {
return
}
nodeInfoCopy := g.nodeInfoSnapshot.NodeInfoMap[nodeName].Clone()
nodeInfoCopy := potentialNodes[i].Clone()
stateCopy := state.Clone()
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
if fits {
@ -879,7 +869,7 @@ func (g *genericScheduler) selectNodesForPreemption(
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}
nodeToVictims[potentialNodes[i]] = &victims
nodeToVictims[potentialNodes[i].Node()] = &victims
resultLock.Unlock()
}
}
@ -1033,16 +1023,17 @@ func (g *genericScheduler) selectVictimsOnNode(
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, fitErr *FitError) []*v1.Node {
var potentialNodes []*v1.Node
for name, node := range nodeNameToInfo {
func nodesWherePreemptionMightHelp(nodes []*schedulernodeinfo.NodeInfo, fitErr *FitError) []*schedulernodeinfo.NodeInfo {
var potentialNodes []*schedulernodeinfo.NodeInfo
for _, node := range nodes {
name := node.Node().Name
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
// to determine whether preemption may help or not on the node.
if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
continue
}
klog.V(3).Infof("Node %v is a potential node for preemption.", name)
potentialNodes = append(potentialNodes, node.Node())
potentialNodes = append(potentialNodes, node)
}
return potentialNodes
}

View File

@ -27,10 +27,6 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -48,9 +44,12 @@ import (
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -1599,18 +1598,14 @@ func TestSelectNodesForPreemption(t *testing.T) {
assignDefaultStartTime(test.pods)
// newnode simulate a case that a new node is added to the cluster, but nodeNameToInfo
// doesn't have it yet.
newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5)
newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"}
nodes = append(nodes, newnode)
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
}
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil)
nodeInfos := nodesToNodeInfos(nodes, snapshot)
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
if err != nil {
t.Error(err)
}
@ -1839,8 +1834,9 @@ func TestPickOneNodeForPreemption(t *testing.T) {
}
assignDefaultStartTime(test.pods)
nodeInfos := nodesToNodeInfos(nodes, snapshot)
state := framework.NewCycleState()
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil)
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
@ -1964,13 +1960,20 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) {
fitErr := FitError{
FilteredNodesStatuses: test.nodesStatuses,
}
nodes := nodesWherePreemptionMightHelp(nodeinfosnapshot.CreateNodeInfoMap(nil, makeNodeList(nodeNames)), &fitErr)
var nodeInfos []*schedulernodeinfo.NodeInfo
for _, n := range makeNodeList(nodeNames) {
ni := schedulernodeinfo.NewNodeInfo()
ni.SetNode(n)
nodeInfos = append(nodeInfos, ni)
}
nodes := nodesWherePreemptionMightHelp(nodeInfos, &fitErr)
if len(test.expected) != len(nodes) {
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
}
for _, node := range nodes {
if _, found := test.expected[node.Name]; !found {
t.Errorf("node %v is not expected.", node.Name)
name := node.Node().Name
if _, found := test.expected[name]; !found {
t.Errorf("node %v is not expected.", name)
}
}
})
@ -2464,3 +2467,11 @@ func TestFairEvaluationForNodes(t *testing.T) {
}
}
}
func nodesToNodeInfos(nodes []*v1.Node, snapshot *nodeinfosnapshot.Snapshot) []*schedulernodeinfo.NodeInfo {
var nodeInfos []*schedulernodeinfo.NodeInfo
for _, n := range nodes {
nodeInfos = append(nodeInfos, snapshot.NodeInfoMap[n.Name])
}
return nodeInfos
}

View File

@ -134,7 +134,7 @@ func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister {
// ListNodes returns the list of nodes in the snapshot.
func (s *Snapshot) ListNodes() []*v1.Node {
nodes := make([]*v1.Node, 0, len(s.NodeInfoMap))
nodes := make([]*v1.Node, 0, len(s.NodeInfoList))
for _, n := range s.NodeInfoList {
if n.Node() != nil {
nodes = append(nodes, n.Node())
@ -159,11 +159,11 @@ func (p *podLister) FilteredList(podFilter schedulerlisters.PodFilter, selector
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
maxSize := 0
for _, n := range p.snapshot.NodeInfoMap {
for _, n := range p.snapshot.NodeInfoList {
maxSize += len(n.Pods())
}
pods := make([]*v1.Pod, 0, maxSize)
for _, n := range p.snapshot.NodeInfoMap {
for _, n := range p.snapshot.NodeInfoList {
for _, pod := range n.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
@ -189,7 +189,7 @@ func (n *nodeInfoLister) HavePodsWithAffinityList() ([]*schedulernodeinfo.NodeIn
// Returns the NodeInfo of the given node name.
func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) {
if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok {
if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok && v.Node() != nil {
return v, nil
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)