From 3d4ae31d917dd48be36a755162ad9a2f0524cb18 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 21 Nov 2017 13:29:16 -0800 Subject: [PATCH 1/2] Add PDB support during pod preemption --- .../pkg/scheduler/core/generic_scheduler.go | 219 +++++++++----- .../scheduler/core/generic_scheduler_test.go | 24 +- test/integration/scheduler/preemption_test.go | 272 ++++++++++++++++++ 3 files changed, 438 insertions(+), 77 deletions(-) diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 2c2f3e3dd6e..9eb1be9581b 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -26,6 +26,9 @@ import ( "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/client-go/util/workqueue" @@ -46,6 +49,11 @@ type FitError struct { FailedPredicates FailedPredicateMap } +type Victims struct { + pods []*v1.Pod + numPDBViolations int +} + var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") const ( @@ -209,29 +217,33 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } - nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue) + pdbs, err := g.cache.ListPDBs(labels.Everything()) if err != nil { return nil, nil, nil, err } - for len(nodeToPods) > 0 { - node := pickOneNodeForPreemption(nodeToPods) + nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) + if err != nil { + return nil, nil, nil, err + } + for len(nodeToVictims) > 0 { + node := pickOneNodeForPreemption(nodeToVictims) if node == nil { return nil, nil, nil, err } - passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders) + passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders) if passes && pErr == nil { // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name) - return node, nodeToPods[node], nominatedPods, err + return node, nodeToVictims[node].pods, nominatedPods, err } if pErr != nil { glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) } // Remove the node from the map and try to pick a different node. - delete(nodeToPods, node) + delete(nodeToVictims, node) } return nil, nil, nil, err } @@ -625,51 +637,66 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf // pickOneNodeForPreemption chooses one node among the given nodes. It assumes // pods in each map entry are ordered by decreasing priority. // It picks a node based on the following criteria: -// 1. A node with minimum highest priority victim is picked. -// 2. Ties are broken by sum of priorities of all victims. -// 3. If there are still ties, node with the minimum number of victims is picked. -// 4. If there are still ties, the first such node is picked (sort of randomly). -//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time. -func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { - type nodeScore struct { - node *v1.Node - highestPriority int32 - sumPriorities int64 - numPods int - } - if len(nodesToPods) == 0 { +// 1. A node with minimum number of PDB violations. +// 2. A node with minimum highest priority victim is picked. +// 3. Ties are broken by sum of priorities of all victims. +// 4. If there are still ties, node with the minimum number of victims is picked. +// 5. If there are still ties, the first such node is picked (sort of randomly). +//TODO(bsalamat): Try to reuse the "min*Nodes" slices in order to save GC time. +func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { + if len(nodesToVictims) == 0 { return nil } - minHighestPriority := int32(math.MaxInt32) - minPriorityScores := []*nodeScore{} - for node, pods := range nodesToPods { - if len(pods) == 0 { + minNumPDBViolatingPods := math.MaxInt32 + var minPDBViolatingNodes []*v1.Node + for node, victims := range nodesToVictims { + if len(victims.pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } + numPDBViolatingPods := victims.numPDBViolations + if numPDBViolatingPods < minNumPDBViolatingPods { + minNumPDBViolatingPods = numPDBViolatingPods + minPDBViolatingNodes = nil + } + if numPDBViolatingPods == minNumPDBViolatingPods { + minPDBViolatingNodes = append(minPDBViolatingNodes, node) + } + } + if len(minPDBViolatingNodes) == 1 { + return minPDBViolatingNodes[0] + } + + // There are more than one node with minimum number PDB violating pods. Find + // the one with minimum highest priority victim. + minHighestPriority := int32(math.MaxInt32) + var minPriorityNodes []*v1.Node + for _, node := range minPDBViolatingNodes { + victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. - highestPodPriority := util.GetPodPriority(pods[0]) + highestPodPriority := util.GetPodPriority(victims.pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority - minPriorityScores = nil + minPriorityNodes = nil } if highestPodPriority == minHighestPriority { - minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)}) + minPriorityNodes = append(minPriorityNodes, node) } } - if len(minPriorityScores) == 1 { - return minPriorityScores[0].node + if len(minPriorityNodes) == 1 { + return minPriorityNodes[0] } + // There are a few nodes with minimum highest priority victim. Find the // smallest sum of priorities. minSumPriorities := int64(math.MaxInt64) - minSumPriorityScores := []*nodeScore{} - for _, nodeScore := range minPriorityScores { + var minSumPriorityNodes []*v1.Node + for _, node := range minPriorityNodes { var sumPriorities int64 - for _, pod := range nodesToPods[nodeScore.node] { + for _, pod := range nodesToVictims[node].pods { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is // needed so that a node with a few pods with negative priority is not // picked over a node with a smaller number of pods with the same negative @@ -678,33 +705,34 @@ func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { } if sumPriorities < minSumPriorities { minSumPriorities = sumPriorities - minSumPriorityScores = nil + minSumPriorityNodes = nil } - nodeScore.sumPriorities = sumPriorities if sumPriorities == minSumPriorities { - minSumPriorityScores = append(minSumPriorityScores, nodeScore) + minSumPriorityNodes = append(minSumPriorityNodes, node) } } - if len(minSumPriorityScores) == 1 { - return minSumPriorityScores[0].node + if len(minSumPriorityNodes) == 1 { + return minSumPriorityNodes[0] } + // There are a few nodes with minimum highest priority victim and sum of priorities. // Find one with the minimum number of pods. minNumPods := math.MaxInt32 - minNumPodScores := []*nodeScore{} - for _, nodeScore := range minSumPriorityScores { - if nodeScore.numPods < minNumPods { - minNumPods = nodeScore.numPods - minNumPodScores = nil + var minNumPodNodes []*v1.Node + for _, node := range minSumPriorityNodes { + numPods := len(nodesToVictims[node].pods) + if numPods < minNumPods { + minNumPods = numPods + minNumPodNodes = nil } - if nodeScore.numPods == minNumPods { - minNumPodScores = append(minNumPodScores, nodeScore) + if numPods == minNumPods { + minNumPodNodes = append(minNumPodNodes, node) } } // At this point, even if there are more than one node with the same score, // return the first one. - if len(minNumPodScores) > 0 { - return minNumPodScores[0].node + if len(minNumPodNodes) > 0 { + return minNumPodNodes[0] } glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!") return nil @@ -718,9 +746,10 @@ func selectNodesForPreemption(pod *v1.Pod, predicates map[string]algorithm.FitPredicate, metadataProducer algorithm.PredicateMetadataProducer, queue SchedulingQueue, -) (map[*v1.Node][]*v1.Pod, error) { + pdbs []*policy.PodDisruptionBudget, +) (map[*v1.Node]*Victims, error) { - nodeNameToPods := map[*v1.Node][]*v1.Pod{} + nodeNameToVictims := map[*v1.Node]*Victims{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. @@ -731,15 +760,19 @@ func selectNodesForPreemption(pod *v1.Pod, if meta != nil { metaCopy = meta.ShallowCopy() } - pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue) + pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs) if fits { resultLock.Lock() - nodeNameToPods[potentialNodes[i]] = pods + victims := Victims{ + pods: pods, + numPDBViolations: numPDBViolations, + } + nodeNameToVictims[potentialNodes[i]] = &victims resultLock.Unlock() } } workqueue.Parallelize(16, len(potentialNodes), checkNode) - return nodeNameToPods, nil + return nodeNameToVictims, nil } func nodePassesExtendersForPreemption( @@ -776,6 +809,45 @@ func nodePassesExtendersForPreemption( return true, nil } +// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" +// and "nonViolatingPods" based on whether their PDBs will be violated if they are +// preempted. +// This function is stable and does not change the order of received pods. So, if it +// receives a sorted list, grouping will preserve the order of the input list. +func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) { + for _, obj := range pods { + pod := obj.(*v1.Pod) + pdbForPodIsViolated := false + // A pod with no labels will not match any PDB. So, no need to check. + if len(pod.Labels) != 0 { + for _, pdb := range pdbs { + if pdb.Namespace != pod.Namespace { + continue + } + selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) + if err != nil { + continue + } + // A PDB with a nil or empty selector matches nothing. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + // We have found a matching PDB. + if pdb.Status.PodDisruptionsAllowed <= 0 { + pdbForPodIsViolated = true + break + } + } + } + if pdbForPodIsViolated { + violatingPods = append(violatingPods, pod) + } else { + nonViolatingPods = append(nonViolatingPods, pod) + } + } + return violatingPods, nonViolatingPods +} + // selectVictimsOnNode finds minimum set of pods on the given node that should // be preempted in order to make enough room for "pod" to be scheduled. The // minimum set selected is subject to the constraint that a higher-priority pod @@ -783,19 +855,22 @@ func nodePassesExtendersForPreemption( // to one another, not relative to the preemptor "pod"). // The algorithm first checks if the pod can be scheduled on the node when all the // lower priority pods are gone. If so, it sorts all the lower priority pods by -// their priority and starts from the highest priority one, tries to keep as -// many of them as possible while checking that the "pod" can still fit on the node. +// their priority and then puts them into two groups of those whose PodDisruptionBudget +// will be violated if preempted and other non-violating pods. Both groups are +// sorted by priority. It first tries to reprieve as many PDB violating pods as +// possible and then does them same for non-PDB-violating pods while checking +// that the "pod" can still fit on the node. // NOTE: This function assumes that it is never called if "pod" cannot be scheduled // due to pod affinity, node affinity, or node anti-affinity reasons. None of // these predicates can be satisfied by removing more pods from the node. -// TODO(bsalamat): Add support for PodDisruptionBudget. func selectVictimsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate, queue SchedulingQueue, -) ([]*v1.Pod, bool) { + pdbs []*policy.PodDisruptionBudget, +) ([]*v1.Pod, int, bool) { potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} nodeInfoCopy := nodeInfo.Clone() @@ -830,20 +905,34 @@ func selectVictimsOnNode( if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } - return nil, false + return nil, 0, false } - victims := []*v1.Pod{} - // Try to reprieve as many pods as possible starting from the highest priority one. - for _, p := range potentialVictims.Items { - lpp := p.(*v1.Pod) - addPod(lpp) - if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits { - removePod(lpp) - victims = append(victims, lpp) - glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name) + var victims []*v1.Pod + numViolatingVictim := 0 + // Try to reprieve as many pods as possible. We first try to reprieve the PDB + // violating victims and then other non-violating ones. In both cases, we start + // from the highest priority victims. + violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) + reprievePod := func(p *v1.Pod) bool { + addPod(p) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue) + if !fits { + removePod(p) + victims = append(victims, p) + glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name) + } + return fits + } + for _, p := range violatingVictims { + if !reprievePod(p) { + numViolatingVictim++ } } - return victims, true + // Now we try to reprieve non-violating victims. + for _, p := range nonViolatingVictims { + reprievePod(p) + } + return victims, numViolatingVictim, true } // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index 99015676537..4f7298817d8 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -560,11 +560,11 @@ func TestZeroRequest(t *testing.T) { } } -func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string { +func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string { var output string - for node, pods := range nodeToPods { + for node, victims := range nodeToVictims { output += node.Name + ": [" - for _, pod := range pods { + for _, pod := range victims.pods { output += pod.Name + ", " } output += "]" @@ -572,15 +572,15 @@ func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string { return output } -func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error { +func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error { if len(expected) == len(nodeToPods) { - for k, pods := range nodeToPods { + for k, victims := range nodeToPods { if expPods, ok := expected[k.Name]; ok { - if len(pods) != len(expPods) { - return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) + if len(victims.pods) != len(expPods) { + return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods)) } prevPriority := int32(math.MaxInt32) - for _, p := range pods { + for _, p := range victims.pods { // Check that pods are sorted by their priority. if *p.Spec.Priority > prevPriority { return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k) @@ -591,11 +591,11 @@ func checkPreemptionVictims(testName string, expected map[string]map[string]bool } } } else { - return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) + return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods)) } } } else { - return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) + return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods)) } return nil } @@ -790,7 +790,7 @@ func TestSelectNodesForPreemption(t *testing.T) { test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods)) } nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) - nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil) + nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) if err != nil { t.Error(err) } @@ -947,7 +947,7 @@ func TestPickOneNodeForPreemption(t *testing.T) { nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)) } nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) - candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil) + candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index fbfb2803055..ca92dbae836 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -24,8 +24,11 @@ import ( "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" @@ -495,3 +498,272 @@ func TestNominatedNodeCleanUp(t *testing.T) { t.Errorf("The nominated node name of the medium priority pod was not cleared: %v", err) } } + +func mkMinAvailablePDB(name, namespace string, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget { + intMinAvailable := intstr.FromInt(minAvailable) + return &policy.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: policy.PodDisruptionBudgetSpec{ + MinAvailable: &intMinAvailable, + Selector: &metav1.LabelSelector{MatchLabels: matchLabels}, + }, + } +} + +// TestPDBInPreemption tests PodDisruptionBudget support in preemption. +func TestPDBInPreemption(t *testing.T) { + // Enable PodPriority feature gate. + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority)) + // Initialize scheduler. + context := initTest(t, "preemption-pdb") + defer cleanupTest(t, context) + cs := context.clientSet + + defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + } + defaultNodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + } + + type nodeConfig struct { + name string + res *v1.ResourceList + } + + tests := []struct { + description string + nodes []*nodeConfig + pdbs []*policy.PodDisruptionBudget + existingPods []*v1.Pod + pod *v1.Pod + preemptedPodIndexes map[int]struct{} + }{ + { + description: "A non-PDB violating pod is preempted despite its higher priority", + nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}}, + pdbs: []*policy.PodDisruptionBudget{ + mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}), + }, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod1", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + Labels: map[string]string{"foo": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod2", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + Labels: map[string]string{"foo": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "mid-pod3", + Namespace: context.ns.Name, + Priority: &mediumPriority, + Resources: defaultPodRes, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{2: {}}, + }, + { + description: "A node without any PDB violating pods is preferred for preemption", + nodes: []*nodeConfig{ + {name: "node-1", res: defaultNodeRes}, + {name: "node-2", res: defaultNodeRes}, + }, + pdbs: []*policy.PodDisruptionBudget{ + mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo": "bar"}), + }, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod1", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-1", + Labels: map[string]string{"foo": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "mid-pod2", + Namespace: context.ns.Name, + Priority: &mediumPriority, + NodeName: "node-2", + Resources: defaultPodRes, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{1: {}}, + }, + { + description: "A node with fewer PDB violating pods is preferred for preemption", + nodes: []*nodeConfig{ + {name: "node-1", res: defaultNodeRes}, + {name: "node-2", res: defaultNodeRes}, + {name: "node-3", res: defaultNodeRes}, + }, + pdbs: []*policy.PodDisruptionBudget{ + mkMinAvailablePDB("pdb-1", context.ns.Name, 2, map[string]string{"foo1": "bar"}), + mkMinAvailablePDB("pdb-2", context.ns.Name, 2, map[string]string{"foo2": "bar"}), + }, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod1", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-1", + Labels: map[string]string{"foo1": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "mid-pod1", + Namespace: context.ns.Name, + Priority: &mediumPriority, + Resources: defaultPodRes, + NodeName: "node-1", + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod2", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-2", + Labels: map[string]string{"foo2": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "mid-pod2", + Namespace: context.ns.Name, + Priority: &mediumPriority, + Resources: defaultPodRes, + NodeName: "node-2", + Labels: map[string]string{"foo2": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod4", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-3", + Labels: map[string]string{"foo2": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod5", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-3", + Labels: map[string]string{"foo2": "bar"}, + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "low-pod6", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-3", + Labels: map[string]string{"foo2": "bar"}, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, + }, + } + + for _, test := range tests { + for _, nodeConf := range test.nodes { + _, err := createNode(cs, nodeConf.name, nodeConf.res) + if err != nil { + t.Fatalf("Error creating node %v: %v", nodeConf.name, err) + } + } + // Create PDBs. + for _, pdb := range test.pdbs { + _, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) + if err != nil { + t.Fatalf("Failed to create PDB: %v", err) + } + } + // Wait for PDBs to show up in the scheduler's cache. + if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) { + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if err != nil { + t.Errorf("Error while polling for PDB: %v", err) + return false, err + } + return len(cachedPDBs) == len(test.pdbs), err + }); err != nil { + t.Fatalf("Not all PDBs were added to the cache: %v", err) + } + + pods := make([]*v1.Pod, len(test.existingPods)) + var err error + // Create and run existingPods. + for i, p := range test.existingPods { + if pods[i], err = runPausePod(cs, p); err != nil { + t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name) + } + } + } + // Also check that the preemptor pod gets the annotation for nominated node name. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil { + t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err) + } + } + + // Cleanup + pods = append(pods, preemptor) + cleanupPods(cs, t, pods) + cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{}) + cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) + } +} From a0ef9cd09a3701b4d3901ea9311eef468b1e91ec Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 21 Nov 2017 16:19:53 -0800 Subject: [PATCH 2/2] Autogenerated files --- plugin/pkg/scheduler/core/BUILD | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/pkg/scheduler/core/BUILD b/plugin/pkg/scheduler/core/BUILD index 60d7cde6902..8a97b597f0d 100644 --- a/plugin/pkg/scheduler/core/BUILD +++ b/plugin/pkg/scheduler/core/BUILD @@ -56,7 +56,9 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",