From c1e7ab07bde02930895b274022e804ca9d38bd7d Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 15 Nov 2018 16:17:39 -0800 Subject: [PATCH] add an e2e test to verify preemption running path --- test/e2e/scheduling/BUILD | 1 + test/e2e/scheduling/preemption.go | 220 ++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index 9da465bf82c..33eaa147b54 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -27,6 +27,7 @@ go_library( "//pkg/quota/v1/evaluator/core:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/api:go_default_library", + "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/extensions/v1beta1:go_default_library", "//staging/src/k8s.io/api/scheduling/v1beta1:go_default_library", diff --git a/test/e2e/scheduling/preemption.go b/test/e2e/scheduling/preemption.go index f4d57dde116..6de8f8ed92e 100644 --- a/test/e2e/scheduling/preemption.go +++ b/test/e2e/scheduling/preemption.go @@ -18,13 +18,19 @@ package scheduling import ( "fmt" + "strings" "time" + "k8s.io/client-go/tools/cache" + + appsv1 "k8s.io/api/apps/v1" "k8s.io/api/core/v1" schedulerapi "k8s.io/api/scheduling/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/test/e2e/framework" @@ -353,3 +359,217 @@ var _ = SIGDescribe("PodPriorityResolution [Serial]", func() { } }) }) + +// construct a fakecpu so as to set it to status of Node object +// otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet +var fakecpu v1.ResourceName = "example.com/fakecpu" + +var _ = SIGDescribe("PreemptionExecutionPath", func() { + var cs clientset.Interface + var node *v1.Node + var ns string + f := framework.NewDefaultFramework("sched-preemption-path") + + AfterEach(func() { + if node != nil { + nodeCopy := node.DeepCopy() + // force it to update + nodeCopy.ResourceVersion = "0" + delete(nodeCopy.Status.Capacity, fakecpu) + _, err := cs.CoreV1().Nodes().UpdateStatus(nodeCopy) + framework.ExpectNoError(err) + } + }) + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace.Name + + // find an available node + By("Finding an available node") + nodeName := GetNodeThatCanRunPod(f) + framework.Logf("found a healthy node: %s", nodeName) + + // get the node API object + var err error + node, err = cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + framework.Failf("error getting node %q: %v", nodeName, err) + } + + // update Node API object with a fake resource + nodeCopy := node.DeepCopy() + // force it to update + nodeCopy.ResourceVersion = "0" + nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("800") + node, err = cs.CoreV1().Nodes().UpdateStatus(nodeCopy) + framework.ExpectNoError(err) + + // create four PriorityClass: p1, p2, p3, p4 + for i := 1; i <= 4; i++ { + priorityName := fmt.Sprintf("p%d", i) + _, err := f.ClientSet.SchedulingV1beta1().PriorityClasses().Create(&schedulerapi.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: int32(i)}) + Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true)) + } + }) + + It("runs ReplicaSets to verify preemption running path", func() { + podNamesSeen := make(map[string]struct{}) + stopCh := make(chan struct{}) + + // create an pod controller to list/watch pod events from the test framework namespace + _, podController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + obj, err := f.ClientSet.CoreV1().Pods(ns).List(options) + return runtime.Object(obj), err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return f.ClientSet.CoreV1().Pods(ns).Watch(options) + }, + }, + &v1.Pod{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*v1.Pod); ok { + podNamesSeen[pod.Name] = struct{}{} + } + }, + }, + ) + go podController.Run(stopCh) + defer close(stopCh) + + // prepare four ReplicaSet + rsConfs := []pauseRSConfig{ + { + Replicas: int32(5), + PodConfig: pausePodConfig{ + Name: fmt.Sprintf("pod1"), + Namespace: ns, + Labels: map[string]string{"name": "pod1"}, + PriorityClassName: "p1", + NodeSelector: map[string]string{"kubernetes.io/hostname": node.Name}, + Resources: &v1.ResourceRequirements{ + Requests: v1.ResourceList{fakecpu: resource.MustParse("40")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("40")}, + }, + }, + }, + { + Replicas: int32(4), + PodConfig: pausePodConfig{ + Name: fmt.Sprintf("pod2"), + Namespace: ns, + Labels: map[string]string{"name": "pod2"}, + PriorityClassName: "p2", + NodeSelector: map[string]string{"kubernetes.io/hostname": node.Name}, + Resources: &v1.ResourceRequirements{ + Requests: v1.ResourceList{fakecpu: resource.MustParse("50")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("50")}, + }, + }, + }, + { + Replicas: int32(4), + PodConfig: pausePodConfig{ + Name: fmt.Sprintf("pod3"), + Namespace: ns, + Labels: map[string]string{"name": "pod3"}, + PriorityClassName: "p3", + NodeSelector: map[string]string{"kubernetes.io/hostname": node.Name}, + Resources: &v1.ResourceRequirements{ + Requests: v1.ResourceList{fakecpu: resource.MustParse("95")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("95")}, + }, + }, + }, + { + Replicas: int32(1), + PodConfig: pausePodConfig{ + Name: fmt.Sprintf("pod4"), + Namespace: ns, + Labels: map[string]string{"name": "pod4"}, + PriorityClassName: "p4", + NodeSelector: map[string]string{"kubernetes.io/hostname": node.Name}, + Resources: &v1.ResourceRequirements{ + Requests: v1.ResourceList{fakecpu: resource.MustParse("400")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("400")}, + }, + }, + }, + } + // create ReplicaSet{1,2,3} so as to occupy 780/800 fake resource + rsNum := len(rsConfs) + for i := 0; i < rsNum-1; i++ { + runPauseRS(f, rsConfs[i]) + } + + framework.Logf("pods created so far: %v", podNamesSeen) + framework.Logf("length of pods created so far: %v", len(podNamesSeen)) + + // create ReplicaSet4 + // if runPauseRS failed, it means ReplicaSet4 cannot be scheduled even after 1 minute + // which is unacceptable + runPauseRS(f, rsConfs[rsNum-1]) + + framework.Logf("pods created so far: %v", podNamesSeen) + framework.Logf("length of pods created so far: %v", len(podNamesSeen)) + + // count pods number of RepliaSet3, if it's more than orignal replicas (4) + // then means its pods has been preempted once or more + rs3PodsSeen := 0 + for podName := range podNamesSeen { + if strings.HasPrefix(podName, "rs-pod3") { + rs3PodsSeen++ + } + } + if rs3PodsSeen != 4 { + framework.Failf("some pods of ReplicaSet3 have been preempted: expect 4 pod names, but got %d", rs3PodsSeen) + } + }) + +}) + +type pauseRSConfig struct { + Replicas int32 + PodConfig pausePodConfig +} + +func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { + pausePod := initPausePod(f, conf.PodConfig) + pauseRS := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rs-" + pausePod.Name, + Namespace: pausePod.Namespace, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &conf.Replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: pausePod.Labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels}, + Spec: pausePod.Spec, + }, + }, + } + return pauseRS +} + +func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { + namespace := conf.PodConfig.Namespace + if len(namespace) == 0 { + namespace = f.Namespace.Name + } + rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(initPauseRS(f, conf)) + framework.ExpectNoError(err) + return rs +} + +func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { + rs := createPauseRS(f, conf) + framework.ExpectNoError(framework.WaitForReplicaSetTargetAvailableReplicas(f.ClientSet, rs, conf.Replicas)) + return rs +}