Add pod eviction logic for scheduler preemption

Add Preempt to scheduler interface
Add preemption to the scheduling workflow
Minor changes to the scheduler integration test library
This commit is contained in:
Bobby (Babak) Salamat 2017-08-09 18:15:40 -07:00
parent 1cec8bac9c
commit 4a08dff168
19 changed files with 1128 additions and 199 deletions

View File

@ -25,6 +25,11 @@ import (
var (
// The predicateName tries to be consistent as the predicate name used in DefaultAlgorithmProvider defined in
// defaults.go (which tend to be stable for backward compatibility)
// NOTE: If you add a new predicate failure error for a predicate that can never
// be made to pass by removing pods, or you change an existing predicate so that
// it can never be made to pass by removing pods, you need to add the predicate
// failure error in nodesWherePreemptionMightHelp() in scheduler/core/generic_scheduler.go
ErrDiskConflict = newPredicateFailureError("NoDiskConflict")
ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict")
ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector")

View File

@ -40,8 +40,8 @@ type matchingPodAntiAffinityTerm struct {
node *v1.Node
}
// NOTE: When new fields are added/removed or logic is changed, please make sure
// that RemovePod and AddPod functions are updated to work with the new changes.
// NOTE: When new fields are added/removed or logic is changed, please make sure that
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
@ -172,13 +172,17 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
}
newPredMeta.podPorts = map[int]bool{}
for k, v := range meta.podPorts {
newPredMeta.podPorts[k] = v
}
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
for k, v := range meta.matchingAntiAffinityTerms {
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
}
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), meta.serviceAffinityMatchingPodList...)
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...)
return (algorithm.PredicateMetadata)(newPredMeta)
}

View File

@ -355,3 +355,46 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
}
}
}
// TestPredicateMetadata_ShallowCopy tests the ShallowCopy function. It is based
// on the idea that shallow-copy should produce an object that is deep-equal to the original
// object.
func TestPredicateMetadata_ShallowCopy(t *testing.T) {
source := predicateMetadata{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "testns",
},
},
podBestEffort: true,
podRequest: &schedulercache.Resource{
MilliCPU: 1000,
Memory: 300,
AllowedPodNumber: 4,
},
podPorts: map[int]bool{1234: true, 456: false},
matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{
"term1": {
{
term: &v1.PodAffinityTerm{TopologyKey: "node"},
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
},
},
},
},
serviceAffinityInUse: true,
serviceAffinityMatchingPodList: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}},
},
serviceAffinityMatchingPodServices: []*v1.Service{
{ObjectMeta: metav1.ObjectMeta{Name: "service1"}},
},
}
if !reflect.DeepEqual(source.ShallowCopy().(*predicateMetadata), &source) {
t.Errorf("Copy is not equal to source!")
}
}

View File

@ -45,6 +45,10 @@ import (
"github.com/golang/glog"
)
const (
MatchInterPodAffinity = "MatchInterPodAffinity"
)
// NodeInfo: Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error)

View File

