Merge pull request #65561 from k82cn/k8s_65372_1

Automatic merge from submit-queue (batch tested with PRs 65561, 67109, 67450, 67456, 67402). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Compared preemption by priority in Kubelet

Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>


**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #65372 

**Release note**:
```release-note
None
```
This commit is contained in:
Kubernetes Submit Queue 2018-08-15 18:15:06 -07:00 committed by GitHub
commit 6faf115870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 32 deletions

View File

@ -31,6 +31,6 @@ func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub {
return &AdmissionFailureHandlerStub{}
}
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
return false, failureReasons, nil
}

View File

@ -36,7 +36,7 @@ type pluginResourceUpdateFuncType func(*schedulercache.NodeInfo, *PodAdmitAttrib
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
// This allows for the graceful handling of pod admission failure.
type AdmissionFailureHandler interface {
HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error)
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error)
}
type predicateAdmitHandler struct {
@ -65,14 +65,14 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Message: "Kubelet cannot get node info.",
}
}
pod := attrs.Pod
admitPod := attrs.Pod
pods := attrs.OtherPods
nodeInfo := schedulercache.NewNodeInfo(pods...)
nodeInfo.SetNode(node)
// ensure the node has enough plugin resources for that required in pods
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: false,
Reason: "UnexpectedAdmissionError",
@ -88,12 +88,12 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo)
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
@ -101,10 +101,10 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
}
if !fit {
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(pod, reasons)
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
@ -117,7 +117,7 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnknownReason",
@ -130,19 +130,19 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
case *predicates.PredicateFailureError:
reason = re.PredicateName
message = re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
case *predicates.InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
case *predicates.FailureReason:
reason = re.GetReason()
message = fmt.Sprintf("Failure: %s", re.GetReason())
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
default:
reason = "UnexpectedPredicateFailureType"
message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
}
return PodAdmitResult{
Admit: fit,

View File

@ -45,6 +45,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/apis/scheduling:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -61,8 +61,8 @@ func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPod
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(pod) {
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(admitPod) {
return false, failureReasons, nil
}
// InsufficientResourceError is not a reason to reject a critical pod.
@ -83,16 +83,16 @@ func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(pod *v1.Pod, failur
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return false, nonResourceReasons, nil
}
err := c.evictPodsToFreeRequests(admissionRequirementList(resourceReasons))
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return err == nil, nil, err
}
// freeRequests takes a list of insufficient resources, and attempts to free them by evicting pods
// evictPodsToFreeRequests takes a list of insufficient resources, and attempts to free them by evicting pods
// based on requests. For example, if the only insufficient resource is 200Mb of memory, this function could
// evict a pod with request=250Mb.
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(insufficientResources admissionRequirementList) error {
podsToPreempt, err := getPodsToPreempt(c.getPodsFunc(), insufficientResources)
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources)
if err != nil {
return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
}
@ -116,8 +116,8 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(insufficientResour
}
// getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements
func getPodsToPreempt(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pods)
func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)
// make sure that pods exist to reclaim the requirements
unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
@ -142,7 +142,7 @@ func getPodsToPreempt(pods []*v1.Pod, requirements admissionRequirementList) ([]
return append(append(bestEffortToEvict, burstableToEvict...), guarateedToEvict...), nil
}
// finds the pods that have pod requests >= admission requirements.
// getPodsToPreemptByDistance finds the pods that have pod requests >= admission requirements.
// Chooses pods that minimize "distance" to the requirements.
// If more than one pod exists that fulfills the remaining requirements,
// it chooses the pod that has the "smaller resource request"
@ -183,8 +183,8 @@ type admissionRequirement struct {
type admissionRequirementList []*admissionRequirement
// distance of the pods requests from the admissionRequirements.
// distance is measured by the fraction of the requirement satisfied by the pod,
// distance returns distance of the pods requests from the admissionRequirements.
// The distance is measured by the fraction of the requirement satisfied by the pod,
// so that each requirement is weighted equally, regardless of absolute magnitude.
func (a admissionRequirementList) distance(pod *v1.Pod) float64 {
dist := float64(0)
@ -198,7 +198,7 @@ func (a admissionRequirementList) distance(pod *v1.Pod) float64 {
return dist
}
// returns a new admissionRequirementList containing remaining requirements if the provided pod
// subtract returns a new admissionRequirementList containing remaining requirements if the provided pod
// were to be preempted
func (a admissionRequirementList) subtract(pods ...*v1.Pod) admissionRequirementList {
newList := []*admissionRequirement{}
@ -225,10 +225,11 @@ func (a admissionRequirementList) toString() string {
return s + "]"
}
// returns lists containing non-critical besteffort, burstable, and guaranteed pods
func sortPodsByQOS(pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) {
// sortPodsByQOS returns lists containing besteffort, burstable, and guaranteed pods that
// can be preempted by preemptor pod.
func sortPodsByQOS(preemptor *v1.Pod, pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) {
for _, pod := range pods {
if !kubetypes.IsCriticalPod(pod) {
if kubetypes.Preemptable(preemptor, pod) {
switch v1qos.GetPodQOS(pod) {
case v1.PodQOSBestEffort:
bestEffort = append(bestEffort, pod)
@ -240,10 +241,11 @@ func sortPodsByQOS(pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod)
}
}
}
return
}
// returns true if pod1 has a smaller request than pod2
// smallerResourceRequest returns true if pod1 has a smaller request than pod2
func smallerResourceRequest(pod1 *v1.Pod, pod2 *v1.Pod) bool {
priorityList := []v1.ResourceName{
v1.ResourceMemory,

View File

@ -26,11 +26,14 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
kubeapi "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/scheduling"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
const (
critical = "critical"
clusterCritical = "cluster-critical"
nodeCritical = "node-critical"
bestEffort = "bestEffort"
burstable = "burstable"
highRequestBurstable = "high-request-burstable"
@ -127,7 +130,7 @@ func TestEvictPodsToFreeRequests(t *testing.T) {
}
for _, r := range runs {
podProvider.setPods(r.inputPods)
outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(r.insufficientResources)
outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(nil, r.insufficientResources)
outputPods := podKiller.getKilledPods()
if !r.expectErr && outErr != nil {
t.Errorf("evictPodsToFreeRequests returned an unexpected error during the %s test. Err: %v", r.testName, outErr)
@ -147,7 +150,7 @@ func BenchmarkGetPodsToPreempt(t *testing.B) {
inputPods = append(inputPods, allPods[tinyBurstable])
}
for n := 0; n < t.N; n++ {
getPodsToPreempt(inputPods, admissionRequirementList([]*admissionRequirement{
getPodsToPreempt(nil, inputPods, admissionRequirementList([]*admissionRequirement{
{
resourceName: v1.ResourceCPU,
quantity: parseCPUToInt64("110m"),
@ -158,6 +161,7 @@ func BenchmarkGetPodsToPreempt(t *testing.B) {
func TestGetPodsToPreempt(t *testing.T) {
type testRun struct {
testName string
preemptor *v1.Pod
inputPods []*v1.Pod
insufficientResources admissionRequirementList
expectErr bool
@ -235,9 +239,26 @@ func TestGetPodsToPreempt(t *testing.T) {
expectErr: false,
expectedOutput: []*v1.Pod{allPods[highRequestBurstable], allPods[highRequestGuaranteed]},
},
{
testName: "evict cluster critical pod for node critical pod",
preemptor: allPods[nodeCritical],
inputPods: []*v1.Pod{allPods[clusterCritical]},
insufficientResources: getAdmissionRequirementList(100, 0, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[clusterCritical]},
},
{
testName: "can not evict node critical pod for cluster critical pod",
preemptor: allPods[clusterCritical],
inputPods: []*v1.Pod{allPods[nodeCritical]},
insufficientResources: getAdmissionRequirementList(100, 0, 0),
expectErr: true,
expectedOutput: nil,
},
}
for _, r := range runs {
outputPods, outErr := getPodsToPreempt(r.inputPods, r.insufficientResources)
outputPods, outErr := getPodsToPreempt(r.preemptor, r.inputPods, r.insufficientResources)
if !r.expectErr && outErr != nil {
t.Errorf("getPodsToPreempt returned an unexpected error during the %s test. Err: %v", r.testName, outErr)
} else if r.expectErr && outErr == nil {
@ -353,6 +374,18 @@ func getTestPods() map[string]*v1.Pod {
v1.ResourceMemory: resource.MustParse("100Mi"),
},
}),
clusterCritical: getPodWithResources(clusterCritical, v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
}),
nodeCritical: getPodWithResources(nodeCritical, v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
}),
burstable: getPodWithResources(burstable, v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
@ -388,6 +421,17 @@ func getTestPods() map[string]*v1.Pod {
}
allPods[critical].Namespace = kubeapi.NamespaceSystem
allPods[critical].Annotations[kubetypes.CriticalPodAnnotationKey] = ""
allPods[clusterCritical].Namespace = kubeapi.NamespaceSystem
allPods[clusterCritical].Spec.PriorityClassName = scheduling.SystemClusterCritical
clusterPriority := scheduling.SystemCriticalPriority
allPods[clusterCritical].Spec.Priority = &clusterPriority
allPods[nodeCritical].Namespace = kubeapi.NamespaceSystem
allPods[nodeCritical].Spec.PriorityClassName = scheduling.SystemNodeCritical
nodePriority := scheduling.SystemCriticalPriority + 100
allPods[nodeCritical].Spec.Priority = &nodePriority
return allPods
}

View File

@ -159,6 +159,23 @@ func IsCriticalPod(pod *v1.Pod) bool {
return false
}
// Preemptable returns true if preemptor pod can preempt preemptee pod:
// - If preemptor's is greater than preemptee's priority, it's preemptable (return true)
// - If preemptor (or its priority) is nil and preemptee bears the critical pod annotation key,
// preemptee can not be preempted (return false)
// - If preemptor (or its priority) is nil and preemptee's priority is greater than or equal to
// SystemCriticalPriority, preemptee can not be preempted (return false)
func Preemptable(preemptor, preemptee *v1.Pod) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {
if (preemptor != nil && preemptor.Spec.Priority != nil) &&
(preemptee != nil && preemptee.Spec.Priority != nil) {
return *(preemptor.Spec.Priority) > *(preemptee.Spec.Priority)
}
}
return !IsCriticalPod(preemptee)
}
// IsCritical returns true if parameters bear the critical pod annotation
// key. The DaemonSetController use this key directly to make scheduling decisions.
// TODO: @ravig - Deprecated. Remove this when we move to resolving critical pods based on priorityClassName.