diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index b670e5bb42d..d39015f2eb4 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -89,6 +89,8 @@ go_library( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/disruption:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", @@ -97,9 +99,11 @@ go_library( "//test/integration/framework:go_default_library", "//test/utils/image:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 8e2e4976b79..d5832a0bd86 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -27,7 +27,6 @@ import ( policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" @@ -601,6 +600,18 @@ func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, } } +func addPodConditionReady(pod *v1.Pod) { + pod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + } +} + // TestPDBInPreemption tests PodDisruptionBudget support in preemption. func TestPDBInPreemption(t *testing.T) { // Enable PodPriority feature gate. @@ -610,6 +621,8 @@ func TestPDBInPreemption(t *testing.T) { defer cleanupTest(t, context) cs := context.clientSet + initDisruptionController(context) + defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, @@ -629,6 +642,7 @@ func TestPDBInPreemption(t *testing.T) { description string nodes []*nodeConfig pdbs []*policy.PodDisruptionBudget + pdbPodNum []int32 existingPods []*v1.Pod pod *v1.Pod preemptedPodIndexes map[int]struct{} @@ -639,6 +653,7 @@ func TestPDBInPreemption(t *testing.T) { pdbs: []*policy.PodDisruptionBudget{ mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), }, + pdbPodNum: []int32{2}, existingPods: []*v1.Pod{ initPausePod(context.clientSet, &pausePodConfig{ Name: "low-pod1", @@ -681,6 +696,7 @@ func TestPDBInPreemption(t *testing.T) { pdbs: []*policy.PodDisruptionBudget{ mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), }, + pdbPodNum: []int32{1}, existingPods: []*v1.Pod{ initPausePod(context.clientSet, &pausePodConfig{ Name: "low-pod1", @@ -720,6 +736,7 @@ func TestPDBInPreemption(t *testing.T) { mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo1": "bar"}), mkMinAvailablePDB("pdb-2", context.ns.Name, types.UID("pdb-2-uid"), 2, map[string]string{"foo2": "bar"}), }, + pdbPodNum: []int32{1, 5}, existingPods: []*v1.Pod{ initPausePod(context.clientSet, &pausePodConfig{ Name: "low-pod1", @@ -783,38 +800,22 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, }, }), - preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, + // The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2. + preemptedPodIndexes: map[int]struct{}{4: {}, 5: {}, 6: {}}, }, } for _, test := range tests { + t.Logf("================ Running test: %v\n", test.description) for _, nodeConf := range test.nodes { _, err := createNode(cs, nodeConf.name, nodeConf.res) if err != nil { t.Fatalf("Error creating node %v: %v", nodeConf.name, err) } } - // Create PDBs. - for _, pdb := range test.pdbs { - _, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) - if err != nil { - t.Fatalf("Failed to create PDB: %v", err) - } - } - // Wait for PDBs to show up in the scheduler's cache. - if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) { - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if err != nil { - t.Errorf("Error while polling for PDB: %v", err) - return false, err - } - return len(cachedPDBs) == len(test.pdbs), err - }); err != nil { - t.Fatalf("Not all PDBs were added to the cache: %v", err) - } pods := make([]*v1.Pod, len(test.existingPods)) var err error @@ -823,7 +824,29 @@ func TestPDBInPreemption(t *testing.T) { if pods[i], err = runPausePod(cs, p); err != nil { t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err) } + // Add pod condition ready so that PDB is updated. + addPodConditionReady(p) + if _, err := context.clientSet.CoreV1().Pods(context.ns.Name).UpdateStatus(p); err != nil { + t.Fatal(err) + } } + // Wait for Pods to be stable in scheduler cache. + if err := waitCachedPodsStable(context, test.existingPods); err != nil { + t.Fatalf("Not all pods are stable in the cache: %v", err) + } + + // Create PDBs. + for _, pdb := range test.pdbs { + _, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) + if err != nil { + t.Fatalf("Failed to create PDB: %v", err) + } + } + // Wait for PDBs to show up in the scheduler's cache and become stable. + if err := waitCachedPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil { + t.Fatalf("Not all pdbs are stable in the cache: %v", err) + } + // Create the "pod". preemptor, err := createPausePod(cs, test.pod) if err != nil { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index d42803c1786..b7035855471 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -24,9 +24,11 @@ import ( "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -42,6 +44,8 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" @@ -194,6 +198,7 @@ func initTestSchedulerWithOptions( // set setPodInformer if provided. if setPodInformer { go podInformer.Informer().Run(context.schedulerConfig.StopEverything) + controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } eventBroadcaster := record.NewBroadcaster() @@ -218,6 +223,26 @@ func initTestSchedulerWithOptions( return context } +// initDisruptionController initializes and runs a Disruption Controller to properly +// update PodDisuptionBudget objects. +func initDisruptionController(context *TestContext) *disruption.DisruptionController { + informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour) + + dc := disruption.NewDisruptionController( + informers.Core().V1().Pods(), + informers.Policy().V1beta1().PodDisruptionBudgets(), + informers.Core().V1().ReplicationControllers(), + informers.Extensions().V1beta1().ReplicaSets(), + informers.Extensions().V1beta1().Deployments(), + informers.Apps().V1beta1().StatefulSets(), + context.clientSet) + + informers.Start(context.schedulerConfig.StopEverything) + informers.WaitForCacheSync(context.schedulerConfig.StopEverything) + go dc.Run(context.schedulerConfig.StopEverything) + return dc +} + // initTest initializes a test environment and creates master and scheduler with default // configuration. func initTest(t *testing.T, nsPrefix string) *TestContext { @@ -514,6 +539,59 @@ func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) } +// waitCachedPDBsStable waits for PDBs in scheduler cache to have "CurrentHealthy" status equal to +// the expected values. +func waitCachedPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { + return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if err != nil { + return false, err + } + if len(cachedPDBs) != len(pdbs) { + return false, nil + } + for i, pdb := range pdbs { + found := false + for _, cpdb := range cachedPDBs { + if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { + found = true + if cpdb.Status.CurrentHealthy != pdbPodNum[i] { + return false, nil + } + } + } + if !found { + return false, nil + } + } + return true, nil + }) +} + +// waitCachedPodsStable waits until scheduler cache has the given pods. +func waitCachedPodsStable(context *TestContext, pods []*v1.Pod) error { + return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { + cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything()) + if err != nil { + return false, err + } + if len(pods) != len(cachedPods) { + return false, nil + } + for _, p := range pods { + actualPod, err1 := context.clientSet.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{}) + if err1 != nil { + return false, err1 + } + cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod) + if err2 != nil || cachedPod == nil { + return false, err2 + } + } + return true, nil + }) +} + // deletePod deletes the given pod in the given namespace. func deletePod(cs clientset.Interface, podName string, nsName string) error { return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0))