@ -47,6 +47,10 @@ type SchedulerExtender interface {
// onto machines.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate

View File

@ -155,7 +155,7 @@ func defaultPredicates() sets.String {
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
"MatchInterPodAffinity",
predicates.MatchInterPodAffinity,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},

View File

@ -183,6 +183,8 @@ func (f *FakeExtender) IsBinder() bool {
return true
}
var _ algorithm.SchedulerExtender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct {
name string

View File

@ -47,7 +47,14 @@ type FitError struct {
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const NoNodeAvailableMsg = "No nodes are available that match all of the following predicates"
const (
NoNodeAvailableMsg = "No nodes are available that match all of the predicates"
// NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods.
// The scheduler uses the annotation to find that the pod shouldn't preempt more pods
// when it gets to the head of scheduling queue again.
// See podEligibleToPreemptOthers() for more information.
NominatedNodeAnnotationKey = "NominatedNodeName"
)
// Error returns detailed information of why the pod failed to fit on each node
func (f *FitError) Error() string {
@ -163,23 +170,61 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node name if such a node is found.
// TODO(bsalamat): This function is under construction! DO NOT USE!
func (g *genericScheduler) preempt(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
nodes, err := nodeLister.List()
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil
}
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err
return nil, nil, err
}
if len(nodes) == 0 {
return "", ErrNoNodesAvailable
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name)
return nil, nil, nil
}
nodeToPods := selectNodesForPreemption(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.predicateMetaProducer)
if len(nodeToPods) == 0 {
return "", nil
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, err
}
node := pickOneNodeForPreemption(nodeToPods)
// TODO: Add actual preemption of pods
return node, nil
if len(allNodes) == 0 {
return nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name)
return nil, nil, nil
}
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
if err != nil {
return nil, nil, err
}
for len(nodeToPods) > 0 {
node := pickOneNodeForPreemption(nodeToPods)
if node == nil {
return nil, nil, err
}
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil {
return node, nodeToPods[node], err
}
if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
}
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
}
return nil, nil, err
}
// Filters the nodes to find the ones that fit based on the given predicate functions
@ -452,108 +497,151 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
// 1. A node with minimum highest priority victim is picked.
// 2. Ties are broken by sum of priorities of all victims.
// 3. If there are still ties, node with the minimum number of victims is picked.
// 4. If there are still ties, the first such node in the map is picked (sort of randomly).
func pickOneNodeForPreemption(nodesToPods map[string][]*v1.Pod) string {
// 4. If there are still ties, the first such node is picked (sort of randomly).
//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time.
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
type nodeScore struct {
nodeName string
node *v1.Node
highestPriority int32
sumPriorities int64
numPods int
}
if len(nodesToPods) == 0 {
return ""
return nil
}
minHighestPriority := int32(math.MaxInt32)
nodeScores := []*nodeScore{}
for nodeName, pods := range nodesToPods {
minPriorityScores := []*nodeScore{}
for node, pods := range nodesToPods {
if len(pods) == 0 {
// We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
return nodeName
return node
}
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
minPriorityScores = nil
}
if highestPodPriority == minHighestPriority {
minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
}
nodeScores = append(nodeScores, &nodeScore{nodeName: nodeName, highestPriority: highestPodPriority, numPods: len(pods)})
}
// Find the nodes with minimum highest priority victim.
if len(minPriorityScores) == 1 {
return minPriorityScores[0].node
}
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
lowestHighPriorityNodes := []*nodeScore{}
for _, nodeScore := range nodeScores {
if nodeScore.highestPriority == minHighestPriority {
lowestHighPriorityNodes = append(lowestHighPriorityNodes, nodeScore)
var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.nodeName] {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
}
nodeScore.sumPriorities = sumPriorities
minSumPriorityScores := []*nodeScore{}
for _, nodeScore := range minPriorityScores {
var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.node] {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
minSumPriorityScores = nil
}
nodeScore.sumPriorities = sumPriorities
if sumPriorities == minSumPriorities {
minSumPriorityScores = append(minSumPriorityScores, nodeScore)
}
}
if len(lowestHighPriorityNodes) == 1 {
return lowestHighPriorityNodes[0].nodeName
if len(minSumPriorityScores) == 1 {
return minSumPriorityScores[0].node
}
// There are multiple nodes with the same minimum highest priority victim.
// Choose the one(s) with lowest sum of priorities.
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
lowestSumPriorityNodes := []*nodeScore{}
for _, nodeScore := range lowestHighPriorityNodes {
if nodeScore.sumPriorities == minSumPriorities {
lowestSumPriorityNodes = append(lowestSumPriorityNodes, nodeScore)
if nodeScore.numPods < minNumPods {
minNumPods = nodeScore.numPods
}
minNumPodScores := []*nodeScore{}
for _, nodeScore := range minSumPriorityScores {
if nodeScore.numPods < minNumPods {
minNumPods = nodeScore.numPods
minNumPodScores = nil
}
}
if len(lowestSumPriorityNodes) == 1 {
return lowestSumPriorityNodes[0].nodeName
}
// There are still more than one node with minimum highest priority victim and
// lowest sum of victim priorities. Find the anyone with minimum number of victims.
for _, nodeScore := range lowestSumPriorityNodes {
if nodeScore.numPods == minNumPods {
return nodeScore.nodeName
minNumPodScores = append(minNumPodScores, nodeScore)
}
}
glog.Errorf("We should never reach here!")
return ""
// At this point, even if there are more than one node with the same score,
// return the first one.
if len(minNumPodScores) > 0 {
return minNumPodScores[0].node
}
glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
}
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
) map[string][]*v1.Pod {
) (map[*v1.Node][]*v1.Pod, error) {
nodeNameToPods := map[string][]*v1.Pod{}
nodeNameToPods := map[*v1.Node][]*v1.Pod{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
pods, fits := selectVictimsOnNode(pod, meta.ShallowCopy(), nodeNameToInfo[nodeName], predicates)
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
if fits {
resultLock.Lock()
nodeNameToPods[nodeName] = pods
nodeNameToPods[potentialNodes[i]] = pods
resultLock.Unlock()
}
}
workqueue.Parallelize(16, len(nodes), checkNode)
return nodeNameToPods
workqueue.Parallelize(16, len(potentialNodes), checkNode)
return nodeNameToPods, nil
}
func nodePassesExtendersForPreemption(
pod *v1.Pod,
nodeName string,
victims []*v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
extenders []algorithm.SchedulerExtender) (bool, error) {
// If there are any extenders, run them and filter the list of candidate nodes.
if len(extenders) == 0 {
return true, nil
}
// Remove the victims from the corresponding nodeInfo and send nodes to the
// extenders for filtering.
originalNodeInfo := nodeNameToInfo[nodeName]
nodeInfoCopy := nodeNameToInfo[nodeName].Clone()
for _, victim := range victims {
nodeInfoCopy.RemovePod(victim)
}
nodeNameToInfo[nodeName] = nodeInfoCopy
defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }()
filteredNodes := []*v1.Node{nodeInfoCopy.Node()}
for _, extender := range extenders {
var err error
var failedNodesMap map[string]string
filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo)
if err != nil {
return false, err
}
if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 {
return false, nil
}
}
return true, nil
}
// selectVictimsOnNode finds minimum set of pods on the given node that should
@ -563,25 +651,31 @@ func selectNodesForPreemption(pod *v1.Pod,
// to one another, not relative to the preemptor "pod").
// The algorithm first checks if the pod can be scheduled on the node when all the
// lower priority pods are gone. If so, it sorts all the lower priority pods by
// their priority and starting from the highest priority one, tries to keep as
// their priority and starts from the highest priority one, tries to keep as
// many of them as possible while checking that the "pod" can still fit on the node.
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
func selectVictimsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
higherPriority := func(pod1, pod2 interface{}) bool {
return util.GetPodPriority(pod1.(*v1.Pod)) > util.GetPodPriority(pod2.(*v1.Pod))
}
potentialVictims := util.SortableList{CompFunc: higherPriority}
// TODO(bsalamat): Add support for PodDisruptionBudget.
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
meta.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
meta.AddPod(ap, nodeInfoCopy)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
@ -597,57 +691,83 @@ func selectVictimsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo
// we are almost done and this node is not suitable for preemption. The only condition
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
if fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
return nil, false
}
// If the new pod still cannot be scheduled for any reason other than pod
// affinity, the new pod will not fit on this node and we are done here.
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAffinity == nil {
return nil, false
}
for _, failedPred := range failedPredicates {
if failedPred != predicates.ErrPodAffinityNotMatch {
return nil, false
}
}
// If we reach here, it means that the pod cannot be scheduled due to pod
// affinity or anti-affinity. Since failure reason for both affinity and
// anti-affinity is the same, we cannot say which one caused it. So, we try
// adding pods one at a time and see if any of them satisfies the affinity rules.
for i, p := range potentialVictims.Items {
existingPod := p.(*v1.Pod)
addPod(existingPod)
if fits, _, _ = podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
removePod(existingPod)
} else {
// We found the pod needed to satisfy pod affinity. Let's remove it from
// potential victims list.
// NOTE: We assume that pod affinity can be satisfied by only one pod,
// not multiple pods. This is how scheduler works today.
potentialVictims.Items = append(potentialVictims.Items[:i], potentialVictims.Items[i+1:]...)
break
}
}
if !fits {
return nil, false
}
return nil, false
}
victims := []*v1.Pod{}
// Try to reprieve as may pods as possible starting from the highest priority one.
// Try to reprieve as many pods as possible starting from the highest priority one.
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name)
}
}
return victims, true
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
unresolvableReasonExist := false
failedPredicates, found := failedPredicatesMap[node.Name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we'd prefer
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition:
unresolvableReasonExist = true
break
// TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
}
}
if !found || !unresolvableReasonExist {
glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
// podEligibleToPreemptOthers determines whether this pod should be considered
// for preempting other pods or not. If this pod has already preempted other
// pods and those are in their graceful termination period, it shouldn't be
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
// TODO(bsalamat): Revisit this algorithm once scheduling by priority is added.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.
return false
}
}
}
}
return true
}
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
algorithmpriorities "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
@ -307,8 +308,7 @@ func TestGenericScheduler(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@ -548,12 +548,24 @@ func TestZeroRequest(t *testing.T) {
}
}
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[string][]*v1.Pod) error {
func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string {
var output string
for node, pods := range nodeToPods {
output += node.Name + ": ["
for _, pod := range pods {
output += pod.Name + ", "
}
output += "]"
}
return output
}
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error {
if len(expected) == len(nodeToPods) {
for k, pods := range nodeToPods {
if expPods, ok := expected[k]; ok {
if expPods, ok := expected[k.Name]; ok {
if len(pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, nodeToPods)
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
prevPriority := int32(math.MaxInt32)
for _, p := range pods {
@ -567,11 +579,11 @@ func checkPreemptionVictims(testName string, expected map[string]map[string]bool
}
}
} else {
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, nodeToPods)
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
}
} else {
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, nodeToPods)
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
return nil
}
@ -724,67 +736,37 @@ func TestSelectNodesForPreemption(t *testing.T) {
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
},
{
name: "lower priority pod is not preempted to satisfy pod affinity",
name: "pod with anti-affinity is preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "machine1",
Labels: map[string]string{"pod": "preemptor"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1", Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor", "value2"},
},
},
},
TopologyKey: "hostname",
},
TopologyKey: "hostname",
},
},
}}}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
}}}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
addAffinityPredicate: true,
},
{
name: "between two pods that satisfy affinity, the higher priority one stays",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}},
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {}},
addAffinityPredicate: true,
},
}
for _, test := range tests {
nodes := []*v1.Node{}
for _, n := range test.nodes {
@ -793,10 +775,13 @@ func TestSelectNodesForPreemption(t *testing.T) {
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
test.predicates["affinity"] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
if err != nil {
t.Error(err)
}
if err := checkPreemptionVictims(test.name, test.expected, nodeToPods); err != nil {
t.Error(err)
}
@ -950,10 +935,11 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
node := pickOneNodeForPreemption(selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata))
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
if node == nodeName {
if node.Name == nodeName {
found = true
break
}
@ -963,3 +949,272 @@ func TestPickOneNodeForPreemption(t *testing.T) {
}
}
}
func TestNodesWherePreemptionMightHelp(t *testing.T) {
// Prepare 4 node names.
nodeNames := []string{}
for i := 1; i < 5; i++ {
nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i))
}
tests := []struct {
name string
failedPredMap FailedPredicateMap
pod *v1.Pod
expected map[string]bool // set of expected node names. Value is ignored.
}{
{
name: "No node should be attempted",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
"machine3": []algorithm.PredicateFailureReason{predicates.ErrTaintsTolerationsNotMatch},
"machine4": []algorithm.PredicateFailureReason{predicates.ErrNodeLabelPresenceViolated},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
expected: map[string]bool{},
},
{
name: "pod affinity should be tried",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
"machine3": []algorithm.PredicateFailureReason{predicates.ErrNodeUnschedulable},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
expected: map[string]bool{"machine1": true, "machine4": true},
},
{
name: "pod with both pod affinity and anti-affinity should be tried",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"blah", "foo"},
},
},
},
TopologyKey: "region",
},
},
},
}}},
expected: map[string]bool{"machine1": true, "machine3": true, "machine4": true},
},
{
name: "Mix of failed predicates works fine",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch, predicates.ErrNodeOutOfDisk, predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName, predicates.ErrDiskConflict},
"machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)},
"machine4": []algorithm.PredicateFailureReason{},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
expected: map[string]bool{"machine3": true, "machine4": true},
},
}
for _, test := range tests {
nodes := nodesWherePreemptionMightHelp(test.pod, makeNodeList(nodeNames), test.failedPredMap)
if len(test.expected) != len(nodes) {
t.Errorf("test [%v]:number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", test.name, len(test.expected), len(nodes), nodes)
}
for _, node := range nodes {
if _, found := test.expected[node.Name]; !found {
t.Errorf("test [%v]: node %v is not expected.", test.name, node.Name)
}
}
}
}
func TestPreempt(t *testing.T) {
failedPredMap := FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrDiskConflict},
"machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)},
}
// Prepare 3 node names.
nodeNames := []string{}
for i := 1; i < 4; i++ {
nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i))
}
tests := []struct {
name string
pod *v1.Pod
pods []*v1.Pod
extenders []*FakeExtender
expectedNode string
expectedPods []string // list of preempted pods
}{
{
name: "basic preemption logic",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "One node doesn't need any preemption",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
expectedNode: "machine3",
expectedPods: []string{},
},
{
name: "Scheduler extenders allow only machine1, otherwise machine3 would have been chosen",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
extenders: []*FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
},
{
predicates: []fitPredicate{machine1PredicateExtender},
},
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "Scheduler extenders do not allow any preemption",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
extenders: []*FakeExtender{
{
predicates: []fitPredicate{falsePredicateExtender},
},
},
expectedNode: "",
expectedPods: []string{},
},
}
for _, test := range tests {
stop := make(chan struct{})
cache := schedulercache.New(time.Duration(0), stop)
for _, pod := range test.pods {
cache.AddPod(pod)
}
for _, name := range nodeNames {
cache.AddNode(makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
extenders := []algorithm.SchedulerExtender{}
for _, extender := range test.extenders {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
// Call Preempt and check the expected results.
node, victims, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
if (node != nil && node.Name != test.expectedNode) || (node == nil && len(test.expectedNode) != 0) {
t.Errorf("test [%v]: expected node: %v, got: %v", test.name, test.expectedNode, node)
}
if len(victims) != len(test.expectedPods) {
t.Errorf("test [%v]: expected %v pods, got %v.", test.name, len(test.expectedPods), len(victims))
}
for _, victim := range victims {
found := false
for _, expPod := range test.expectedPods {
if expPod == victim.Name {
found = true
break
}
}
if !found {
t.Errorf("test [%v]: pod %v is not expected to be a victim.", test.name, victim.Name)
}
// Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now()
victim.DeletionTimestamp = &now
test.pod.Annotations = make(map[string]string)
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
if node != nil && len(victims) > 0 {
t.Errorf("test [%v]: didn't expect any more preemption. Node %v is selected for preemption.", test.name, node)
}
close(stop)
}
}

View File

@ -716,6 +716,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
Algorithm: algo,
Binder: f.getBinder(extenders),
PodConditionUpdater: &podConditionUpdater{f.client},
PodPreemptor: &podPreemptor{f.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced)
},
@ -991,3 +992,28 @@ func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) er
}
return nil
}
type podPreemptor struct {
Client clientset.Interface
}
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
//TODO(bsalamat): change this to patch PodStatus to avoid overwriting potential pending status updates.
func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
podCopy.Annotations = map[string]string{}
}
for k, v := range annotations {
podCopy.Annotations[k] = v
}
_, err := p.Client.CoreV1().Pods(podCopy.Namespace).UpdateStatus(podCopy)
return err
}

