Merge pull request #56178 from bsalamat/pdb

Automatic merge from submit-queue (batch tested with PRs 55952, 49112, 55450, 56178, 56151). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add PodDisruptionBudget support in pod preemption

**What this PR does / why we need it**:
This PR adds the logic to make scheduler preemption aware of PodDisruptionBudget. Preemption tries to avoid preempting pods whose PDBs are violated by preemption. If preemption does not find any other pods to preempt, it will preempt pods despite violating their PDBs.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #53913

**Special notes for your reviewer**:

**Release note**:

```release-note
Add PodDisruptionBudget support during pod preemption
```

ref/ #47604

/sig scheduling
This commit is contained in:
Kubernetes Submit Queue 2017-11-22 21:48:48 -08:00 committed by GitHub
commit 82c88982c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 440 additions and 77 deletions

View File

@ -57,7 +57,9 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",

View File

@ -26,6 +26,9 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/errors"
utiltrace "k8s.io/apiserver/pkg/util/trace" utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -47,6 +50,11 @@ type FitError struct {
FailedPredicates FailedPredicateMap FailedPredicates FailedPredicateMap
} }
type Victims struct {
pods []*v1.Pod
numPDBViolations int
}
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const ( const (
@ -211,29 +219,33 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// In this case, we should clean-up any existing nominated node name of the pod. // In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil return nil, nil, []*v1.Pod{pod}, nil
} }
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue) pdbs, err := g.cache.ListPDBs(labels.Everything())
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
for len(nodeToPods) > 0 { nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs)
node := pickOneNodeForPreemption(nodeToPods) if err != nil {
return nil, nil, nil, err
}
for len(nodeToVictims) > 0 {
node := pickOneNodeForPreemption(nodeToVictims)
if node == nil { if node == nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders) passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil { if passes && pErr == nil {
// Lower priority pods nominated to run on this node, may no longer fit on // Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their // this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It // nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name) nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
return node, nodeToPods[node], nominatedPods, err return node, nodeToVictims[node].pods, nominatedPods, err
} }
if pErr != nil { if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
} }
// Remove the node from the map and try to pick a different node. // Remove the node from the map and try to pick a different node.
delete(nodeToPods, node) delete(nodeToVictims, node)
} }
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -627,51 +639,66 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes // pickOneNodeForPreemption chooses one node among the given nodes. It assumes
// pods in each map entry are ordered by decreasing priority. // pods in each map entry are ordered by decreasing priority.
// It picks a node based on the following criteria: // It picks a node based on the following criteria:
// 1. A node with minimum highest priority victim is picked. // 1. A node with minimum number of PDB violations.
// 2. Ties are broken by sum of priorities of all victims. // 2. A node with minimum highest priority victim is picked.
// 3. If there are still ties, node with the minimum number of victims is picked. // 3. Ties are broken by sum of priorities of all victims.
// 4. If there are still ties, the first such node is picked (sort of randomly). // 4. If there are still ties, node with the minimum number of victims is picked.
//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time. // 5. If there are still ties, the first such node is picked (sort of randomly).
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { //TODO(bsalamat): Try to reuse the "min*Nodes" slices in order to save GC time.
type nodeScore struct { func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
node *v1.Node if len(nodesToVictims) == 0 {
highestPriority int32
sumPriorities int64
numPods int
}
if len(nodesToPods) == 0 {
return nil return nil
} }
minHighestPriority := int32(math.MaxInt32) minNumPDBViolatingPods := math.MaxInt32
minPriorityScores := []*nodeScore{} var minPDBViolatingNodes []*v1.Node
for node, pods := range nodesToPods { for node, victims := range nodesToVictims {
if len(pods) == 0 { if len(victims.pods) == 0 {
// We found a node that doesn't need any preemption. Return it! // We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between // This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that // the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption. // preemption logic tries to find nodes for preemption.
return node return node
} }
numPDBViolatingPods := victims.numPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minPDBViolatingNodes = nil
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minPDBViolatingNodes = append(minPDBViolatingNodes, node)
}
}
if len(minPDBViolatingNodes) == 1 {
return minPDBViolatingNodes[0]
}
// There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32)
var minPriorityNodes []*v1.Node
for _, node := range minPDBViolatingNodes {
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node. // highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(pods[0]) highestPodPriority := util.GetPodPriority(victims.pods[0])
if highestPodPriority < minHighestPriority { if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority minHighestPriority = highestPodPriority
minPriorityScores = nil minPriorityNodes = nil
} }
if highestPodPriority == minHighestPriority { if highestPodPriority == minHighestPriority {
minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)}) minPriorityNodes = append(minPriorityNodes, node)
} }
} }
if len(minPriorityScores) == 1 { if len(minPriorityNodes) == 1 {
return minPriorityScores[0].node return minPriorityNodes[0]
} }
// There are a few nodes with minimum highest priority victim. Find the // There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities. // smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64) minSumPriorities := int64(math.MaxInt64)
minSumPriorityScores := []*nodeScore{} var minSumPriorityNodes []*v1.Node
for _, nodeScore := range minPriorityScores { for _, node := range minPriorityNodes {
var sumPriorities int64 var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.node] { for _, pod := range nodesToVictims[node].pods {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not // needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative // picked over a node with a smaller number of pods with the same negative
@ -680,33 +707,34 @@ func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
} }
if sumPriorities < minSumPriorities { if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities minSumPriorities = sumPriorities
minSumPriorityScores = nil minSumPriorityNodes = nil
} }
nodeScore.sumPriorities = sumPriorities
if sumPriorities == minSumPriorities { if sumPriorities == minSumPriorities {
minSumPriorityScores = append(minSumPriorityScores, nodeScore) minSumPriorityNodes = append(minSumPriorityNodes, node)
} }
} }
if len(minSumPriorityScores) == 1 { if len(minSumPriorityNodes) == 1 {
return minSumPriorityScores[0].node return minSumPriorityNodes[0]
} }
// There are a few nodes with minimum highest priority victim and sum of priorities. // There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods. // Find one with the minimum number of pods.
minNumPods := math.MaxInt32 minNumPods := math.MaxInt32
minNumPodScores := []*nodeScore{} var minNumPodNodes []*v1.Node
for _, nodeScore := range minSumPriorityScores { for _, node := range minSumPriorityNodes {
if nodeScore.numPods < minNumPods { numPods := len(nodesToVictims[node].pods)
minNumPods = nodeScore.numPods if numPods < minNumPods {
minNumPodScores = nil minNumPods = numPods
minNumPodNodes = nil
} }
if nodeScore.numPods == minNumPods { if numPods == minNumPods {
minNumPodScores = append(minNumPodScores, nodeScore) minNumPodNodes = append(minNumPodNodes, node)
} }
} }
// At this point, even if there are more than one node with the same score, // At this point, even if there are more than one node with the same score,
// return the first one. // return the first one.
if len(minNumPodScores) > 0 { if len(minNumPodNodes) > 0 {
return minNumPodScores[0].node return minNumPodNodes[0]
} }
glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!") glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil return nil
@ -720,9 +748,10 @@ func selectNodesForPreemption(pod *v1.Pod,
predicates map[string]algorithm.FitPredicate, predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer, metadataProducer algorithm.PredicateMetadataProducer,
queue SchedulingQueue, queue SchedulingQueue,
) (map[*v1.Node][]*v1.Pod, error) { pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*Victims, error) {
nodeNameToPods := map[*v1.Node][]*v1.Pod{} nodeNameToVictims := map[*v1.Node]*Victims{}
var resultLock sync.Mutex var resultLock sync.Mutex
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
@ -733,15 +762,19 @@ func selectNodesForPreemption(pod *v1.Pod,
if meta != nil { if meta != nil {
metaCopy = meta.ShallowCopy() metaCopy = meta.ShallowCopy()
} }
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue) pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits { if fits {
resultLock.Lock() resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods victims := Victims{
pods: pods,
numPDBViolations: numPDBViolations,
}
nodeNameToVictims[potentialNodes[i]] = &victims
resultLock.Unlock() resultLock.Unlock()
} }
} }
workqueue.Parallelize(16, len(potentialNodes), checkNode) workqueue.Parallelize(16, len(potentialNodes), checkNode)
return nodeNameToPods, nil return nodeNameToVictims, nil
} }
func nodePassesExtendersForPreemption( func nodePassesExtendersForPreemption(
@ -778,6 +811,45 @@ func nodePassesExtendersForPreemption(
return true, nil return true, nil
} }
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
// preempted.
// This function is stable and does not change the order of received pods. So, if it
// receives a sorted list, grouping will preserve the order of the input list.
func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
for _, obj := range pods {
pod := obj.(*v1.Pod)
pdbForPodIsViolated := false
// A pod with no labels will not match any PDB. So, no need to check.
if len(pod.Labels) != 0 {
for _, pdb := range pdbs {
if pdb.Namespace != pod.Namespace {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
continue
}
// A PDB with a nil or empty selector matches nothing.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
// We have found a matching PDB.
if pdb.Status.PodDisruptionsAllowed <= 0 {
pdbForPodIsViolated = true
break
}
}
}
if pdbForPodIsViolated {
violatingPods = append(violatingPods, pod)
} else {
nonViolatingPods = append(nonViolatingPods, pod)
}
}
return violatingPods, nonViolatingPods
}
// selectVictimsOnNode finds minimum set of pods on the given node that should // selectVictimsOnNode finds minimum set of pods on the given node that should
// be preempted in order to make enough room for "pod" to be scheduled. The // be preempted in order to make enough room for "pod" to be scheduled. The
// minimum set selected is subject to the constraint that a higher-priority pod // minimum set selected is subject to the constraint that a higher-priority pod
@ -785,19 +857,22 @@ func nodePassesExtendersForPreemption(
// to one another, not relative to the preemptor "pod"). // to one another, not relative to the preemptor "pod").
// The algorithm first checks if the pod can be scheduled on the node when all the // The algorithm first checks if the pod can be scheduled on the node when all the
// lower priority pods are gone. If so, it sorts all the lower priority pods by // lower priority pods are gone. If so, it sorts all the lower priority pods by
// their priority and starts from the highest priority one, tries to keep as // their priority and then puts them into two groups of those whose PodDisruptionBudget
// many of them as possible while checking that the "pod" can still fit on the node. // will be violated if preempted and other non-violating pods. Both groups are
// sorted by priority. It first tries to reprieve as many PDB violating pods as
// possible and then does them same for non-PDB-violating pods while checking
// that the "pod" can still fit on the node.
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled // NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of // due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node. // these predicates can be satisfied by removing more pods from the node.
// TODO(bsalamat): Add support for PodDisruptionBudget.
func selectVictimsOnNode( func selectVictimsOnNode(
pod *v1.Pod, pod *v1.Pod,
meta algorithm.PredicateMetadata, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate, fitPredicates map[string]algorithm.FitPredicate,
queue SchedulingQueue, queue SchedulingQueue,
) ([]*v1.Pod, bool) { pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone() nodeInfoCopy := nodeInfo.Clone()
@ -832,20 +907,34 @@ func selectVictimsOnNode(
if err != nil { if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
} }
return nil, false return nil, 0, false
} }
victims := []*v1.Pod{} var victims []*v1.Pod
// Try to reprieve as many pods as possible starting from the highest priority one. numViolatingVictim := 0
for _, p := range potentialVictims.Items { // Try to reprieve as many pods as possible. We first try to reprieve the PDB
lpp := p.(*v1.Pod) // violating victims and then other non-violating ones. In both cases, we start
addPod(lpp) // from the highest priority victims.
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits { violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
removePod(lpp) reprievePod := func(p *v1.Pod) bool {
victims = append(victims, lpp) addPod(p)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name) fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue)
if !fits {
removePod(p)
victims = append(victims, p)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
} }
} }
return victims, true // Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true
} }
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates

View File

@ -560,11 +560,11 @@ func TestZeroRequest(t *testing.T) {
} }
} }
func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string { func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string {
var output string var output string
for node, pods := range nodeToPods { for node, victims := range nodeToVictims {
output += node.Name + ": [" output += node.Name + ": ["
for _, pod := range pods { for _, pod := range victims.pods {
output += pod.Name + ", " output += pod.Name + ", "
} }
output += "]" output += "]"
@ -572,15 +572,15 @@ func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string {
return output return output
} }
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error { func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error {
if len(expected) == len(nodeToPods) { if len(expected) == len(nodeToPods) {
for k, pods := range nodeToPods { for k, victims := range nodeToPods {
if expPods, ok := expected[k.Name]; ok { if expPods, ok := expected[k.Name]; ok {
if len(pods) != len(expPods) { if len(victims.pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
} }
prevPriority := int32(math.MaxInt32) prevPriority := int32(math.MaxInt32)
for _, p := range pods { for _, p := range victims.pods {
// Check that pods are sorted by their priority. // Check that pods are sorted by their priority.
if *p.Spec.Priority > prevPriority { if *p.Spec.Priority > prevPriority {
return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k) return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k)
@ -591,11 +591,11 @@ func checkPreemptionVictims(testName string, expected map[string]map[string]bool
} }
} }
} else { } else {
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
} }
} }
} else { } else {
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
} }
return nil return nil
} }
@ -790,7 +790,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods)) test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
} }
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil) nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -947,7 +947,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)) nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
} }
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil) candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
node := pickOneNodeForPreemption(candidateNodes) node := pickOneNodeForPreemption(candidateNodes)
found := false found := false
for _, nodeName := range test.expected { for _, nodeName := range test.expected {

View File

@ -24,8 +24,11 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -495,3 +498,272 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("The nominated node name of the medium priority pod was not cleared: %v", err) t.Errorf("The nominated node name of the medium priority pod was not cleared: %v", err)
} }
} }
func mkMinAvailablePDB(name, namespace string, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget {
intMinAvailable := intstr.FromInt(minAvailable)
return &policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: policy.PodDisruptionBudgetSpec{
MinAvailable: &intMinAvailable,
Selector: &metav1.LabelSelector{MatchLabels: matchLabels},
},
}
}
// TestPDBInPreemption tests PodDisruptionBudget support in preemption.
func TestPDBInPreemption(t *testing.T) {
// Enable PodPriority feature gate.
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority))
// Initialize scheduler.
context := initTest(t, "preemption-pdb")
defer cleanupTest(t, context)
cs := context.clientSet
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)},
}
defaultNodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
}
type nodeConfig struct {
name string
res *v1.ResourceList
}
tests := []struct {
description string
nodes []*nodeConfig
pdbs []*policy.PodDisruptionBudget
existingPods []*v1.Pod
pod *v1.Pod
preemptedPodIndexes map[int]struct{}
}{
{
description: "A non-PDB violating pod is preempted despite its higher priority",
nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod2",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod3",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{2: {}},
},
{
description: "A node without any PDB violating pods is preferred for preemption",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod2",
Namespace: context.ns.Name,
Priority: &mediumPriority,
NodeName: "node-2",
Resources: defaultPodRes,
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{1: {}},
},
{
description: "A node with fewer PDB violating pods is preferred for preemption",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
{name: "node-3", res: defaultNodeRes},
},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo1": "bar"}),
mkMinAvailablePDB("pdb-2", context.ns.Name, 2, map[string]string{"foo2": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
Labels: map[string]string{"foo1": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod1",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
NodeName: "node-1",
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod2",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod2",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod4",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod5",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod6",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
},
}
for _, test := range tests {
for _, nodeConf := range test.nodes {
_, err := createNode(cs, nodeConf.name, nodeConf.res)
if err != nil {
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
}
}
// Create PDBs.
for _, pdb := range test.pdbs {
_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
}
// Wait for PDBs to show up in the scheduler's cache.
if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs) == len(test.pdbs), err
}); err != nil {
t.Fatalf("Not all PDBs were added to the cache: %v", err)
}
pods := make([]*v1.Pod, len(test.existingPods))
var err error
// Create and run existingPods.
for i, p := range test.existingPods {
if pods[i], err = runPausePod(cs, p); err != nil {
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name)
}
}
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}
// Cleanup
pods = append(pods, preemptor)
cleanupPods(cs, t, pods)
cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{})
cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
}
}