From a75d625cc354ef853976e0fb5089fffb2b8bcefd Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Thu, 26 Jul 2018 15:03:02 +0800 Subject: [PATCH] Compared preemption by priority. Signed-off-by: Da K. Ma --- .../admission_failure_handler_stub.go | 2 +- pkg/kubelet/lifecycle/predicate.go | 24 ++++----- pkg/kubelet/preemption/BUILD | 1 + pkg/kubelet/preemption/preemption.go | 34 +++++++------ pkg/kubelet/preemption/preemption_test.go | 50 +++++++++++++++++-- pkg/kubelet/types/pod_update.go | 17 +++++++ 6 files changed, 96 insertions(+), 32 deletions(-) diff --git a/pkg/kubelet/lifecycle/admission_failure_handler_stub.go b/pkg/kubelet/lifecycle/admission_failure_handler_stub.go index 58e675e9a9b..7184392ead9 100644 --- a/pkg/kubelet/lifecycle/admission_failure_handler_stub.go +++ b/pkg/kubelet/lifecycle/admission_failure_handler_stub.go @@ -31,6 +31,6 @@ func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub { return &AdmissionFailureHandlerStub{} } -func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) { +func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) { return false, failureReasons, nil } diff --git a/pkg/kubelet/lifecycle/predicate.go b/pkg/kubelet/lifecycle/predicate.go index 420bae21672..ba6d25b584c 100644 --- a/pkg/kubelet/lifecycle/predicate.go +++ b/pkg/kubelet/lifecycle/predicate.go @@ -36,7 +36,7 @@ type pluginResourceUpdateFuncType func(*schedulercache.NodeInfo, *PodAdmitAttrib // AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod. // This allows for the graceful handling of pod admission failure. type AdmissionFailureHandler interface { - HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) + HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) } type predicateAdmitHandler struct { @@ -65,14 +65,14 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult Message: "Kubelet cannot get node info.", } } - pod := attrs.Pod + admitPod := attrs.Pod pods := attrs.OtherPods nodeInfo := schedulercache.NewNodeInfo(pods...) nodeInfo.SetNode(node) // ensure the node has enough plugin resources for that required in pods if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil { message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err) - glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message) return PodAdmitResult{ Admit: false, Reason: "UnexpectedAdmissionError", @@ -88,12 +88,12 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult // node-level extended resource it requires is not found, then kubelet will // not fail admission while it should. This issue will be addressed with // the Resource Class API in the future. - podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo) + podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo) fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo) if err != nil { message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) - glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message) return PodAdmitResult{ Admit: fit, Reason: "UnexpectedAdmissionError", @@ -101,10 +101,10 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult } } if !fit { - fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(pod, reasons) + fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons) if err != nil { message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err) - glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message) return PodAdmitResult{ Admit: fit, Reason: "UnexpectedAdmissionError", @@ -117,7 +117,7 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult var message string if len(reasons) == 0 { message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.") - glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message) return PodAdmitResult{ Admit: fit, Reason: "UnknownReason", @@ -130,19 +130,19 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult case *predicates.PredicateFailureError: reason = re.PredicateName message = re.Error() - glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message) + glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message) case *predicates.InsufficientResourceError: reason = fmt.Sprintf("OutOf%s", re.ResourceName) message = re.Error() - glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message) + glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message) case *predicates.FailureReason: reason = re.GetReason() message = fmt.Sprintf("Failure: %s", re.GetReason()) - glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message) + glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message) default: reason = "UnexpectedPredicateFailureType" message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r) - glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message) } return PodAdmitResult{ Admit: fit, diff --git a/pkg/kubelet/preemption/BUILD b/pkg/kubelet/preemption/BUILD index 1372e9e0cfc..9e8e44331b8 100644 --- a/pkg/kubelet/preemption/BUILD +++ b/pkg/kubelet/preemption/BUILD @@ -45,6 +45,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core:go_default_library", + "//pkg/apis/scheduling:go_default_library", "//pkg/kubelet/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/preemption/preemption.go b/pkg/kubelet/preemption/preemption.go index ebd16a4f188..c7236e2c5f7 100644 --- a/pkg/kubelet/preemption/preemption.go +++ b/pkg/kubelet/preemption/preemption.go @@ -61,8 +61,8 @@ func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPod // HandleAdmissionFailure gracefully handles admission rejection, and, in some cases, // to allow admission of the pod despite its previous failure. -func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) { - if !kubetypes.IsCriticalPod(pod) { +func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) { + if !kubetypes.IsCriticalPod(admitPod) { return false, failureReasons, nil } // InsufficientResourceError is not a reason to reject a critical pod. @@ -83,16 +83,16 @@ func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(pod *v1.Pod, failur // Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons. return false, nonResourceReasons, nil } - err := c.evictPodsToFreeRequests(admissionRequirementList(resourceReasons)) + err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons)) // if no error is returned, preemption succeeded and the pod is safe to admit. return err == nil, nil, err } -// freeRequests takes a list of insufficient resources, and attempts to free them by evicting pods +// evictPodsToFreeRequests takes a list of insufficient resources, and attempts to free them by evicting pods // based on requests. For example, if the only insufficient resource is 200Mb of memory, this function could // evict a pod with request=250Mb. -func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(insufficientResources admissionRequirementList) error { - podsToPreempt, err := getPodsToPreempt(c.getPodsFunc(), insufficientResources) +func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error { + podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources) if err != nil { return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err) } @@ -116,8 +116,8 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(insufficientResour } // getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements -func getPodsToPreempt(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) { - bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pods) +func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) { + bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods) // make sure that pods exist to reclaim the requirements unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...) @@ -142,7 +142,7 @@ func getPodsToPreempt(pods []*v1.Pod, requirements admissionRequirementList) ([] return append(append(bestEffortToEvict, burstableToEvict...), guarateedToEvict...), nil } -// finds the pods that have pod requests >= admission requirements. +// getPodsToPreemptByDistance finds the pods that have pod requests >= admission requirements. // Chooses pods that minimize "distance" to the requirements. // If more than one pod exists that fulfills the remaining requirements, // it chooses the pod that has the "smaller resource request" @@ -183,8 +183,8 @@ type admissionRequirement struct { type admissionRequirementList []*admissionRequirement -// distance of the pods requests from the admissionRequirements. -// distance is measured by the fraction of the requirement satisfied by the pod, +// distance returns distance of the pods requests from the admissionRequirements. +// The distance is measured by the fraction of the requirement satisfied by the pod, // so that each requirement is weighted equally, regardless of absolute magnitude. func (a admissionRequirementList) distance(pod *v1.Pod) float64 { dist := float64(0) @@ -198,7 +198,7 @@ func (a admissionRequirementList) distance(pod *v1.Pod) float64 { return dist } -// returns a new admissionRequirementList containing remaining requirements if the provided pod +// subtract returns a new admissionRequirementList containing remaining requirements if the provided pod // were to be preempted func (a admissionRequirementList) subtract(pods ...*v1.Pod) admissionRequirementList { newList := []*admissionRequirement{} @@ -225,10 +225,11 @@ func (a admissionRequirementList) toString() string { return s + "]" } -// returns lists containing non-critical besteffort, burstable, and guaranteed pods -func sortPodsByQOS(pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) { +// sortPodsByQOS returns lists containing besteffort, burstable, and guaranteed pods that +// can be preempted by preemptor pod. +func sortPodsByQOS(preemptor *v1.Pod, pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) { for _, pod := range pods { - if !kubetypes.IsCriticalPod(pod) { + if kubetypes.Preemptable(preemptor, pod) { switch v1qos.GetPodQOS(pod) { case v1.PodQOSBestEffort: bestEffort = append(bestEffort, pod) @@ -240,10 +241,11 @@ func sortPodsByQOS(pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) } } } + return } -// returns true if pod1 has a smaller request than pod2 +// smallerResourceRequest returns true if pod1 has a smaller request than pod2 func smallerResourceRequest(pod1 *v1.Pod, pod2 *v1.Pod) bool { priorityList := []v1.ResourceName{ v1.ResourceMemory, diff --git a/pkg/kubelet/preemption/preemption_test.go b/pkg/kubelet/preemption/preemption_test.go index 744cd41696b..e66837e1023 100644 --- a/pkg/kubelet/preemption/preemption_test.go +++ b/pkg/kubelet/preemption/preemption_test.go @@ -26,11 +26,14 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" kubeapi "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/apis/scheduling" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) const ( critical = "critical" + clusterCritical = "cluster-critical" + nodeCritical = "node-critical" bestEffort = "bestEffort" burstable = "burstable" highRequestBurstable = "high-request-burstable" @@ -127,7 +130,7 @@ func TestEvictPodsToFreeRequests(t *testing.T) { } for _, r := range runs { podProvider.setPods(r.inputPods) - outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(r.insufficientResources) + outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(nil, r.insufficientResources) outputPods := podKiller.getKilledPods() if !r.expectErr && outErr != nil { t.Errorf("evictPodsToFreeRequests returned an unexpected error during the %s test. Err: %v", r.testName, outErr) @@ -147,7 +150,7 @@ func BenchmarkGetPodsToPreempt(t *testing.B) { inputPods = append(inputPods, allPods[tinyBurstable]) } for n := 0; n < t.N; n++ { - getPodsToPreempt(inputPods, admissionRequirementList([]*admissionRequirement{ + getPodsToPreempt(nil, inputPods, admissionRequirementList([]*admissionRequirement{ { resourceName: v1.ResourceCPU, quantity: parseCPUToInt64("110m"), @@ -158,6 +161,7 @@ func BenchmarkGetPodsToPreempt(t *testing.B) { func TestGetPodsToPreempt(t *testing.T) { type testRun struct { testName string + preemptor *v1.Pod inputPods []*v1.Pod insufficientResources admissionRequirementList expectErr bool @@ -235,9 +239,26 @@ func TestGetPodsToPreempt(t *testing.T) { expectErr: false, expectedOutput: []*v1.Pod{allPods[highRequestBurstable], allPods[highRequestGuaranteed]}, }, + { + testName: "evict cluster critical pod for node critical pod", + preemptor: allPods[nodeCritical], + inputPods: []*v1.Pod{allPods[clusterCritical]}, + insufficientResources: getAdmissionRequirementList(100, 0, 0), + expectErr: false, + expectedOutput: []*v1.Pod{allPods[clusterCritical]}, + }, + { + testName: "can not evict node critical pod for cluster critical pod", + preemptor: allPods[clusterCritical], + inputPods: []*v1.Pod{allPods[nodeCritical]}, + insufficientResources: getAdmissionRequirementList(100, 0, 0), + expectErr: true, + expectedOutput: nil, + }, } + for _, r := range runs { - outputPods, outErr := getPodsToPreempt(r.inputPods, r.insufficientResources) + outputPods, outErr := getPodsToPreempt(r.preemptor, r.inputPods, r.insufficientResources) if !r.expectErr && outErr != nil { t.Errorf("getPodsToPreempt returned an unexpected error during the %s test. Err: %v", r.testName, outErr) } else if r.expectErr && outErr == nil { @@ -353,6 +374,18 @@ func getTestPods() map[string]*v1.Pod { v1.ResourceMemory: resource.MustParse("100Mi"), }, }), + clusterCritical: getPodWithResources(clusterCritical, v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + }), + nodeCritical: getPodWithResources(nodeCritical, v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + }), burstable: getPodWithResources(burstable, v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("100m"), @@ -388,6 +421,17 @@ func getTestPods() map[string]*v1.Pod { } allPods[critical].Namespace = kubeapi.NamespaceSystem allPods[critical].Annotations[kubetypes.CriticalPodAnnotationKey] = "" + + allPods[clusterCritical].Namespace = kubeapi.NamespaceSystem + allPods[clusterCritical].Spec.PriorityClassName = scheduling.SystemClusterCritical + clusterPriority := scheduling.SystemCriticalPriority + allPods[clusterCritical].Spec.Priority = &clusterPriority + + allPods[nodeCritical].Namespace = kubeapi.NamespaceSystem + allPods[nodeCritical].Spec.PriorityClassName = scheduling.SystemNodeCritical + nodePriority := scheduling.SystemCriticalPriority + 100 + allPods[nodeCritical].Spec.Priority = &nodePriority + return allPods } diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index cb0d4893172..d2cc4aff74b 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -159,6 +159,23 @@ func IsCriticalPod(pod *v1.Pod) bool { return false } +// Preemptable returns true if preemptor pod can preempt preemptee pod: +// - If preemptor's is greater than preemptee's priority, it's preemptable (return true) +// - If preemptor (or its priority) is nil and preemptee bears the critical pod annotation key, +// preemptee can not be preempted (return false) +// - If preemptor (or its priority) is nil and preemptee's priority is greater than or equal to +// SystemCriticalPriority, preemptee can not be preempted (return false) +func Preemptable(preemptor, preemptee *v1.Pod) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) { + if (preemptor != nil && preemptor.Spec.Priority != nil) && + (preemptee != nil && preemptee.Spec.Priority != nil) { + return *(preemptor.Spec.Priority) > *(preemptee.Spec.Priority) + } + } + + return !IsCriticalPod(preemptee) +} + // IsCritical returns true if parameters bear the critical pod annotation // key. The DaemonSetController use this key directly to make scheduling decisions. // TODO: @ravig - Deprecated. Remove this when we move to resolving critical pods based on priorityClassName.