View File

@ -23,10 +23,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
@ -48,6 +50,14 @@ type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
@ -102,6 +112,8 @@ type Config struct {
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -176,6 +188,41 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
return host, err
}
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {
glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.")
return "", nil
}
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {
glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
if node == nil {
return "", err
}
glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name)
annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name)
}
return node.Name, err
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asnychronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
@ -258,6 +305,13 @@ func (sched *Scheduler) scheduleOne() {
suggestedHost, err := sched.schedule(pod)
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
sched.preempt(pod, fitError)
}
return
}

View File

@ -103,6 +103,10 @@ func (es mockScheduler) Prioritizers() []algorithm.PriorityConfig {
return nil
}
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
return nil, nil, nil
}
func TestScheduler(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()

View File

@ -320,7 +320,7 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
n.generation++
}
// RemovePod subtracts pod information to this NodeInfo.
// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k1, err := getPodKey(pod)
if err != nil {

View File

@ -67,8 +67,8 @@ type SortableList struct {
CompFunc LessFunc
}
// LessFunc is a function that receives two Pods and returns true if the first
// pod should be placed before pod2 when the list is sorted.
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type LessFunc func(item1, item2 interface{}) bool
var _ = sort.Interface(&SortableList{})
@ -88,3 +88,10 @@ func (l *SortableList) Swap(i, j int) {
func (l *SortableList) Sort() {
sort.Sort(l)
}
// HigherPriorityPod return true when priority of the first pod is higher than
// the second one. It takes arguments of the type "interface{}" to be used with
// SortableList, but expects those arguments to be *v1.Pod.
func HigherPriorityPod(pod1, pod2 interface{}) bool {
return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod))
}

