mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #104030 from chendave/refactoring_new
Refactor defaultpreemption for out-of-tree plugins
This commit is contained in:
commit
27b02a3e37
@ -1,45 +0,0 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package defaultpreemption
|
||||
|
||||
import (
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
)
|
||||
|
||||
// Candidate represents a nominated node on which the preemptor can be scheduled,
|
||||
// along with the list of victims that should be evicted for the preemptor to fit the node.
|
||||
type Candidate interface {
|
||||
// Victims wraps a list of to-be-preempted Pods and the number of PDB violation.
|
||||
Victims() *extenderv1.Victims
|
||||
// Name returns the target node name where the preemptor gets nominated to run.
|
||||
Name() string
|
||||
}
|
||||
|
||||
type candidate struct {
|
||||
victims *extenderv1.Victims
|
||||
name string
|
||||
}
|
||||
|
||||
// Victims returns s.victims.
|
||||
func (s *candidate) Victims() *extenderv1.Victims {
|
||||
return s.victims
|
||||
}
|
||||
|
||||
// Name returns s.name.
|
||||
func (s *candidate) Name() string {
|
||||
return s.name
|
||||
}
|
@ -18,15 +18,9 @@ package defaultpreemption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
@ -34,16 +28,17 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
policylisters "k8s.io/client-go/listers/policy/v1"
|
||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
||||
"k8s.io/klog/v2"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
@ -92,85 +87,15 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
|
||||
metrics.PreemptionAttempts.Inc()
|
||||
}()
|
||||
|
||||
nnn, status := pl.preempt(ctx, state, pod, m)
|
||||
if !status.IsSuccess() {
|
||||
return nil, status
|
||||
pe := preemption.Evaluator{
|
||||
PluginName: names.DefaultPreemption,
|
||||
Handler: pl.fh,
|
||||
PodLister: pl.podLister,
|
||||
PdbLister: pl.pdbLister,
|
||||
State: state,
|
||||
Interface: pl,
|
||||
}
|
||||
// This happens when the pod is not eligible for preemption or extenders filtered all candidates.
|
||||
if nnn == "" {
|
||||
return nil, framework.NewStatus(framework.Unschedulable)
|
||||
}
|
||||
return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
|
||||
}
|
||||
|
||||
// preempt finds nodes with pods that can be preempted to make room for "pod" to
|
||||
// schedule. It chooses one of the nodes and preempts the pods on the node and
|
||||
// returns 1) the node name which is picked up for preemption, 2) any possible error.
|
||||
// preempt does not update its snapshot. It uses the same snapshot used in the
|
||||
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
|
||||
// nodes without preempting any pod. When there are many pending pods in the
|
||||
// scheduling queue a nominated pod will go back to the queue and behind
|
||||
// other pods with the same priority. The nominated pod prevents other pods from
|
||||
// using the nominated resources and the nominated pod could take a long time
|
||||
// before it is retried after many other pending pods.
|
||||
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
|
||||
cs := pl.fh.ClientSet()
|
||||
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
|
||||
|
||||
// 0) Fetch the latest version of <pod>.
|
||||
// It's safe to directly fetch pod here. Because the informer cache has already been
|
||||
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
|
||||
// However, tests may need to manually initialize the shared pod informer.
|
||||
podNamespace, podName := pod.Namespace, pod.Name
|
||||
pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
|
||||
return "", framework.AsStatus(err)
|
||||
}
|
||||
|
||||
// 1) Ensure the preemptor is eligible to preempt other pods.
|
||||
if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
|
||||
klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod))
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// 2) Find all preemption candidates.
|
||||
candidates, nodeToStatusMap, status := pl.FindCandidates(ctx, state, pod, m)
|
||||
if !status.IsSuccess() {
|
||||
return "", status
|
||||
}
|
||||
|
||||
// Return a FitError only when there are no candidates that fit the pod.
|
||||
if len(candidates) == 0 {
|
||||
fitError := &framework.FitError{
|
||||
Pod: pod,
|
||||
NumAllNodes: len(nodeToStatusMap),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: nodeToStatusMap,
|
||||
// Leave FailedPlugins as nil as it won't be used on moving Pods.
|
||||
},
|
||||
}
|
||||
return "", framework.NewStatus(framework.Unschedulable, fitError.Error())
|
||||
}
|
||||
|
||||
// 3) Interact with registered Extenders to filter out some candidates if needed.
|
||||
candidates, status = CallExtenders(pl.fh.Extenders(), pod, nodeLister, candidates)
|
||||
if !status.IsSuccess() {
|
||||
return "", status
|
||||
}
|
||||
|
||||
// 4) Find the best candidate.
|
||||
bestCandidate := SelectCandidate(candidates)
|
||||
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// 5) Perform preparation work before nominating the selected candidate.
|
||||
if status := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); !status.IsSuccess() {
|
||||
return "", status
|
||||
}
|
||||
|
||||
return bestCandidate.Name(), nil
|
||||
return pe.Preempt(ctx, pod, m)
|
||||
}
|
||||
|
||||
// calculateNumCandidates returns the number of candidates the FindCandidates
|
||||
@ -188,248 +113,15 @@ func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
|
||||
return n
|
||||
}
|
||||
|
||||
// getOffsetAndNumCandidates chooses a random offset and calculates the number
|
||||
// GetOffsetAndNumCandidates chooses a random offset and calculates the number
|
||||
// of candidates that should be shortlisted for dry running preemption.
|
||||
func (pl *DefaultPreemption) getOffsetAndNumCandidates(numNodes int32) (int32, int32) {
|
||||
func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
|
||||
return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
|
||||
}
|
||||
|
||||
// FindCandidates calculates a slice of preemption candidates.
|
||||
// Each candidate is executable to make the given <pod> schedulable.
|
||||
func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {
|
||||
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
|
||||
if err != nil {
|
||||
return nil, nil, framework.AsStatus(err)
|
||||
}
|
||||
if len(allNodes) == 0 {
|
||||
return nil, nil, framework.NewStatus(framework.Error, "no nodes available")
|
||||
}
|
||||
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
|
||||
if len(potentialNodes) == 0 {
|
||||
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
|
||||
// In this case, we should clean-up any existing nominated node name of the pod.
|
||||
if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil {
|
||||
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))
|
||||
// We do not return as this error is not critical.
|
||||
}
|
||||
return nil, unschedulableNodeStatus, nil
|
||||
}
|
||||
|
||||
pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
|
||||
if err != nil {
|
||||
return nil, nil, framework.AsStatus(err)
|
||||
}
|
||||
|
||||
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))
|
||||
if klog.V(5).Enabled() {
|
||||
var sample []string
|
||||
for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {
|
||||
sample = append(sample, potentialNodes[i].Node().Name)
|
||||
}
|
||||
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
|
||||
}
|
||||
candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates)
|
||||
for node, status := range unschedulableNodeStatus {
|
||||
nodeStatuses[node] = status
|
||||
}
|
||||
return candidates, nodeStatuses, nil
|
||||
}
|
||||
|
||||
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
||||
// for preempting other pods or not. If this pod has already preempted other
|
||||
// pods and those are in their graceful termination period, it shouldn't be
|
||||
// considered for preemption.
|
||||
// We look at the node that is nominated for this pod and as long as there are
|
||||
// terminating pods on the node, we don't consider this for preempting more pods.
|
||||
func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
|
||||
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
||||
klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod))
|
||||
return false
|
||||
}
|
||||
nomNodeName := pod.Status.NominatedNodeName
|
||||
if len(nomNodeName) > 0 {
|
||||
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
|
||||
// then the pod should be considered for preempting again.
|
||||
if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
|
||||
return true
|
||||
}
|
||||
|
||||
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
||||
podPriority := corev1helpers.PodPriority(pod)
|
||||
for _, p := range nodeInfo.Pods {
|
||||
if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {
|
||||
// There is a terminating pod on the nominated node.
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
||||
// that may be satisfied by removing pods from the node.
|
||||
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) {
|
||||
var potentialNodes []*framework.NodeInfo
|
||||
nodeStatuses := make(framework.NodeToStatusMap)
|
||||
for _, node := range nodes {
|
||||
name := node.Node().Name
|
||||
// We rely on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
||||
// to determine whether preemption may help or not on the node.
|
||||
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
||||
nodeStatuses[node.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling")
|
||||
continue
|
||||
}
|
||||
potentialNodes = append(potentialNodes, node)
|
||||
}
|
||||
return potentialNodes, nodeStatuses
|
||||
}
|
||||
|
||||
type candidateList struct {
|
||||
idx int32
|
||||
items []Candidate
|
||||
}
|
||||
|
||||
func newCandidateList(size int32) *candidateList {
|
||||
return &candidateList{idx: -1, items: make([]Candidate, size)}
|
||||
}
|
||||
|
||||
// add adds a new candidate to the internal array atomically.
|
||||
func (cl *candidateList) add(c *candidate) {
|
||||
if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) {
|
||||
cl.items[idx] = c
|
||||
}
|
||||
}
|
||||
|
||||
// size returns the number of candidate stored. Note that some add() operations
|
||||
// might still be executing when this is called, so care must be taken to
|
||||
// ensure that all add() operations complete before accessing the elements of
|
||||
// the list.
|
||||
func (cl *candidateList) size() int32 {
|
||||
n := atomic.LoadInt32(&cl.idx) + 1
|
||||
if n >= int32(len(cl.items)) {
|
||||
n = int32(len(cl.items))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// get returns the internal candidate array. This function is NOT atomic and
|
||||
// assumes that all add() operations have been completed.
|
||||
func (cl *candidateList) get() []Candidate {
|
||||
return cl.items[:cl.size()]
|
||||
}
|
||||
|
||||
// dryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
|
||||
// returns preemption candidates and a map indicating filtered nodes statuses.
|
||||
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
|
||||
// candidates, ones that do not violate PDB are preferred over ones that do.
|
||||
func dryRunPreemption(ctx context.Context, fh framework.Handle,
|
||||
state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
|
||||
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {
|
||||
nonViolatingCandidates := newCandidateList(numCandidates)
|
||||
violatingCandidates := newCandidateList(numCandidates)
|
||||
parallelCtx, cancel := context.WithCancel(ctx)
|
||||
nodeStatuses := make(framework.NodeToStatusMap)
|
||||
var statusesLock sync.Mutex
|
||||
checkNode := func(i int) {
|
||||
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
|
||||
stateCopy := state.Clone()
|
||||
pods, numPDBViolations, status := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
|
||||
if status.IsSuccess() && len(pods) != 0 {
|
||||
victims := extenderv1.Victims{
|
||||
Pods: pods,
|
||||
NumPDBViolations: int64(numPDBViolations),
|
||||
}
|
||||
c := &candidate{
|
||||
victims: &victims,
|
||||
name: nodeInfoCopy.Node().Name,
|
||||
}
|
||||
if numPDBViolations == 0 {
|
||||
nonViolatingCandidates.add(c)
|
||||
} else {
|
||||
violatingCandidates.add(c)
|
||||
}
|
||||
nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
|
||||
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
|
||||
cancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
if status.IsSuccess() && len(pods) == 0 {
|
||||
status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
|
||||
}
|
||||
statusesLock.Lock()
|
||||
nodeStatuses[nodeInfoCopy.Node().Name] = status
|
||||
statusesLock.Unlock()
|
||||
}
|
||||
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
|
||||
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
|
||||
}
|
||||
|
||||
// CallExtenders calls given <extenders> to select the list of feasible candidates.
|
||||
// We will only check <candidates> with extenders that support preemption.
|
||||
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||
func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister,
|
||||
candidates []Candidate) ([]Candidate, *framework.Status) {
|
||||
if len(extenders) == 0 {
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
// Migrate candidate slice to victimsMap to adapt to the Extender interface.
|
||||
// It's only applicable for candidate slice that have unique nominated node name.
|
||||
victimsMap := candidatesToVictimsMap(candidates)
|
||||
if len(victimsMap) == 0 {
|
||||
return candidates, nil
|
||||
}
|
||||
for _, extender := range extenders {
|
||||
if !extender.SupportsPreemption() || !extender.IsInterested(pod) {
|
||||
continue
|
||||
}
|
||||
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
|
||||
if err != nil {
|
||||
if extender.IsIgnorable() {
|
||||
klog.InfoS("Skipping extender as it returned error and has ignorable flag set",
|
||||
"extender", extender, "err", err)
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(err)
|
||||
}
|
||||
// Check if the returned victims are valid.
|
||||
for nodeName, victims := range nodeNameToVictims {
|
||||
if victims == nil || len(victims.Pods) == 0 {
|
||||
if extender.IsIgnorable() {
|
||||
delete(nodeNameToVictims, nodeName)
|
||||
klog.InfoS("Ignoring node without victims", "node", nodeName)
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName))
|
||||
}
|
||||
}
|
||||
|
||||
// Replace victimsMap with new result after preemption. So the
|
||||
// rest of extenders can continue use it as parameter.
|
||||
victimsMap = nodeNameToVictims
|
||||
|
||||
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
||||
if len(victimsMap) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var newCandidates []Candidate
|
||||
for nodeName := range victimsMap {
|
||||
newCandidates = append(newCandidates, &candidate{
|
||||
victims: victimsMap[nodeName],
|
||||
name: nodeName,
|
||||
})
|
||||
}
|
||||
return newCandidates, nil
|
||||
}
|
||||
|
||||
// This function is not applicable for out-of-tree preemption plugins that exercise
|
||||
// different preemption candidates on the same nominated node.
|
||||
func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
|
||||
func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims {
|
||||
m := make(map[string]*extenderv1.Victims)
|
||||
for _, c := range candidates {
|
||||
m[c.Name()] = c.Victims()
|
||||
@ -437,193 +129,20 @@ func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victi
|
||||
return m
|
||||
}
|
||||
|
||||
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
|
||||
func SelectCandidate(candidates []Candidate) Candidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(candidates) == 1 {
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
victimsMap := candidatesToVictimsMap(candidates)
|
||||
candidateNode := pickOneNodeForPreemption(victimsMap)
|
||||
|
||||
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
|
||||
// preemption plugins that exercise different candidates on the same nominated node.
|
||||
if victims := victimsMap[candidateNode]; victims != nil {
|
||||
return &candidate{
|
||||
victims: victims,
|
||||
name: candidateNode,
|
||||
}
|
||||
}
|
||||
|
||||
// We shouldn't reach here.
|
||||
klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates)
|
||||
// To not break the whole flow, return the first candidate.
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
|
||||
// pods in each map entry are ordered by decreasing priority.
|
||||
// It picks a node based on the following criteria:
|
||||
// 1. A node with minimum number of PDB violations.
|
||||
// 2. A node with minimum highest priority victim is picked.
|
||||
// 3. Ties are broken by sum of priorities of all victims.
|
||||
// 4. If there are still ties, node with the minimum number of victims is picked.
|
||||
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
||||
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||
// allocation and garbage collection time.
|
||||
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
|
||||
if len(nodesToVictims) == 0 {
|
||||
return ""
|
||||
}
|
||||
minNumPDBViolatingPods := int64(math.MaxInt32)
|
||||
var minNodes1 []string
|
||||
lenNodes1 := 0
|
||||
for node, victims := range nodesToVictims {
|
||||
numPDBViolatingPods := victims.NumPDBViolations
|
||||
if numPDBViolatingPods < minNumPDBViolatingPods {
|
||||
minNumPDBViolatingPods = numPDBViolatingPods
|
||||
minNodes1 = nil
|
||||
lenNodes1 = 0
|
||||
}
|
||||
if numPDBViolatingPods == minNumPDBViolatingPods {
|
||||
minNodes1 = append(minNodes1, node)
|
||||
lenNodes1++
|
||||
}
|
||||
}
|
||||
if lenNodes1 == 1 {
|
||||
return minNodes1[0]
|
||||
}
|
||||
|
||||
// There are more than one node with minimum number PDB violating pods. Find
|
||||
// the one with minimum highest priority victim.
|
||||
minHighestPriority := int32(math.MaxInt32)
|
||||
var minNodes2 = make([]string, lenNodes1)
|
||||
lenNodes2 := 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
victims := nodesToVictims[node]
|
||||
// highestPodPriority is the highest priority among the victims on this node.
|
||||
highestPodPriority := corev1helpers.PodPriority(victims.Pods[0])
|
||||
if highestPodPriority < minHighestPriority {
|
||||
minHighestPriority = highestPodPriority
|
||||
lenNodes2 = 0
|
||||
}
|
||||
if highestPodPriority == minHighestPriority {
|
||||
minNodes2[lenNodes2] = node
|
||||
lenNodes2++
|
||||
}
|
||||
}
|
||||
if lenNodes2 == 1 {
|
||||
return minNodes2[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with minimum highest priority victim. Find the
|
||||
// smallest sum of priorities.
|
||||
minSumPriorities := int64(math.MaxInt64)
|
||||
lenNodes1 = 0
|
||||
for i := 0; i < lenNodes2; i++ {
|
||||
var sumPriorities int64
|
||||
node := minNodes2[i]
|
||||
for _, pod := range nodesToVictims[node].Pods {
|
||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||
// needed so that a node with a few pods with negative priority is not
|
||||
// picked over a node with a smaller number of pods with the same negative
|
||||
// priority (and similar scenarios).
|
||||
sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
|
||||
}
|
||||
if sumPriorities < minSumPriorities {
|
||||
minSumPriorities = sumPriorities
|
||||
lenNodes1 = 0
|
||||
}
|
||||
if sumPriorities == minSumPriorities {
|
||||
minNodes1[lenNodes1] = node
|
||||
lenNodes1++
|
||||
}
|
||||
}
|
||||
if lenNodes1 == 1 {
|
||||
return minNodes1[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with minimum highest priority victim and sum of priorities.
|
||||
// Find one with the minimum number of pods.
|
||||
minNumPods := math.MaxInt32
|
||||
lenNodes2 = 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
numPods := len(nodesToVictims[node].Pods)
|
||||
if numPods < minNumPods {
|
||||
minNumPods = numPods
|
||||
lenNodes2 = 0
|
||||
}
|
||||
if numPods == minNumPods {
|
||||
minNodes2[lenNodes2] = node
|
||||
lenNodes2++
|
||||
}
|
||||
}
|
||||
if lenNodes2 == 1 {
|
||||
return minNodes2[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with same number of pods.
|
||||
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
|
||||
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
|
||||
if latestStartTime == nil {
|
||||
// If the earliest start time of all pods on the 1st node is nil, just return it,
|
||||
// which is not expected to happen.
|
||||
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", minNodes2[0])
|
||||
return minNodes2[0]
|
||||
}
|
||||
nodeToReturn := minNodes2[0]
|
||||
for i := 1; i < lenNodes2; i++ {
|
||||
node := minNodes2[i]
|
||||
// Get earliest start time of all pods on the current node.
|
||||
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
|
||||
if earliestStartTimeOnNode == nil {
|
||||
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", node)
|
||||
continue
|
||||
}
|
||||
if earliestStartTimeOnNode.After(latestStartTime.Time) {
|
||||
latestStartTime = earliestStartTimeOnNode
|
||||
nodeToReturn = node
|
||||
}
|
||||
}
|
||||
|
||||
return nodeToReturn
|
||||
}
|
||||
|
||||
// selectVictimsOnNode finds minimum set of pods on the given node that should
|
||||
// be preempted in order to make enough room for "pod" to be scheduled. The
|
||||
// minimum set selected is subject to the constraint that a higher-priority pod
|
||||
// is never preempted when a lower-priority pod could be (higher/lower relative
|
||||
// to one another, not relative to the preemptor "pod").
|
||||
// The algorithm first checks if the pod can be scheduled on the node when all the
|
||||
// lower priority pods are gone. If so, it sorts all the lower priority pods by
|
||||
// their priority and then puts them into two groups of those whose PodDisruptionBudget
|
||||
// will be violated if preempted and other non-violating pods. Both groups are
|
||||
// sorted by priority. It first tries to reprieve as many PDB violating pods as
|
||||
// possible and then does them same for non-PDB-violating pods while checking
|
||||
// that the "pod" can still fit on the node.
|
||||
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
||||
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
||||
// these predicates can be satisfied by removing more pods from the node.
|
||||
func selectVictimsOnNode(
|
||||
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
|
||||
// for "pod" to be scheduled.
|
||||
func (pl *DefaultPreemption) SelectVictimsOnNode(
|
||||
ctx context.Context,
|
||||
fh framework.Handle,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
nodeInfo *framework.NodeInfo,
|
||||
pdbs []*policy.PodDisruptionBudget,
|
||||
) ([]*v1.Pod, int, *framework.Status) {
|
||||
pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
|
||||
var potentialVictims []*framework.PodInfo
|
||||
removePod := func(rpi *framework.PodInfo) error {
|
||||
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
|
||||
return err
|
||||
}
|
||||
status := fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
|
||||
status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
}
|
||||
@ -631,7 +150,7 @@ func selectVictimsOnNode(
|
||||
}
|
||||
addPod := func(api *framework.PodInfo) error {
|
||||
nodeInfo.AddPodInfo(api)
|
||||
status := fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
|
||||
status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
}
|
||||
@ -661,7 +180,7 @@ func selectVictimsOnNode(
|
||||
// inter-pod affinity to one or more victims, but we have decided not to
|
||||
// support this case for performance reasons. Having affinity to lower
|
||||
// priority pods is not a recommended configuration anyway.
|
||||
if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
|
||||
if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
|
||||
return nil, 0, status
|
||||
}
|
||||
var victims []*v1.Pod
|
||||
@ -675,7 +194,7 @@ func selectVictimsOnNode(
|
||||
if err := addPod(pi); err != nil {
|
||||
return false, err
|
||||
}
|
||||
status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
||||
status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
||||
fits := status.IsSuccess()
|
||||
if !fits {
|
||||
if err := removePod(pi); err != nil {
|
||||
@ -703,60 +222,37 @@ func selectVictimsOnNode(
|
||||
return victims, numViolatingVictim, framework.NewStatus(framework.Success)
|
||||
}
|
||||
|
||||
// PrepareCandidate does some preparation work before nominating the selected candidate:
|
||||
// - Evict the victim pods
|
||||
// - Reject the victim pods if they are in waitingPod map
|
||||
// - Clear the low-priority pods' nominatedNodeName status if needed
|
||||
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status {
|
||||
for _, victim := range c.Victims().Pods {
|
||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
|
||||
// Otherwise we should delete the victim.
|
||||
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||
waitingPod.Reject(pluginName, "preempted")
|
||||
} else if err := util.DeletePod(cs, victim); err != nil {
|
||||
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
||||
return framework.AsStatus(err)
|
||||
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
||||
// for preempting other pods or not. If this pod has already preempted other
|
||||
// pods and those are in their graceful termination period, it shouldn't be
|
||||
// considered for preemption.
|
||||
// We look at the node that is nominated for this pod and as long as there are
|
||||
// terminating pods on the node, we don't consider this for preempting more pods.
|
||||
func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool {
|
||||
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
||||
klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod))
|
||||
return false
|
||||
}
|
||||
nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
|
||||
nomNodeName := pod.Status.NominatedNodeName
|
||||
if len(nomNodeName) > 0 {
|
||||
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
|
||||
// then the pod should be considered for preempting again.
|
||||
if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
|
||||
return true
|
||||
}
|
||||
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
||||
pod.Namespace, pod.Name, c.Name())
|
||||
}
|
||||
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
|
||||
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
|
||||
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
||||
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")
|
||||
// We do not return as this error is not critical.
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// priority of the given "pod" and are nominated to run on the given node.
|
||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||
// and return those that no longer fit, but that would require lots of
|
||||
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
||||
// worth the complexity, especially because we generally expect to have a very
|
||||
// small number of nominated pods per node.
|
||||
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||
podInfos := pn.NominatedPodsForNode(nodeName)
|
||||
|
||||
if len(podInfos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var lowerPriorityPods []*v1.Pod
|
||||
podPriority := corev1helpers.PodPriority(pod)
|
||||
for _, pi := range podInfos {
|
||||
if corev1helpers.PodPriority(pi.Pod) < podPriority {
|
||||
lowerPriorityPods = append(lowerPriorityPods, pi.Pod)
|
||||
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
||||
podPriority := corev1helpers.PodPriority(pod)
|
||||
for _, p := range nodeInfo.Pods {
|
||||
if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {
|
||||
// There is a terminating pod on the nominated node.
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return lowerPriorityPods
|
||||
return true
|
||||
}
|
||||
|
||||
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
||||
@ -817,10 +313,3 @@ func getPDBLister(informerFactory informers.SharedInformerFactory, enablePodDisr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
|
||||
if pdbLister != nil {
|
||||
return pdbLister.List(labels.Everything())
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package defaultpreemption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
@ -29,8 +28,8 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
apiruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
@ -38,23 +37,18 @@ import (
|
||||
"k8s.io/client-go/tools/events"
|
||||
kubeschedulerconfigv1beta1 "k8s.io/kube-scheduler/config/v1beta1"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
configv1beta1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||
@ -99,7 +93,7 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs {
|
||||
return dpa
|
||||
}
|
||||
|
||||
var nodeResourcesFitFunc = func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
var nodeResourcesFitFunc = func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return noderesources.NewFit(plArgs, fh, feature.Features{})
|
||||
}
|
||||
|
||||
@ -317,8 +311,11 @@ func TestPostFilter(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSelectNodesForPreemption tests dryRunPreemption. This test assumes
|
||||
// that podsFitsOnNode works correctly and is tested separately.
|
||||
type candidate struct {
|
||||
victims *extenderv1.Victims
|
||||
name string
|
||||
}
|
||||
|
||||
func TestDryRunPreemption(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@ -330,7 +327,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
pdbs []*policy.PodDisruptionBudget
|
||||
fakeFilterRC framework.Code // return code for fake filter plugin
|
||||
disableParallelism bool
|
||||
expected [][]Candidate
|
||||
expected [][]candidate
|
||||
expectedNumFilterCalled []int32
|
||||
}{
|
||||
{
|
||||
@ -346,7 +343,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{{}},
|
||||
expected: [][]candidate{{}},
|
||||
expectedNumFilterCalled: []int32{2},
|
||||
},
|
||||
{
|
||||
@ -362,7 +359,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{{}},
|
||||
expected: [][]candidate{{}},
|
||||
fakeFilterRC: framework.Unschedulable,
|
||||
expectedNumFilterCalled: []int32{2},
|
||||
},
|
||||
@ -380,7 +377,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{{}},
|
||||
expected: [][]candidate{{}},
|
||||
fakeFilterRC: framework.Unschedulable,
|
||||
expectedNumFilterCalled: []int32{2},
|
||||
},
|
||||
@ -397,15 +394,15 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
name: "node1",
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
@ -428,7 +425,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{{}},
|
||||
expected: [][]candidate{{}},
|
||||
expectedNumFilterCalled: []int32{0},
|
||||
},
|
||||
{
|
||||
@ -445,15 +442,15 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
name: "node1",
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
@ -479,9 +476,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(lowPriority).Req(smallRes).Obj(),
|
||||
@ -510,9 +507,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).StartTime(epochTime2).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).StartTime(epochTime1).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(lowPriority).Req(smallRes).StartTime(epochTime5).Obj(),
|
||||
@ -544,9 +541,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(highPriority).Req(smallRes).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(smallRes).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("foo", "").Priority(lowPriority).Req(smallRes).
|
||||
@ -578,15 +575,15 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("pod-x1").UID("pod-x1").Node("node-x").Label("foo", "").Priority(highPriority).Obj(),
|
||||
st.MakePod().Name("pod-x2").UID("pod-x2").Node("node-x").Label("foo", "").Priority(highPriority).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("pod-a2").UID("pod-a2").Node("node-a").Label("foo", "").Priority(lowPriority).Obj()},
|
||||
},
|
||||
name: "node-a",
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("pod-b1").UID("pod-b1").Node("node-b").Label("foo", "").Priority(lowPriority).Obj()},
|
||||
},
|
||||
@ -596,6 +593,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
},
|
||||
expectedNumFilterCalled: []int32{5}, // node-a (3), node-b (2), node-x (0)
|
||||
},
|
||||
|
||||
{
|
||||
name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
@ -610,7 +608,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
fakeFilterRC: framework.Unschedulable,
|
||||
expected: [][]Candidate{{}},
|
||||
expected: [][]candidate{{}},
|
||||
expectedNumFilterCalled: []int32{2},
|
||||
},
|
||||
{
|
||||
@ -632,9 +630,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1},
|
||||
},
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
|
||||
@ -667,9 +665,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p2": {Time: time.Now()}}},
|
||||
},
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
|
||||
@ -702,9 +700,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.2": {Time: time.Now()}}},
|
||||
},
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
|
||||
@ -738,9 +736,9 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.3": {Time: time.Now()}}},
|
||||
},
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
&candidate{
|
||||
candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
|
||||
@ -773,16 +771,16 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
disableParallelism: true,
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
// cycle=0 => offset=4 => node5 (yes), node1 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node1",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node5",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -810,16 +808,16 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p5").UID("p5").Node("node5").Priority(veryHighPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
disableParallelism: true,
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
// cycle=0 => offset=4 => node5 (no), node1 (yes), node2 (no), node3 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node1",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node3",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -849,16 +847,16 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj(),
|
||||
},
|
||||
disableParallelism: true,
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
// cycle=0 => offset=4 => node5 (yes), node1 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node1",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node5",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -867,13 +865,13 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
},
|
||||
{
|
||||
// cycle=1 => offset=1 => node2 (yes), node3 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node2",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node3",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -882,13 +880,13 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
},
|
||||
{
|
||||
// cycle=2 => offset=3 => node4 (yes), node5 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node4",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p4").UID("p4").Node("node4").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node5",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -922,7 +920,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
},
|
||||
},
|
||||
disableParallelism: true,
|
||||
expected: [][]Candidate{
|
||||
expected: [][]candidate{
|
||||
{
|
||||
// Even though the DefaultPreemptionArgs constraints suggest that the
|
||||
// minimum number of candidates is 2, we get three candidates here
|
||||
@ -931,20 +929,20 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
// number of additional candidates returned will be at most
|
||||
// approximately equal to the parallelism in dryRunPreemption).
|
||||
// cycle=0 => offset=4 => node5 (yes, pdb), node1 (yes, pdb), node2 (no, pdb), node3 (yes)
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node1",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Label("app", "foo").Priority(midPriority).Req(largeRes).Obj()},
|
||||
NumPDBViolations: 1,
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node3",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p3").UID("p3").Node("node3").Priority(midPriority).Req(largeRes).Obj()},
|
||||
},
|
||||
},
|
||||
&candidate{
|
||||
candidate{
|
||||
name: "node5",
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p5").UID("p5").Node("node5").Label("app", "foo").Priority(midPriority).Req(largeRes).Obj()},
|
||||
@ -1033,7 +1031,12 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
if tt.args == nil {
|
||||
tt.args = getDefaultDefaultPreemptionArgs()
|
||||
}
|
||||
pl := &DefaultPreemption{args: *tt.args}
|
||||
pl := &DefaultPreemption{
|
||||
fh: fwk,
|
||||
podLister: informerFactory.Core().V1().Pods().Lister(),
|
||||
pdbLister: getPDBLister(informerFactory, true),
|
||||
args: *tt.args,
|
||||
}
|
||||
|
||||
// Using 4 as a seed source to test getOffsetAndNumCandidates() deterministically.
|
||||
// However, we need to do it after informerFactory.WaitforCacheSync() which might
|
||||
@ -1046,8 +1049,16 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
if status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
|
||||
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
|
||||
}
|
||||
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||
got, _ := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
|
||||
pe := preemption.Evaluator{
|
||||
PluginName: names.DefaultPreemption,
|
||||
Handler: pl.fh,
|
||||
PodLister: pl.podLister,
|
||||
PdbLister: pl.pdbLister,
|
||||
State: state,
|
||||
Interface: pl,
|
||||
}
|
||||
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||
got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates)
|
||||
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
|
||||
for i := range got {
|
||||
victims := got[i].Victims().Pods
|
||||
@ -1058,11 +1069,15 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
sort.Slice(got, func(i, j int) bool {
|
||||
return got[i].Name() < got[j].Name()
|
||||
})
|
||||
candidates := []candidate{}
|
||||
for i := range got {
|
||||
candidates = append(candidates, candidate{victims: got[i].Victims(), name: got[i].Name()})
|
||||
}
|
||||
if fakePlugin.NumFilterCalled-prevNumFilterCalled != tt.expectedNumFilterCalled[cycle] {
|
||||
t.Errorf("cycle %d: got NumFilterCalled=%d, want %d", cycle, fakePlugin.NumFilterCalled-prevNumFilterCalled, tt.expectedNumFilterCalled[cycle])
|
||||
}
|
||||
prevNumFilterCalled = fakePlugin.NumFilterCalled
|
||||
if diff := cmp.Diff(tt.expected[cycle], got, cmp.AllowUnexported(candidate{})); diff != "" {
|
||||
if diff := cmp.Diff(tt.expected[cycle], candidates, cmp.AllowUnexported(candidate{})); diff != "" {
|
||||
t.Errorf("cycle %d: unexpected candidates (-want, +got): %s", cycle, diff)
|
||||
}
|
||||
}
|
||||
@ -1264,10 +1279,23 @@ func TestSelectBestCandidate(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()}
|
||||
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||
candidates, _ := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates)
|
||||
s := SelectCandidate(candidates)
|
||||
pl := &DefaultPreemption{
|
||||
fh: fwk,
|
||||
podLister: informerFactory.Core().V1().Pods().Lister(),
|
||||
pdbLister: getPDBLister(informerFactory, true),
|
||||
args: *getDefaultDefaultPreemptionArgs(),
|
||||
}
|
||||
pe := preemption.Evaluator{
|
||||
PluginName: names.DefaultPreemption,
|
||||
Handler: pl.fh,
|
||||
PodLister: pl.podLister,
|
||||
PdbLister: pl.pdbLister,
|
||||
State: state,
|
||||
Interface: pl,
|
||||
}
|
||||
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||
candidates, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates)
|
||||
s := pe.SelectCandidate(candidates)
|
||||
if s == nil || len(s.Name()) == 0 {
|
||||
return
|
||||
}
|
||||
@ -1334,124 +1362,23 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
|
||||
for _, n := range test.nodes {
|
||||
nodes = append(nodes, st.MakeNode().Name(n).Obj())
|
||||
}
|
||||
snapshot := internalcache.NewSnapshot(test.pods, nodes)
|
||||
if got := PodEligibleToPreemptOthers(test.pod, snapshot.NodeInfos(), test.nominatedNodeStatus); got != test.expected {
|
||||
registeredPlugins := []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
}
|
||||
f, err := st.NewFramework(registeredPlugins, "",
|
||||
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pl := DefaultPreemption{fh: f}
|
||||
if got := pl.PodEligibleToPreemptOthers(test.pod, test.nominatedNodeStatus); got != test.expected {
|
||||
t.Errorf("expected %t, got %t for pod: %s", test.expected, got, test.pod.Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodesWherePreemptionMightHelp(t *testing.T) {
|
||||
// Prepare 4 nodes names.
|
||||
nodeNames := []string{"node1", "node2", "node3", "node4"}
|
||||
tests := []struct {
|
||||
name string
|
||||
nodesStatuses framework.NodeToStatusMap
|
||||
expected sets.String // set of expected node names.
|
||||
}{
|
||||
{
|
||||
name: "No node should be attempted",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodelabel.ErrReasonPresenceViolated),
|
||||
},
|
||||
expected: sets.NewString(),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable),
|
||||
},
|
||||
expected: sets.NewString("node1", "node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "Mix of failed predicates works fine",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "Node condition errors should be considered unresolvable",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonNodeConflict)),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict)),
|
||||
},
|
||||
expected: sets.NewString("node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, ""),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, ""),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var nodeInfos []*framework.NodeInfo
|
||||
for _, name := range nodeNames {
|
||||
ni := framework.NewNodeInfo()
|
||||
ni.SetNode(st.MakeNode().Name(name).Obj())
|
||||
nodeInfos = append(nodeInfos, ni)
|
||||
}
|
||||
nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses)
|
||||
if len(tt.expected) != len(nodes) {
|
||||
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes)
|
||||
}
|
||||
for _, node := range nodes {
|
||||
name := node.Node().Name
|
||||
if _, found := tt.expected[name]; !found {
|
||||
t.Errorf("node %v is not expected.", name)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreempt(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@ -1673,14 +1600,23 @@ func TestPreempt(t *testing.T) {
|
||||
pdbLister: getPDBLister(informerFactory, true),
|
||||
args: *getDefaultDefaultPreemptionArgs(),
|
||||
}
|
||||
node, status := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
|
||||
if !status.IsSuccess() {
|
||||
|
||||
pe := preemption.Evaluator{
|
||||
PluginName: names.DefaultPreemption,
|
||||
Handler: pl.fh,
|
||||
PodLister: pl.podLister,
|
||||
PdbLister: pl.pdbLister,
|
||||
State: state,
|
||||
Interface: &pl,
|
||||
}
|
||||
res, status := pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
|
||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||
t.Errorf("unexpected error in preemption: %v", status.AsError())
|
||||
}
|
||||
if len(node) != 0 && node != test.expectedNode {
|
||||
t.Errorf("expected node: %v, got: %v", test.expectedNode, node)
|
||||
if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode {
|
||||
t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName)
|
||||
}
|
||||
if len(node) == 0 && len(test.expectedNode) != 0 {
|
||||
if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 {
|
||||
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
|
||||
}
|
||||
if len(deletedPodNames) != len(test.expectedPods) {
|
||||
@ -1698,7 +1634,9 @@ func TestPreempt(t *testing.T) {
|
||||
t.Errorf("pod %v is not expected to be a victim.", victimName)
|
||||
}
|
||||
}
|
||||
test.pod.Status.NominatedNodeName = node
|
||||
if res != nil {
|
||||
test.pod.Status.NominatedNodeName = res.NominatedNodeName
|
||||
}
|
||||
|
||||
// Manually set the deleted Pods' deletionTimestamp to non-nil.
|
||||
for _, pod := range test.pods {
|
||||
@ -1710,12 +1648,12 @@ func TestPreempt(t *testing.T) {
|
||||
}
|
||||
|
||||
// Call preempt again and make sure it doesn't preempt any more pods.
|
||||
node, status = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
|
||||
if !status.IsSuccess() {
|
||||
res, status = pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
|
||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||
t.Errorf("unexpected error in preemption: %v", status.AsError())
|
||||
}
|
||||
if len(node) != 0 && len(deletedPodNames) > 0 {
|
||||
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
|
||||
if res != nil && len(deletedPodNames) > 0 {
|
||||
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
574
pkg/scheduler/framework/preemption/preemption.go
Normal file
574
pkg/scheduler/framework/preemption/preemption.go
Normal file
@ -0,0 +1,574 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package preemption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
policylisters "k8s.io/client-go/listers/policy/v1"
|
||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
||||
"k8s.io/klog/v2"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
// Candidate represents a nominated node on which the preemptor can be scheduled,
|
||||
// along with the list of victims that should be evicted for the preemptor to fit the node.
|
||||
type Candidate interface {
|
||||
// Victims wraps a list of to-be-preempted Pods and the number of PDB violation.
|
||||
Victims() *extenderv1.Victims
|
||||
// Name returns the target node name where the preemptor gets nominated to run.
|
||||
Name() string
|
||||
}
|
||||
|
||||
type candidate struct {
|
||||
victims *extenderv1.Victims
|
||||
name string
|
||||
}
|
||||
|
||||
// Victims returns s.victims.
|
||||
func (s *candidate) Victims() *extenderv1.Victims {
|
||||
return s.victims
|
||||
}
|
||||
|
||||
// Name returns s.name.
|
||||
func (s *candidate) Name() string {
|
||||
return s.name
|
||||
}
|
||||
|
||||
type candidateList struct {
|
||||
idx int32
|
||||
items []Candidate
|
||||
}
|
||||
|
||||
func newCandidateList(size int32) *candidateList {
|
||||
return &candidateList{idx: -1, items: make([]Candidate, size)}
|
||||
}
|
||||
|
||||
// add adds a new candidate to the internal array atomically.
|
||||
func (cl *candidateList) add(c *candidate) {
|
||||
if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) {
|
||||
cl.items[idx] = c
|
||||
}
|
||||
}
|
||||
|
||||
// size returns the number of candidate stored. Note that some add() operations
|
||||
// might still be executing when this is called, so care must be taken to
|
||||
// ensure that all add() operations complete before accessing the elements of
|
||||
// the list.
|
||||
func (cl *candidateList) size() int32 {
|
||||
n := atomic.LoadInt32(&cl.idx) + 1
|
||||
if n >= int32(len(cl.items)) {
|
||||
n = int32(len(cl.items))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// get returns the internal candidate array. This function is NOT atomic and
|
||||
// assumes that all add() operations have been completed.
|
||||
func (cl *candidateList) get() []Candidate {
|
||||
return cl.items[:cl.size()]
|
||||
}
|
||||
|
||||
// Interface is expected to be implemented by different preemption plugins as all those member
|
||||
// methods might have different behavior compared with the default preemption.
|
||||
type Interface interface {
|
||||
// GetOffsetAndNumCandidates chooses a random offset and calculates the number of candidates that should be
|
||||
// shortlisted for dry running preemption.
|
||||
GetOffsetAndNumCandidates(nodes int32) (int32, int32)
|
||||
// CandidatesToVictimsMap builds a map from the target node to a list of to-be-preempted Pods and the number of PDB violation.
|
||||
CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims
|
||||
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
||||
// for preempting other pods or not.
|
||||
PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool
|
||||
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
|
||||
// for "pod" to be scheduled.
|
||||
// Note that both `state` and `nodeInfo` are deep copied.
|
||||
SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
|
||||
pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
|
||||
}
|
||||
|
||||
type Evaluator struct {
|
||||
PluginName string
|
||||
Handler framework.Handle
|
||||
PodLister corelisters.PodLister
|
||||
PdbLister policylisters.PodDisruptionBudgetLister
|
||||
State *framework.CycleState
|
||||
Interface
|
||||
}
|
||||
|
||||
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
|
||||
|
||||
// 0) Fetch the latest version of <pod>.
|
||||
// It's safe to directly fetch pod here. Because the informer cache has already been
|
||||
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
|
||||
// However, tests may need to manually initialize the shared pod informer.
|
||||
podNamespace, podName := pod.Namespace, pod.Name
|
||||
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
|
||||
return nil, framework.AsStatus(err)
|
||||
}
|
||||
|
||||
// 1) Ensure the preemptor is eligible to preempt other pods.
|
||||
if !ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]) {
|
||||
klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod))
|
||||
return nil, framework.NewStatus(framework.Unschedulable)
|
||||
}
|
||||
|
||||
// 2) Find all preemption candidates.
|
||||
candidates, nodeToStatusMap, status := ev.findCandidates(ctx, pod, m)
|
||||
if !status.IsSuccess() {
|
||||
return nil, status
|
||||
}
|
||||
|
||||
// Return a FitError only when there are no candidates that fit the pod.
|
||||
if len(candidates) == 0 {
|
||||
fitError := &framework.FitError{
|
||||
Pod: pod,
|
||||
NumAllNodes: len(nodeToStatusMap),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: nodeToStatusMap,
|
||||
// Leave FailedPlugins as nil as it won't be used on moving Pods.
|
||||
},
|
||||
}
|
||||
return nil, framework.NewStatus(framework.Unschedulable, fitError.Error())
|
||||
}
|
||||
|
||||
// 3) Interact with registered Extenders to filter out some candidates if needed.
|
||||
candidates, status = ev.callExtenders(pod, candidates)
|
||||
if !status.IsSuccess() {
|
||||
return nil, status
|
||||
}
|
||||
|
||||
// 4) Find the best candidate.
|
||||
bestCandidate := ev.SelectCandidate(candidates)
|
||||
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
|
||||
return nil, framework.NewStatus(framework.Unschedulable)
|
||||
}
|
||||
|
||||
// 5) Perform preparation work before nominating the selected candidate.
|
||||
if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
|
||||
return nil, status
|
||||
}
|
||||
|
||||
return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success)
|
||||
}
|
||||
|
||||
// FindCandidates calculates a slice of preemption candidates.
|
||||
// Each candidate is executable to make the given <pod> schedulable.
|
||||
func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {
|
||||
allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
|
||||
if err != nil {
|
||||
return nil, nil, framework.AsStatus(err)
|
||||
}
|
||||
if len(allNodes) == 0 {
|
||||
return nil, nil, framework.NewStatus(framework.Error, "no nodes available")
|
||||
}
|
||||
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
|
||||
if len(potentialNodes) == 0 {
|
||||
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
|
||||
// In this case, we should clean-up any existing nominated node name of the pod.
|
||||
if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil {
|
||||
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))
|
||||
// We do not return as this error is not critical.
|
||||
}
|
||||
return nil, unschedulableNodeStatus, nil
|
||||
}
|
||||
|
||||
pdbs, err := getPodDisruptionBudgets(ev.PdbLister)
|
||||
if err != nil {
|
||||
return nil, nil, framework.AsStatus(err)
|
||||
}
|
||||
|
||||
offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
|
||||
if klog.V(5).Enabled() {
|
||||
var sample []string
|
||||
for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {
|
||||
sample = append(sample, potentialNodes[i].Node().Name)
|
||||
}
|
||||
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
|
||||
}
|
||||
candidates, nodeStatuses := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
|
||||
for node, status := range unschedulableNodeStatus {
|
||||
nodeStatuses[node] = status
|
||||
}
|
||||
return candidates, nodeStatuses, nil
|
||||
}
|
||||
|
||||
// callExtenders calls given <extenders> to select the list of feasible candidates.
|
||||
// We will only check <candidates> with extenders that support preemption.
|
||||
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||
func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) {
|
||||
extenders := ev.Handler.Extenders()
|
||||
nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos()
|
||||
if len(extenders) == 0 {
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
// Migrate candidate slice to victimsMap to adapt to the Extender interface.
|
||||
// It's only applicable for candidate slice that have unique nominated node name.
|
||||
victimsMap := ev.CandidatesToVictimsMap(candidates)
|
||||
if len(victimsMap) == 0 {
|
||||
return candidates, nil
|
||||
}
|
||||
for _, extender := range extenders {
|
||||
if !extender.SupportsPreemption() || !extender.IsInterested(pod) {
|
||||
continue
|
||||
}
|
||||
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
|
||||
if err != nil {
|
||||
if extender.IsIgnorable() {
|
||||
klog.InfoS("Skipping extender as it returned error and has ignorable flag set",
|
||||
"extender", extender, "err", err)
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(err)
|
||||
}
|
||||
// Check if the returned victims are valid.
|
||||
for nodeName, victims := range nodeNameToVictims {
|
||||
if victims == nil || len(victims.Pods) == 0 {
|
||||
if extender.IsIgnorable() {
|
||||
delete(nodeNameToVictims, nodeName)
|
||||
klog.InfoS("Ignoring node without victims", "node", nodeName)
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName))
|
||||
}
|
||||
}
|
||||
|
||||
// Replace victimsMap with new result after preemption. So the
|
||||
// rest of extenders can continue use it as parameter.
|
||||
victimsMap = nodeNameToVictims
|
||||
|
||||
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
||||
if len(victimsMap) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var newCandidates []Candidate
|
||||
for nodeName := range victimsMap {
|
||||
newCandidates = append(newCandidates, &candidate{
|
||||
victims: victimsMap[nodeName],
|
||||
name: nodeName,
|
||||
})
|
||||
}
|
||||
return newCandidates, nil
|
||||
}
|
||||
|
||||
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
|
||||
// NOTE: This method is exported for easier testing in default preemption.
|
||||
func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(candidates) == 1 {
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
victimsMap := ev.CandidatesToVictimsMap(candidates)
|
||||
candidateNode := pickOneNodeForPreemption(victimsMap)
|
||||
|
||||
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
|
||||
// preemption plugins that exercise different candidates on the same nominated node.
|
||||
if victims := victimsMap[candidateNode]; victims != nil {
|
||||
return &candidate{
|
||||
victims: victims,
|
||||
name: candidateNode,
|
||||
}
|
||||
}
|
||||
|
||||
// We shouldn't reach here.
|
||||
klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates)
|
||||
// To not break the whole flow, return the first candidate.
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
// prepareCandidate does some preparation work before nominating the selected candidate:
|
||||
// - Evict the victim pods
|
||||
// - Reject the victim pods if they are in waitingPod map
|
||||
// - Clear the low-priority pods' nominatedNodeName status if needed
|
||||
func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
|
||||
fh := ev.Handler
|
||||
cs := ev.Handler.ClientSet()
|
||||
for _, victim := range c.Victims().Pods {
|
||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
|
||||
// Otherwise we should delete the victim.
|
||||
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||
waitingPod.Reject(pluginName, "preempted")
|
||||
} else if err := util.DeletePod(cs, victim); err != nil {
|
||||
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
||||
pod.Namespace, pod.Name, c.Name())
|
||||
}
|
||||
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
|
||||
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
|
||||
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
||||
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")
|
||||
// We do not return as this error is not critical.
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
||||
// that may be satisfied by removing pods from the node.
|
||||
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) {
|
||||
var potentialNodes []*framework.NodeInfo
|
||||
nodeStatuses := make(framework.NodeToStatusMap)
|
||||
for _, node := range nodes {
|
||||
name := node.Node().Name
|
||||
// We rely on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
||||
// to determine whether preemption may help or not on the node.
|
||||
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
||||
nodeStatuses[node.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling")
|
||||
continue
|
||||
}
|
||||
potentialNodes = append(potentialNodes, node)
|
||||
}
|
||||
return potentialNodes, nodeStatuses
|
||||
}
|
||||
|
||||
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
|
||||
if pdbLister != nil {
|
||||
return pdbLister.List(labels.Everything())
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
|
||||
// pods in each map entry are ordered by decreasing priority.
|
||||
// It picks a node based on the following criteria:
|
||||
// 1. A node with minimum number of PDB violations.
|
||||
// 2. A node with minimum highest priority victim is picked.
|
||||
// 3. Ties are broken by sum of priorities of all victims.
|
||||
// 4. If there are still ties, node with the minimum number of victims is picked.
|
||||
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
||||
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||
// allocation and garbage collection time.
|
||||
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
|
||||
if len(nodesToVictims) == 0 {
|
||||
return ""
|
||||
}
|
||||
minNumPDBViolatingPods := int64(math.MaxInt32)
|
||||
var minNodes1 []string
|
||||
lenNodes1 := 0
|
||||
for node, victims := range nodesToVictims {
|
||||
numPDBViolatingPods := victims.NumPDBViolations
|
||||
if numPDBViolatingPods < minNumPDBViolatingPods {
|
||||
minNumPDBViolatingPods = numPDBViolatingPods
|
||||
minNodes1 = nil
|
||||
lenNodes1 = 0
|
||||
}
|
||||
if numPDBViolatingPods == minNumPDBViolatingPods {
|
||||
minNodes1 = append(minNodes1, node)
|
||||
lenNodes1++
|
||||
}
|
||||
}
|
||||
if lenNodes1 == 1 {
|
||||
return minNodes1[0]
|
||||
}
|
||||
|
||||
// There are more than one node with minimum number PDB violating pods. Find
|
||||
// the one with minimum highest priority victim.
|
||||
minHighestPriority := int32(math.MaxInt32)
|
||||
var minNodes2 = make([]string, lenNodes1)
|
||||
lenNodes2 := 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
victims := nodesToVictims[node]
|
||||
// highestPodPriority is the highest priority among the victims on this node.
|
||||
highestPodPriority := corev1helpers.PodPriority(victims.Pods[0])
|
||||
if highestPodPriority < minHighestPriority {
|
||||
minHighestPriority = highestPodPriority
|
||||
lenNodes2 = 0
|
||||
}
|
||||
if highestPodPriority == minHighestPriority {
|
||||
minNodes2[lenNodes2] = node
|
||||
lenNodes2++
|
||||
}
|
||||
}
|
||||
if lenNodes2 == 1 {
|
||||
return minNodes2[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with minimum highest priority victim. Find the
|
||||
// smallest sum of priorities.
|
||||
minSumPriorities := int64(math.MaxInt64)
|
||||
lenNodes1 = 0
|
||||
for i := 0; i < lenNodes2; i++ {
|
||||
var sumPriorities int64
|
||||
node := minNodes2[i]
|
||||
for _, pod := range nodesToVictims[node].Pods {
|
||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||
// needed so that a node with a few pods with negative priority is not
|
||||
// picked over a node with a smaller number of pods with the same negative
|
||||
// priority (and similar scenarios).
|
||||
sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
|
||||
}
|
||||
if sumPriorities < minSumPriorities {
|
||||
minSumPriorities = sumPriorities
|
||||
lenNodes1 = 0
|
||||
}
|
||||
if sumPriorities == minSumPriorities {
|
||||
minNodes1[lenNodes1] = node
|
||||
lenNodes1++
|
||||
}
|
||||
}
|
||||
if lenNodes1 == 1 {
|
||||
return minNodes1[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with minimum highest priority victim and sum of priorities.
|
||||
// Find one with the minimum number of pods.
|
||||
minNumPods := math.MaxInt32
|
||||
lenNodes2 = 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
numPods := len(nodesToVictims[node].Pods)
|
||||
if numPods < minNumPods {
|
||||
minNumPods = numPods
|
||||
lenNodes2 = 0
|
||||
}
|
||||
if numPods == minNumPods {
|
||||
minNodes2[lenNodes2] = node
|
||||
lenNodes2++
|
||||
}
|
||||
}
|
||||
if lenNodes2 == 1 {
|
||||
return minNodes2[0]
|
||||
}
|
||||
|
||||
// There are a few nodes with same number of pods.
|
||||
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
|
||||
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
|
||||
if latestStartTime == nil {
|
||||
// If the earliest start time of all pods on the 1st node is nil, just return it,
|
||||
// which is not expected to happen.
|
||||
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", minNodes2[0])
|
||||
return minNodes2[0]
|
||||
}
|
||||
nodeToReturn := minNodes2[0]
|
||||
for i := 1; i < lenNodes2; i++ {
|
||||
node := minNodes2[i]
|
||||
// Get earliest start time of all pods on the current node.
|
||||
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
|
||||
if earliestStartTimeOnNode == nil {
|
||||
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", node)
|
||||
continue
|
||||
}
|
||||
if earliestStartTimeOnNode.After(latestStartTime.Time) {
|
||||
latestStartTime = earliestStartTimeOnNode
|
||||
nodeToReturn = node
|
||||
}
|
||||
}
|
||||
|
||||
return nodeToReturn
|
||||
}
|
||||
|
||||
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// priority of the given "pod" and are nominated to run on the given node.
|
||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||
// and return those that no longer fit, but that would require lots of
|
||||
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
||||
// worth the complexity, especially because we generally expect to have a very
|
||||
// small number of nominated pods per node.
|
||||
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||
podInfos := pn.NominatedPodsForNode(nodeName)
|
||||
|
||||
if len(podInfos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var lowerPriorityPods []*v1.Pod
|
||||
podPriority := corev1helpers.PodPriority(pod)
|
||||
for _, pi := range podInfos {
|
||||
if corev1helpers.PodPriority(pi.Pod) < podPriority {
|
||||
lowerPriorityPods = append(lowerPriorityPods, pi.Pod)
|
||||
}
|
||||
}
|
||||
return lowerPriorityPods
|
||||
}
|
||||
|
||||
// DryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
|
||||
// returns preemption candidates and a map indicating filtered nodes statuses.
|
||||
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
|
||||
// candidates, ones that do not violate PDB are preferred over ones that do.
|
||||
// NOTE: This method is exported for easier testing in default preemption.
|
||||
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
|
||||
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {
|
||||
fh := ev.Handler
|
||||
nonViolatingCandidates := newCandidateList(numCandidates)
|
||||
violatingCandidates := newCandidateList(numCandidates)
|
||||
parallelCtx, cancel := context.WithCancel(ctx)
|
||||
nodeStatuses := make(framework.NodeToStatusMap)
|
||||
var statusesLock sync.Mutex
|
||||
checkNode := func(i int) {
|
||||
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
|
||||
stateCopy := ev.State.Clone()
|
||||
pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
|
||||
if status.IsSuccess() && len(pods) != 0 {
|
||||
victims := extenderv1.Victims{
|
||||
Pods: pods,
|
||||
NumPDBViolations: int64(numPDBViolations),
|
||||
}
|
||||
c := &candidate{
|
||||
victims: &victims,
|
||||
name: nodeInfoCopy.Node().Name,
|
||||
}
|
||||
if numPDBViolations == 0 {
|
||||
nonViolatingCandidates.add(c)
|
||||
} else {
|
||||
violatingCandidates.add(c)
|
||||
}
|
||||
nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
|
||||
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
|
||||
cancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
if status.IsSuccess() && len(pods) == 0 {
|
||||
status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
|
||||
}
|
||||
statusesLock.Lock()
|
||||
nodeStatuses[nodeInfoCopy.Node().Name] = status
|
||||
statusesLock.Unlock()
|
||||
}
|
||||
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
|
||||
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
|
||||
}
|
334
pkg/scheduler/framework/preemption/preemption_test.go
Normal file
334
pkg/scheduler/framework/preemption/preemption_test.go
Normal file
@ -0,0 +1,334 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package preemption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
midPriority, highPriority = int32(100), int32(1000)
|
||||
|
||||
veryLargeRes = map[v1.ResourceName]string{
|
||||
v1.ResourceCPU: "500m",
|
||||
v1.ResourceMemory: "500",
|
||||
}
|
||||
)
|
||||
|
||||
type FakePostFilterPlugin struct {
|
||||
numViolatingVictim int
|
||||
}
|
||||
|
||||
func (pl *FakePostFilterPlugin) SelectVictimsOnNode(
|
||||
ctx context.Context, state *framework.CycleState, pod *v1.Pod,
|
||||
nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, staus *framework.Status) {
|
||||
return append(victims, nodeInfo.Pods[0].Pod), pl.numViolatingVictim, nil
|
||||
}
|
||||
|
||||
func (pl *FakePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
|
||||
return 0, nodes
|
||||
}
|
||||
|
||||
func (pl *FakePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func TestNodesWherePreemptionMightHelp(t *testing.T) {
|
||||
// Prepare 4 nodes names.
|
||||
nodeNames := []string{"node1", "node2", "node3", "node4"}
|
||||
tests := []struct {
|
||||
name string
|
||||
nodesStatuses framework.NodeToStatusMap
|
||||
expected sets.String // set of expected node names.
|
||||
}{
|
||||
{
|
||||
name: "No node should be attempted",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodelabel.ErrReasonPresenceViolated),
|
||||
},
|
||||
expected: sets.NewString(),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable),
|
||||
},
|
||||
expected: sets.NewString("node1", "node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "Mix of failed predicates works fine",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "Node condition errors should be considered unresolvable",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition),
|
||||
},
|
||||
expected: sets.NewString("node2", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonNodeConflict)),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict)),
|
||||
},
|
||||
expected: sets.NewString("node4"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3", "node4"),
|
||||
},
|
||||
{
|
||||
name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, ""),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3"),
|
||||
},
|
||||
{
|
||||
name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label",
|
||||
nodesStatuses: framework.NodeToStatusMap{
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, ""),
|
||||
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
|
||||
},
|
||||
expected: sets.NewString("node1", "node3"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var nodeInfos []*framework.NodeInfo
|
||||
for _, name := range nodeNames {
|
||||
ni := framework.NewNodeInfo()
|
||||
ni.SetNode(st.MakeNode().Name(name).Obj())
|
||||
nodeInfos = append(nodeInfos, ni)
|
||||
}
|
||||
nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses)
|
||||
if len(tt.expected) != len(nodes) {
|
||||
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes)
|
||||
}
|
||||
for _, node := range nodes {
|
||||
name := node.Node().Name
|
||||
if _, found := tt.expected[name]; !found {
|
||||
t.Errorf("node %v is not expected.", name)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDryRunPreemption(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
nodes []*v1.Node
|
||||
testPods []*v1.Pod
|
||||
initPods []*v1.Pod
|
||||
numViolatingVictim int
|
||||
expected [][]Candidate
|
||||
}{
|
||||
{
|
||||
name: "no pdb violation",
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(),
|
||||
st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(),
|
||||
},
|
||||
testPods: []*v1.Pod{
|
||||
st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(),
|
||||
},
|
||||
initPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
|
||||
},
|
||||
expected: [][]Candidate{
|
||||
{
|
||||
&candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()},
|
||||
},
|
||||
name: "node1",
|
||||
},
|
||||
&candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()},
|
||||
},
|
||||
name: "node2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "pdb violation on each node",
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(),
|
||||
st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(),
|
||||
},
|
||||
testPods: []*v1.Pod{
|
||||
st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(),
|
||||
},
|
||||
initPods: []*v1.Pod{
|
||||
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
|
||||
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
|
||||
},
|
||||
numViolatingVictim: 1,
|
||||
expected: [][]Candidate{
|
||||
{
|
||||
&candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()},
|
||||
NumPDBViolations: 1,
|
||||
},
|
||||
name: "node1",
|
||||
},
|
||||
&candidate{
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()},
|
||||
NumPDBViolations: 1,
|
||||
},
|
||||
name: "node2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
registeredPlugins := append([]st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
)
|
||||
var objs []runtime.Object
|
||||
for _, p := range append(tt.testPods, tt.initPods...) {
|
||||
objs = append(objs, p)
|
||||
}
|
||||
for _, n := range tt.nodes {
|
||||
objs = append(objs, n)
|
||||
}
|
||||
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
|
||||
parallelism := parallelize.DefaultParallelism
|
||||
fwk, err := st.NewFramework(
|
||||
registeredPlugins, "",
|
||||
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
|
||||
frameworkruntime.WithInformerFactory(informerFactory),
|
||||
frameworkruntime.WithParallelism(parallelism),
|
||||
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
snapshot := internalcache.NewSnapshot(tt.initPods, tt.nodes)
|
||||
nodeInfos, err := snapshot.NodeInfos().List()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
sort.Slice(nodeInfos, func(i, j int) bool {
|
||||
return nodeInfos[i].Node().Name < nodeInfos[j].Node().Name
|
||||
})
|
||||
|
||||
fakePostPlugin := &FakePostFilterPlugin{numViolatingVictim: tt.numViolatingVictim}
|
||||
|
||||
for cycle, pod := range tt.testPods {
|
||||
state := framework.NewCycleState()
|
||||
pe := Evaluator{
|
||||
PluginName: "FakePostFilter",
|
||||
Handler: fwk,
|
||||
Interface: fakePostPlugin,
|
||||
State: state,
|
||||
}
|
||||
got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
|
||||
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
|
||||
for i := range got {
|
||||
victims := got[i].Victims().Pods
|
||||
sort.Slice(victims, func(i, j int) bool {
|
||||
return victims[i].Name < victims[j].Name
|
||||
})
|
||||
}
|
||||
sort.Slice(got, func(i, j int) bool {
|
||||
return got[i].Name() < got[j].Name()
|
||||
})
|
||||
if diff := cmp.Diff(tt.expected[cycle], got, cmp.AllowUnexported(candidate{})); diff != "" {
|
||||
t.Errorf("cycle %d: unexpected candidates (-want, +got): %s", cycle, diff)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user