Add PDB support during pod preemption

This commit is contained in:
Bobby (Babak) Salamat 2017-11-21 13:29:16 -08:00
parent 1489d19443
commit 3d4ae31d91
3 changed files with 438 additions and 77 deletions

View File

@ -26,6 +26,9 @@ import (
"time"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/util/workqueue"
@ -46,6 +49,11 @@ type FitError struct {
FailedPredicates FailedPredicateMap
}
type Victims struct {
pods []*v1.Pod
numPDBViolations int
}
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const (
@ -209,29 +217,33 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue)
pdbs, err := g.cache.ListPDBs(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
for len(nodeToPods) > 0 {
node := pickOneNodeForPreemption(nodeToPods)
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
for len(nodeToVictims) > 0 {
node := pickOneNodeForPreemption(nodeToVictims)
if node == nil {
return nil, nil, nil, err
}
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil {
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
return node, nodeToPods[node], nominatedPods, err
return node, nodeToVictims[node].pods, nominatedPods, err
}
if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
}
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
delete(nodeToVictims, node)
}
return nil, nil, nil, err
}
@ -625,51 +637,66 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
// pods in each map entry are ordered by decreasing priority.
// It picks a node based on the following criteria:
// 1. A node with minimum highest priority victim is picked.
// 2. Ties are broken by sum of priorities of all victims.
// 3. If there are still ties, node with the minimum number of victims is picked.
// 4. If there are still ties, the first such node is picked (sort of randomly).
//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time.
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
type nodeScore struct {
node *v1.Node
highestPriority int32
sumPriorities int64
numPods int
}
if len(nodesToPods) == 0 {
// 1. A node with minimum number of PDB violations.
// 2. A node with minimum highest priority victim is picked.
// 3. Ties are broken by sum of priorities of all victims.
// 4. If there are still ties, node with the minimum number of victims is picked.
// 5. If there are still ties, the first such node is picked (sort of randomly).
//TODO(bsalamat): Try to reuse the "min*Nodes" slices in order to save GC time.
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
minHighestPriority := int32(math.MaxInt32)
minPriorityScores := []*nodeScore{}
for node, pods := range nodesToPods {
if len(pods) == 0 {
minNumPDBViolatingPods := math.MaxInt32
var minPDBViolatingNodes []*v1.Node
for node, victims := range nodesToVictims {
if len(victims.pods) == 0 {
// We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
return node
}
numPDBViolatingPods := victims.numPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minPDBViolatingNodes = nil
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minPDBViolatingNodes = append(minPDBViolatingNodes, node)
}
}
if len(minPDBViolatingNodes) == 1 {
return minPDBViolatingNodes[0]
}
// There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32)
var minPriorityNodes []*v1.Node
for _, node := range minPDBViolatingNodes {
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(pods[0])
highestPodPriority := util.GetPodPriority(victims.pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
minPriorityScores = nil
minPriorityNodes = nil
}
if highestPodPriority == minHighestPriority {
minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
minPriorityNodes = append(minPriorityNodes, node)
}
}
if len(minPriorityScores) == 1 {
return minPriorityScores[0].node
if len(minPriorityNodes) == 1 {
return minPriorityNodes[0]
}
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
minSumPriorityScores := []*nodeScore{}
for _, nodeScore := range minPriorityScores {
var minSumPriorityNodes []*v1.Node
for _, node := range minPriorityNodes {
var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.node] {
for _, pod := range nodesToVictims[node].pods {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
@ -678,33 +705,34 @@ func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
minSumPriorityScores = nil
minSumPriorityNodes = nil
}
nodeScore.sumPriorities = sumPriorities
if sumPriorities == minSumPriorities {
minSumPriorityScores = append(minSumPriorityScores, nodeScore)
minSumPriorityNodes = append(minSumPriorityNodes, node)
}
}
if len(minSumPriorityScores) == 1 {
return minSumPriorityScores[0].node
if len(minSumPriorityNodes) == 1 {
return minSumPriorityNodes[0]
}
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
minNumPodScores := []*nodeScore{}
for _, nodeScore := range minSumPriorityScores {
if nodeScore.numPods < minNumPods {
minNumPods = nodeScore.numPods
minNumPodScores = nil
var minNumPodNodes []*v1.Node
for _, node := range minSumPriorityNodes {
numPods := len(nodesToVictims[node].pods)
if numPods < minNumPods {
minNumPods = numPods
minNumPodNodes = nil
}
if nodeScore.numPods == minNumPods {
minNumPodScores = append(minNumPodScores, nodeScore)
if numPods == minNumPods {
minNumPodNodes = append(minNumPodNodes, node)
}
}
// At this point, even if there are more than one node with the same score,
// return the first one.
if len(minNumPodScores) > 0 {
return minNumPodScores[0].node
if len(minNumPodNodes) > 0 {
return minNumPodNodes[0]
}
glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
@ -718,9 +746,10 @@ func selectNodesForPreemption(pod *v1.Pod,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue SchedulingQueue,
) (map[*v1.Node][]*v1.Pod, error) {
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*Victims, error) {
nodeNameToPods := map[*v1.Node][]*v1.Pod{}
nodeNameToVictims := map[*v1.Node]*Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
@ -731,15 +760,19 @@ func selectNodesForPreemption(pod *v1.Pod,
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue)
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods
victims := Victims{
pods: pods,
numPDBViolations: numPDBViolations,
}
nodeNameToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.Parallelize(16, len(potentialNodes), checkNode)
return nodeNameToPods, nil
return nodeNameToVictims, nil
}
func nodePassesExtendersForPreemption(
@ -776,6 +809,45 @@ func nodePassesExtendersForPreemption(
return true, nil
}
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
// preempted.
// This function is stable and does not change the order of received pods. So, if it
// receives a sorted list, grouping will preserve the order of the input list.
func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
for _, obj := range pods {
pod := obj.(*v1.Pod)
pdbForPodIsViolated := false
// A pod with no labels will not match any PDB. So, no need to check.
if len(pod.Labels) != 0 {
for _, pdb := range pdbs {
if pdb.Namespace != pod.Namespace {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
continue
}
// A PDB with a nil or empty selector matches nothing.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
// We have found a matching PDB.
if pdb.Status.PodDisruptionsAllowed <= 0 {
pdbForPodIsViolated = true
break
}
}
}
if pdbForPodIsViolated {
violatingPods = append(violatingPods, pod)
} else {
nonViolatingPods = append(nonViolatingPods, pod)
}
}
return violatingPods, nonViolatingPods
}
// selectVictimsOnNode finds minimum set of pods on the given node that should
// be preempted in order to make enough room for "pod" to be scheduled. The
// minimum set selected is subject to the constraint that a higher-priority pod
@ -783,19 +855,22 @@ func nodePassesExtendersForPreemption(
// to one another, not relative to the preemptor "pod").
// The algorithm first checks if the pod can be scheduled on the node when all the
// lower priority pods are gone. If so, it sorts all the lower priority pods by
// their priority and starts from the highest priority one, tries to keep as
// many of them as possible while checking that the "pod" can still fit on the node.
// their priority and then puts them into two groups of those whose PodDisruptionBudget
// will be violated if preempted and other non-violating pods. Both groups are
// sorted by priority. It first tries to reprieve as many PDB violating pods as
// possible and then does them same for non-PDB-violating pods while checking
// that the "pod" can still fit on the node.
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
// TODO(bsalamat): Add support for PodDisruptionBudget.
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue SchedulingQueue,
) ([]*v1.Pod, bool) {
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
@ -830,20 +905,34 @@ func selectVictimsOnNode(
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, false
return nil, 0, false
}
victims := []*v1.Pod{}
// Try to reprieve as many pods as possible starting from the highest priority one.
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name)
var victims []*v1.Pod
numViolatingVictim := 0
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
// violating victims and then other non-violating ones. In both cases, we start
// from the highest priority victims.
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue)
if !fits {
removePod(p)
victims = append(victims, p)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
return victims, true
// Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates

View File

@ -560,11 +560,11 @@ func TestZeroRequest(t *testing.T) {
}
}
func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string {
func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string {
var output string
for node, pods := range nodeToPods {
for node, victims := range nodeToVictims {
output += node.Name + ": ["
for _, pod := range pods {
for _, pod := range victims.pods {
output += pod.Name + ", "
}
output += "]"
@ -572,15 +572,15 @@ func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string {
return output
}
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error {
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error {
if len(expected) == len(nodeToPods) {
for k, pods := range nodeToPods {
for k, victims := range nodeToPods {
if expPods, ok := expected[k.Name]; ok {
if len(pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
if len(victims.pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
}
prevPriority := int32(math.MaxInt32)
for _, p := range pods {
for _, p := range victims.pods {
// Check that pods are sorted by their priority.
if *p.Spec.Priority > prevPriority {
return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k)
@ -591,11 +591,11 @@ func checkPreemptionVictims(testName string, expected map[string]map[string]bool
}
}
} else {
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
}
}
} else {
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
}
return nil
}
@ -790,7 +790,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
if err != nil {
t.Error(err)
}
@ -947,7 +947,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {

View File

@ -24,8 +24,11 @@ import (
"time"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
@ -495,3 +498,272 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("The nominated node name of the medium priority pod was not cleared: %v", err)
}
}
func mkMinAvailablePDB(name, namespace string, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget {
intMinAvailable := intstr.FromInt(minAvailable)
return &policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: policy.PodDisruptionBudgetSpec{
MinAvailable: &intMinAvailable,
Selector: &metav1.LabelSelector{MatchLabels: matchLabels},
},
}
}
// TestPDBInPreemption tests PodDisruptionBudget support in preemption.
func TestPDBInPreemption(t *testing.T) {
// Enable PodPriority feature gate.
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority))
// Initialize scheduler.
context := initTest(t, "preemption-pdb")
defer cleanupTest(t, context)
cs := context.clientSet
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)},
}
defaultNodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
}
type nodeConfig struct {
name string
res *v1.ResourceList
}
tests := []struct {
description string
nodes []*nodeConfig
pdbs []*policy.PodDisruptionBudget
existingPods []*v1.Pod
pod *v1.Pod
preemptedPodIndexes map[int]struct{}
}{
{
description: "A non-PDB violating pod is preempted despite its higher priority",
nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod2",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod3",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{2: {}},
},
{
description: "A node without any PDB violating pods is preferred for preemption",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod2",
Namespace: context.ns.Name,
Priority: &mediumPriority,
NodeName: "node-2",
Resources: defaultPodRes,
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{1: {}},
},
{
description: "A node with fewer PDB violating pods is preferred for preemption",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
{name: "node-3", res: defaultNodeRes},
},
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo1": "bar"}),
mkMinAvailablePDB("pdb-2", context.ns.Name, 2, map[string]string{"foo2": "bar"}),
},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
Labels: map[string]string{"foo1": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod1",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
NodeName: "node-1",
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod2",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "mid-pod2",
Namespace: context.ns.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod4",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod5",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod6",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
},
}
for _, test := range tests {
for _, nodeConf := range test.nodes {
_, err := createNode(cs, nodeConf.name, nodeConf.res)
if err != nil {
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
}
}
// Create PDBs.
for _, pdb := range test.pdbs {
_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
}
// Wait for PDBs to show up in the scheduler's cache.
if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs) == len(test.pdbs), err
}); err != nil {
t.Fatalf("Not all PDBs were added to the cache: %v", err)
}
pods := make([]*v1.Pod, len(test.existingPods))
var err error
// Create and run existingPods.
for i, p := range test.existingPods {
if pods[i], err = runPausePod(cs, p); err != nil {
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name)
}
}
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}
// Cleanup
pods = append(pods, preemptor)
cleanupPods(cs, t, pods)
cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{})
cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
}
}