View File

@ -52,6 +52,7 @@ type pausePodConfig struct {
NodeName string
Ports []v1.ContainerPort
OwnerReferences []metav1.OwnerReference
PriorityClassName string
}
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
@ -555,8 +556,9 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
Ports: conf.Ports,
},
},
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
PriorityClassName: conf.PriorityClassName,
},
}
if conf.Resources != nil {

View File

@ -0,0 +1,128 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduling
import (
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/api/scheduling/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
_ "github.com/stretchr/testify/assert"
)
var _ = SIGDescribe("SchedulerPreemption [Serial] [Feature:PodPreemption]", func() {
var cs clientset.Interface
var nodeList *v1.NodeList
var ns string
f := framework.NewDefaultFramework("sched-preemption")
lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
lowPriorityClassName := f.BaseName + "-low-priority"
mediumPriorityClassName := f.BaseName + "-medium-priority"
highPriorityClassName := f.BaseName + "-high-priority"
AfterEach(func() {
})
BeforeEach(func() {
cs = f.ClientSet
ns = f.Namespace.Name
nodeList = &v1.NodeList{}
_, err := f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: highPriorityClassName}, Value: highPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
_, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: mediumPriorityClassName}, Value: mediumPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
_, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: lowPriorityClassName}, Value: lowPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
framework.WaitForAllNodesHealthy(cs, time.Minute)
masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs)
err = framework.CheckTestingNSDeletedExcept(cs, ns)
framework.ExpectNoError(err)
})
// This test verifies that when a higher priority pod is created and no node with
// enough resources is found, scheduler preempts a lower priority pod to schedule
// the high priority pod.
It("validates basic preemption works", func() {
var podRes v1.ResourceList
// Create one pod per node that uses a lot of the node's resources.
By("Create pods that use 60% of node resources.")
pods := make([]*v1.Pod, len(nodeList.Items))
for i, node := range nodeList.Items {
cpuAllocatable, found := node.Status.Allocatable["cpu"]
Expect(found).To(Equal(true))
milliCPU := cpuAllocatable.MilliValue() * 40 / 100
memAllocatable, found := node.Status.Allocatable["memory"]
Expect(found).To(Equal(true))
memory := memAllocatable.Value() * 60 / 100
podRes = v1.ResourceList{}
podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
// make the first pod low priority and the rest medium priority.
priorityName := mediumPriorityClassName
if i == 0 {
priorityName = lowPriorityClassName
}
pods[i] = createPausePod(f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%v", i, priorityName),
PriorityClassName: priorityName,
Resources: &v1.ResourceRequirements{
Requests: podRes,
},
})
framework.Logf("Created pod: %v", pods[i].Name)
}
By("Wait for pods to be scheduled.")
for _, pod := range pods {
framework.ExpectNoError(framework.WaitForPodRunningInNamespace(cs, pod))
}
By("Run a high priority pod that use 60% of a node resources.")
// Create a high priority pod and make sure it is scheduled.
runPausePod(f, pausePodConfig{
Name: "preemptor-pod",
PriorityClassName: highPriorityClassName,
Resources: &v1.ResourceRequirements{
Requests: podRes,
},
})
// Make sure that the lowest priority pod is deleted.
preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(pods[0].Name, metav1.GetOptions{})
podDeleted := (err != nil && errors.IsNotFound(err)) ||
(err == nil && preemptedPod.DeletionTimestamp != nil)
Expect(podDeleted).To(BeTrue())
// Other pods (mid priority ones) should be present.
for i := 1; i < len(pods); i++ {
livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(pods[i].Name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(livePod.DeletionTimestamp).To(BeNil())
}
})
})

