diff --git a/plugin/pkg/scheduler/algorithm/predicates/error.go b/plugin/pkg/scheduler/algorithm/predicates/error.go index 8cc85bce2c0..155fe18f431 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/error.go +++ b/plugin/pkg/scheduler/algorithm/predicates/error.go @@ -25,6 +25,11 @@ import ( var ( // The predicateName tries to be consistent as the predicate name used in DefaultAlgorithmProvider defined in // defaults.go (which tend to be stable for backward compatibility) + + // NOTE: If you add a new predicate failure error for a predicate that can never + // be made to pass by removing pods, or you change an existing predicate so that + // it can never be made to pass by removing pods, you need to add the predicate + // failure error in nodesWherePreemptionMightHelp() in scheduler/core/generic_scheduler.go ErrDiskConflict = newPredicateFailureError("NoDiskConflict") ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict") ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector") diff --git a/plugin/pkg/scheduler/algorithm/predicates/metadata.go b/plugin/pkg/scheduler/algorithm/predicates/metadata.go index 0adaf84dc81..bb15272db3d 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/metadata.go +++ b/plugin/pkg/scheduler/algorithm/predicates/metadata.go @@ -40,8 +40,8 @@ type matchingPodAntiAffinityTerm struct { node *v1.Node } -// NOTE: When new fields are added/removed or logic is changed, please make sure -// that RemovePod and AddPod functions are updated to work with the new changes. +// NOTE: When new fields are added/removed or logic is changed, please make sure that +// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes. type predicateMetadata struct { pod *v1.Pod podBestEffort bool @@ -172,13 +172,17 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { podRequest: meta.podRequest, serviceAffinityInUse: meta.serviceAffinityInUse, } + newPredMeta.podPorts = map[int]bool{} for k, v := range meta.podPorts { newPredMeta.podPorts[k] = v } + newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{} for k, v := range meta.matchingAntiAffinityTerms { newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) } - newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...) - newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), meta.serviceAffinityMatchingPodList...) + newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), + meta.serviceAffinityMatchingPodServices...) + newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), + meta.serviceAffinityMatchingPodList...) return (algorithm.PredicateMetadata)(newPredMeta) } diff --git a/plugin/pkg/scheduler/algorithm/predicates/metadata_test.go b/plugin/pkg/scheduler/algorithm/predicates/metadata_test.go index 40476283e8c..640da1fcace 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -355,3 +355,46 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { } } } + +// TestPredicateMetadata_ShallowCopy tests the ShallowCopy function. It is based +// on the idea that shallow-copy should produce an object that is deep-equal to the original +// object. +func TestPredicateMetadata_ShallowCopy(t *testing.T) { + source := predicateMetadata{ + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + }, + podBestEffort: true, + podRequest: &schedulercache.Resource{ + MilliCPU: 1000, + Memory: 300, + AllowedPodNumber: 4, + }, + podPorts: map[int]bool{1234: true, 456: false}, + matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{ + "term1": { + { + term: &v1.PodAffinityTerm{TopologyKey: "node"}, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, + }, + }, + }, + }, + serviceAffinityInUse: true, + serviceAffinityMatchingPodList: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}, + }, + serviceAffinityMatchingPodServices: []*v1.Service{ + {ObjectMeta: metav1.ObjectMeta{Name: "service1"}}, + }, + } + + if !reflect.DeepEqual(source.ShallowCopy().(*predicateMetadata), &source) { + t.Errorf("Copy is not equal to source!") + } +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index f28c7a901ce..94e6aa70219 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -45,6 +45,10 @@ import ( "github.com/golang/glog" ) +const ( + MatchInterPodAffinity = "MatchInterPodAffinity" +) + // NodeInfo: Other types for predicate functions... type NodeInfo interface { GetNodeInfo(nodeID string) (*v1.Node, error) diff --git a/plugin/pkg/scheduler/algorithm/scheduler_interface.go b/plugin/pkg/scheduler/algorithm/scheduler_interface.go index 7214f8ac4d1..41a6dbb48bb 100644 --- a/plugin/pkg/scheduler/algorithm/scheduler_interface.go +++ b/plugin/pkg/scheduler/algorithm/scheduler_interface.go @@ -47,6 +47,10 @@ type SchedulerExtender interface { // onto machines. type ScheduleAlgorithm interface { Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error) + // Preempt receives scheduling errors for a pod and tries to create room for + // the pod by preempting lower priority pods if possible. + // It returns the node where preemption happened, a list of preempted pods, and error if any. + Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error) // Predicates() returns a pointer to a map of predicate functions. This is // exposed for testing. Predicates() map[string]FitPredicate diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index d38fbb78f62..668db25d64d 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -155,7 +155,7 @@ func defaultPredicates() sets.String { ), // Fit is determined by inter-pod affinity. factory.RegisterFitPredicateFactory( - "MatchInterPodAffinity", + predicates.MatchInterPodAffinity, func(args factory.PluginFactoryArgs) algorithm.FitPredicate { return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister) }, diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index cb211e27f46..8b26ccdfeba 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -183,6 +183,8 @@ func (f *FakeExtender) IsBinder() bool { return true } +var _ algorithm.SchedulerExtender = &FakeExtender{} + func TestGenericSchedulerWithExtenders(t *testing.T) { tests := []struct { name string diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index b3e35477c66..686b52e170c 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -47,7 +47,14 @@ type FitError struct { var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") -const NoNodeAvailableMsg = "No nodes are available that match all of the following predicates" +const ( + NoNodeAvailableMsg = "No nodes are available that match all of the predicates" + // NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods. + // The scheduler uses the annotation to find that the pod shouldn't preempt more pods + // when it gets to the head of scheduling queue again. + // See podEligibleToPreemptOthers() for more information. + NominatedNodeAnnotationKey = "NominatedNodeName" +) // Error returns detailed information of why the pod failed to fit on each node func (f *FitError) Error() string { @@ -163,23 +170,61 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList // preempt finds nodes with pods that can be preempted to make room for "pod" to // schedule. It chooses one of the nodes and preempts the pods on the node and -// returns the node name if such a node is found. -// TODO(bsalamat): This function is under construction! DO NOT USE! -func (g *genericScheduler) preempt(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) { - nodes, err := nodeLister.List() +// returns the node and the list of preempted pods if such a node is found. +// TODO(bsalamat): Add priority-based scheduling. More info: today one or more +// pending pods (different from the pod that triggered the preemption(s)) may +// schedule into some portion of the resources freed up by the preemption(s) +// before the pod that triggered the preemption(s) has a chance to schedule +// there, thereby preventing the pod that triggered the preemption(s) from +// scheduling. Solution is given at: +// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics +func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) { + // Scheduler may return various types of errors. Consider preemption only if + // the error is of type FitError. + fitError, ok := scheduleErr.(*FitError) + if !ok || fitError == nil { + return nil, nil, nil + } + err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) if err != nil { - return "", err + return nil, nil, err } - if len(nodes) == 0 { - return "", ErrNoNodesAvailable + if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) { + glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name) + return nil, nil, nil } - nodeToPods := selectNodesForPreemption(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.predicateMetaProducer) - if len(nodeToPods) == 0 { - return "", nil + allNodes, err := nodeLister.List() + if err != nil { + return nil, nil, err } - node := pickOneNodeForPreemption(nodeToPods) - // TODO: Add actual preemption of pods - return node, nil + if len(allNodes) == 0 { + return nil, nil, ErrNoNodesAvailable + } + potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates) + if len(potentialNodes) == 0 { + glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name) + return nil, nil, nil + } + nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer) + if err != nil { + return nil, nil, err + } + for len(nodeToPods) > 0 { + node := pickOneNodeForPreemption(nodeToPods) + if node == nil { + return nil, nil, err + } + passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders) + if passes && pErr == nil { + return node, nodeToPods[node], err + } + if pErr != nil { + glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) + } + // Remove the node from the map and try to pick a different node. + delete(nodeToPods, node) + } + return nil, nil, err } // Filters the nodes to find the ones that fit based on the given predicate functions @@ -452,108 +497,151 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf // 1. A node with minimum highest priority victim is picked. // 2. Ties are broken by sum of priorities of all victims. // 3. If there are still ties, node with the minimum number of victims is picked. -// 4. If there are still ties, the first such node in the map is picked (sort of randomly). -func pickOneNodeForPreemption(nodesToPods map[string][]*v1.Pod) string { +// 4. If there are still ties, the first such node is picked (sort of randomly). +//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time. +func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { type nodeScore struct { - nodeName string + node *v1.Node highestPriority int32 sumPriorities int64 numPods int } if len(nodesToPods) == 0 { - return "" + return nil } minHighestPriority := int32(math.MaxInt32) - nodeScores := []*nodeScore{} - for nodeName, pods := range nodesToPods { + minPriorityScores := []*nodeScore{} + for node, pods := range nodesToPods { if len(pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. - return nodeName + return node } // highestPodPriority is the highest priority among the victims on this node. highestPodPriority := util.GetPodPriority(pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority + minPriorityScores = nil + } + if highestPodPriority == minHighestPriority { + minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)}) } - nodeScores = append(nodeScores, &nodeScore{nodeName: nodeName, highestPriority: highestPodPriority, numPods: len(pods)}) } - // Find the nodes with minimum highest priority victim. + if len(minPriorityScores) == 1 { + return minPriorityScores[0].node + } + // There are a few nodes with minimum highest priority victim. Find the + // smallest sum of priorities. minSumPriorities := int64(math.MaxInt64) - lowestHighPriorityNodes := []*nodeScore{} - for _, nodeScore := range nodeScores { - if nodeScore.highestPriority == minHighestPriority { - lowestHighPriorityNodes = append(lowestHighPriorityNodes, nodeScore) - var sumPriorities int64 - for _, pod := range nodesToPods[nodeScore.nodeName] { - // We add MaxInt32+1 to all priorities to make all of them >= 0. This is - // needed so that a node with a few pods with negative priority is not - // picked over a node with a smaller number of pods with the same negative - // priority (and similar scenarios). - sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1) - } - if sumPriorities < minSumPriorities { - minSumPriorities = sumPriorities - } - nodeScore.sumPriorities = sumPriorities + minSumPriorityScores := []*nodeScore{} + for _, nodeScore := range minPriorityScores { + var sumPriorities int64 + for _, pod := range nodesToPods[nodeScore.node] { + // We add MaxInt32+1 to all priorities to make all of them >= 0. This is + // needed so that a node with a few pods with negative priority is not + // picked over a node with a smaller number of pods with the same negative + // priority (and similar scenarios). + sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1) + } + if sumPriorities < minSumPriorities { + minSumPriorities = sumPriorities + minSumPriorityScores = nil + } + nodeScore.sumPriorities = sumPriorities + if sumPriorities == minSumPriorities { + minSumPriorityScores = append(minSumPriorityScores, nodeScore) } } - if len(lowestHighPriorityNodes) == 1 { - return lowestHighPriorityNodes[0].nodeName + if len(minSumPriorityScores) == 1 { + return minSumPriorityScores[0].node } - // There are multiple nodes with the same minimum highest priority victim. - // Choose the one(s) with lowest sum of priorities. + // There are a few nodes with minimum highest priority victim and sum of priorities. + // Find one with the minimum number of pods. minNumPods := math.MaxInt32 - lowestSumPriorityNodes := []*nodeScore{} - for _, nodeScore := range lowestHighPriorityNodes { - if nodeScore.sumPriorities == minSumPriorities { - lowestSumPriorityNodes = append(lowestSumPriorityNodes, nodeScore) - if nodeScore.numPods < minNumPods { - minNumPods = nodeScore.numPods - } + minNumPodScores := []*nodeScore{} + for _, nodeScore := range minSumPriorityScores { + if nodeScore.numPods < minNumPods { + minNumPods = nodeScore.numPods + minNumPodScores = nil } - } - if len(lowestSumPriorityNodes) == 1 { - return lowestSumPriorityNodes[0].nodeName - } - // There are still more than one node with minimum highest priority victim and - // lowest sum of victim priorities. Find the anyone with minimum number of victims. - for _, nodeScore := range lowestSumPriorityNodes { if nodeScore.numPods == minNumPods { - return nodeScore.nodeName + minNumPodScores = append(minNumPodScores, nodeScore) } } - glog.Errorf("We should never reach here!") - return "" + // At this point, even if there are more than one node with the same score, + // return the first one. + if len(minNumPodScores) > 0 { + return minNumPodScores[0].node + } + glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!") + return nil } // selectNodesForPreemption finds all the nodes with possible victims for // preemption in parallel. func selectNodesForPreemption(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, - nodes []*v1.Node, + potentialNodes []*v1.Node, predicates map[string]algorithm.FitPredicate, metadataProducer algorithm.PredicateMetadataProducer, -) map[string][]*v1.Pod { +) (map[*v1.Node][]*v1.Pod, error) { - nodeNameToPods := map[string][]*v1.Pod{} + nodeNameToPods := map[*v1.Node][]*v1.Pod{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { - nodeName := nodes[i].Name - pods, fits := selectVictimsOnNode(pod, meta.ShallowCopy(), nodeNameToInfo[nodeName], predicates) + nodeName := potentialNodes[i].Name + var metaCopy algorithm.PredicateMetadata + if meta != nil { + metaCopy = meta.ShallowCopy() + } + pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates) if fits { resultLock.Lock() - nodeNameToPods[nodeName] = pods + nodeNameToPods[potentialNodes[i]] = pods resultLock.Unlock() } } - workqueue.Parallelize(16, len(nodes), checkNode) - return nodeNameToPods + workqueue.Parallelize(16, len(potentialNodes), checkNode) + return nodeNameToPods, nil +} + +func nodePassesExtendersForPreemption( + pod *v1.Pod, + nodeName string, + victims []*v1.Pod, + nodeNameToInfo map[string]*schedulercache.NodeInfo, + extenders []algorithm.SchedulerExtender) (bool, error) { + // If there are any extenders, run them and filter the list of candidate nodes. + if len(extenders) == 0 { + return true, nil + } + // Remove the victims from the corresponding nodeInfo and send nodes to the + // extenders for filtering. + originalNodeInfo := nodeNameToInfo[nodeName] + nodeInfoCopy := nodeNameToInfo[nodeName].Clone() + for _, victim := range victims { + nodeInfoCopy.RemovePod(victim) + } + nodeNameToInfo[nodeName] = nodeInfoCopy + defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }() + filteredNodes := []*v1.Node{nodeInfoCopy.Node()} + for _, extender := range extenders { + var err error + var failedNodesMap map[string]string + filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo) + if err != nil { + return false, err + } + if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 { + return false, nil + } + } + return true, nil } // selectVictimsOnNode finds minimum set of pods on the given node that should @@ -563,25 +651,31 @@ func selectNodesForPreemption(pod *v1.Pod, // to one another, not relative to the preemptor "pod"). // The algorithm first checks if the pod can be scheduled on the node when all the // lower priority pods are gone. If so, it sorts all the lower priority pods by -// their priority and starting from the highest priority one, tries to keep as +// their priority and starts from the highest priority one, tries to keep as // many of them as possible while checking that the "pod" can still fit on the node. // NOTE: This function assumes that it is never called if "pod" cannot be scheduled // due to pod affinity, node affinity, or node anti-affinity reasons. None of // these predicates can be satisfied by removing more pods from the node. -func selectVictimsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) { - higherPriority := func(pod1, pod2 interface{}) bool { - return util.GetPodPriority(pod1.(*v1.Pod)) > util.GetPodPriority(pod2.(*v1.Pod)) - } - potentialVictims := util.SortableList{CompFunc: higherPriority} +// TODO(bsalamat): Add support for PodDisruptionBudget. +func selectVictimsOnNode( + pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo, + fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) { + potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) { nodeInfoCopy.RemovePod(rp) - meta.RemovePod(rp) + if meta != nil { + meta.RemovePod(rp) + } } addPod := func(ap *v1.Pod) { nodeInfoCopy.AddPod(ap) - meta.AddPod(ap, nodeInfoCopy) + if meta != nil { + meta.AddPod(ap, nodeInfoCopy) + } } // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. @@ -597,57 +691,83 @@ func selectVictimsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo // we are almost done and this node is not suitable for preemption. The only condition // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. - if fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { + // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) - return nil, false - } - // If the new pod still cannot be scheduled for any reason other than pod - // affinity, the new pod will not fit on this node and we are done here. - affinity := pod.Spec.Affinity - if affinity == nil || affinity.PodAffinity == nil { - return nil, false - } - for _, failedPred := range failedPredicates { - if failedPred != predicates.ErrPodAffinityNotMatch { - return nil, false - } - } - // If we reach here, it means that the pod cannot be scheduled due to pod - // affinity or anti-affinity. Since failure reason for both affinity and - // anti-affinity is the same, we cannot say which one caused it. So, we try - // adding pods one at a time and see if any of them satisfies the affinity rules. - for i, p := range potentialVictims.Items { - existingPod := p.(*v1.Pod) - addPod(existingPod) - if fits, _, _ = podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { - removePod(existingPod) - } else { - // We found the pod needed to satisfy pod affinity. Let's remove it from - // potential victims list. - // NOTE: We assume that pod affinity can be satisfied by only one pod, - // not multiple pods. This is how scheduler works today. - potentialVictims.Items = append(potentialVictims.Items[:i], potentialVictims.Items[i+1:]...) - break - } - } - if !fits { - return nil, false } + return nil, false } victims := []*v1.Pod{} - // Try to reprieve as may pods as possible starting from the highest priority one. + // Try to reprieve as many pods as possible starting from the highest priority one. for _, p := range potentialVictims.Items { lpp := p.(*v1.Pod) addPod(lpp) if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { removePod(lpp) victims = append(victims, lpp) + glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name) } } return victims, true } +// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates +// that may be satisfied by removing pods from the node. +func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node { + potentialNodes := []*v1.Node{} + for _, node := range nodes { + unresolvableReasonExist := false + failedPredicates, found := failedPredicatesMap[node.Name] + // If we assume that scheduler looks at all nodes and populates the failedPredicateMap + // (which is the case today), the !found case should never happen, but we'd prefer + // to rely less on such assumptions in the code when checking does not impose + // significant overhead. + for _, failedPredicate := range failedPredicates { + switch failedPredicate { + case + predicates.ErrNodeSelectorNotMatch, + predicates.ErrPodNotMatchHostName, + predicates.ErrTaintsTolerationsNotMatch, + predicates.ErrNodeLabelPresenceViolated, + predicates.ErrNodeNotReady, + predicates.ErrNodeNetworkUnavailable, + predicates.ErrNodeUnschedulable, + predicates.ErrNodeUnknownCondition: + unresolvableReasonExist = true + break + // TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors. + } + } + if !found || !unresolvableReasonExist { + glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) + potentialNodes = append(potentialNodes, node) + } + } + return potentialNodes +} + +// podEligibleToPreemptOthers determines whether this pod should be considered +// for preempting other pods or not. If this pod has already preempted other +// pods and those are in their graceful termination period, it shouldn't be +// considered for preemption. +// We look at the node that is nominated for this pod and as long as there are +// terminating pods on the node, we don't consider this for preempting more pods. +// TODO(bsalamat): Revisit this algorithm once scheduling by priority is added. +func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool { + if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found { + if nodeInfo, found := nodeNameToInfo[nodeName]; found { + for _, p := range nodeInfo.Pods() { + if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) { + // There is a terminating pod on the nominated node. + return false + } + } + } + } + return true +} + func NewGenericScheduler( cache schedulercache.Cache, eCache *EquivalenceCache, diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index d2ed01dff7f..cde1b5e1b4e 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" algorithmpriorities "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" @@ -307,8 +308,7 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( - cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, - []algorithm.SchedulerExtender{}) + cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if !reflect.DeepEqual(err, test.wErr) { @@ -548,12 +548,24 @@ func TestZeroRequest(t *testing.T) { } } -func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[string][]*v1.Pod) error { +func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string { + var output string + for node, pods := range nodeToPods { + output += node.Name + ": [" + for _, pod := range pods { + output += pod.Name + ", " + } + output += "]" + } + return output +} + +func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error { if len(expected) == len(nodeToPods) { for k, pods := range nodeToPods { - if expPods, ok := expected[k]; ok { + if expPods, ok := expected[k.Name]; ok { if len(pods) != len(expPods) { - return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, nodeToPods) + return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) } prevPriority := int32(math.MaxInt32) for _, p := range pods { @@ -567,11 +579,11 @@ func checkPreemptionVictims(testName string, expected map[string]map[string]bool } } } else { - return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, nodeToPods) + return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) } } } else { - return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, nodeToPods) + return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods)) } return nil } @@ -724,67 +736,37 @@ func TestSelectNodesForPreemption(t *testing.T) { expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}}, }, { - name: "lower priority pod is not preempted to satisfy pod affinity", + name: "pod with anti-affinity is preempted", predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, nodes: []string{"machine1", "machine2"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "service", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"securityscan", "value2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "machine1", + Labels: map[string]string{"pod": "preemptor"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1", Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "pod", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"preemptor", "value2"}, + }, }, }, + TopologyKey: "hostname", }, - TopologyKey: "hostname", }, - }, - }}}}, - pods: []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, + }}}}, {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}}, {ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}}, {ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}}, - expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}}, - addAffinityPredicate: true, - }, - { - name: "between two pods that satisfy affinity, the higher priority one stays", - predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, - nodes: []string{"machine1", "machine2"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "service", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"securityscan", "value2"}, - }, - }, - }, - TopologyKey: "hostname", - }, - }, - }}}}, - pods: []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "b", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}}, - expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}}, + expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {}}, addAffinityPredicate: true, }, } - for _, test := range tests { nodes := []*v1.Node{} for _, n := range test.nodes { @@ -793,10 +775,13 @@ func TestSelectNodesForPreemption(t *testing.T) { nodes = append(nodes, node) } if test.addAffinityPredicate { - test.predicates["affinity"] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods)) + test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods)) } nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) - nodeToPods := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata) + nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata) + if err != nil { + t.Error(err) + } if err := checkPreemptionVictims(test.name, test.expected, nodeToPods); err != nil { t.Error(err) } @@ -950,10 +935,11 @@ func TestPickOneNodeForPreemption(t *testing.T) { nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)) } nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) - node := pickOneNodeForPreemption(selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)) + candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata) + node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { - if node == nodeName { + if node.Name == nodeName { found = true break } @@ -963,3 +949,272 @@ func TestPickOneNodeForPreemption(t *testing.T) { } } } + +func TestNodesWherePreemptionMightHelp(t *testing.T) { + // Prepare 4 node names. + nodeNames := []string{} + for i := 1; i < 5; i++ { + nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i)) + } + + tests := []struct { + name string + failedPredMap FailedPredicateMap + pod *v1.Pod + expected map[string]bool // set of expected node names. Value is ignored. + }{ + { + name: "No node should be attempted", + failedPredMap: FailedPredicateMap{ + "machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch}, + "machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName}, + "machine3": []algorithm.PredicateFailureReason{predicates.ErrTaintsTolerationsNotMatch}, + "machine4": []algorithm.PredicateFailureReason{predicates.ErrNodeLabelPresenceViolated}, + }, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, + expected: map[string]bool{}, + }, + { + name: "pod affinity should be tried", + failedPredMap: FailedPredicateMap{ + "machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch}, + "machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName}, + "machine3": []algorithm.PredicateFailureReason{predicates.ErrNodeUnschedulable}, + }, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + TopologyKey: "hostname", + }, + }, + }}}}, + expected: map[string]bool{"machine1": true, "machine4": true}, + }, + { + name: "pod with both pod affinity and anti-affinity should be tried", + failedPredMap: FailedPredicateMap{ + "machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch}, + "machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName}, + }, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + TopologyKey: "hostname", + }, + }, + }, + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"blah", "foo"}, + }, + }, + }, + TopologyKey: "region", + }, + }, + }, + }}}, + expected: map[string]bool{"machine1": true, "machine3": true, "machine4": true}, + }, + { + name: "Mix of failed predicates works fine", + failedPredMap: FailedPredicateMap{ + "machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch, predicates.ErrNodeOutOfDisk, predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)}, + "machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName, predicates.ErrDiskConflict}, + "machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)}, + "machine4": []algorithm.PredicateFailureReason{}, + }, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, + expected: map[string]bool{"machine3": true, "machine4": true}, + }, + } + + for _, test := range tests { + nodes := nodesWherePreemptionMightHelp(test.pod, makeNodeList(nodeNames), test.failedPredMap) + if len(test.expected) != len(nodes) { + t.Errorf("test [%v]:number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", test.name, len(test.expected), len(nodes), nodes) + } + for _, node := range nodes { + if _, found := test.expected[node.Name]; !found { + t.Errorf("test [%v]: node %v is not expected.", test.name, node.Name) + } + } + } +} + +func TestPreempt(t *testing.T) { + failedPredMap := FailedPredicateMap{ + "machine1": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)}, + "machine2": []algorithm.PredicateFailureReason{predicates.ErrDiskConflict}, + "machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)}, + } + // Prepare 3 node names. + nodeNames := []string{} + for i := 1; i < 4; i++ { + nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i)) + } + tests := []struct { + name string + pod *v1.Pod + pods []*v1.Pod + extenders []*FakeExtender + expectedNode string + expectedPods []string // list of preempted pods + }{ + { + name: "basic preemption logic", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{ + Containers: veryLargeContainers, + Priority: &highPriority}, + }, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + + {ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + + {ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, + }, + { + name: "One node doesn't need any preemption", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{ + Containers: veryLargeContainers, + Priority: &highPriority}, + }, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + + {ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + }, + expectedNode: "machine3", + expectedPods: []string{}, + }, + { + name: "Scheduler extenders allow only machine1, otherwise machine3 would have been chosen", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{ + Containers: veryLargeContainers, + Priority: &highPriority}, + }, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + + {ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + }, + extenders: []*FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + }, + { + predicates: []fitPredicate{machine1PredicateExtender}, + }, + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, + }, + { + name: "Scheduler extenders do not allow any preemption", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{ + Containers: veryLargeContainers, + Priority: &highPriority}, + }, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + + {ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, + }, + extenders: []*FakeExtender{ + { + predicates: []fitPredicate{falsePredicateExtender}, + }, + }, + expectedNode: "", + expectedPods: []string{}, + }, + } + + for _, test := range tests { + stop := make(chan struct{}) + cache := schedulercache.New(time.Duration(0), stop) + for _, pod := range test.pods { + cache.AddPod(pod) + } + for _, name := range nodeNames { + cache.AddNode(makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)) + } + extenders := []algorithm.SchedulerExtender{} + for _, extender := range test.extenders { + extenders = append(extenders, extender) + } + scheduler := NewGenericScheduler( + cache, nil, map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders) + // Call Preempt and check the expected results. + node, victims, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap})) + if err != nil { + t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err) + } + if (node != nil && node.Name != test.expectedNode) || (node == nil && len(test.expectedNode) != 0) { + t.Errorf("test [%v]: expected node: %v, got: %v", test.name, test.expectedNode, node) + } + if len(victims) != len(test.expectedPods) { + t.Errorf("test [%v]: expected %v pods, got %v.", test.name, len(test.expectedPods), len(victims)) + } + for _, victim := range victims { + found := false + for _, expPod := range test.expectedPods { + if expPod == victim.Name { + found = true + break + } + } + if !found { + t.Errorf("test [%v]: pod %v is not expected to be a victim.", test.name, victim.Name) + } + // Mark the victims for deletion and record the preemptor's nominated node name. + now := metav1.Now() + victim.DeletionTimestamp = &now + test.pod.Annotations = make(map[string]string) + test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name + } + // Call preempt again and make sure it doesn't preempt any more pods. + node, victims, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap})) + if err != nil { + t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err) + } + if node != nil && len(victims) > 0 { + t.Errorf("test [%v]: didn't expect any more preemption. Node %v is selected for preemption.", test.name, node) + } + close(stop) + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 31cb17548ed..069707ddf21 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -716,6 +716,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, Algorithm: algo, Binder: f.getBinder(extenders), PodConditionUpdater: &podConditionUpdater{f.client}, + PodPreemptor: &podPreemptor{f.client}, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced) }, @@ -991,3 +992,28 @@ func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) er } return nil } + +type podPreemptor struct { + Client clientset.Interface +} + +func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { + return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) +} + +func (p *podPreemptor) DeletePod(pod *v1.Pod) error { + return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) +} + +//TODO(bsalamat): change this to patch PodStatus to avoid overwriting potential pending status updates. +func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error { + podCopy := pod.DeepCopy() + if podCopy.Annotations == nil { + podCopy.Annotations = map[string]string{} + } + for k, v := range annotations { + podCopy.Annotations[k] = v + } + _, err := p.Client.CoreV1().Pods(podCopy.Namespace).UpdateStatus(podCopy) + return err +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index a58a31bc34a..8f1c8ea38ea 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -23,10 +23,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/core" @@ -48,6 +50,14 @@ type PodConditionUpdater interface { Update(pod *v1.Pod, podCondition *v1.PodCondition) error } +// PodPreemptor has methods needed to delete a pod and to update +// annotations of the preemptor pod. +type PodPreemptor interface { + GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) + DeletePod(pod *v1.Pod) error + UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error +} + // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -102,6 +112,8 @@ type Config struct { // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. PodConditionUpdater PodConditionUpdater + // PodPreemptor is used to evict pods and update pod annotations. + PodPreemptor PodPreemptor // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -176,6 +188,41 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) { return host, err } +func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) { + glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.") + return "", nil + } + preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor) + if err != nil { + glog.Errorf("Error getting the updated preemptor pod object: %v", err) + return "", err + } + node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr) + if err != nil { + glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name) + return "", err + } + if node == nil { + return "", err + } + glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name) + annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name} + err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations) + if err != nil { + glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err) + return "", err + } + for _, victim := range victims { + if err := sched.config.PodPreemptor.DeletePod(victim); err != nil { + glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) + return "", err + } + sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name) + } + return node.Name, err +} + // assume signals to the cache that a pod is already in the cache, so that binding can be asnychronous. // assume modifies `assumed`. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { @@ -258,6 +305,13 @@ func (sched *Scheduler) scheduleOne() { suggestedHost, err := sched.schedule(pod) metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { + // schedule() may have failed because the pod would not fit on any host, so we try to + // preempt, with the expectation that the next time the pod is tried for scheduling it + // will fit due to the preemption. It is also possible that a different pod will schedule + // into the resources that were preempted, but this is harmless. + if fitError, ok := err.(*core.FitError); ok { + sched.preempt(pod, fitError) + } return } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 3ad54d400b3..b0743b7e17b 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -103,6 +103,10 @@ func (es mockScheduler) Prioritizers() []algorithm.PriorityConfig { return nil } +func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) { + return nil, nil, nil +} + func TestScheduler(t *testing.T) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(t.Logf).Stop() diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 4192aea7264..cda57e8b433 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -320,7 +320,7 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) { n.generation++ } -// RemovePod subtracts pod information to this NodeInfo. +// RemovePod subtracts pod information from this NodeInfo. func (n *NodeInfo) RemovePod(pod *v1.Pod) error { k1, err := getPodKey(pod) if err != nil { diff --git a/plugin/pkg/scheduler/util/utils.go b/plugin/pkg/scheduler/util/utils.go index 2bf10adf6b1..2cbe26f2e3a 100644 --- a/plugin/pkg/scheduler/util/utils.go +++ b/plugin/pkg/scheduler/util/utils.go @@ -67,8 +67,8 @@ type SortableList struct { CompFunc LessFunc } -// LessFunc is a function that receives two Pods and returns true if the first -// pod should be placed before pod2 when the list is sorted. +// LessFunc is a function that receives two items and returns true if the first +// item should be placed before the second one when the list is sorted. type LessFunc func(item1, item2 interface{}) bool var _ = sort.Interface(&SortableList{}) @@ -88,3 +88,10 @@ func (l *SortableList) Swap(i, j int) { func (l *SortableList) Sort() { sort.Sort(l) } + +// HigherPriorityPod return true when priority of the first pod is higher than +// the second one. It takes arguments of the type "interface{}" to be used with +// SortableList, but expects those arguments to be *v1.Pod. +func HigherPriorityPod(pod1, pod2 interface{}) bool { + return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod)) +} diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 8443e26b38d..9c14b20a5f4 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -52,6 +52,7 @@ type pausePodConfig struct { NodeName string Ports []v1.ContainerPort OwnerReferences []metav1.OwnerReference + PriorityClassName string } var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { @@ -555,8 +556,9 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { Ports: conf.Ports, }, }, - Tolerations: conf.Tolerations, - NodeName: conf.NodeName, + Tolerations: conf.Tolerations, + NodeName: conf.NodeName, + PriorityClassName: conf.PriorityClassName, }, } if conf.Resources != nil { diff --git a/test/e2e/scheduling/preemption.go b/test/e2e/scheduling/preemption.go new file mode 100644 index 00000000000..ef1af225362 --- /dev/null +++ b/test/e2e/scheduling/preemption.go @@ -0,0 +1,128 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "fmt" + "time" + + "k8s.io/api/core/v1" + "k8s.io/api/scheduling/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + _ "github.com/stretchr/testify/assert" +) + +var _ = SIGDescribe("SchedulerPreemption [Serial] [Feature:PodPreemption]", func() { + var cs clientset.Interface + var nodeList *v1.NodeList + var ns string + f := framework.NewDefaultFramework("sched-preemption") + + lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000) + lowPriorityClassName := f.BaseName + "-low-priority" + mediumPriorityClassName := f.BaseName + "-medium-priority" + highPriorityClassName := f.BaseName + "-high-priority" + + AfterEach(func() { + }) + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace.Name + nodeList = &v1.NodeList{} + + _, err := f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: highPriorityClassName}, Value: highPriority}) + Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true)) + _, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: mediumPriorityClassName}, Value: mediumPriority}) + Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true)) + _, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: lowPriorityClassName}, Value: lowPriority}) + Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true)) + + framework.WaitForAllNodesHealthy(cs, time.Minute) + masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs) + + err = framework.CheckTestingNSDeletedExcept(cs, ns) + framework.ExpectNoError(err) + }) + + // This test verifies that when a higher priority pod is created and no node with + // enough resources is found, scheduler preempts a lower priority pod to schedule + // the high priority pod. + It("validates basic preemption works", func() { + var podRes v1.ResourceList + // Create one pod per node that uses a lot of the node's resources. + By("Create pods that use 60% of node resources.") + pods := make([]*v1.Pod, len(nodeList.Items)) + for i, node := range nodeList.Items { + cpuAllocatable, found := node.Status.Allocatable["cpu"] + Expect(found).To(Equal(true)) + milliCPU := cpuAllocatable.MilliValue() * 40 / 100 + memAllocatable, found := node.Status.Allocatable["memory"] + Expect(found).To(Equal(true)) + memory := memAllocatable.Value() * 60 / 100 + podRes = v1.ResourceList{} + podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI) + podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI) + + // make the first pod low priority and the rest medium priority. + priorityName := mediumPriorityClassName + if i == 0 { + priorityName = lowPriorityClassName + } + pods[i] = createPausePod(f, pausePodConfig{ + Name: fmt.Sprintf("pod%d-%v", i, priorityName), + PriorityClassName: priorityName, + Resources: &v1.ResourceRequirements{ + Requests: podRes, + }, + }) + framework.Logf("Created pod: %v", pods[i].Name) + } + By("Wait for pods to be scheduled.") + for _, pod := range pods { + framework.ExpectNoError(framework.WaitForPodRunningInNamespace(cs, pod)) + } + + By("Run a high priority pod that use 60% of a node resources.") + // Create a high priority pod and make sure it is scheduled. + runPausePod(f, pausePodConfig{ + Name: "preemptor-pod", + PriorityClassName: highPriorityClassName, + Resources: &v1.ResourceRequirements{ + Requests: podRes, + }, + }) + // Make sure that the lowest priority pod is deleted. + preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(pods[0].Name, metav1.GetOptions{}) + podDeleted := (err != nil && errors.IsNotFound(err)) || + (err == nil && preemptedPod.DeletionTimestamp != nil) + Expect(podDeleted).To(BeTrue()) + // Other pods (mid priority ones) should be present. + for i := 1; i < len(pods); i++ { + livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(pods[i].Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + Expect(livePod.DeletionTimestamp).To(BeNil()) + } + }) +}) diff --git a/test/integration/scheduler/priorities_test.go b/test/integration/scheduler/priorities_test.go index 5271ad82ff3..6dd75ee670a 100644 --- a/test/integration/scheduler/priorities_test.go +++ b/test/integration/scheduler/priorities_test.go @@ -51,7 +51,7 @@ func TestNodeAffinity(t *testing.T) { } // Create a pod with node affinity. podName := "pod-with-node-affinity" - pod, err := runPausePod(context.clientSet, &pausePodConfig{ + pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{ Name: podName, Namespace: context.ns.Name, Affinity: &v1.Affinity{ @@ -72,7 +72,7 @@ func TestNodeAffinity(t *testing.T) { }, }, }, - }) + })) if err != nil { t.Fatalf("Error running pause pod: %v", err) } @@ -110,11 +110,11 @@ func TestPodAffinity(t *testing.T) { // Add a pod with a label and wait for it to schedule. labelKey := "service" labelValue := "S1" - _, err = runPausePod(context.clientSet, &pausePodConfig{ + _, err = runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{ Name: "attractor-pod", Namespace: context.ns.Name, Labels: map[string]string{labelKey: labelValue}, - }) + })) if err != nil { t.Fatalf("Error running the attractor pod: %v", err) } @@ -125,7 +125,7 @@ func TestPodAffinity(t *testing.T) { } // Add a new pod with affinity to the attractor pod. podName := "pod-with-podaffinity" - pod, err := runPausePod(context.clientSet, &pausePodConfig{ + pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{ Name: podName, Namespace: context.ns.Name, Affinity: &v1.Affinity{ @@ -158,7 +158,7 @@ func TestPodAffinity(t *testing.T) { }, }, }, - }) + })) if err != nil { t.Fatalf("Error running pause pod: %v", err) } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index e0c2b586b46..770a9bead28 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -24,9 +24,11 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -36,15 +38,18 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/test/integration/framework" + testutils "k8s.io/kubernetes/test/utils" ) const enableEquivalenceCache = true @@ -457,13 +462,13 @@ func TestMultiScheduler(t *testing.T) { } defaultScheduler := "default-scheduler" - testPodFitsDefault, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler}) + testPodFitsDefault, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler})) if err != nil { t.Fatalf("Failed to create pod: %v", err) } fooScheduler := "foo-scheduler" - testPodFitsFoo, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler}) + testPodFitsFoo, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler})) if err != nil { t.Fatalf("Failed to create pod: %v", err) } @@ -647,3 +652,251 @@ func TestAllocatable(t *testing.T) { t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name) } } + +// TestPreemption tests a few preemption scenarios. +func TestPreemption(t *testing.T) { + // Enable PodPriority feature gate. + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority)) + // Initialize scheduler. + context := initTest(t, "preemption") + defer cleanupTest(t, context) + cs := context.clientSet + + lowPriority, mediumPriority, highPriority := int32(100), int32(200), int32(300) + defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + } + + tests := []struct { + description string + existingPods []*v1.Pod + pod *v1.Pod + preemptedPodIndexes map[int]struct{} + }{ + { + description: "basic pod preemption", + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "victim-pod", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + }, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + }, + { + description: "preemption is performed to satisfy anti-affinity", + existingPods: []*v1.Pod{ + initPausePod(cs, &pausePodConfig{ + Name: "pod-0", Namespace: context.ns.Name, + Priority: &mediumPriority, + Labels: map[string]string{"pod": "p0"}, + Resources: defaultPodRes, + }), + initPausePod(cs, &pausePodConfig{ + Name: "pod-1", Namespace: context.ns.Name, + Priority: &lowPriority, + Labels: map[string]string{"pod": "p1"}, + Resources: defaultPodRes, + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "pod", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"preemptor"}, + }, + }, + }, + TopologyKey: "node", + }, + }, + }, + }, + }), + }, + // A higher priority pod with anti-affinity. + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Labels: map[string]string{"pod": "preemptor"}, + Resources: defaultPodRes, + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "pod", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"p0"}, + }, + }, + }, + TopologyKey: "node", + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, + }, + { + // This is similar to the previous case only pod-1 is high priority. + description: "preemption is not performed when anti-affinity is not satisfied", + existingPods: []*v1.Pod{ + initPausePod(cs, &pausePodConfig{ + Name: "pod-0", Namespace: context.ns.Name, + Priority: &mediumPriority, + Labels: map[string]string{"pod": "p0"}, + Resources: defaultPodRes, + }), + initPausePod(cs, &pausePodConfig{ + Name: "pod-1", Namespace: context.ns.Name, + Priority: &highPriority, + Labels: map[string]string{"pod": "p1"}, + Resources: defaultPodRes, + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "pod", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"preemptor"}, + }, + }, + }, + TopologyKey: "node", + }, + }, + }, + }, + }), + }, + // A higher priority pod with anti-affinity. + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Labels: map[string]string{"pod": "preemptor"}, + Resources: defaultPodRes, + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "pod", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"p0"}, + }, + }, + }, + TopologyKey: "node", + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{}, + }, + } + + // Create a node with some resources and a label. + nodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + } + node, err := createNode(context.clientSet, "node1", nodeRes) + if err != nil { + t.Fatalf("Error creating nodes: %v", err) + } + nodeLabels := map[string]string{"node": node.Name} + if err = testutils.AddLabelsToNode(context.clientSet, node.Name, nodeLabels); err != nil { + t.Fatalf("Cannot add labels to node: %v", err) + } + if err = waitForNodeLabels(context.clientSet, node.Name, nodeLabels); err != nil { + t.Fatalf("Adding labels to node didn't succeed: %v", err) + } + + for _, test := range tests { + pods := make([]*v1.Pod, len(test.existingPods)) + // Create and run existingPods. + for i, p := range test.existingPods { + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name) + } + } + } + // Also check that the preemptor pod gets the annotation for nominated node name. + if len(test.preemptedPodIndexes) > 0 { + if err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + pod, err := context.clientSet.CoreV1().Pods(context.ns.Name).Get("preemptor-pod", metav1.GetOptions{}) + if err != nil { + t.Errorf("Test [%v]: error getting pod: %v", test.description, err) + } + annot, found := pod.Annotations[core.NominatedNodeAnnotationKey] + if found && len(annot) > 0 { + return true, nil + } + return false, err + }); err != nil { + t.Errorf("Test [%v]: Pod annotation did not get set.", test.description) + } + } + + // Cleanup + pods = append(pods, preemptor) + for _, p := range pods { + err = cs.CoreV1().Pods(p.Namespace).Delete(p.Name, metav1.NewDeleteOptions(0)) + if err != nil && !errors.IsNotFound(err) { + t.Errorf("Test [%v]: error, %v, while deleting pod during test.", test.description, err) + } + err = wait.Poll(time.Second, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name)) + if err != nil { + t.Errorf("Test [%v]: error, %v, while waiting for pod to get deleted.", test.description, err) + } + } + } +} diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 8cf8c6c90c2..81726f1caba 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -205,6 +205,7 @@ type pausePodConfig struct { Tolerations []v1.Toleration NodeName string SchedulerName string + Priority *int32 } // initPausePod initializes a pod API object from the given config. It is used @@ -213,6 +214,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: conf.Name, + Namespace: conf.Namespace, Labels: conf.Labels, Annotations: conf.Annotations, }, @@ -228,6 +230,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod { Tolerations: conf.Tolerations, NodeName: conf.NodeName, SchedulerName: conf.SchedulerName, + Priority: conf.Priority, }, } if conf.Resources != nil { @@ -238,9 +241,8 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod { // createPausePod creates a pod with "Pause" image and the given config and // return its pointer and error status. -func createPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) { - p := initPausePod(cs, conf) - return cs.CoreV1().Pods(conf.Namespace).Create(p) +func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) { + return cs.CoreV1().Pods(p.Namespace).Create(p) } // createPausePodWithResource creates a pod with "Pause" image and the given @@ -262,22 +264,21 @@ func createPausePodWithResource(cs clientset.Interface, podName string, nsName s }, } } - return createPausePod(cs, &conf) + return createPausePod(cs, initPausePod(cs, &conf)) } // runPausePod creates a pod with "Pause" image and the given config and waits // until it is scheduled. It returns its pointer and error status. -func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) { - p := initPausePod(cs, conf) - pod, err := cs.CoreV1().Pods(conf.Namespace).Create(p) +func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { + pod, err := cs.CoreV1().Pods(pod.Namespace).Create(pod) if err != nil { return nil, fmt.Errorf("Error creating pause pod: %v", err) } if err = waitForPodToSchedule(cs, pod); err != nil { return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) } - if pod, err = cs.CoreV1().Pods(conf.Namespace).Get(conf.Name, metav1.GetOptions{}); err != nil { - return pod, fmt.Errorf("Error getting pod %v info: %v", conf.Name, err) + if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil { + return pod, fmt.Errorf("Error getting pod %v info: %v", pod.Name, err) } return pod, nil } @@ -285,7 +286,10 @@ func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) // podDeleted returns true if a pod is not found in the given namespace. func podDeleted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { return func() (bool, error) { - _, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if pod.DeletionTimestamp != nil { + return true, nil + } if errors.IsNotFound(err) { return true, nil } @@ -293,6 +297,20 @@ func podDeleted(c clientset.Interface, podNamespace, podName string) wait.Condit } } +// podIsGettingEvicted returns true if the pod's deletion timestamp is set. +func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if pod.DeletionTimestamp != nil { + return true, nil + } + return false, nil + } +} + // podScheduled returns true if a node is assigned to the given pod. func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { return func() (bool, error) {