Move GeneralPredicates logic to kubelet.

This commit is contained in:
Wei Huang
2020-01-20 17:07:39 -08:00
parent 37f98ac3b6
commit 3f8b202266
10 changed files with 245 additions and 339 deletions

View File

@@ -21,7 +21,11 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/helper:go_default_library",
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -40,11 +44,16 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
],
)

View File

@@ -18,7 +18,6 @@ package lifecycle
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
)
// AdmissionFailureHandlerStub is an AdmissionFailureHandler that does not perform any handling of admission failure.
@@ -31,6 +30,6 @@ func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub {
return &AdmissionFailureHandlerStub{}
}
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
return false, failureReasons, nil
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error) {
return failureReasons, nil
}

View File

@@ -20,11 +20,15 @@ import (
"fmt"
"k8s.io/klog"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@@ -35,7 +39,7 @@ type pluginResourceUpdateFuncType func(*schedulernodeinfo.NodeInfo, *PodAdmitAtt
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
// This allows for the graceful handling of pod admission failure.
type AdmissionFailureHandler interface {
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error)
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error)
}
type predicateAdmitHandler struct {
@@ -89,7 +93,8 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
reasons, err := GeneralPredicates(podWithoutMissingExtendedResources, nodeInfo)
fit := len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
@@ -100,7 +105,8 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
}
if !fit {
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
fit = len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
@@ -126,11 +132,11 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *predicates.PredicateFailureError:
case *PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
case *predicates.InsufficientResourceError:
case *InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
@@ -168,3 +174,68 @@ func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulernodeinfo.Nod
}
return podCopy
}
// InsufficientResourceError is an error type that indicates what kind of resource limit is
// hit and caused the unfitting failure.
type InsufficientResourceError struct {
noderesources.InsufficientResource
}
func (e *InsufficientResourceError) Error() string {
return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d",
e.ResourceName, e.Requested, e.Used, e.Capacity)
}
// PredicateFailureReason interface represents the failure reason of a predicate.
type PredicateFailureReason interface {
GetReason() string
}
// GetReason returns the reason of the InsufficientResourceError.
func (e *InsufficientResourceError) GetReason() string {
return fmt.Sprintf("Insufficient %v", e.ResourceName)
}
// GetInsufficientAmount returns the amount of the insufficient resource of the error.
func (e *InsufficientResourceError) GetInsufficientAmount() int64 {
return e.Requested - (e.Capacity - e.Used)
}
// PredicateFailureError describes a failure error of predicate.
type PredicateFailureError struct {
PredicateName string
PredicateDesc string
}
func (e *PredicateFailureError) Error() string {
return fmt.Sprintf("Predicate %s failed", e.PredicateName)
}
// GetReason returns the reason of the PredicateFailureError.
func (e *PredicateFailureError) GetReason() string {
return e.PredicateDesc
}
// GeneralPredicates checks a group of predicates that the kubelet cares about.
func GeneralPredicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) ([]PredicateFailureReason, error) {
if nodeInfo.Node() == nil {
return nil, fmt.Errorf("node not found")
}
var reasons []PredicateFailureReason
for _, r := range noderesources.Fits(pod, nodeInfo, nil) {
reasons = append(reasons, &InsufficientResourceError{InsufficientResource: r})
}
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, nodeInfo.Node()) {
reasons = append(reasons, &PredicateFailureError{nodeaffinity.Name, nodeaffinity.ErrReason})
}
if !nodename.Fits(pod, nodeInfo) {
reasons = append(reasons, &PredicateFailureError{nodename.Name, nodename.ErrReason})
}
if !nodeports.Fits(pod, nodeInfo) {
reasons = append(reasons, &PredicateFailureError{nodeports.Name, nodeports.ErrReason})
}
return reasons, nil
}

View File

@@ -22,6 +22,11 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@@ -110,3 +115,147 @@ func makeTestNode(allocatable v1.ResourceList) *v1.Node {
},
}
}
var (
extendedResourceA = v1.ResourceName("example.com/aaa")
hugePageResourceA = v1helper.HugePageResourceName(resource.MustParse("2Mi"))
)
func makeResources(milliCPU, memory, pods, extendedA, storage, hugePageA int64) v1.NodeResources {
return v1.NodeResources{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
extendedResourceA: *resource.NewQuantity(extendedA, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(storage, resource.BinarySI),
hugePageResourceA: *resource.NewQuantity(hugePageA, resource.BinarySI),
},
}
}
func makeAllocatableResources(milliCPU, memory, pods, extendedA, storage, hugePageA int64) v1.ResourceList {
return v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
extendedResourceA: *resource.NewQuantity(extendedA, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(storage, resource.BinarySI),
hugePageResourceA: *resource.NewQuantity(hugePageA, resource.BinarySI),
}
}
func newResourcePod(usage ...schedulernodeinfo.Resource) *v1.Pod {
containers := []v1.Container{}
for _, req := range usage {
containers = append(containers, v1.Container{
Resources: v1.ResourceRequirements{Requests: req.ResourceList()},
})
}
return &v1.Pod{
Spec: v1.PodSpec{
Containers: containers,
},
}
}
func newPodWithPort(hostPorts ...int) *v1.Pod {
networkPorts := []v1.ContainerPort{}
for _, port := range hostPorts {
networkPorts = append(networkPorts, v1.ContainerPort{HostPort: int32(port)})
}
return &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Ports: networkPorts,
},
},
},
}
}
func TestGeneralPredicates(t *testing.T) {
resourceTests := []struct {
pod *v1.Pod
nodeInfo *schedulernodeinfo.NodeInfo
node *v1.Node
fits bool
name string
wErr error
reasons []PredicateFailureReason
}{
{
pod: &v1.Pod{},
nodeInfo: schedulernodeinfo.NewNodeInfo(
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 9, Memory: 19})),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
},
fits: true,
wErr: nil,
name: "no resources/port/host requested always fits",
},
{
pod: newResourcePod(schedulernodeinfo.Resource{MilliCPU: 8, Memory: 10}),
nodeInfo: schedulernodeinfo.NewNodeInfo(
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 19})),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
},
fits: false,
wErr: nil,
reasons: []PredicateFailureReason{
&InsufficientResourceError{InsufficientResource: noderesources.InsufficientResource{ResourceName: v1.ResourceCPU, Requested: 8, Used: 5, Capacity: 10}},
&InsufficientResourceError{InsufficientResource: noderesources.InsufficientResource{ResourceName: v1.ResourceMemory, Requested: 10, Used: 19, Capacity: 20}},
},
name: "not enough cpu and memory resource",
},
{
pod: &v1.Pod{
Spec: v1.PodSpec{
NodeName: "machine2",
},
},
nodeInfo: schedulernodeinfo.NewNodeInfo(),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
},
fits: false,
wErr: nil,
reasons: []PredicateFailureReason{&PredicateFailureError{nodename.Name, nodename.ErrReason}},
name: "host not match",
},
{
pod: newPodWithPort(123),
nodeInfo: schedulernodeinfo.NewNodeInfo(newPodWithPort(123)),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
},
fits: false,
wErr: nil,
reasons: []PredicateFailureReason{&PredicateFailureError{nodeports.Name, nodeports.ErrReason}},
name: "hostport conflict",
},
}
for _, test := range resourceTests {
t.Run(test.name, func(t *testing.T) {
test.nodeInfo.SetNode(test.node)
reasons, err := GeneralPredicates(test.pod, test.nodeInfo)
fits := len(reasons) == 0 && err == nil
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, test.reasons)
}
if fits != test.fits {
t.Errorf("expected: %v got %v", test.fits, fits)
}
})
}
}