View File

@ -51,7 +51,7 @@ func TestNodeAffinity(t *testing.T) {
}
// Create a pod with node affinity.
podName := "pod-with-node-affinity"
pod, err := runPausePod(context.clientSet, &pausePodConfig{
pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: podName,
Namespace: context.ns.Name,
Affinity: &v1.Affinity{
@ -72,7 +72,7 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
})
}))
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}
@ -110,11 +110,11 @@ func TestPodAffinity(t *testing.T) {
// Add a pod with a label and wait for it to schedule.
labelKey := "service"
labelValue := "S1"
_, err = runPausePod(context.clientSet, &pausePodConfig{
_, err = runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: "attractor-pod",
Namespace: context.ns.Name,
Labels: map[string]string{labelKey: labelValue},
})
}))
if err != nil {
t.Fatalf("Error running the attractor pod: %v", err)
}
@ -125,7 +125,7 @@ func TestPodAffinity(t *testing.T) {
}
// Add a new pod with affinity to the attractor pod.
podName := "pod-with-podaffinity"
pod, err := runPausePod(context.clientSet, &pausePodConfig{
pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: podName,
Namespace: context.ns.Name,
Affinity: &v1.Affinity{
@ -158,7 +158,7 @@ func TestPodAffinity(t *testing.T) {
},
},
},
})
}))
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}

