Refactor and expose common preemption functions

This commit is contained in:
Wei Huang 2020-07-11 23:17:21 -07:00 committed by Wei Huang
parent 888255bf3a
commit 4e8ccf0187
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
4 changed files with 340 additions and 175 deletions

View File

@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["default_preemption.go"],
srcs = [
"candidate.go",
"default_preemption.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption",
visibility = ["//visibility:public"],
deps = [
@ -20,6 +23,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",

View File

@ -0,0 +1,45 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaultpreemption
import (
extenderv1 "k8s.io/kube-scheduler/extender/v1"
)
// Candidate represents a nominated node on which the preemptor can be scheduled,
// along with the list of victims that should be evicted for the preemptor to fit the node.
type Candidate interface {
// Victims wraps a list of to-be-preempted Pods and the number of PDB violation.
Victims() *extenderv1.Victims
// Name returns the target node name where the preemptor gets nominated to run.
Name() string
}
type candidate struct {
victims *extenderv1.Victims
name string
}
// Victims returns s.victims.
func (s *candidate) Victims() *extenderv1.Victims {
return s.victims
}
// Name returns s.name.
func (s *candidate) Name() string {
return s.name
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -101,6 +102,10 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
// before it is retried after many other pending pods.
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
cs := pl.fh.ClientSet()
ph := pl.fh.PreemptHandle()
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
// 0) Fetch the latest version of <pod>.
// TODO(Huang-Wei): get pod from informer cache instead of API server.
pod, err := util.GetUpdatedPod(cs, pod)
if err != nil {
@ -108,17 +113,51 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
return "", err
}
if !podEligibleToPreemptOthers(pod, pl.fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) {
// 1) Ensure the preemptor is eligible to preempt other pods.
if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return "", nil
}
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
// 2) Find all preemption candidates.
candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister)
if err != nil || len(candidates) == 0 {
return "", err
}
// 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
if err != nil {
return "", err
}
if len(allNodes) == 0 {
return "", core.ErrNoNodesAvailable
// 4) Find the best candidate.
bestCandidate := SelectCandidate(candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return "", nil
}
// 5) Perform preparation work before nominating the selected candidate.
if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
return "", err
}
return bestCandidate.Name(), nil
}
// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable.
func FindCandidates(ctx context.Context, cs kubernetes.Interface, state *framework.CycleState, pod *v1.Pod,
m framework.NodeToStatusMap, ph framework.PreemptHandle, nodeLister framework.NodeInfoLister,
pdbLister policylisters.PodDisruptionBudgetLister) ([]Candidate, error) {
allNodes, err := nodeLister.List()
if err != nil {
return nil, err
}
if len(allNodes) == 0 {
return nil, core.ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
@ -127,7 +166,7 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
// We do not return as this error is not critical.
}
return "", nil
return nil, nil
}
if klog.V(5).Enabled() {
var sample []string
@ -136,62 +175,20 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
}
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
}
pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
pdbs, err := getPodDisruptionBudgets(pdbLister)
if err != nil {
return "", err
return nil, err
}
nodeNameToVictims, err := selectNodesForPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
if err != nil {
return "", err
}
// We will only check nodeNameToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeNameToVictims, err = processPreemptionWithExtenders(pl.fh, pod, nodeNameToVictims)
if err != nil {
return "", err
}
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
if len(candidateNode) == 0 {
return "", nil
}
victims := nodeNameToVictims[candidateNode].Pods
for _, victim := range victims {
if err := util.DeletePod(cs, victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := pl.fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
pl.fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
}
metrics.PreemptionVictims.Observe(float64(len(victims)))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(pl.fh.PreemptHandle(), pod, candidateNode)
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
// We do not return as this error is not critical.
}
return candidateNode, nil
return dryRunPreemption(ctx, ph, state, pod, potentialNodes, pdbs), nil
}
// podEligibleToPreemptOthers determines whether this pod should be considered
// PodEligibleToPreemptOthers determines whether this pod should be considered
// for preempting other pods or not. If this pod has already preempted other
// pods and those are in their graceful termination period, it shouldn't be
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
return false
@ -233,18 +230,12 @@ func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.Node
return potentialNodes
}
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func selectNodesForPreemption(
ctx context.Context,
fh framework.PreemptHandle,
state *framework.CycleState,
pod *v1.Pod,
potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget,
) (map[string]*extenderv1.Victims, error) {
nodeNameToVictims := map[string]*extenderv1.Victims{}
// dryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
// and returns all possible preemption candidates.
func dryRunPreemption(ctx context.Context, fh framework.PreemptHandle, state *framework.CycleState,
pod *v1.Pod, potentialNodes []*framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) []Candidate {
var resultLock sync.Mutex
var candidates []Candidate
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[i].Clone()
@ -256,46 +247,100 @@ func selectNodesForPreemption(
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}
nodeNameToVictims[potentialNodes[i].Node().Name] = &victims
c := candidate{
victims: &victims,
name: nodeInfoCopy.Node().Name,
}
candidates = append(candidates, &c)
resultLock.Unlock()
}
}
parallelize.Until(ctx, len(potentialNodes), checkNode)
return nodeNameToVictims, nil
return candidates
}
// processPreemptionWithExtenders processes preemption with extenders
func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) {
if len(nodeNameToVictims) > 0 {
for _, extender := range fh.PreemptHandle().Extenders() {
if extender.SupportsPreemption() && extender.IsInterested(pod) {
newNodeNameToVictims, err := extender.ProcessPreemption(
pod,
nodeNameToVictims,
fh.SnapshotSharedLister().NodeInfos(),
)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
}
return nil, err
}
// CallExtenders calls given <extenders> to select the list of feasible candidates.
// We will only check <candidates> with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister,
candidates []Candidate) ([]Candidate, error) {
if len(extenders) == 0 {
return candidates, nil
}
// Replace nodeNameToVictims with new result after preemption. So the
// rest of extenders can continue use it as parameter.
nodeNameToVictims = newNodeNameToVictims
// If node list becomes empty, no preemption can happen regardless of other extenders.
if len(nodeNameToVictims) == 0 {
break
}
// Migrate candidate slice to victimsMap to adapt to the Extender interface.
// It's only applicable for candidate slice that have unique nominated node name.
victimsMap := candidatesToVictimsMap(candidates)
if len(victimsMap) == 0 {
return candidates, nil
}
for _, extender := range extenders {
if !extender.SupportsPreemption() || !extender.IsInterested(pod) {
continue
}
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
}
return nil, err
}
// Replace victimsMap with new result after preemption. So the
// rest of extenders can continue use it as parameter.
victimsMap = nodeNameToVictims
// If node list becomes empty, no preemption can happen regardless of other extenders.
if len(victimsMap) == 0 {
break
}
}
return nodeNameToVictims, nil
var newCandidates []Candidate
for nodeName := range victimsMap {
newCandidates = append(newCandidates, &candidate{
victims: victimsMap[nodeName],
name: nodeName,
})
}
return newCandidates, nil
}
// This function is not applicable for out-of-tree preemption plugins that exercise
// different preemption candidates on the same nominated node.
func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
m := make(map[string]*extenderv1.Victims)
for _, c := range candidates {
m[c.Name()] = c.Victims()
}
return m
}
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
func SelectCandidate(candidates []Candidate) Candidate {
if len(candidates) == 0 {
return nil
}
if len(candidates) == 1 {
return candidates[0]
}
victimsMap := candidatesToVictimsMap(candidates)
candidateNode := pickOneNodeForPreemption(victimsMap)
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
// preemption plugins that exercise different candidates on the same nominated node.
for _, candidate := range candidates {
if candidateNode == candidate.Name() {
return candidate
}
}
// We shouldn't reach here.
klog.Errorf("None candidate can be picked from %v.", candidates)
// To not break the whole flow, return the first candidate.
return candidates[0]
}
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
@ -541,6 +586,38 @@ func selectVictimsOnNode(
return victims, numViolatingVictim, true
}
// PrepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func PrepareCandidate(c Candidate, fh framework.FrameworkHandle, cs kubernetes.Interface, pod *v1.Pod) error {
for _, victim := range c.Victims().Pods {
if err := util.DeletePod(cs, victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())
}
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name())
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
// We do not return as this error is not critical.
}
return nil
}
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
// priority of the given "pod" and are nominated to run on the given node.
// Note: We could possibly check if the nominated lower priority pods still fit

