Merge pull request #80824 from damemi/preemption-e2e-to-integration

Move PodPriorityResolution e2e to integration
This commit is contained in:
Kubernetes Prow Robot 2019-09-20 12:27:25 -07:00 committed by GitHub
commit c7619bd770
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 65 deletions

View File

@ -46,31 +46,31 @@ const (
// Register registers a plugin // Register registers a plugin
func Register(plugins *admission.Plugins) { func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) { plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return newPlugin(), nil return NewPlugin(), nil
}) })
} }
// priorityPlugin is an implementation of admission.Interface. // Plugin is an implementation of admission.Interface.
type priorityPlugin struct { type Plugin struct {
*admission.Handler *admission.Handler
client kubernetes.Interface client kubernetes.Interface
lister schedulingv1listers.PriorityClassLister lister schedulingv1listers.PriorityClassLister
} }
var _ admission.MutationInterface = &priorityPlugin{} var _ admission.MutationInterface = &Plugin{}
var _ admission.ValidationInterface = &priorityPlugin{} var _ admission.ValidationInterface = &Plugin{}
var _ = genericadmissioninitializers.WantsExternalKubeInformerFactory(&priorityPlugin{}) var _ = genericadmissioninitializers.WantsExternalKubeInformerFactory(&Plugin{})
var _ = genericadmissioninitializers.WantsExternalKubeClientSet(&priorityPlugin{}) var _ = genericadmissioninitializers.WantsExternalKubeClientSet(&Plugin{})
// NewPlugin creates a new priority admission plugin. // NewPlugin creates a new priority admission plugin.
func newPlugin() *priorityPlugin { func NewPlugin() *Plugin {
return &priorityPlugin{ return &Plugin{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
} }
} }
// ValidateInitialization implements the InitializationValidator interface. // ValidateInitialization implements the InitializationValidator interface.
func (p *priorityPlugin) ValidateInitialization() error { func (p *Plugin) ValidateInitialization() error {
if p.client == nil { if p.client == nil {
return fmt.Errorf("%s requires a client", PluginName) return fmt.Errorf("%s requires a client", PluginName)
} }
@ -80,13 +80,13 @@ func (p *priorityPlugin) ValidateInitialization() error {
return nil return nil
} }
// SetInternalKubeClientSet implements the WantsInternalKubeClientSet interface. // SetExternalKubeClientSet implements the WantsInternalKubeClientSet interface.
func (p *priorityPlugin) SetExternalKubeClientSet(client kubernetes.Interface) { func (p *Plugin) SetExternalKubeClientSet(client kubernetes.Interface) {
p.client = client p.client = client
} }
// SetInternalKubeInformerFactory implements the WantsInternalKubeInformerFactory interface. // SetExternalKubeInformerFactory implements the WantsInternalKubeInformerFactory interface.
func (p *priorityPlugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) { func (p *Plugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
priorityInformer := f.Scheduling().V1().PriorityClasses() priorityInformer := f.Scheduling().V1().PriorityClasses()
p.lister = priorityInformer.Lister() p.lister = priorityInformer.Lister()
p.SetReadyFunc(priorityInformer.Informer().HasSynced) p.SetReadyFunc(priorityInformer.Informer().HasSynced)
@ -99,7 +99,7 @@ var (
// Admit checks Pods and admits or rejects them. It also resolves the priority of pods based on their PriorityClass. // Admit checks Pods and admits or rejects them. It also resolves the priority of pods based on their PriorityClass.
// Note that pod validation mechanism prevents update of a pod priority. // Note that pod validation mechanism prevents update of a pod priority.
func (p *priorityPlugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
operation := a.GetOperation() operation := a.GetOperation()
// Ignore all calls to subresources // Ignore all calls to subresources
if len(a.GetSubresource()) != 0 { if len(a.GetSubresource()) != 0 {
@ -119,7 +119,7 @@ func (p *priorityPlugin) Admit(ctx context.Context, a admission.Attributes, o ad
} }
// Validate checks PriorityClasses and admits or rejects them. // Validate checks PriorityClasses and admits or rejects them.
func (p *priorityPlugin) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { func (p *Plugin) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
operation := a.GetOperation() operation := a.GetOperation()
// Ignore all calls to subresources // Ignore all calls to subresources
if len(a.GetSubresource()) != 0 { if len(a.GetSubresource()) != 0 {
@ -153,7 +153,7 @@ func priorityClassPermittedInNamespace(priorityClassName string, namespace strin
} }
// admitPod makes sure a new pod does not set spec.Priority field. It also makes sure that the PriorityClassName exists if it is provided and resolves the pod priority from the PriorityClassName. // admitPod makes sure a new pod does not set spec.Priority field. It also makes sure that the PriorityClassName exists if it is provided and resolves the pod priority from the PriorityClassName.
func (p *priorityPlugin) admitPod(a admission.Attributes) error { func (p *Plugin) admitPod(a admission.Attributes) error {
operation := a.GetOperation() operation := a.GetOperation()
pod, ok := a.GetObject().(*api.Pod) pod, ok := a.GetObject().(*api.Pod)
if !ok { if !ok {
@ -226,7 +226,7 @@ func (p *priorityPlugin) admitPod(a admission.Attributes) error {
} }
// validatePriorityClass ensures that the value field is not larger than the highest user definable priority. If the GlobalDefault is set, it ensures that there is no other PriorityClass whose GlobalDefault is set. // validatePriorityClass ensures that the value field is not larger than the highest user definable priority. If the GlobalDefault is set, it ensures that there is no other PriorityClass whose GlobalDefault is set.
func (p *priorityPlugin) validatePriorityClass(a admission.Attributes) error { func (p *Plugin) validatePriorityClass(a admission.Attributes) error {
operation := a.GetOperation() operation := a.GetOperation()
pc, ok := a.GetObject().(*scheduling.PriorityClass) pc, ok := a.GetObject().(*scheduling.PriorityClass)
if !ok { if !ok {
@ -248,7 +248,7 @@ func (p *priorityPlugin) validatePriorityClass(a admission.Attributes) error {
return nil return nil
} }
func (p *priorityPlugin) getDefaultPriorityClass() (*schedulingv1.PriorityClass, error) { func (p *Plugin) getDefaultPriorityClass() (*schedulingv1.PriorityClass, error) {
list, err := p.lister.List(labels.Everything()) list, err := p.lister.List(labels.Everything())
if err != nil { if err != nil {
return nil, err return nil, err
@ -266,7 +266,7 @@ func (p *priorityPlugin) getDefaultPriorityClass() (*schedulingv1.PriorityClass,
return defaultPC, nil return defaultPC, nil
} }
func (p *priorityPlugin) getDefaultPriority() (string, int32, *apiv1.PreemptionPolicy, error) { func (p *Plugin) getDefaultPriority() (string, int32, *apiv1.PreemptionPolicy, error) {
dpc, err := p.getDefaultPriorityClass() dpc, err := p.getDefaultPriorityClass()
if err != nil { if err != nil {
return "", 0, nil, err return "", 0, nil, err

View File

@ -37,7 +37,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
) )
func addPriorityClasses(ctrl *priorityPlugin, priorityClasses []*scheduling.PriorityClass) error { func addPriorityClasses(ctrl *Plugin, priorityClasses []*scheduling.PriorityClass) error {
informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc())
ctrl.SetExternalKubeInformerFactory(informerFactory) ctrl.SetExternalKubeInformerFactory(informerFactory)
// First add the existing classes to the cache. // First add the existing classes to the cache.
@ -173,7 +173,7 @@ func TestPriorityClassAdmission(t *testing.T) {
for _, test := range tests { for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name) klog.V(4).Infof("starting test %q", test.name)
ctrl := newPlugin() ctrl := NewPlugin()
// Add existing priority classes. // Add existing priority classes.
if err := addPriorityClasses(ctrl, test.existingClasses); err != nil { if err := addPriorityClasses(ctrl, test.existingClasses); err != nil {
t.Errorf("Test %q: unable to add object to informer: %v", test.name, err) t.Errorf("Test %q: unable to add object to informer: %v", test.name, err)
@ -274,7 +274,7 @@ func TestDefaultPriority(t *testing.T) {
for _, test := range tests { for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name) klog.V(4).Infof("starting test %q", test.name)
ctrl := newPlugin() ctrl := NewPlugin()
if err := addPriorityClasses(ctrl, test.classesBefore); err != nil { if err := addPriorityClasses(ctrl, test.classesBefore); err != nil {
t.Errorf("Test %q: unable to add object to informer: %v", test.name, err) t.Errorf("Test %q: unable to add object to informer: %v", test.name, err)
} }
@ -682,7 +682,7 @@ func TestPodAdmission(t *testing.T) {
for _, test := range tests { for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name) klog.V(4).Infof("starting test %q", test.name)
ctrl := newPlugin() ctrl := NewPlugin()
// Add existing priority classes. // Add existing priority classes.
if err := addPriorityClasses(ctrl, test.existingClasses); err != nil { if err := addPriorityClasses(ctrl, test.existingClasses); err != nil {
t.Errorf("Test %q: unable to add object to informer: %v", test.name, err) t.Errorf("Test %q: unable to add object to informer: %v", test.name, err)

View File

@ -221,43 +221,6 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
}) })
}) })
var _ = SIGDescribe("PodPriorityResolution [Serial]", func() {
var cs clientset.Interface
var ns string
f := framework.NewDefaultFramework("sched-pod-priority")
ginkgo.BeforeEach(func() {
cs = f.ClientSet
ns = f.Namespace.Name
err := framework.CheckTestingNSDeletedExcept(cs, ns)
framework.ExpectNoError(err)
})
// This test verifies that system critical priorities are created automatically and resolved properly.
ginkgo.It("validates critical system priorities are created and resolved", func() {
// Create pods that use system critical priorities and
ginkgo.By("Create pods that use critical system priorities.")
systemPriorityClasses := []string{
scheduling.SystemNodeCritical, scheduling.SystemClusterCritical,
}
for i, spc := range systemPriorityClasses {
pod := createPausePod(f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%v", i, spc),
Namespace: metav1.NamespaceSystem,
PriorityClassName: spc,
})
defer func() {
// Clean-up the pod.
err := f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewDeleteOptions(0))
framework.ExpectNoError(err)
}()
gomega.Expect(pod.Spec.Priority).NotTo(gomega.BeNil())
framework.Logf("Created pod: %v", pod.Name)
}
})
})
// construct a fakecpu so as to set it to status of Node object // construct a fakecpu so as to set it to status of Node object
// otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet // otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet
var fakecpu v1.ResourceName = "example.com/fakecpu" var fakecpu v1.ResourceName = "example.com/fakecpu"

View File

@ -24,6 +24,7 @@ go_test(
deps = [ deps = [
"//pkg/api/legacyscheme:go_default_library", "//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/scheduling:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/nodelifecycle:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library", "//pkg/scheduler:go_default_library",
@ -39,6 +40,7 @@ go_test(
"//plugin/pkg/admission/defaulttolerationseconds:go_default_library", "//plugin/pkg/admission/defaulttolerationseconds:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction:go_default_library", "//plugin/pkg/admission/podtolerationrestriction:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library", "//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library",
"//plugin/pkg/admission/priority:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",

View File

@ -27,12 +27,18 @@ import (
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"k8s.io/klog" "k8s.io/klog"
@ -366,6 +372,102 @@ func TestDisablePreemption(t *testing.T) {
} }
} }
// This test verifies that system critical priorities are created automatically and resolved properly.
func TestPodPriorityResolution(t *testing.T) {
admission := priority.NewPlugin()
context := initTestScheduler(t, initTestMaster(t, "preemption", admission), true, nil)
defer cleanupTest(t, context)
cs := context.clientSet
// Build clientset and informers for controllers.
externalClientset := kubernetes.NewForConfigOrDie(&restclient.Config{
QPS: -1,
Host: context.httpServer.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second)
admission.SetExternalKubeClientSet(externalClientset)
admission.SetExternalKubeInformerFactory(externalInformers)
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
tests := []struct {
Name string
PriorityClass string
Pod *v1.Pod
ExpectedPriority int32
ExpectedError error
}{
{
Name: "SystemNodeCritical priority class",
PriorityClass: scheduling.SystemNodeCritical,
ExpectedPriority: scheduling.SystemCriticalPriority + 1000,
Pod: initPausePod(cs, &pausePodConfig{
Name: fmt.Sprintf("pod1-%v", scheduling.SystemNodeCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: scheduling.SystemNodeCritical,
}),
},
{
Name: "SystemClusterCritical priority class",
PriorityClass: scheduling.SystemClusterCritical,
ExpectedPriority: scheduling.SystemCriticalPriority,
Pod: initPausePod(cs, &pausePodConfig{
Name: fmt.Sprintf("pod2-%v", scheduling.SystemClusterCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: scheduling.SystemClusterCritical,
}),
},
{
Name: "Invalid priority class should result in error",
PriorityClass: "foo",
ExpectedPriority: scheduling.SystemCriticalPriority,
Pod: initPausePod(cs, &pausePodConfig{
Name: fmt.Sprintf("pod3-%v", scheduling.SystemClusterCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: "foo",
}),
ExpectedError: fmt.Errorf("Error creating pause pod: pods \"pod3-system-cluster-critical\" is forbidden: no PriorityClass with name foo was found"),
},
}
// Create a node with some resources and a label.
nodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
}
_, err := createNode(context.clientSet, "node1", nodeRes)
if err != nil {
t.Fatalf("Error creating nodes: %v", err)
}
pods := make([]*v1.Pod, 0, len(tests))
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
pod, err := runPausePod(cs, test.Pod)
if err != nil {
if test.ExpectedError == nil {
t.Fatalf("Test [PodPriority/%v]: Error running pause pod: %v", test.PriorityClass, err)
}
if err.Error() != test.ExpectedError.Error() {
t.Fatalf("Test [PodPriority/%v]: Expected error %v but got error %v", test.PriorityClass, test.ExpectedError, err)
}
return
}
pods = append(pods, pod)
if pod.Spec.Priority != nil {
if *pod.Spec.Priority != test.ExpectedPriority {
t.Errorf("Expected pod %v to have priority %v but was %v", pod.Name, test.ExpectedPriority, pod.Spec.Priority)
}
} else {
t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass)
}
})
}
cleanupPods(cs, t, pods)
cleanupNodes(cs, t)
}
func mkPriorityPodWithGrace(tc *testContext, name string, priority int32, grace int64) *v1.Pod { func mkPriorityPodWithGrace(tc *testContext, name string, priority int32, grace int64) *v1.Pod {
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),

View File

@ -463,6 +463,7 @@ type pausePodConfig struct {
NodeName string NodeName string
SchedulerName string SchedulerName string
Priority *int32 Priority *int32
PriorityClassName string
} }
// initPausePod initializes a pod API object from the given config. It is used // initPausePod initializes a pod API object from the given config. It is used
@ -484,10 +485,11 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
Image: imageutils.GetPauseImageName(), Image: imageutils.GetPauseImageName(),
}, },
}, },
Tolerations: conf.Tolerations, Tolerations: conf.Tolerations,
NodeName: conf.NodeName, NodeName: conf.NodeName,
SchedulerName: conf.SchedulerName, SchedulerName: conf.SchedulerName,
Priority: conf.Priority, Priority: conf.Priority,
PriorityClassName: conf.PriorityClassName,
}, },
} }
if conf.Resources != nil { if conf.Resources != nil {