View File

@ -24,9 +24,11 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -36,15 +38,18 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
)
const enableEquivalenceCache = true
@ -457,13 +462,13 @@ func TestMultiScheduler(t *testing.T) {
}
defaultScheduler := "default-scheduler"
testPodFitsDefault, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler})
testPodFitsDefault, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
fooScheduler := "foo-scheduler"
testPodFitsFoo, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler})
testPodFitsFoo, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
@ -647,3 +652,251 @@ func TestAllocatable(t *testing.T) {
t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name)
}
}
// TestPreemption tests a few preemption scenarios.
func TestPreemption(t *testing.T) {
// Enable PodPriority feature gate.
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority))
// Initialize scheduler.
context := initTest(t, "preemption")
defer cleanupTest(t, context)
cs := context.clientSet
lowPriority, mediumPriority, highPriority := int32(100), int32(200), int32(300)
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)},
}
tests := []struct {
description string
existingPods []*v1.Pod
pod *v1.Pod
preemptedPodIndexes map[int]struct{}
}{
{
description: "basic pod preemption",
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
description: "preemption is performed to satisfy anti-affinity",
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(cs, &pausePodConfig{
Name: "pod-1", Namespace: context.ns.Name,
Priority: &lowPriority,
Labels: map[string]string{"pod": "p1"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "preemptor"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"p0"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
},
{
// This is similar to the previous case only pod-1 is high priority.
description: "preemption is not performed when anti-affinity is not satisfied",
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(cs, &pausePodConfig{
Name: "pod-1", Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "p1"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "preemptor"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"p0"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{},
},
}
// Create a node with some resources and a label.
nodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
}
node, err := createNode(context.clientSet, "node1", nodeRes)
if err != nil {
t.Fatalf("Error creating nodes: %v", err)
}
nodeLabels := map[string]string{"node": node.Name}
if err = testutils.AddLabelsToNode(context.clientSet, node.Name, nodeLabels); err != nil {
t.Fatalf("Cannot add labels to node: %v", err)
}
if err = waitForNodeLabels(context.clientSet, node.Name, nodeLabels); err != nil {
t.Fatalf("Adding labels to node didn't succeed: %v", err)
}
for _, test := range tests {
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
for i, p := range test.existingPods {
pods[i], err = runPausePod(cs, p)
if err != nil {
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name)
}
}
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := context.clientSet.CoreV1().Pods(context.ns.Name).Get("preemptor-pod", metav1.GetOptions{})
if err != nil {
t.Errorf("Test [%v]: error getting pod: %v", test.description, err)
}
annot, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if found && len(annot) > 0 {
return true, nil
}
return false, err
}); err != nil {
t.Errorf("Test [%v]: Pod annotation did not get set.", test.description)
}
}
// Cleanup
pods = append(pods, preemptor)
for _, p := range pods {
err = cs.CoreV1().Pods(p.Namespace).Delete(p.Name, metav1.NewDeleteOptions(0))
if err != nil && !errors.IsNotFound(err) {
t.Errorf("Test [%v]: error, %v, while deleting pod during test.", test.description, err)
}
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name))
if err != nil {
t.Errorf("Test [%v]: error, %v, while waiting for pod to get deleted.", test.description, err)
}
}
}
}