View File

@ -243,9 +243,9 @@ func TestPostFilter(t *testing.T) {
}
}
// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes
// TestSelectNodesForPreemption tests dryRunPreemption. This test assumes
// that podsFitsOnNode works correctly and is tested separately.
func TestSelectNodesForPreemption(t *testing.T) {
func TestDryRunPreemption(t *testing.T) {
tests := []struct {
name string
nodeNames []string
@ -254,7 +254,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
registerPlugins []st.RegisterPluginFunc
pdbs []*policy.PodDisruptionBudget
fakeFilterRC framework.Code // return code for fake filter plugin
expected map[string]*extenderv1.Victims
expected []Candidate
expectedNumFilterCalled int32
}{
{
@ -268,7 +268,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
},
expected: map[string]*extenderv1.Victims{},
expected: nil,
expectedNumFilterCalled: 2,
},
{
@ -282,9 +282,9 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {},
"node2": {},
expected: []Candidate{
&candidate{victims: &extenderv1.Victims{}, name: "node1"},
&candidate{victims: &extenderv1.Victims{}, name: "node2"},
},
expectedNumFilterCalled: 4,
},
@ -294,13 +294,14 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.RegisterFilterPlugin("MatchFilter", st.NewMatchFilterPlugin),
},
nodeNames: []string{"node1", "node2"},
pod: st.MakePod().Name("node1").UID("node1").Priority(highPriority).Obj(),
// Name the pod as "node1" to fit "MatchFilter" plugin.
pod: st.MakePod().Name("node1").UID("node1").Priority(highPriority).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {},
expected: []Candidate{
&candidate{victims: &extenderv1.Victims{}, name: "node1"},
},
expectedNumFilterCalled: 3,
},
@ -315,12 +316,18 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
},
name: "node1",
},
"node2": {
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
},
name: "node2",
},
},
expectedNumFilterCalled: 4,
@ -336,7 +343,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
},
expected: map[string]*extenderv1.Victims{},
expected: nil,
expectedNumFilterCalled: 0,
},
{
@ -351,12 +358,18 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(midPriority).Req(largeRes).Obj()},
},
name: "node1",
},
"node2": {
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj()},
},
name: "node2",
},
},
expectedNumFilterCalled: 5,
@ -375,12 +388,15 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(lowPriority).Req(smallRes).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(midPriority).Req(mediumRes).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Priority(lowPriority).Req(smallRes).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(midPriority).Req(mediumRes).Obj(),
},
},
name: "node1",
},
},
expectedNumFilterCalled: 4,
@ -399,12 +415,15 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1.4").UID("p1.4").Node("node1").Priority(highPriority).Req(smallRes).StartTime(epochTime2).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(largeRes).StartTime(epochTime1).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(lowPriority).Req(smallRes).StartTime(epochTime5).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(midPriority).Req(mediumRes).StartTime(epochTime3).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(lowPriority).Req(smallRes).StartTime(epochTime5).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(midPriority).Req(mediumRes).StartTime(epochTime3).Obj(),
},
},
name: "node1",
},
},
expectedNumFilterCalled: 4, // no preemption would happen on node2 and no filter call is counted.
@ -424,12 +443,15 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Priority(highPriority).Req(smallRes).Obj(),
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(highPriority).Req(smallRes).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("foo", "").Priority(lowPriority).Req(smallRes).
PodAntiAffinityExists("foo", "hostname", st.PodAntiAffinityWithRequiredReq).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("foo", "").Priority(lowPriority).Req(smallRes).
PodAntiAffinityExists("foo", "hostname", st.PodAntiAffinityWithRequiredReq).Obj(),
},
},
name: "node1",
},
},
expectedNumFilterCalled: 3, // no preemption would happen on node2 and no filter call is counted.
@ -451,12 +473,18 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("pod-x1").UID("pod-x1").Node("node-x").Label("foo", "").Priority(highPriority).Obj(),
st.MakePod().Name("pod-x2").UID("pod-x2").Node("node-x").Label("foo", "").Priority(highPriority).Obj(),
},
expected: map[string]*extenderv1.Victims{
"node-a": {
Pods: []*v1.Pod{st.MakePod().Name("pod-a2").UID("pod-a2").Node("node-a").Label("foo", "").Priority(lowPriority).Obj()},
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("pod-a2").UID("pod-a2").Node("node-a").Label("foo", "").Priority(lowPriority).Obj()},
},
name: "node-a",
},
"node-b": {
Pods: []*v1.Pod{st.MakePod().Name("pod-b1").UID("pod-b1").Node("node-b").Label("foo", "").Priority(lowPriority).Obj()},
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{st.MakePod().Name("pod-b1").UID("pod-b1").Node("node-b").Label("foo", "").Priority(lowPriority).Obj()},
},
name: "node-b",
},
},
expectedNumFilterCalled: 5, // node-a (3), node-b (2), node-x (0)
@ -473,7 +501,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Req(largeRes).Obj(),
},
fakeFilterRC: framework.Unschedulable,
expected: map[string]*extenderv1.Victims{},
expected: nil,
expectedNumFilterCalled: 2,
},
{
@ -493,13 +521,16 @@ func TestSelectNodesForPreemption(t *testing.T) {
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1},
},
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
},
NumPDBViolations: 1,
},
NumPDBViolations: 1,
name: "node1",
},
},
expectedNumFilterCalled: 3,
@ -521,13 +552,16 @@ func TestSelectNodesForPreemption(t *testing.T) {
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p2": {Time: time.Now()}}},
},
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
},
NumPDBViolations: 1,
},
NumPDBViolations: 1,
name: "node1",
},
},
expectedNumFilterCalled: 3,
@ -549,13 +583,16 @@ func TestSelectNodesForPreemption(t *testing.T) {
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.2": {Time: time.Now()}}},
},
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
},
NumPDBViolations: 0,
},
NumPDBViolations: 0,
name: "node1",
},
},
expectedNumFilterCalled: 3,
@ -578,14 +615,17 @@ func TestSelectNodesForPreemption(t *testing.T) {
Status: policy.PodDisruptionBudgetStatus{DisruptionsAllowed: 1, DisruptedPods: map[string]metav1.Time{"p1.3": {Time: time.Now()}}},
},
},
expected: map[string]*extenderv1.Victims{
"node1": {
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
expected: []Candidate{
&candidate{
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.2").UID("p1.2").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
st.MakePod().Name("p1.3").UID("p1.3").Node("node1").Label("app", "foo").Priority(midPriority).Req(mediumRes).Obj(),
},
NumPDBViolations: 1,
},
NumPDBViolations: 1,
name: "node1",
},
},
expectedNumFilterCalled: 4,
@ -645,30 +685,32 @@ func TestSelectNodesForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got, err := selectNodesForPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, tt.pdbs)
got := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, tt.pdbs)
if err != nil {
t.Fatal(err)
}
// Sort the values (inner victims).
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Pods
victims := got[i].Victims().Pods
sort.Slice(victims, func(i, j int) bool {
return victims[i].Name < victims[j].Name
})
}
sort.Slice(got, func(i, j int) bool {
return got[i].Name() < got[j].Name()
})
if tt.expectedNumFilterCalled != fakePlugin.NumFilterCalled {
t.Errorf("expected fakePlugin.numFilterCalled is %d, but got %d", tt.expectedNumFilterCalled, fakePlugin.NumFilterCalled)
}
if diff := cmp.Diff(tt.expected, got); diff != "" {
t.Errorf("Unexpected strategies (-want, +got): %s", diff)
if diff := cmp.Diff(tt.expected, got, cmp.AllowUnexported(candidate{})); diff != "" {
t.Errorf("Unexpected candidates (-want, +got): %s", diff)
}
})
}
}
// TestPickOneNodeForPreemption tests pickOneNodeForPreemption.
func TestPickOneNodeForPreemption(t *testing.T) {
func TestSelectBestCandidate(t *testing.T) {
tests := []struct {
name string
registerPlugin st.RegisterPluginFunc
@ -861,20 +903,17 @@ func TestPickOneNodeForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
candidateNodes, err := selectNodesForPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, nil)
if err != nil {
t.Fatal(err)
}
node := pickOneNodeForPreemption(candidateNodes)
candidates := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, nil)
s := SelectCandidate(candidates)
found := false
for _, nodeName := range tt.expected {
if node == nodeName {
if nodeName == s.Name() {
found = true
break
}
}
if !found {
t.Errorf("unexpected node: %v", node)
t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name())
}
})
}
@ -929,7 +968,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
nodes = append(nodes, st.MakeNode().Name(n).Obj())
}
snapshot := internalcache.NewSnapshot(test.pods, nodes)
if got := podEligibleToPreemptOthers(test.pod, snapshot.NodeInfos(), test.nominatedNodeStatus); got != test.expected {
if got := PodEligibleToPreemptOthers(test.pod, snapshot.NodeInfos(), test.nominatedNodeStatus); got != test.expected {
t.Errorf("expected %t, got %t for pod: %s", test.expected, got, test.pod.Name)
}
}