From 3af26bae2c368e0b5a78802412231f3d1ecb51ff Mon Sep 17 00:00:00 2001 From: Dave Chen Date: Fri, 30 Jul 2021 18:23:15 +0800 Subject: [PATCH] Refactor defaultpreemption for out-of-tree plugins Signed-off-by: Dave Chen --- .../plugins/defaultpreemption/candidate.go | 45 -- .../defaultpreemption/default_preemption.go | 607 ++---------------- .../default_preemption_test.go | 316 ++++----- .../framework/preemption/preemption.go | 574 +++++++++++++++++ .../framework/preemption/preemption_test.go | 334 ++++++++++ 5 files changed, 1083 insertions(+), 793 deletions(-) delete mode 100644 pkg/scheduler/framework/plugins/defaultpreemption/candidate.go create mode 100644 pkg/scheduler/framework/preemption/preemption.go create mode 100644 pkg/scheduler/framework/preemption/preemption_test.go diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/candidate.go b/pkg/scheduler/framework/plugins/defaultpreemption/candidate.go deleted file mode 100644 index 6bd86cf22e3..00000000000 --- a/pkg/scheduler/framework/plugins/defaultpreemption/candidate.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package defaultpreemption - -import ( - extenderv1 "k8s.io/kube-scheduler/extender/v1" -) - -// Candidate represents a nominated node on which the preemptor can be scheduled, -// along with the list of victims that should be evicted for the preemptor to fit the node. -type Candidate interface { - // Victims wraps a list of to-be-preempted Pods and the number of PDB violation. - Victims() *extenderv1.Victims - // Name returns the target node name where the preemptor gets nominated to run. - Name() string -} - -type candidate struct { - victims *extenderv1.Victims - name string -} - -// Victims returns s.victims. -func (s *candidate) Victims() *extenderv1.Victims { - return s.victims -} - -// Name returns s.name. -func (s *candidate) Name() string { - return s.name -} diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 61815121c20..f96b0d1acc4 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -18,15 +18,9 @@ package defaultpreemption import ( "context" - "errors" "fmt" - "math" "math/rand" "sort" - "sync" - "sync/atomic" - - "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" @@ -34,16 +28,17 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/framework/preemption" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -92,85 +87,15 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy metrics.PreemptionAttempts.Inc() }() - nnn, status := pl.preempt(ctx, state, pod, m) - if !status.IsSuccess() { - return nil, status + pe := preemption.Evaluator{ + PluginName: names.DefaultPreemption, + Handler: pl.fh, + PodLister: pl.podLister, + PdbLister: pl.pdbLister, + State: state, + Interface: pl, } - // This happens when the pod is not eligible for preemption or extenders filtered all candidates. - if nnn == "" { - return nil, framework.NewStatus(framework.Unschedulable) - } - return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success) -} - -// preempt finds nodes with pods that can be preempted to make room for "pod" to -// schedule. It chooses one of the nodes and preempts the pods on the node and -// returns 1) the node name which is picked up for preemption, 2) any possible error. -// preempt does not update its snapshot. It uses the same snapshot used in the -// scheduling cycle. This is to avoid a scenario where preempt finds feasible -// nodes without preempting any pod. When there are many pending pods in the -// scheduling queue a nominated pod will go back to the queue and behind -// other pods with the same priority. The nominated pod prevents other pods from -// using the nominated resources and the nominated pod could take a long time -// before it is retried after many other pending pods. -func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) { - cs := pl.fh.ClientSet() - nodeLister := pl.fh.SnapshotSharedLister().NodeInfos() - - // 0) Fetch the latest version of . - // It's safe to directly fetch pod here. Because the informer cache has already been - // initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc(). - // However, tests may need to manually initialize the shared pod informer. - podNamespace, podName := pod.Namespace, pod.Name - pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name) - if err != nil { - klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName)) - return "", framework.AsStatus(err) - } - - // 1) Ensure the preemptor is eligible to preempt other pods. - if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) { - klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod)) - return "", nil - } - - // 2) Find all preemption candidates. - candidates, nodeToStatusMap, status := pl.FindCandidates(ctx, state, pod, m) - if !status.IsSuccess() { - return "", status - } - - // Return a FitError only when there are no candidates that fit the pod. - if len(candidates) == 0 { - fitError := &framework.FitError{ - Pod: pod, - NumAllNodes: len(nodeToStatusMap), - Diagnosis: framework.Diagnosis{ - NodeToStatusMap: nodeToStatusMap, - // Leave FailedPlugins as nil as it won't be used on moving Pods. - }, - } - return "", framework.NewStatus(framework.Unschedulable, fitError.Error()) - } - - // 3) Interact with registered Extenders to filter out some candidates if needed. - candidates, status = CallExtenders(pl.fh.Extenders(), pod, nodeLister, candidates) - if !status.IsSuccess() { - return "", status - } - - // 4) Find the best candidate. - bestCandidate := SelectCandidate(candidates) - if bestCandidate == nil || len(bestCandidate.Name()) == 0 { - return "", nil - } - - // 5) Perform preparation work before nominating the selected candidate. - if status := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); !status.IsSuccess() { - return "", status - } - - return bestCandidate.Name(), nil + return pe.Preempt(ctx, pod, m) } // calculateNumCandidates returns the number of candidates the FindCandidates @@ -188,248 +113,15 @@ func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 { return n } -// getOffsetAndNumCandidates chooses a random offset and calculates the number +// GetOffsetAndNumCandidates chooses a random offset and calculates the number // of candidates that should be shortlisted for dry running preemption. -func (pl *DefaultPreemption) getOffsetAndNumCandidates(numNodes int32) (int32, int32) { +func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) { return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes) } -// FindCandidates calculates a slice of preemption candidates. -// Each candidate is executable to make the given schedulable. -func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) { - allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List() - if err != nil { - return nil, nil, framework.AsStatus(err) - } - if len(allNodes) == 0 { - return nil, nil, framework.NewStatus(framework.Error, "no nodes available") - } - potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) - if len(potentialNodes) == 0 { - klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) - // In this case, we should clean-up any existing nominated node name of the pod. - if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil { - klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) - // We do not return as this error is not critical. - } - return nil, unschedulableNodeStatus, nil - } - - pdbs, err := getPodDisruptionBudgets(pl.pdbLister) - if err != nil { - return nil, nil, framework.AsStatus(err) - } - - offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes))) - if klog.V(5).Enabled() { - var sample []string - for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ { - sample = append(sample, potentialNodes[i].Node().Name) - } - klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) - } - candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates) - for node, status := range unschedulableNodeStatus { - nodeStatuses[node] = status - } - return candidates, nodeStatuses, nil -} - -// PodEligibleToPreemptOthers determines whether this pod should be considered -// for preempting other pods or not. If this pod has already preempted other -// pods and those are in their graceful termination period, it shouldn't be -// considered for preemption. -// We look at the node that is nominated for this pod and as long as there are -// terminating pods on the node, we don't consider this for preempting more pods. -func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool { - if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { - klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod)) - return false - } - nomNodeName := pod.Status.NominatedNodeName - if len(nomNodeName) > 0 { - // If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters, - // then the pod should be considered for preempting again. - if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable { - return true - } - - if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { - podPriority := corev1helpers.PodPriority(pod) - for _, p := range nodeInfo.Pods { - if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority { - // There is a terminating pod on the nominated node. - return false - } - } - } - } - return true -} - -// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates -// that may be satisfied by removing pods from the node. -func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) { - var potentialNodes []*framework.NodeInfo - nodeStatuses := make(framework.NodeToStatusMap) - for _, node := range nodes { - name := node.Node().Name - // We rely on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable' - // to determine whether preemption may help or not on the node. - if m[name].Code() == framework.UnschedulableAndUnresolvable { - nodeStatuses[node.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling") - continue - } - potentialNodes = append(potentialNodes, node) - } - return potentialNodes, nodeStatuses -} - -type candidateList struct { - idx int32 - items []Candidate -} - -func newCandidateList(size int32) *candidateList { - return &candidateList{idx: -1, items: make([]Candidate, size)} -} - -// add adds a new candidate to the internal array atomically. -func (cl *candidateList) add(c *candidate) { - if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) { - cl.items[idx] = c - } -} - -// size returns the number of candidate stored. Note that some add() operations -// might still be executing when this is called, so care must be taken to -// ensure that all add() operations complete before accessing the elements of -// the list. -func (cl *candidateList) size() int32 { - n := atomic.LoadInt32(&cl.idx) + 1 - if n >= int32(len(cl.items)) { - n = int32(len(cl.items)) - } - return n -} - -// get returns the internal candidate array. This function is NOT atomic and -// assumes that all add() operations have been completed. -func (cl *candidateList) get() []Candidate { - return cl.items[:cl.size()] -} - -// dryRunPreemption simulates Preemption logic on in parallel, -// returns preemption candidates and a map indicating filtered nodes statuses. -// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of -// candidates, ones that do not violate PDB are preferred over ones that do. -func dryRunPreemption(ctx context.Context, fh framework.Handle, - state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo, - pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) { - nonViolatingCandidates := newCandidateList(numCandidates) - violatingCandidates := newCandidateList(numCandidates) - parallelCtx, cancel := context.WithCancel(ctx) - nodeStatuses := make(framework.NodeToStatusMap) - var statusesLock sync.Mutex - checkNode := func(i int) { - nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone() - stateCopy := state.Clone() - pods, numPDBViolations, status := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs) - if status.IsSuccess() && len(pods) != 0 { - victims := extenderv1.Victims{ - Pods: pods, - NumPDBViolations: int64(numPDBViolations), - } - c := &candidate{ - victims: &victims, - name: nodeInfoCopy.Node().Name, - } - if numPDBViolations == 0 { - nonViolatingCandidates.add(c) - } else { - violatingCandidates.add(c) - } - nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size() - if nvcSize > 0 && nvcSize+vcSize >= numCandidates { - cancel() - } - return - } - if status.IsSuccess() && len(pods) == 0 { - status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name)) - } - statusesLock.Lock() - nodeStatuses[nodeInfoCopy.Node().Name] = status - statusesLock.Unlock() - } - fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode) - return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses -} - -// CallExtenders calls given to select the list of feasible candidates. -// We will only check with extenders that support preemption. -// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated -// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. -func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister, - candidates []Candidate) ([]Candidate, *framework.Status) { - if len(extenders) == 0 { - return candidates, nil - } - - // Migrate candidate slice to victimsMap to adapt to the Extender interface. - // It's only applicable for candidate slice that have unique nominated node name. - victimsMap := candidatesToVictimsMap(candidates) - if len(victimsMap) == 0 { - return candidates, nil - } - for _, extender := range extenders { - if !extender.SupportsPreemption() || !extender.IsInterested(pod) { - continue - } - nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister) - if err != nil { - if extender.IsIgnorable() { - klog.InfoS("Skipping extender as it returned error and has ignorable flag set", - "extender", extender, "err", err) - continue - } - return nil, framework.AsStatus(err) - } - // Check if the returned victims are valid. - for nodeName, victims := range nodeNameToVictims { - if victims == nil || len(victims.Pods) == 0 { - if extender.IsIgnorable() { - delete(nodeNameToVictims, nodeName) - klog.InfoS("Ignoring node without victims", "node", nodeName) - continue - } - return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName)) - } - } - - // Replace victimsMap with new result after preemption. So the - // rest of extenders can continue use it as parameter. - victimsMap = nodeNameToVictims - - // If node list becomes empty, no preemption can happen regardless of other extenders. - if len(victimsMap) == 0 { - break - } - } - - var newCandidates []Candidate - for nodeName := range victimsMap { - newCandidates = append(newCandidates, &candidate{ - victims: victimsMap[nodeName], - name: nodeName, - }) - } - return newCandidates, nil -} - // This function is not applicable for out-of-tree preemption plugins that exercise // different preemption candidates on the same nominated node. -func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims { +func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims { m := make(map[string]*extenderv1.Victims) for _, c := range candidates { m[c.Name()] = c.Victims() @@ -437,193 +129,20 @@ func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victi return m } -// SelectCandidate chooses the best-fit candidate from given and return it. -func SelectCandidate(candidates []Candidate) Candidate { - if len(candidates) == 0 { - return nil - } - if len(candidates) == 1 { - return candidates[0] - } - - victimsMap := candidatesToVictimsMap(candidates) - candidateNode := pickOneNodeForPreemption(victimsMap) - - // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree - // preemption plugins that exercise different candidates on the same nominated node. - if victims := victimsMap[candidateNode]; victims != nil { - return &candidate{ - victims: victims, - name: candidateNode, - } - } - - // We shouldn't reach here. - klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates) - // To not break the whole flow, return the first candidate. - return candidates[0] -} - -// pickOneNodeForPreemption chooses one node among the given nodes. It assumes -// pods in each map entry are ordered by decreasing priority. -// It picks a node based on the following criteria: -// 1. A node with minimum number of PDB violations. -// 2. A node with minimum highest priority victim is picked. -// 3. Ties are broken by sum of priorities of all victims. -// 4. If there are still ties, node with the minimum number of victims is picked. -// 5. If there are still ties, node with the latest start time of all highest priority victims is picked. -// 6. If there are still ties, the first such node is picked (sort of randomly). -// The 'minNodes1' and 'minNodes2' are being reused here to save the memory -// allocation and garbage collection time. -func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string { - if len(nodesToVictims) == 0 { - return "" - } - minNumPDBViolatingPods := int64(math.MaxInt32) - var minNodes1 []string - lenNodes1 := 0 - for node, victims := range nodesToVictims { - numPDBViolatingPods := victims.NumPDBViolations - if numPDBViolatingPods < minNumPDBViolatingPods { - minNumPDBViolatingPods = numPDBViolatingPods - minNodes1 = nil - lenNodes1 = 0 - } - if numPDBViolatingPods == minNumPDBViolatingPods { - minNodes1 = append(minNodes1, node) - lenNodes1++ - } - } - if lenNodes1 == 1 { - return minNodes1[0] - } - - // There are more than one node with minimum number PDB violating pods. Find - // the one with minimum highest priority victim. - minHighestPriority := int32(math.MaxInt32) - var minNodes2 = make([]string, lenNodes1) - lenNodes2 := 0 - for i := 0; i < lenNodes1; i++ { - node := minNodes1[i] - victims := nodesToVictims[node] - // highestPodPriority is the highest priority among the victims on this node. - highestPodPriority := corev1helpers.PodPriority(victims.Pods[0]) - if highestPodPriority < minHighestPriority { - minHighestPriority = highestPodPriority - lenNodes2 = 0 - } - if highestPodPriority == minHighestPriority { - minNodes2[lenNodes2] = node - lenNodes2++ - } - } - if lenNodes2 == 1 { - return minNodes2[0] - } - - // There are a few nodes with minimum highest priority victim. Find the - // smallest sum of priorities. - minSumPriorities := int64(math.MaxInt64) - lenNodes1 = 0 - for i := 0; i < lenNodes2; i++ { - var sumPriorities int64 - node := minNodes2[i] - for _, pod := range nodesToVictims[node].Pods { - // We add MaxInt32+1 to all priorities to make all of them >= 0. This is - // needed so that a node with a few pods with negative priority is not - // picked over a node with a smaller number of pods with the same negative - // priority (and similar scenarios). - sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1) - } - if sumPriorities < minSumPriorities { - minSumPriorities = sumPriorities - lenNodes1 = 0 - } - if sumPriorities == minSumPriorities { - minNodes1[lenNodes1] = node - lenNodes1++ - } - } - if lenNodes1 == 1 { - return minNodes1[0] - } - - // There are a few nodes with minimum highest priority victim and sum of priorities. - // Find one with the minimum number of pods. - minNumPods := math.MaxInt32 - lenNodes2 = 0 - for i := 0; i < lenNodes1; i++ { - node := minNodes1[i] - numPods := len(nodesToVictims[node].Pods) - if numPods < minNumPods { - minNumPods = numPods - lenNodes2 = 0 - } - if numPods == minNumPods { - minNodes2[lenNodes2] = node - lenNodes2++ - } - } - if lenNodes2 == 1 { - return minNodes2[0] - } - - // There are a few nodes with same number of pods. - // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node)) - latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) - if latestStartTime == nil { - // If the earliest start time of all pods on the 1st node is nil, just return it, - // which is not expected to happen. - klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", minNodes2[0]) - return minNodes2[0] - } - nodeToReturn := minNodes2[0] - for i := 1; i < lenNodes2; i++ { - node := minNodes2[i] - // Get earliest start time of all pods on the current node. - earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) - if earliestStartTimeOnNode == nil { - klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", node) - continue - } - if earliestStartTimeOnNode.After(latestStartTime.Time) { - latestStartTime = earliestStartTimeOnNode - nodeToReturn = node - } - } - - return nodeToReturn -} - -// selectVictimsOnNode finds minimum set of pods on the given node that should -// be preempted in order to make enough room for "pod" to be scheduled. The -// minimum set selected is subject to the constraint that a higher-priority pod -// is never preempted when a lower-priority pod could be (higher/lower relative -// to one another, not relative to the preemptor "pod"). -// The algorithm first checks if the pod can be scheduled on the node when all the -// lower priority pods are gone. If so, it sorts all the lower priority pods by -// their priority and then puts them into two groups of those whose PodDisruptionBudget -// will be violated if preempted and other non-violating pods. Both groups are -// sorted by priority. It first tries to reprieve as many PDB violating pods as -// possible and then does them same for non-PDB-violating pods while checking -// that the "pod" can still fit on the node. -// NOTE: This function assumes that it is never called if "pod" cannot be scheduled -// due to pod affinity, node affinity, or node anti-affinity reasons. None of -// these predicates can be satisfied by removing more pods from the node. -func selectVictimsOnNode( +// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room +// for "pod" to be scheduled. +func (pl *DefaultPreemption) SelectVictimsOnNode( ctx context.Context, - fh framework.Handle, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, - pdbs []*policy.PodDisruptionBudget, -) ([]*v1.Pod, int, *framework.Status) { + pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) { var potentialVictims []*framework.PodInfo removePod := func(rpi *framework.PodInfo) error { if err := nodeInfo.RemovePod(rpi.Pod); err != nil { return err } - status := fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo) + status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -631,7 +150,7 @@ func selectVictimsOnNode( } addPod := func(api *framework.PodInfo) error { nodeInfo.AddPodInfo(api) - status := fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo) + status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -661,7 +180,7 @@ func selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() { + if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() { return nil, 0, status } var victims []*v1.Pod @@ -675,7 +194,7 @@ func selectVictimsOnNode( if err := addPod(pi); err != nil { return false, err } - status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) + status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) fits := status.IsSuccess() if !fits { if err := removePod(pi); err != nil { @@ -703,60 +222,37 @@ func selectVictimsOnNode( return victims, numViolatingVictim, framework.NewStatus(framework.Success) } -// PrepareCandidate does some preparation work before nominating the selected candidate: -// - Evict the victim pods -// - Reject the victim pods if they are in waitingPod map -// - Clear the low-priority pods' nominatedNodeName status if needed -func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status { - for _, victim := range c.Victims().Pods { - // If the victim is a WaitingPod, send a reject message to the PermitPlugin. - // Otherwise we should delete the victim. - if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { - waitingPod.Reject(pluginName, "preempted") - } else if err := util.DeletePod(cs, victim); err != nil { - klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - return framework.AsStatus(err) +// PodEligibleToPreemptOthers determines whether this pod should be considered +// for preempting other pods or not. If this pod has already preempted other +// pods and those are in their graceful termination period, it shouldn't be +// considered for preemption. +// We look at the node that is nominated for this pod and as long as there are +// terminating pods on the node, we don't consider this for preempting more pods. +func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool { + if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { + klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod)) + return false + } + nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos() + nomNodeName := pod.Status.NominatedNodeName + if len(nomNodeName) > 0 { + // If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters, + // then the pod should be considered for preempting again. + if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable { + return true } - fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", - pod.Namespace, pod.Name, c.Name()) - } - metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods))) - // Lower priority pods nominated to run on this node, may no longer fit on - // this node. So, we should remove their nomination. Removing their - // nomination updates these pods and moves them to the active queue. It - // lets scheduler find another place for them. - nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) - if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { - klog.ErrorS(err, "cannot clear 'NominatedNodeName' field") - // We do not return as this error is not critical. - } - - return nil -} - -// getLowerPriorityNominatedPods returns pods whose priority is smaller than the -// priority of the given "pod" and are nominated to run on the given node. -// Note: We could possibly check if the nominated lower priority pods still fit -// and return those that no longer fit, but that would require lots of -// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be -// worth the complexity, especially because we generally expect to have a very -// small number of nominated pods per node. -func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod { - podInfos := pn.NominatedPodsForNode(nodeName) - - if len(podInfos) == 0 { - return nil - } - - var lowerPriorityPods []*v1.Pod - podPriority := corev1helpers.PodPriority(pod) - for _, pi := range podInfos { - if corev1helpers.PodPriority(pi.Pod) < podPriority { - lowerPriorityPods = append(lowerPriorityPods, pi.Pod) + if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { + podPriority := corev1helpers.PodPriority(pod) + for _, p := range nodeInfo.Pods { + if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority { + // There is a terminating pod on the nominated node. + return false + } + } } } - return lowerPriorityPods + return true } // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" @@ -817,10 +313,3 @@ func getPDBLister(informerFactory informers.SharedInformerFactory, enablePodDisr } return nil } - -func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { - if pdbLister != nil { - return pdbLister.List(labels.Everything()) - } - return nil, nil -} diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 14c2919d55e..95b13ed21ca 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -18,7 +18,6 @@ package defaultpreemption import ( "context" - "fmt" "math/rand" "sort" "strings" @@ -29,8 +28,8 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" - apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" @@ -38,23 +37,18 @@ import ( "k8s.io/client-go/tools/events" kubeschedulerconfigv1beta1 "k8s.io/kube-scheduler/config/v1beta1" extenderv1 "k8s.io/kube-scheduler/extender/v1" - volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/scheduler/apis/config" configv1beta1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta1" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" + "k8s.io/kubernetes/pkg/scheduler/framework/preemption" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" @@ -99,7 +93,7 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs { return dpa } -var nodeResourcesFitFunc = func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { +var nodeResourcesFitFunc = func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return noderesources.NewFit(plArgs, fh, feature.Features{}) } @@ -317,8 +311,11 @@ func TestPostFilter(t *testing.T) { } } -// TestSelectNodesForPreemption tests dryRunPreemption. This test assumes -// that podsFitsOnNode works correctly and is tested separately. +type candidate struct { + victims *extenderv1.Victims + name string +} + func TestDryRunPreemption(t *testing.T) { tests := []struct { name string @@ -330,7 +327,7 @@ func TestDryRunPreemption(t *testing.T) { pdbs []*policy.PodDisruptionBudget fakeFilterRC framework.Code // return code for fake filter plugin disableParallelism bool - expected [][]Candidate + expected [][]candidate expectedNumFilterCalled []int32 }{ { @@ -346,7 +343,7 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(), }, - expected: [][]Candidate{{}}, + expected: [][]candidate{{}}, expectedNumFilterCalled: []int32{2}, }, { @@ -362,7 +359,7 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(), }, - expected: [][]Candidate{{}}, + expected: [][]candidate{{}}, fakeFilterRC: framework.Unschedulable, expectedNumFilterCalled: []int32{2}, }, @@ -380,7 +377,7 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(), }, - expected: [][]Candidate{{}}, + expected: [][]candidate{{}}, fakeFilterRC: framework.Unschedulable, expectedNumFilterCalled: []int32{2}, }, @@ -397,15 +394,15 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()}, }, name: "node1", }, - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()}, }, @@ -428,7 +425,7 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(), }, - expected: [][]Candidate{{}}, + expected: [][]candidate{{}}, expectedNumFilterCalled: []int32{0}, }, { @@ -445,15 +442,15 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj()}, }, name: "node1", }, - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()}, }, @@ -479,9 +476,9 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(lowPriority).Req(smallRes).Obj(), @@ -510,9 +507,9 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).StartTime(epochTime2).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).StartTime(epochTime1).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(lowPriority).Req(smallRes).StartTime(epochTime5).Obj(), @@ -544,9 +541,9 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(highPriority).Req(smallRes).Obj(), st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(smallRes).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("foo", "").Priority(lowPriority).Req(smallRes). @@ -578,15 +575,15 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("pod-x1").UID("pod-x1").Node("node-x").Label("foo", "").Priority(highPriority).Obj(), st.MakePod().Name("pod-x2").UID("pod-x2").Node("node-x").Label("foo", "").Priority(highPriority).Obj(), }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("pod-a2").UID("pod-a2").Node("node-a").Label("foo", "").Priority(lowPriority).Obj()}, }, name: "node-a", }, - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("pod-b1").UID("pod-b1").Node("node-b").Label("foo", "").Priority(lowPriority).Obj()}, }, @@ -596,6 +593,7 @@ func TestDryRunPreemption(t *testing.T) { }, expectedNumFilterCalled: []int32{5}, // node-a (3), node-b (2), node-x (0) }, + { name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes", registerPlugins: []st.RegisterPluginFunc{ @@ -610,7 +608,7 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(), }, fakeFilterRC: framework.Unschedulable, - expected: [][]Candidate{{}}, + expected: [][]candidate{{}}, expectedNumFilterCalled: []int32{2}, }, { @@ -632,9 +630,9 @@ func TestDryRunPreemption(t *testing.T) { Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1}, }, }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(), @@ -667,9 +665,9 @@ func TestDryRunPreemption(t *testing.T) { Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p2": {Time: time.Now()}}}, }, }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(), @@ -702,9 +700,9 @@ func TestDryRunPreemption(t *testing.T) { Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.2": {Time: time.Now()}}}, }, }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(), @@ -738,9 +736,9 @@ func TestDryRunPreemption(t *testing.T) { Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.3": {Time: time.Now()}}}, }, }, - expected: [][]Candidate{ + expected: [][]candidate{ { - &candidate{ + candidate{ victims: &extenderv1.Victims{ Pods: []*v1.Pod{ st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(), @@ -773,16 +771,16 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj(), }, disableParallelism: true, - expected: [][]Candidate{ + expected: [][]candidate{ { // cycle=0 => offset=4 => node5 (yes), node1 (yes) - &candidate{ + candidate{ name: "node1", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node5", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()}, @@ -810,16 +808,16 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p5").UID("p5").Node("node5").Priority(veryHighPriority).Req(largeRes).Obj(), }, disableParallelism: true, - expected: [][]Candidate{ + expected: [][]candidate{ { // cycle=0 => offset=4 => node5 (no), node1 (yes), node2 (no), node3 (yes) - &candidate{ + candidate{ name: "node1", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node3", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()}, @@ -849,16 +847,16 @@ func TestDryRunPreemption(t *testing.T) { st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj(), }, disableParallelism: true, - expected: [][]Candidate{ + expected: [][]candidate{ { // cycle=0 => offset=4 => node5 (yes), node1 (yes) - &candidate{ + candidate{ name: "node1", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node5", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()}, @@ -867,13 +865,13 @@ func TestDryRunPreemption(t *testing.T) { }, { // cycle=1 => offset=1 => node2 (yes), node3 (yes) - &candidate{ + candidate{ name: "node2", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node3", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()}, @@ -882,13 +880,13 @@ func TestDryRunPreemption(t *testing.T) { }, { // cycle=2 => offset=3 => node4 (yes), node5 (yes) - &candidate{ + candidate{ name: "node4", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p4").UID("p4").Node("node4").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node5", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()}, @@ -922,7 +920,7 @@ func TestDryRunPreemption(t *testing.T) { }, }, disableParallelism: true, - expected: [][]Candidate{ + expected: [][]candidate{ { // Even though the DefaultPreemptionArgs constraints suggest that the // minimum number of candidates is 2, we get three candidates here @@ -931,20 +929,20 @@ func TestDryRunPreemption(t *testing.T) { // number of additional candidates returned will be at most // approximately equal to the parallelism in dryRunPreemption). // cycle=0 => offset=4 => node5 (yes, pdb), node1 (yes, pdb), node2 (no, pdb), node3 (yes) - &candidate{ + candidate{ name: "node1", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Label("app", "foo").Priority(midPriority).Req(largeRes).Obj()}, NumPDBViolations: 1, }, }, - &candidate{ + candidate{ name: "node3", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()}, }, }, - &candidate{ + candidate{ name: "node5", victims: &extenderv1.Victims{ Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Label("app", "foo").Priority(midPriority).Req(largeRes).Obj()}, @@ -1033,7 +1031,12 @@ func TestDryRunPreemption(t *testing.T) { if tt.args == nil { tt.args = getDefaultDefaultPreemptionArgs() } - pl := &DefaultPreemption{args: *tt.args} + pl := &DefaultPreemption{ + fh: fwk, + podLister: informerFactory.Core().V1().Pods().Lister(), + pdbLister: getPDBLister(informerFactory, true), + args: *tt.args, + } // Using 4 as a seed source to test getOffsetAndNumCandidates() deterministically. // However, we need to do it after informerFactory.WaitforCacheSync() which might @@ -1046,8 +1049,16 @@ func TestDryRunPreemption(t *testing.T) { if status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() { t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) } - offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - got, _ := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates) + pe := preemption.Evaluator{ + PluginName: names.DefaultPreemption, + Handler: pl.fh, + PodLister: pl.podLister, + PdbLister: pl.pdbLister, + State: state, + Interface: pl, + } + offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) + got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods @@ -1058,11 +1069,15 @@ func TestDryRunPreemption(t *testing.T) { sort.Slice(got, func(i, j int) bool { return got[i].Name() < got[j].Name() }) + candidates := []candidate{} + for i := range got { + candidates = append(candidates, candidate{victims: got[i].Victims(), name: got[i].Name()}) + } if fakePlugin.NumFilterCalled-prevNumFilterCalled != tt.expectedNumFilterCalled[cycle] { t.Errorf("cycle %d: got NumFilterCalled=%d, want %d", cycle, fakePlugin.NumFilterCalled-prevNumFilterCalled, tt.expectedNumFilterCalled[cycle]) } prevNumFilterCalled = fakePlugin.NumFilterCalled - if diff := cmp.Diff(tt.expected[cycle], got, cmp.AllowUnexported(candidate{})); diff != "" { + if diff := cmp.Diff(tt.expected[cycle], candidates, cmp.AllowUnexported(candidate{})); diff != "" { t.Errorf("cycle %d: unexpected candidates (-want, +got): %s", cycle, diff) } } @@ -1264,10 +1279,23 @@ func TestSelectBestCandidate(t *testing.T) { t.Fatal(err) } - pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()} - offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates, _ := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates) - s := SelectCandidate(candidates) + pl := &DefaultPreemption{ + fh: fwk, + podLister: informerFactory.Core().V1().Pods().Lister(), + pdbLister: getPDBLister(informerFactory, true), + args: *getDefaultDefaultPreemptionArgs(), + } + pe := preemption.Evaluator{ + PluginName: names.DefaultPreemption, + Handler: pl.fh, + PodLister: pl.podLister, + PdbLister: pl.pdbLister, + State: state, + Interface: pl, + } + offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) + candidates, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates) + s := pe.SelectCandidate(candidates) if s == nil || len(s.Name()) == 0 { return } @@ -1334,124 +1362,23 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { for _, n := range test.nodes { nodes = append(nodes, st.MakeNode().Name(n).Obj()) } - snapshot := internalcache.NewSnapshot(test.pods, nodes) - if got := PodEligibleToPreemptOthers(test.pod, snapshot.NodeInfos(), test.nominatedNodeStatus); got != test.expected { + registeredPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + f, err := st.NewFramework(registeredPlugins, "", + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), + ) + if err != nil { + t.Fatal(err) + } + pl := DefaultPreemption{fh: f} + if got := pl.PodEligibleToPreemptOthers(test.pod, test.nominatedNodeStatus); got != test.expected { t.Errorf("expected %t, got %t for pod: %s", test.expected, got, test.pod.Name) } }) } } - -func TestNodesWherePreemptionMightHelp(t *testing.T) { - // Prepare 4 nodes names. - nodeNames := []string{"node1", "node2", "node3", "node4"} - tests := []struct { - name string - nodesStatuses framework.NodeToStatusMap - expected sets.String // set of expected node names. - }{ - { - name: "No node should be attempted", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod), - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), - "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch), - "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodelabel.ErrReasonPresenceViolated), - }, - expected: sets.NewString(), - }, - { - name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch), - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), - "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable), - }, - expected: sets.NewString("node1", "node4"), - }, - { - name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch), - "node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch), - }, - expected: sets.NewString("node2", "node3", "node4"), - }, - { - name: "Mix of failed predicates works fine", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict), - "node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)), - }, - expected: sets.NewString("node2", "node3", "node4"), - }, - { - name: "Node condition errors should be considered unresolvable", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition), - }, - expected: sets.NewString("node2", "node3", "node4"), - }, - { - name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict), - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonNodeConflict)), - "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict)), - }, - expected: sets.NewString("node4"), - }, - { - name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints", - nodesStatuses: framework.NodeToStatusMap{ - "node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch), - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), - "node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch), - }, - expected: sets.NewString("node1", "node3", "node4"), - }, - { - name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried", - nodesStatuses: framework.NodeToStatusMap{ - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), - "node3": framework.NewStatus(framework.Unschedulable, ""), - "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), - }, - expected: sets.NewString("node1", "node3"), - }, - { - name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label", - nodesStatuses: framework.NodeToStatusMap{ - "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch), - "node3": framework.NewStatus(framework.Unschedulable, ""), - "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), - }, - expected: sets.NewString("node1", "node3"), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var nodeInfos []*framework.NodeInfo - for _, name := range nodeNames { - ni := framework.NewNodeInfo() - ni.SetNode(st.MakeNode().Name(name).Obj()) - nodeInfos = append(nodeInfos, ni) - } - nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses) - if len(tt.expected) != len(nodes) { - t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes) - } - for _, node := range nodes { - name := node.Node().Name - if _, found := tt.expected[name]; !found { - t.Errorf("node %v is not expected.", name) - } - } - }) - } -} - func TestPreempt(t *testing.T) { tests := []struct { name string @@ -1673,14 +1600,23 @@ func TestPreempt(t *testing.T) { pdbLister: getPDBLister(informerFactory, true), args: *getDefaultDefaultPreemptionArgs(), } - node, status := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) - if !status.IsSuccess() { + + pe := preemption.Evaluator{ + PluginName: names.DefaultPreemption, + Handler: pl.fh, + PodLister: pl.podLister, + PdbLister: pl.pdbLister, + State: state, + Interface: &pl, + } + res, status := pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap)) + if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } - if len(node) != 0 && node != test.expectedNode { - t.Errorf("expected node: %v, got: %v", test.expectedNode, node) + if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode { + t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName) } - if len(node) == 0 && len(test.expectedNode) != 0 { + if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 { t.Errorf("expected node: %v, got: nothing", test.expectedNode) } if len(deletedPodNames) != len(test.expectedPods) { @@ -1698,7 +1634,9 @@ func TestPreempt(t *testing.T) { t.Errorf("pod %v is not expected to be a victim.", victimName) } } - test.pod.Status.NominatedNodeName = node + if res != nil { + test.pod.Status.NominatedNodeName = res.NominatedNodeName + } // Manually set the deleted Pods' deletionTimestamp to non-nil. for _, pod := range test.pods { @@ -1710,12 +1648,12 @@ func TestPreempt(t *testing.T) { } // Call preempt again and make sure it doesn't preempt any more pods. - node, status = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) - if !status.IsSuccess() { + res, status = pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap)) + if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } - if len(node) != 0 && len(deletedPodNames) > 0 { - t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node) + if res != nil && len(deletedPodNames) > 0 { + t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName) } }) } diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go new file mode 100644 index 00000000000..ba4967ebad7 --- /dev/null +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -0,0 +1,574 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package preemption + +import ( + "context" + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/labels" + corelisters "k8s.io/client-go/listers/core/v1" + policylisters "k8s.io/client-go/listers/policy/v1" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/klog/v2" + extenderv1 "k8s.io/kube-scheduler/extender/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/util" +) + +// Candidate represents a nominated node on which the preemptor can be scheduled, +// along with the list of victims that should be evicted for the preemptor to fit the node. +type Candidate interface { + // Victims wraps a list of to-be-preempted Pods and the number of PDB violation. + Victims() *extenderv1.Victims + // Name returns the target node name where the preemptor gets nominated to run. + Name() string +} + +type candidate struct { + victims *extenderv1.Victims + name string +} + +// Victims returns s.victims. +func (s *candidate) Victims() *extenderv1.Victims { + return s.victims +} + +// Name returns s.name. +func (s *candidate) Name() string { + return s.name +} + +type candidateList struct { + idx int32 + items []Candidate +} + +func newCandidateList(size int32) *candidateList { + return &candidateList{idx: -1, items: make([]Candidate, size)} +} + +// add adds a new candidate to the internal array atomically. +func (cl *candidateList) add(c *candidate) { + if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) { + cl.items[idx] = c + } +} + +// size returns the number of candidate stored. Note that some add() operations +// might still be executing when this is called, so care must be taken to +// ensure that all add() operations complete before accessing the elements of +// the list. +func (cl *candidateList) size() int32 { + n := atomic.LoadInt32(&cl.idx) + 1 + if n >= int32(len(cl.items)) { + n = int32(len(cl.items)) + } + return n +} + +// get returns the internal candidate array. This function is NOT atomic and +// assumes that all add() operations have been completed. +func (cl *candidateList) get() []Candidate { + return cl.items[:cl.size()] +} + +// Interface is expected to be implemented by different preemption plugins as all those member +// methods might have different behavior compared with the default preemption. +type Interface interface { + // GetOffsetAndNumCandidates chooses a random offset and calculates the number of candidates that should be + // shortlisted for dry running preemption. + GetOffsetAndNumCandidates(nodes int32) (int32, int32) + // CandidatesToVictimsMap builds a map from the target node to a list of to-be-preempted Pods and the number of PDB violation. + CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims + // PodEligibleToPreemptOthers determines whether this pod should be considered + // for preempting other pods or not. + PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool + // SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room + // for "pod" to be scheduled. + // Note that both `state` and `nodeInfo` are deep copied. + SelectVictimsOnNode(ctx context.Context, state *framework.CycleState, + pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) +} + +type Evaluator struct { + PluginName string + Handler framework.Handle + PodLister corelisters.PodLister + PdbLister policylisters.PodDisruptionBudgetLister + State *framework.CycleState + Interface +} + +func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + + // 0) Fetch the latest version of . + // It's safe to directly fetch pod here. Because the informer cache has already been + // initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc(). + // However, tests may need to manually initialize the shared pod informer. + podNamespace, podName := pod.Namespace, pod.Name + pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name) + if err != nil { + klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName)) + return nil, framework.AsStatus(err) + } + + // 1) Ensure the preemptor is eligible to preempt other pods. + if !ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]) { + klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod)) + return nil, framework.NewStatus(framework.Unschedulable) + } + + // 2) Find all preemption candidates. + candidates, nodeToStatusMap, status := ev.findCandidates(ctx, pod, m) + if !status.IsSuccess() { + return nil, status + } + + // Return a FitError only when there are no candidates that fit the pod. + if len(candidates) == 0 { + fitError := &framework.FitError{ + Pod: pod, + NumAllNodes: len(nodeToStatusMap), + Diagnosis: framework.Diagnosis{ + NodeToStatusMap: nodeToStatusMap, + // Leave FailedPlugins as nil as it won't be used on moving Pods. + }, + } + return nil, framework.NewStatus(framework.Unschedulable, fitError.Error()) + } + + // 3) Interact with registered Extenders to filter out some candidates if needed. + candidates, status = ev.callExtenders(pod, candidates) + if !status.IsSuccess() { + return nil, status + } + + // 4) Find the best candidate. + bestCandidate := ev.SelectCandidate(candidates) + if bestCandidate == nil || len(bestCandidate.Name()) == 0 { + return nil, framework.NewStatus(framework.Unschedulable) + } + + // 5) Perform preparation work before nominating the selected candidate. + if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() { + return nil, status + } + + return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success) +} + +// FindCandidates calculates a slice of preemption candidates. +// Each candidate is executable to make the given schedulable. +func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) { + allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List() + if err != nil { + return nil, nil, framework.AsStatus(err) + } + if len(allNodes) == 0 { + return nil, nil, framework.NewStatus(framework.Error, "no nodes available") + } + potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) + if len(potentialNodes) == 0 { + klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) + // In this case, we should clean-up any existing nominated node name of the pod. + if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil { + klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) + // We do not return as this error is not critical. + } + return nil, unschedulableNodeStatus, nil + } + + pdbs, err := getPodDisruptionBudgets(ev.PdbLister) + if err != nil { + return nil, nil, framework.AsStatus(err) + } + + offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes))) + if klog.V(5).Enabled() { + var sample []string + for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ { + sample = append(sample, potentialNodes[i].Node().Name) + } + klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) + } + candidates, nodeStatuses := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates) + for node, status := range unschedulableNodeStatus { + nodeStatuses[node] = status + } + return candidates, nodeStatuses, nil +} + +// callExtenders calls given to select the list of feasible candidates. +// We will only check with extenders that support preemption. +// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated +// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. +func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) { + extenders := ev.Handler.Extenders() + nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos() + if len(extenders) == 0 { + return candidates, nil + } + + // Migrate candidate slice to victimsMap to adapt to the Extender interface. + // It's only applicable for candidate slice that have unique nominated node name. + victimsMap := ev.CandidatesToVictimsMap(candidates) + if len(victimsMap) == 0 { + return candidates, nil + } + for _, extender := range extenders { + if !extender.SupportsPreemption() || !extender.IsInterested(pod) { + continue + } + nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister) + if err != nil { + if extender.IsIgnorable() { + klog.InfoS("Skipping extender as it returned error and has ignorable flag set", + "extender", extender, "err", err) + continue + } + return nil, framework.AsStatus(err) + } + // Check if the returned victims are valid. + for nodeName, victims := range nodeNameToVictims { + if victims == nil || len(victims.Pods) == 0 { + if extender.IsIgnorable() { + delete(nodeNameToVictims, nodeName) + klog.InfoS("Ignoring node without victims", "node", nodeName) + continue + } + return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName)) + } + } + + // Replace victimsMap with new result after preemption. So the + // rest of extenders can continue use it as parameter. + victimsMap = nodeNameToVictims + + // If node list becomes empty, no preemption can happen regardless of other extenders. + if len(victimsMap) == 0 { + break + } + } + + var newCandidates []Candidate + for nodeName := range victimsMap { + newCandidates = append(newCandidates, &candidate{ + victims: victimsMap[nodeName], + name: nodeName, + }) + } + return newCandidates, nil +} + +// SelectCandidate chooses the best-fit candidate from given and return it. +// NOTE: This method is exported for easier testing in default preemption. +func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate { + if len(candidates) == 0 { + return nil + } + if len(candidates) == 1 { + return candidates[0] + } + + victimsMap := ev.CandidatesToVictimsMap(candidates) + candidateNode := pickOneNodeForPreemption(victimsMap) + + // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree + // preemption plugins that exercise different candidates on the same nominated node. + if victims := victimsMap[candidateNode]; victims != nil { + return &candidate{ + victims: victims, + name: candidateNode, + } + } + + // We shouldn't reach here. + klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates) + // To not break the whole flow, return the first candidate. + return candidates[0] +} + +// prepareCandidate does some preparation work before nominating the selected candidate: +// - Evict the victim pods +// - Reject the victim pods if they are in waitingPod map +// - Clear the low-priority pods' nominatedNodeName status if needed +func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status { + fh := ev.Handler + cs := ev.Handler.ClientSet() + for _, victim := range c.Victims().Pods { + // If the victim is a WaitingPod, send a reject message to the PermitPlugin. + // Otherwise we should delete the victim. + if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { + waitingPod.Reject(pluginName, "preempted") + } else if err := util.DeletePod(cs, victim); err != nil { + klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + return framework.AsStatus(err) + } + fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", + pod.Namespace, pod.Name, c.Name()) + } + metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods))) + + // Lower priority pods nominated to run on this node, may no longer fit on + // this node. So, we should remove their nomination. Removing their + // nomination updates these pods and moves them to the active queue. It + // lets scheduler find another place for them. + nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) + if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { + klog.ErrorS(err, "cannot clear 'NominatedNodeName' field") + // We do not return as this error is not critical. + } + + return nil +} + +// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates +// that may be satisfied by removing pods from the node. +func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) { + var potentialNodes []*framework.NodeInfo + nodeStatuses := make(framework.NodeToStatusMap) + for _, node := range nodes { + name := node.Node().Name + // We rely on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable' + // to determine whether preemption may help or not on the node. + if m[name].Code() == framework.UnschedulableAndUnresolvable { + nodeStatuses[node.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling") + continue + } + potentialNodes = append(potentialNodes, node) + } + return potentialNodes, nodeStatuses +} + +func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { + if pdbLister != nil { + return pdbLister.List(labels.Everything()) + } + return nil, nil +} + +// pickOneNodeForPreemption chooses one node among the given nodes. It assumes +// pods in each map entry are ordered by decreasing priority. +// It picks a node based on the following criteria: +// 1. A node with minimum number of PDB violations. +// 2. A node with minimum highest priority victim is picked. +// 3. Ties are broken by sum of priorities of all victims. +// 4. If there are still ties, node with the minimum number of victims is picked. +// 5. If there are still ties, node with the latest start time of all highest priority victims is picked. +// 6. If there are still ties, the first such node is picked (sort of randomly). +// The 'minNodes1' and 'minNodes2' are being reused here to save the memory +// allocation and garbage collection time. +func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string { + if len(nodesToVictims) == 0 { + return "" + } + minNumPDBViolatingPods := int64(math.MaxInt32) + var minNodes1 []string + lenNodes1 := 0 + for node, victims := range nodesToVictims { + numPDBViolatingPods := victims.NumPDBViolations + if numPDBViolatingPods < minNumPDBViolatingPods { + minNumPDBViolatingPods = numPDBViolatingPods + minNodes1 = nil + lenNodes1 = 0 + } + if numPDBViolatingPods == minNumPDBViolatingPods { + minNodes1 = append(minNodes1, node) + lenNodes1++ + } + } + if lenNodes1 == 1 { + return minNodes1[0] + } + + // There are more than one node with minimum number PDB violating pods. Find + // the one with minimum highest priority victim. + minHighestPriority := int32(math.MaxInt32) + var minNodes2 = make([]string, lenNodes1) + lenNodes2 := 0 + for i := 0; i < lenNodes1; i++ { + node := minNodes1[i] + victims := nodesToVictims[node] + // highestPodPriority is the highest priority among the victims on this node. + highestPodPriority := corev1helpers.PodPriority(victims.Pods[0]) + if highestPodPriority < minHighestPriority { + minHighestPriority = highestPodPriority + lenNodes2 = 0 + } + if highestPodPriority == minHighestPriority { + minNodes2[lenNodes2] = node + lenNodes2++ + } + } + if lenNodes2 == 1 { + return minNodes2[0] + } + + // There are a few nodes with minimum highest priority victim. Find the + // smallest sum of priorities. + minSumPriorities := int64(math.MaxInt64) + lenNodes1 = 0 + for i := 0; i < lenNodes2; i++ { + var sumPriorities int64 + node := minNodes2[i] + for _, pod := range nodesToVictims[node].Pods { + // We add MaxInt32+1 to all priorities to make all of them >= 0. This is + // needed so that a node with a few pods with negative priority is not + // picked over a node with a smaller number of pods with the same negative + // priority (and similar scenarios). + sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1) + } + if sumPriorities < minSumPriorities { + minSumPriorities = sumPriorities + lenNodes1 = 0 + } + if sumPriorities == minSumPriorities { + minNodes1[lenNodes1] = node + lenNodes1++ + } + } + if lenNodes1 == 1 { + return minNodes1[0] + } + + // There are a few nodes with minimum highest priority victim and sum of priorities. + // Find one with the minimum number of pods. + minNumPods := math.MaxInt32 + lenNodes2 = 0 + for i := 0; i < lenNodes1; i++ { + node := minNodes1[i] + numPods := len(nodesToVictims[node].Pods) + if numPods < minNumPods { + minNumPods = numPods + lenNodes2 = 0 + } + if numPods == minNumPods { + minNodes2[lenNodes2] = node + lenNodes2++ + } + } + if lenNodes2 == 1 { + return minNodes2[0] + } + + // There are a few nodes with same number of pods. + // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node)) + latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) + if latestStartTime == nil { + // If the earliest start time of all pods on the 1st node is nil, just return it, + // which is not expected to happen. + klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", minNodes2[0]) + return minNodes2[0] + } + nodeToReturn := minNodes2[0] + for i := 1; i < lenNodes2; i++ { + node := minNodes2[i] + // Get earliest start time of all pods on the current node. + earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) + if earliestStartTimeOnNode == nil { + klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", node) + continue + } + if earliestStartTimeOnNode.After(latestStartTime.Time) { + latestStartTime = earliestStartTimeOnNode + nodeToReturn = node + } + } + + return nodeToReturn +} + +// getLowerPriorityNominatedPods returns pods whose priority is smaller than the +// priority of the given "pod" and are nominated to run on the given node. +// Note: We could possibly check if the nominated lower priority pods still fit +// and return those that no longer fit, but that would require lots of +// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be +// worth the complexity, especially because we generally expect to have a very +// small number of nominated pods per node. +func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod { + podInfos := pn.NominatedPodsForNode(nodeName) + + if len(podInfos) == 0 { + return nil + } + + var lowerPriorityPods []*v1.Pod + podPriority := corev1helpers.PodPriority(pod) + for _, pi := range podInfos { + if corev1helpers.PodPriority(pi.Pod) < podPriority { + lowerPriorityPods = append(lowerPriorityPods, pi.Pod) + } + } + return lowerPriorityPods +} + +// DryRunPreemption simulates Preemption logic on in parallel, +// returns preemption candidates and a map indicating filtered nodes statuses. +// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of +// candidates, ones that do not violate PDB are preferred over ones that do. +// NOTE: This method is exported for easier testing in default preemption. +func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo, + pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) { + fh := ev.Handler + nonViolatingCandidates := newCandidateList(numCandidates) + violatingCandidates := newCandidateList(numCandidates) + parallelCtx, cancel := context.WithCancel(ctx) + nodeStatuses := make(framework.NodeToStatusMap) + var statusesLock sync.Mutex + checkNode := func(i int) { + nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone() + stateCopy := ev.State.Clone() + pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs) + if status.IsSuccess() && len(pods) != 0 { + victims := extenderv1.Victims{ + Pods: pods, + NumPDBViolations: int64(numPDBViolations), + } + c := &candidate{ + victims: &victims, + name: nodeInfoCopy.Node().Name, + } + if numPDBViolations == 0 { + nonViolatingCandidates.add(c) + } else { + violatingCandidates.add(c) + } + nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size() + if nvcSize > 0 && nvcSize+vcSize >= numCandidates { + cancel() + } + return + } + if status.IsSuccess() && len(pods) == 0 { + status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name)) + } + statusesLock.Lock() + nodeStatuses[nodeInfoCopy.Node().Name] = status + statusesLock.Unlock() + } + fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode) + return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses +} diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go new file mode 100644 index 00000000000..e3edfa6033a --- /dev/null +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -0,0 +1,334 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package preemption + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" + extenderv1 "k8s.io/kube-scheduler/extender/v1" + volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" + internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +var ( + midPriority, highPriority = int32(100), int32(1000) + + veryLargeRes = map[v1.ResourceName]string{ + v1.ResourceCPU: "500m", + v1.ResourceMemory: "500", + } +) + +type FakePostFilterPlugin struct { + numViolatingVictim int +} + +func (pl *FakePostFilterPlugin) SelectVictimsOnNode( + ctx context.Context, state *framework.CycleState, pod *v1.Pod, + nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, staus *framework.Status) { + return append(victims, nodeInfo.Pods[0].Pod), pl.numViolatingVictim, nil +} + +func (pl *FakePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) { + return 0, nodes +} + +func (pl *FakePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims { + return nil +} + +func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool { + return true +} + +func TestNodesWherePreemptionMightHelp(t *testing.T) { + // Prepare 4 nodes names. + nodeNames := []string{"node1", "node2", "node3", "node4"} + tests := []struct { + name string + nodesStatuses framework.NodeToStatusMap + expected sets.String // set of expected node names. + }{ + { + name: "No node should be attempted", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod), + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), + "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch), + "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodelabel.ErrReasonPresenceViolated), + }, + expected: sets.NewString(), + }, + { + name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch), + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), + "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable), + }, + expected: sets.NewString("node1", "node4"), + }, + { + name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch), + "node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch), + }, + expected: sets.NewString("node2", "node3", "node4"), + }, + { + name: "Mix of failed predicates works fine", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict), + "node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)), + }, + expected: sets.NewString("node2", "node3", "node4"), + }, + { + name: "Node condition errors should be considered unresolvable", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition), + }, + expected: sets.NewString("node2", "node3", "node4"), + }, + { + name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict), + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonNodeConflict)), + "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict)), + }, + expected: sets.NewString("node4"), + }, + { + name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints", + nodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch), + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason), + "node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch), + }, + expected: sets.NewString("node1", "node3", "node4"), + }, + { + name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried", + nodesStatuses: framework.NodeToStatusMap{ + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), + "node3": framework.NewStatus(framework.Unschedulable, ""), + "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), + }, + expected: sets.NewString("node1", "node3"), + }, + { + name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label", + nodesStatuses: framework.NodeToStatusMap{ + "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch), + "node3": framework.NewStatus(framework.Unschedulable, ""), + "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""), + }, + expected: sets.NewString("node1", "node3"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var nodeInfos []*framework.NodeInfo + for _, name := range nodeNames { + ni := framework.NewNodeInfo() + ni.SetNode(st.MakeNode().Name(name).Obj()) + nodeInfos = append(nodeInfos, ni) + } + nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses) + if len(tt.expected) != len(nodes) { + t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes) + } + for _, node := range nodes { + name := node.Node().Name + if _, found := tt.expected[name]; !found { + t.Errorf("node %v is not expected.", name) + } + } + }) + } +} + +func TestDryRunPreemption(t *testing.T) { + tests := []struct { + name string + nodes []*v1.Node + testPods []*v1.Pod + initPods []*v1.Pod + numViolatingVictim int + expected [][]Candidate + }{ + { + name: "no pdb violation", + nodes: []*v1.Node{ + st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(), + st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(), + }, + testPods: []*v1.Pod{ + st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(), + }, + initPods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(), + }, + expected: [][]Candidate{ + { + &candidate{ + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()}, + }, + name: "node1", + }, + &candidate{ + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()}, + }, + name: "node2", + }, + }, + }, + }, + { + name: "pdb violation on each node", + nodes: []*v1.Node{ + st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(), + st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(), + }, + testPods: []*v1.Pod{ + st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(), + }, + initPods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(), + }, + numViolatingVictim: 1, + expected: [][]Candidate{ + { + &candidate{ + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()}, + NumPDBViolations: 1, + }, + name: "node1", + }, + &candidate{ + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()}, + NumPDBViolations: 1, + }, + name: "node2", + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registeredPlugins := append([]st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + for _, p := range append(tt.testPods, tt.initPods...) { + objs = append(objs, p) + } + for _, n := range tt.nodes { + objs = append(objs, n) + } + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) + parallelism := parallelize.DefaultParallelism + fwk, err := st.NewFramework( + registeredPlugins, "", + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithParallelism(parallelism), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), + ) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + snapshot := internalcache.NewSnapshot(tt.initPods, tt.nodes) + nodeInfos, err := snapshot.NodeInfos().List() + if err != nil { + t.Fatal(err) + } + sort.Slice(nodeInfos, func(i, j int) bool { + return nodeInfos[i].Node().Name < nodeInfos[j].Node().Name + }) + + fakePostPlugin := &FakePostFilterPlugin{numViolatingVictim: tt.numViolatingVictim} + + for cycle, pod := range tt.testPods { + state := framework.NewCycleState() + pe := Evaluator{ + PluginName: "FakePostFilter", + Handler: fwk, + Interface: fakePostPlugin, + State: state, + } + got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos))) + // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). + for i := range got { + victims := got[i].Victims().Pods + sort.Slice(victims, func(i, j int) bool { + return victims[i].Name < victims[j].Name + }) + } + sort.Slice(got, func(i, j int) bool { + return got[i].Name() < got[j].Name() + }) + if diff := cmp.Diff(tt.expected[cycle], got, cmp.AllowUnexported(candidate{})); diff != "" { + t.Errorf("cycle %d: unexpected candidates (-want, +got): %s", cycle, diff) + } + } + }) + } +}