diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 69a332a12a1..7490b71562b 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -178,7 +178,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } trace.Step("Snapshotting scheduler cache and node infos done") - if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { + if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } @@ -190,7 +190,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS trace.Step("Running prefilter plugins done") startPredicateEvalTime := time.Now() - filteredNodes, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) + filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod) if err != nil { return result, err } @@ -205,7 +205,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, - NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), + NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } @@ -294,19 +294,20 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } - if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, nil, nil, err + } + if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } - potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoList, fitError) + potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } - var ( - pdbs []*policy.PodDisruptionBudget - err error - ) + var pdbs []*policy.PodDisruptionBudget if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { @@ -424,98 +425,116 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i return numNodes } -// Filters the nodes to find the ones that fit based on the given predicate functions -// Each node is passed through the predicate functions to determine if it is a fit -func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { - var filtered []*v1.Node - filteredNodesStatuses := framework.NodeToStatusMap{} - - if !g.framework.HasFilterPlugins() { - filtered = g.nodeInfoSnapshot.ListNodes() - } else { - allNodes := len(g.nodeInfoSnapshot.NodeInfoList) - numNodesToFind := g.numFeasibleNodesToFind(int32(allNodes)) - - // Create filtered list with enough space to avoid growing it - // and allow assigning. - filtered = make([]*v1.Node, numNodesToFind) - errCh := util.NewErrorChannel() - var ( - predicateResultLock sync.Mutex - filteredLen int32 - ) - - ctx, cancel := context.WithCancel(ctx) - checkNode := func(i int) { - // We check the nodes starting from where we left off in the previous scheduling cycle, - // this is to make sure all nodes have the same chance of being examined across pods. - nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes] - fits, status, err := g.podFitsOnNode(ctx, state, pod, nodeInfo) - if err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } - if fits { - length := atomic.AddInt32(&filteredLen, 1) - if length > numNodesToFind { - cancel() - atomic.AddInt32(&filteredLen, -1) - } else { - filtered[length-1] = nodeInfo.Node() - } - } else { - predicateResultLock.Lock() - if !status.IsSuccess() { - filteredNodesStatuses[nodeInfo.Node().Name] = status - } - predicateResultLock.Unlock() - } - } - - // Stops searching for more nodes once the configured number of feasible nodes - // are found. - workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) - processedNodes := int(filteredLen) + len(filteredNodesStatuses) - g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % allNodes - - filtered = filtered[:filteredLen] - if err := errCh.ReceiveError(); err != nil { - return []*v1.Node{}, framework.NodeToStatusMap{}, err - } +// Filters the nodes to find the ones that fit the pod based on the framework +// filter plugins and filter extenders. +func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { + filteredNodesStatuses := make(framework.NodeToStatusMap) + filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses) + if err != nil { + return nil, nil, err } - if len(filtered) > 0 && len(g.extenders) != 0 { - for _, extender := range g.extenders { - if !extender.IsInterested(pod) { - continue - } - filteredList, failedMap, err := extender.Filter(pod, filtered) - if err != nil { - if extender.IsIgnorable() { - klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", - extender, err) - continue - } - - return []*v1.Node{}, framework.NodeToStatusMap{}, err - } - - for failedNodeName, failedMsg := range failedMap { - if _, found := filteredNodesStatuses[failedNodeName]; !found { - filteredNodesStatuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg) - } else { - filteredNodesStatuses[failedNodeName].AppendReason(failedMsg) - } - } - filtered = filteredList - if len(filtered) == 0 { - break - } - } + filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses) + if err != nil { + return nil, nil, err } return filtered, filteredNodesStatuses, nil } +// findNodesThatPassFilters finds the nodes that fit the filter plugins. +func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, err + } + + numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) + + // Create filtered list with enough space to avoid growing it + // and allow assigning. + filtered := make([]*v1.Node, numNodesToFind) + + if !g.framework.HasFilterPlugins() { + for i := range filtered { + filtered[i] = allNodes[i].Node() + } + g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % len(allNodes) + return filtered, nil + } + + errCh := util.NewErrorChannel() + var statusesLock sync.Mutex + var filteredLen int32 + ctx, cancel := context.WithCancel(ctx) + checkNode := func(i int) { + // We check the nodes starting from where we left off in the previous scheduling cycle, + // this is to make sure all nodes have the same chance of being examined across pods. + nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] + fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo) + if err != nil { + errCh.SendErrorWithCancel(err, cancel) + return + } + if fits { + length := atomic.AddInt32(&filteredLen, 1) + if length > numNodesToFind { + cancel() + atomic.AddInt32(&filteredLen, -1) + } else { + filtered[length-1] = nodeInfo.Node() + } + } else { + statusesLock.Lock() + if !status.IsSuccess() { + statuses[nodeInfo.Node().Name] = status + } + statusesLock.Unlock() + } + } + + // Stops searching for more nodes once the configured number of feasible nodes + // are found. + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode) + processedNodes := int(filteredLen) + len(statuses) + g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) + + filtered = filtered[:filteredLen] + if err := errCh.ReceiveError(); err != nil { + return nil, err + } + return filtered, nil +} + +func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { + for _, extender := range g.extenders { + if len(filtered) == 0 { + break + } + if !extender.IsInterested(pod) { + continue + } + filteredList, failedMap, err := extender.Filter(pod, filtered) + if err != nil { + if extender.IsIgnorable() { + klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", + extender, err) + continue + } + return nil, err + } + + for failedNodeName, failedMsg := range failedMap { + if _, found := statuses[failedNodeName]; !found { + statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg) + } else { + statuses[failedNodeName].AppendReason(failedMsg) + } + } + filtered = filteredList + } + return filtered, nil +} + // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // 3) augmented nodeInfo. @@ -545,17 +564,17 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st return podsAdded, stateOut, nodeInfoOut, nil } -// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. -// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached -// predicate results as possible. +// podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the +// filter plugins. // This function is called from two different places: Schedule and Preempt. -// When it is called from Schedule, we want to test whether the pod is schedulable -// on the node with all the existing pods on the node plus higher and equal priority -// pods nominated to run on the node. -// When it is called from Preempt, we should remove the victims of preemption and -// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode(). -// It removes victims from PreFilter state and NodeInfo before calling this function. -func (g *genericScheduler) podFitsOnNode( +// When it is called from Schedule, we want to test whether the pod is +// schedulable on the node with all the existing pods on the node plus higher +// and equal priority pods nominated to run on the node. +// When it is called from Preempt, we should remove the victims of preemption +// and add the nominated pods. Removal of the victims is done by +// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and +// NodeInfo before calling this function. +func (g *genericScheduler) podPassesFiltersOnNode( ctx context.Context, state *framework.CycleState, pod *v1.Pod, @@ -564,21 +583,21 @@ func (g *genericScheduler) podFitsOnNode( var status *framework.Status podsAdded := false - // We run predicates twice in some cases. If the node has greater or equal priority + // We run filters twice in some cases. If the node has greater or equal priority // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. - // If all predicates succeed in this pass, we run them again when these + // If all filters succeed in this pass, we run them again when these // nominated pods are not added. This second pass is necessary because some - // predicates such as inter-pod affinity may not pass without the nominated pods. + // filters such as inter-pod affinity may not pass without the nominated pods. // If there are no nominated pods for the node or if the first run of the - // predicates fail, we don't run the second pass. + // filters fail, we don't run the second pass. // We consider only equal or higher priority pods in the first pass, because // those are the current "pod" must yield to them and not take a space opened // for running them. It is ok if the current "pod" take resources freed for // lower priority pods. // Requiring that the new pod is schedulable in both circumstances ensures that - // we are making a conservative decision: predicates like resources and inter-pod + // we are making a conservative decision: filters like resources and inter-pod // anti-affinity are more likely to fail when the nominated pods are treated - // as running, while predicates like pod affinity are more likely to fail when + // as running, while filters like pod affinity are more likely to fail when // the nominated pods are treated as not running. We can't just assume the // nominated pods are running because they are not running right now and in fact, // they may end up getting scheduled to a different node. @@ -648,7 +667,7 @@ func (g *genericScheduler) prioritizeNodes( if len(g.extenders) != 0 && nodes != nil { var mu sync.Mutex var wg sync.WaitGroup - combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) + combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if !g.extenders[i].IsInterested(pod) { continue @@ -959,7 +978,7 @@ func (g *genericScheduler) selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, err := g.podFitsOnNode(ctx, state, pod, nodeInfo); !fits { + if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -977,7 +996,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _ := g.podFitsOnNode(ctx, state, pod, nodeInfo) + fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo) if !fits { if err := removePod(p); err != nil { return false, err diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 8437e8ef22c..791fc84bcf3 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -845,7 +845,7 @@ func TestFindFitAllError(t *testing.T) { st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), ) - _, nodeToStatusMap, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), &v1.Pod{}) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -878,7 +878,7 @@ func TestFindFitSomeError(t *testing.T) { ) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, nodeToStatusMap, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), pod) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -957,7 +957,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot) queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") - _, _, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), test.pod) + _, _, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1131,7 +1131,7 @@ func TestZeroRequest(t *testing.T) { ctx := context.Background() state := framework.NewCycleState() - _, filteredNodesStatuses, err := scheduler.findNodesThatFit(ctx, state, test.pod) + _, filteredNodesStatuses, err := scheduler.findNodesThatFitPod(ctx, state, test.pod) if err != nil { t.Fatalf("error filtering nodes: %+v", err) } @@ -2427,7 +2427,7 @@ func TestFairEvaluationForNodes(t *testing.T) { // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := g.findNodesThatFit(context.Background(), framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/nodeinfo/snapshot/snapshot.go b/pkg/scheduler/nodeinfo/snapshot/snapshot.go index bb6eee6b676..94e2d50c897 100644 --- a/pkg/scheduler/nodeinfo/snapshot/snapshot.go +++ b/pkg/scheduler/nodeinfo/snapshot/snapshot.go @@ -132,15 +132,9 @@ func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister { return &nodeInfoLister{snapshot: s} } -// ListNodes returns the list of nodes in the snapshot. -func (s *Snapshot) ListNodes() []*v1.Node { - nodes := make([]*v1.Node, 0, len(s.NodeInfoList)) - for _, n := range s.NodeInfoList { - if n.Node() != nil { - nodes = append(nodes, n.Node()) - } - } - return nodes +// NumNodes returns the number of nodes in the snapshot. +func (s *Snapshot) NumNodes() int { + return len(s.NodeInfoList) } type podLister struct {