View File

@ -205,6 +205,7 @@ type pausePodConfig struct {
Tolerations []v1.Toleration
NodeName string
SchedulerName string
Priority *int32
}
// initPausePod initializes a pod API object from the given config. It is used
@ -213,6 +214,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
},
@ -228,6 +230,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
SchedulerName: conf.SchedulerName,
Priority: conf.Priority,
},
}
if conf.Resources != nil {
@ -238,9 +241,8 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
// createPausePod creates a pod with "Pause" image and the given config and
// return its pointer and error status.
func createPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) {
p := initPausePod(cs, conf)
return cs.CoreV1().Pods(conf.Namespace).Create(p)
func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
return cs.CoreV1().Pods(p.Namespace).Create(p)
}
// createPausePodWithResource creates a pod with "Pause" image and the given
@ -262,22 +264,21 @@ func createPausePodWithResource(cs clientset.Interface, podName string, nsName s
},
}
}
return createPausePod(cs, &conf)
return createPausePod(cs, initPausePod(cs, &conf))
}
// runPausePod creates a pod with "Pause" image and the given config and waits
// until it is scheduled. It returns its pointer and error status.
func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) {
p := initPausePod(cs, conf)
pod, err := cs.CoreV1().Pods(conf.Namespace).Create(p)
func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
return nil, fmt.Errorf("Error creating pause pod: %v", err)
}
if err = waitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(conf.Namespace).Get(conf.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("Error getting pod %v info: %v", conf.Name, err)
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("Error getting pod %v info: %v", pod.Name, err)
}
return pod, nil
}
@ -285,7 +286,10 @@ func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error)
// podDeleted returns true if a pod is not found in the given namespace.
func podDeleted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
_, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if pod.DeletionTimestamp != nil {
return true, nil
}
if errors.IsNotFound(err) {
return true, nil
}
@ -293,6 +297,20 @@ func podDeleted(c clientset.Interface, podNamespace, podName string) wait.Condit
}
}
// podIsGettingEvicted returns true if the pod's deletion timestamp is set.
func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pod.DeletionTimestamp != nil {
return true, nil
}
return false, nil
}
}
// podScheduled returns true if a node is assigned to the given pod.
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {