From 070f0979c29748584693c47b6b194d31c99c0785 Mon Sep 17 00:00:00 2001 From: gmarek Date: Fri, 25 Nov 2016 18:15:00 +0100 Subject: [PATCH] Make it possible to run Load test using Deployments or ReplicaSets --- test/e2e/density.go | 111 ++++++++------ test/e2e/example_cluster_dns.go | 3 +- test/e2e/framework/util.go | 247 ++++++++++++++++++++++--------- test/e2e/load.go | 142 ++++++++++-------- test/e2e/serviceloadbalancers.go | 4 +- test/utils/BUILD | 2 + test/utils/runners.go | 66 +++++++++ 7 files changed, 392 insertions(+), 183 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index a69b0822912..f156baf137c 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/extensions" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util/sets" utiluuid "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/workqueue" @@ -56,11 +58,13 @@ const ( var MaxContainerFailures = 0 type DensityTestConfig struct { - Configs []testutils.RCConfig + Configs []testutils.RunObjectConfig ClientSet clientset.Interface InternalClientset internalclientset.Interface PollInterval time.Duration PodCount int + // What kind of resource we want to create + kind schema.GroupKind } func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint { @@ -193,13 +197,13 @@ func runDensityTest(dtc DensityTestConfig) time.Duration { wg := sync.WaitGroup{} wg.Add(len(dtc.Configs)) for i := range dtc.Configs { - rcConfig := dtc.Configs[i] + config := dtc.Configs[i] go func() { defer GinkgoRecover() // Call wg.Done() in defer to avoid blocking whole test // in case of error from RunRC. defer wg.Done() - framework.ExpectNoError(framework.RunRC(rcConfig)) + framework.ExpectNoError(config.Run()) }() } logStopCh := make(chan struct{}) @@ -236,21 +240,21 @@ func runDensityTest(dtc DensityTestConfig) time.Duration { func cleanupDensityTest(dtc DensityTestConfig) { defer GinkgoRecover() - By("Deleting ReplicationController") + By("Deleting created Collections") // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics. for i := range dtc.Configs { - rcName := dtc.Configs[i].Name - rc, err := dtc.ClientSet.Core().ReplicationControllers(dtc.Configs[i].Namespace).Get(rcName) - if err == nil && *(rc.Spec.Replicas) != 0 { - if framework.TestContext.GarbageCollectorEnabled { - By("Cleaning up only the replication controller, garbage collector will clean up the pods") - err := framework.DeleteRCAndWaitForGC(dtc.ClientSet, dtc.Configs[i].Namespace, rcName) - framework.ExpectNoError(err) - } else { - By("Cleaning up the replication controller and pods") - err := framework.DeleteRCAndPods(dtc.ClientSet, dtc.InternalClientset, dtc.Configs[i].Namespace, rcName) - framework.ExpectNoError(err) - } + name := dtc.Configs[i].GetName() + namespace := dtc.Configs[i].GetNamespace() + kind := dtc.Configs[i].GetKind() + // TODO: Remove Deployment guard once GC is implemented for Deployments. + if framework.TestContext.GarbageCollectorEnabled && kind != extensions.Kind("Deployment") { + By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind)) + err := framework.DeleteResourceAndWaitForGC(dtc.ClientSet, kind, namespace, name) + framework.ExpectNoError(err) + } else { + By(fmt.Sprintf("Cleaning up the %v and pods", kind)) + err := framework.DeleteResourceAndPods(dtc.ClientSet, dtc.InternalClientset, kind, dtc.Configs[i].GetNamespace(), name) + framework.ExpectNoError(err) } } } @@ -265,7 +269,7 @@ func cleanupDensityTest(dtc DensityTestConfig) { var _ = framework.KubeDescribe("Density", func() { var c clientset.Interface var nodeCount int - var RCName string + var name string var additionalPodsPrefix string var ns string var uuid string @@ -352,27 +356,31 @@ var _ = framework.KubeDescribe("Density", func() { podsPerNode int // Controls how often the apiserver is polled for pods interval time.Duration + // What kind of resource we should be creating. Default: ReplicationController + kind schema.GroupKind } densityTests := []Density{ // TODO: Expose runLatencyTest as ginkgo flag. - {podsPerNode: 3, runLatencyTest: false}, - {podsPerNode: 30, runLatencyTest: true}, - {podsPerNode: 50, runLatencyTest: false}, - {podsPerNode: 95, runLatencyTest: true}, - {podsPerNode: 100, runLatencyTest: false}, + {podsPerNode: 3, runLatencyTest: false, kind: api.Kind("ReplicationController")}, + {podsPerNode: 30, runLatencyTest: true, kind: api.Kind("ReplicationController")}, + {podsPerNode: 50, runLatencyTest: false, kind: api.Kind("ReplicationController")}, + {podsPerNode: 95, runLatencyTest: true, kind: api.Kind("ReplicationController")}, + {podsPerNode: 100, runLatencyTest: false, kind: api.Kind("ReplicationController")}, } for _, testArg := range densityTests { feature := "ManualPerformance" switch testArg.podsPerNode { case 30: - feature = "Performance" + if testArg.kind == api.Kind("ReplicationController") { + feature = "Performance" + } case 95: feature = "HighDensityPerformance" } - name := fmt.Sprintf("[Feature:%s] should allow starting %d pods per node", feature, testArg.podsPerNode) + name := fmt.Sprintf("[Feature:%s] should allow starting %d pods per node using %v", feature, testArg.podsPerNode, testArg.kind) itArg := testArg It(name, func() { nodePreparer := framework.NewE2ETestNodePreparer( @@ -392,44 +400,55 @@ var _ = framework.KubeDescribe("Density", func() { defer fileHndl.Close() // nodeCountPerNamespace and CreateNamespaces are defined in load.go - numberOfRCs := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace - namespaces, err := CreateNamespaces(f, numberOfRCs, fmt.Sprintf("density-%v", testArg.podsPerNode)) + numberOfCollections := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace + namespaces, err := CreateNamespaces(f, numberOfCollections, fmt.Sprintf("density-%v", testArg.podsPerNode)) framework.ExpectNoError(err) - RCConfigs := make([]testutils.RCConfig, numberOfRCs) + configs := make([]testutils.RunObjectConfig, numberOfCollections) // Since all RCs are created at the same time, timeout for each config // has to assume that it will be run at the very end. podThroughput := 20 timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute // createClients is defined in load.go - clients, internalClients, err := createClients(numberOfRCs) - for i := 0; i < numberOfRCs; i++ { - RCName := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid) + clients, internalClients, err := createClients(numberOfCollections) + for i := 0; i < numberOfCollections; i++ { + name := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid) nsName := namespaces[i].Name - RCConfigs[i] = testutils.RCConfig{ + baseConfig := &testutils.RCConfig{ Client: clients[i], InternalClient: internalClients[i], Image: framework.GetPauseImageName(f.ClientSet), - Name: RCName, + Name: name, Namespace: nsName, Labels: map[string]string{"type": "densityPod"}, PollInterval: DensityPollInterval, Timeout: timeout, PodStatusFile: fileHndl, - Replicas: (totalPods + numberOfRCs - 1) / numberOfRCs, + Replicas: (totalPods + numberOfCollections - 1) / numberOfCollections, CpuRequest: nodeCpuCapacity / 100, MemRequest: nodeMemCapacity / 100, MaxContainerFailures: &MaxContainerFailures, Silent: true, } + switch itArg.kind { + case api.Kind("ReplicationController"): + configs[i] = baseConfig + case extensions.Kind("ReplicaSet"): + configs[i] = &testutils.ReplicaSetConfig{RCConfig: *baseConfig} + case extensions.Kind("Deployment"): + configs[i] = &testutils.DeploymentConfig{RCConfig: *baseConfig} + default: + framework.Failf("Unsupported kind: %v", itArg.kind) + } } dConfig := DensityTestConfig{ ClientSet: f.ClientSet, InternalClientset: f.InternalClientset, - Configs: RCConfigs, + Configs: configs, PodCount: totalPods, PollInterval: DensityPollInterval, + kind: itArg.kind, } e2eStartupTime = runDensityTest(dConfig) if itArg.runLatencyTest { @@ -657,29 +676,29 @@ var _ = framework.KubeDescribe("Density", func() { fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid)) framework.ExpectNoError(err) defer fileHndl.Close() - rcCnt := 1 - RCConfigs := make([]testutils.RCConfig, rcCnt) - podsPerRC := int(totalPods / rcCnt) - for i := 0; i < rcCnt; i++ { - if i == rcCnt-1 { - podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt))) + collectionCount := 1 + configs := make([]testutils.RunObjectConfig, collectionCount) + podsPerCollection := int(totalPods / collectionCount) + for i := 0; i < collectionCount; i++ { + if i == collectionCount-1 { + podsPerCollection += int(math.Mod(float64(totalPods), float64(collectionCount))) } - RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid - RCConfigs[i] = testutils.RCConfig{Client: c, + name = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid + configs[i] = &testutils.RCConfig{Client: c, Image: framework.GetPauseImageName(f.ClientSet), - Name: RCName, + Name: name, Namespace: ns, Labels: map[string]string{"type": "densityPod"}, PollInterval: DensityPollInterval, PodStatusFile: fileHndl, - Replicas: podsPerRC, + Replicas: podsPerCollection, MaxContainerFailures: &MaxContainerFailures, Silent: true, } } dConfig := DensityTestConfig{ ClientSet: f.ClientSet, - Configs: RCConfigs, + Configs: configs, PodCount: totalPods, PollInterval: DensityPollInterval, } @@ -727,6 +746,6 @@ func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns, } _, err := c.Core().ReplicationControllers(ns).Create(rc) framework.ExpectNoError(err) - framework.ExpectNoError(framework.WaitForRCPodsRunning(c, ns, name)) + framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController"))) framework.Logf("Found pod '%s' running", name) } diff --git a/test/e2e/example_cluster_dns.go b/test/e2e/example_cluster_dns.go index f89717fd965..0302b77f009 100644 --- a/test/e2e/example_cluster_dns.go +++ b/test/e2e/example_cluster_dns.go @@ -21,6 +21,7 @@ import ( "path/filepath" "time" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/labels" @@ -90,7 +91,7 @@ var _ = framework.KubeDescribe("ClusterDns [Feature:Example]", func() { // wait for objects for _, ns := range namespaces { - framework.WaitForRCPodsRunning(c, ns.Name, backendRcName) + framework.WaitForControlledPodsRunning(c, ns.Name, backendRcName, api.Kind("ReplicationController")) framework.WaitForService(c, ns.Name, backendSvcName, true, framework.Poll, framework.ServiceStartTimeout) } // it is not enough that pods are running because they may be set to running, but diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index fb469760cef..ce8c2e034d6 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2622,9 +2622,29 @@ func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint v1.Taint) } } -func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - By(fmt.Sprintf("Scaling replication controller %s in namespace %s to %d", name, ns, size)) - scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset) +func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) { + switch kind { + case api.Kind("ReplicationController"): + return kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset) + case extensionsinternal.Kind("ReplicaSet"): + return kubectl.ScalerFor(extensionsinternal.Kind("ReplicaSet"), internalClientset) + case extensionsinternal.Kind("Deployment"): + return kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset) + default: + return nil, fmt.Errorf("Unsupported kind for getting Scaler: %v", kind) + } +} + +func ScaleResource( + clientset clientset.Interface, + internalClientset internalclientset.Interface, + ns, name string, + size uint, + wait bool, + kind schema.GroupKind, +) error { + By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) + scaler, err := getScalerForKind(internalClientset, kind) if err != nil { return err } @@ -2636,51 +2656,32 @@ func ScaleRC(clientset clientset.Interface, internalClientset internalclientset. if !wait { return nil } - return WaitForRCPodsRunning(clientset, ns, name) + return WaitForControlledPodsRunning(clientset, ns, name, kind) } // Wait up to 10 minutes for pods to become Running. -func WaitForRCPodsRunning(c clientset.Interface, ns, rcName string) error { - rc, err := c.Core().ReplicationControllers(ns).Get(rcName) +func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind schema.GroupKind) error { + rtObject, err := getRuntimeObjectForKind(c, kind, ns, name) + if err != nil { + return err + } + selector, err := getSelectorFromRuntimeObject(rtObject) if err != nil { return err } - selector := labels.SelectorFromSet(labels.Set(rc.Spec.Selector)) err = testutils.WaitForPodsWithLabelRunning(c, ns, selector) if err != nil { - return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", rcName, err) + return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", name, err) } return nil } +func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController")) +} + func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - By(fmt.Sprintf("Scaling Deployment %s in namespace %s to %d", name, ns, size)) - scaler, err := kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset) - if err != nil { - return err - } - waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) - waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) - if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil { - return fmt.Errorf("error while scaling Deployment %s to %d replicas: %v", name, size, err) - } - if !wait { - return nil - } - return WaitForDeploymentPodsRunning(clientset, ns, name) -} - -func WaitForDeploymentPodsRunning(c clientset.Interface, ns, name string) error { - deployment, err := c.Extensions().Deployments(ns).Get(name) - if err != nil { - return err - } - selector := labels.SelectorFromSet(labels.Set(deployment.Spec.Selector.MatchLabels)) - err = testutils.WaitForPodsWithLabelRunning(c, ns, selector) - if err != nil { - return fmt.Errorf("Error while waiting for Deployment %s pods to be running: %v", name, err) - } - return nil + return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment")) } // Returns true if all the specified pods are scheduled, else returns false. @@ -2760,26 +2761,102 @@ func WaitForPodsWithLabelRunningReady(c clientset.Interface, ns string, label la return pods, err } -// DeleteRCAndPods a Replication Controller and all pods it spawned -func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error { - By(fmt.Sprintf("deleting replication controller %s in namespace %s", name, ns)) - rc, err := clientset.Core().ReplicationControllers(ns).Get(name) +func getRuntimeObjectForKind(c clientset.Interface, kind schema.GroupKind, ns, name string) (runtime.Object, error) { + switch kind { + case api.Kind("ReplicationController"): + return c.Core().ReplicationControllers(ns).Get(name) + case extensionsinternal.Kind("ReplicaSet"): + return c.Extensions().ReplicaSets(ns).Get(name) + case extensionsinternal.Kind("Deployment"): + return c.Extensions().Deployments(ns).Get(name) + default: + return nil, fmt.Errorf("Unsupported kind when getting runtime object: %v", kind) + } +} + +func deleteResource(c clientset.Interface, kind schema.GroupKind, ns, name string, deleteOption *v1.DeleteOptions) error { + switch kind { + case api.Kind("ReplicationController"): + return c.Core().ReplicationControllers(ns).Delete(name, deleteOption) + case extensionsinternal.Kind("ReplicaSet"): + return c.Extensions().ReplicaSets(ns).Delete(name, deleteOption) + case extensionsinternal.Kind("Deployment"): + return c.Extensions().Deployments(ns).Delete(name, deleteOption) + default: + return fmt.Errorf("Unsupported kind when deleting: %v", kind) + } +} + +func getSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) { + switch typed := obj.(type) { + case *v1.ReplicationController: + return labels.SelectorFromSet(typed.Spec.Selector), nil + case *extensions.ReplicaSet: + return metav1.LabelSelectorAsSelector(typed.Spec.Selector) + case *extensions.Deployment: + return metav1.LabelSelectorAsSelector(typed.Spec.Selector) + default: + return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj) + } +} + +func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) { + switch typed := obj.(type) { + case *v1.ReplicationController: + if typed.Spec.Replicas != nil { + return *typed.Spec.Replicas, nil + } + return 0, nil + case *extensions.ReplicaSet: + if typed.Spec.Replicas != nil { + return *typed.Spec.Replicas, nil + } + return 0, nil + case *extensions.Deployment: + if typed.Spec.Replicas != nil { + return *typed.Spec.Replicas, nil + } + return 0, nil + default: + return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj) + } +} + +func getReaperForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Reaper, error) { + switch kind { + case api.Kind("ReplicationController"): + return kubectl.ReaperFor(api.Kind("ReplicationController"), internalClientset) + case extensionsinternal.Kind("ReplicaSet"): + return kubectl.ReaperFor(extensionsinternal.Kind("ReplicaSet"), internalClientset) + case extensionsinternal.Kind("Deployment"): + return kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClientset) + default: + return nil, fmt.Errorf("Unsupported kind: %v", kind) + } +} + +// DeleteResourceAndPods deletes a given resource and all pods it spawned +func DeleteResourceAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, kind schema.GroupKind, ns, name string) error { + By(fmt.Sprintf("deleting %v %s in namespace %s", kind, name, ns)) + + rtObject, err := getRuntimeObjectForKind(clientset, kind, ns, name) if err != nil { if apierrs.IsNotFound(err) { - Logf("RC %s was already deleted: %v", name, err) + Logf("%v %s not found: %v", kind, name, err) return nil } return err } - reaper, err := kubectl.ReaperForReplicationController(internalClientset.Core(), 10*time.Minute) + selector, err := getSelectorFromRuntimeObject(rtObject) if err != nil { - if apierrs.IsNotFound(err) { - Logf("RC %s was already deleted: %v", name, err) - return nil - } return err } - ps, err := podStoreForRC(clientset, rc) + reaper, err := getReaperForKind(internalClientset, kind) + if err != nil { + return err + } + + ps, err := podStoreForSelector(clientset, ns, selector) if err != nil { return err } @@ -2787,20 +2864,20 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl startTime := time.Now() err = reaper.Stop(ns, name, 0, nil) if apierrs.IsNotFound(err) { - Logf("RC %s was already deleted: %v", name, err) + Logf("%v %s was already deleted: %v", kind, name, err) return nil } if err != nil { - return fmt.Errorf("error while stopping RC: %s: %v", name, err) + return fmt.Errorf("error while stopping %v: %s: %v", kind, name, err) } - deleteRCTime := time.Now().Sub(startTime) - Logf("Deleting RC %s took: %v", name, deleteRCTime) + deleteTime := time.Now().Sub(startTime) + Logf("Deleting %v %s took: %v", kind, name, deleteTime) err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute) if err != nil { return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err) } - terminatePodTime := time.Now().Sub(startTime) - deleteRCTime - Logf("Terminating RC %s pods took: %v", name, terminatePodTime) + terminatePodTime := time.Now().Sub(startTime) - deleteTime + Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime) // this is to relieve namespace controller's pressure when deleting the // namespace after a test. err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute) @@ -2810,57 +2887,75 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl return nil } -// DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods. -func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error { - By(fmt.Sprintf("deleting replication controller %s in namespace %s, will wait for the garbage collector to delete the pods", name, ns)) - rc, err := c.Core().ReplicationControllers(ns).Get(name) +func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error { + return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name) +} + +// DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods. +func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns, name string) error { + By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns)) + + rtObject, err := getRuntimeObjectForKind(c, kind, ns, name) if err != nil { if apierrs.IsNotFound(err) { - Logf("RC %s was already deleted: %v", name, err) + Logf("%v %s not found: %v", kind, name, err) return nil } return err } - ps, err := podStoreForRC(c, rc) + selector, err := getSelectorFromRuntimeObject(rtObject) if err != nil { return err } + replicas, err := getReplicasFromRuntimeObject(rtObject) + if err != nil { + return err + } + + ps, err := podStoreForSelector(c, ns, selector) + if err != nil { + return err + } + defer ps.Stop() startTime := time.Now() falseVar := false deleteOption := &v1.DeleteOptions{OrphanDependents: &falseVar} - err = c.Core().ReplicationControllers(ns).Delete(name, deleteOption) + err = deleteResource(c, kind, ns, name, deleteOption) if err != nil && apierrs.IsNotFound(err) { - Logf("RC %s was already deleted: %v", name, err) + Logf("%v %s was already deleted: %v", kind, name, err) return nil } if err != nil { return err } - deleteRCTime := time.Now().Sub(startTime) - Logf("Deleting RC %s took: %v", name, deleteRCTime) + deleteTime := time.Now().Sub(startTime) + Logf("Deleting %v %s took: %v", kind, name, deleteTime) + var interval, timeout time.Duration switch { - case *(rc.Spec.Replicas) < 100: + case replicas < 100: interval = 100 * time.Millisecond - case *(rc.Spec.Replicas) < 1000: + case replicas < 1000: interval = 1 * time.Second default: interval = 10 * time.Second } - if *(rc.Spec.Replicas) < 5000 { + if replicas < 5000 { timeout = 10 * time.Minute } else { - timeout = time.Duration(*(rc.Spec.Replicas)/gcThroughput) * time.Second + timeout = time.Duration(replicas/gcThroughput) * time.Second // gcThroughput is pretty strict now, add a bit more to it timeout = timeout + 3*time.Minute } + err = waitForPodsInactive(ps, interval, timeout) if err != nil { return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err) } - terminatePodTime := time.Now().Sub(startTime) - deleteRCTime - Logf("Terminating RC %s pods took: %v", name, terminatePodTime) + terminatePodTime := time.Now().Sub(startTime) - deleteTime + Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime) + err = waitForPodsGone(ps, interval, 10*time.Minute) if err != nil { return fmt.Errorf("error while waiting for pods gone %s: %v", name, err) @@ -2868,11 +2963,15 @@ func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error { return nil } -// podStoreForRC creates a PodStore that monitors pods belong to the rc. It -// waits until the reflector does a List() before returning. -func podStoreForRC(c clientset.Interface, rc *v1.ReplicationController) (*testutils.PodStore, error) { - labels := labels.SelectorFromSet(rc.Spec.Selector) - ps := testutils.NewPodStore(c, rc.Namespace, labels, fields.Everything()) +// DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods. +func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error { + return DeleteResourceAndWaitForGC(c, api.Kind("ReplicationController"), ns, name) +} + +// podStoreForSelector creates a PodStore that monitors pods from given namespace matching given selector. +// It waits until the reflector does a List() before returning. +func podStoreForSelector(c clientset.Interface, ns string, selector labels.Selector) (*testutils.PodStore, error) { + ps := testutils.NewPodStore(c, ns, selector, fields.Everything()) err := wait.Poll(1*time.Second, 2*time.Minute, func() (bool, error) { if len(ps.Reflector.LastSyncResourceVersion()) != 0 { return true, nil @@ -4312,7 +4411,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl return err } if replicas == 0 { - ps, err := podStoreForRC(clientset, rc) + ps, err := podStoreForSelector(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector)) if err != nil { return err } diff --git a/test/e2e/load.go b/test/e2e/load.go index c6e57e6ac1e..370d4b2c02c 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -27,12 +27,15 @@ import ( "sync" "time" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/transport" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util/intstr" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/test/e2e/framework" @@ -43,15 +46,15 @@ import ( ) const ( - smallRCSize = 5 - mediumRCSize = 30 - bigRCSize = 250 - smallRCGroupName = "load-small-rc" - mediumRCGroupName = "load-medium-rc" - bigRCGroupName = "load-big-rc" - smallRCBatchSize = 30 - mediumRCBatchSize = 5 - bigRCBatchSize = 1 + smallGroupSize = 5 + mediumGroupSize = 30 + bigGroupSize = 250 + smallGroupName = "load-small" + mediumGroupName = "load-medium" + bigGroupName = "load-big" + smallGroupBatchSize = 30 + mediumGroupBatchSize = 5 + bigGroupBatchSize = 1 // We start RCs/Services/pods/... in different namespace in this test. // nodeCountPerNamespace determines how many namespaces we will be using // depending on the number of nodes in the underlying cluster. @@ -66,7 +69,7 @@ var _ = framework.KubeDescribe("Load capacity", func() { var clientset clientset.Interface var nodeCount int var ns string - var configs []*testutils.RCConfig + var configs []testutils.RunObjectConfig // Gathers metrics before teardown // TODO add flag that allows to skip cleanup on failure @@ -117,20 +120,22 @@ var _ = framework.KubeDescribe("Load capacity", func() { podsPerNode int image string command []string + // What kind of resource we want to create + kind schema.GroupKind } loadTests := []Load{ // The container will consume 1 cpu and 512mb of memory. - {podsPerNode: 3, image: "jess/stress", command: []string{"stress", "-c", "1", "-m", "2"}}, - {podsPerNode: 30, image: "gcr.io/google_containers/serve_hostname:v1.4"}, + {podsPerNode: 3, image: "jess/stress", command: []string{"stress", "-c", "1", "-m", "2"}, kind: api.Kind("ReplicationController")}, + {podsPerNode: 30, image: "gcr.io/google_containers/serve_hostname:v1.4", kind: api.Kind("ReplicationController")}, } for _, testArg := range loadTests { feature := "ManualPerformance" - if testArg.podsPerNode == 30 { + if testArg.podsPerNode == 30 && testArg.kind == api.Kind("ReplicationController") { feature = "Performance" } - name := fmt.Sprintf("[Feature:%s] should be able to handle %v pods per node", feature, testArg.podsPerNode) + name := fmt.Sprintf("[Feature:%s] should be able to handle %v pods per node %v", feature, testArg.podsPerNode, testArg.kind) itArg := testArg It(name, func() { @@ -140,7 +145,7 @@ var _ = framework.KubeDescribe("Load capacity", func() { framework.ExpectNoError(err) totalPods := itArg.podsPerNode * nodeCount - configs = generateRCConfigs(totalPods, itArg.image, itArg.command, namespaces) + configs = generateConfigs(totalPods, itArg.image, itArg.command, namespaces, itArg.kind) var services []*v1.Service // Read the environment variable to see if we want to create services createServices := os.Getenv("CREATE_SERVICES") @@ -173,7 +178,7 @@ var _ = framework.KubeDescribe("Load capacity", func() { // We may want to revisit it in the future. framework.Logf("Starting to create ReplicationControllers...") creatingTime := time.Duration(totalPods/throughput) * time.Second - createAllRC(configs, creatingTime) + createAllResources(configs, creatingTime) By("============================================================================") // We would like to spread scaling replication controllers over time @@ -182,11 +187,11 @@ var _ = framework.KubeDescribe("Load capacity", func() { // The expected number of created/deleted pods is less than totalPods/3. scalingTime := time.Duration(totalPods/(3*throughput)) * time.Second framework.Logf("Starting to scale ReplicationControllers first time...") - scaleAllRC(configs, scalingTime) + scaleAllResources(configs, scalingTime) By("============================================================================") framework.Logf("Starting to scale ReplicationControllers second time...") - scaleAllRC(configs, scalingTime) + scaleAllResources(configs, scalingTime) By("============================================================================") // Cleanup all created replication controllers. @@ -194,7 +199,7 @@ var _ = framework.KubeDescribe("Load capacity", func() { // We may want to revisit it in the future. deletingTime := time.Duration(totalPods/throughput) * time.Second framework.Logf("Starting to delete ReplicationControllers...") - deleteAllRC(configs, deletingTime) + deleteAllResources(configs, deletingTime) if createServices == "true" { framework.Logf("Starting to delete services...") for _, service := range services { @@ -259,27 +264,27 @@ func createClients(numberOfClients int) ([]*clientset.Clientset, []*internalclie return clients, internalClients, nil } -func computeRCCounts(total int) (int, int, int) { +func computePodCounts(total int) (int, int, int) { // Small RCs owns ~0.5 of total number of pods, medium and big RCs ~0.25 each. // For example for 3000 pods (100 nodes, 30 pods per node) there are: // - 300 small RCs each 5 pods // - 25 medium RCs each 30 pods // - 3 big RCs each 250 pods - bigRCCount := total / 4 / bigRCSize - total -= bigRCCount * bigRCSize - mediumRCCount := total / 3 / mediumRCSize - total -= mediumRCCount * mediumRCSize - smallRCCount := total / smallRCSize + bigRCCount := total / 4 / bigGroupSize + total -= bigRCCount * bigGroupSize + mediumRCCount := total / 3 / mediumGroupSize + total -= mediumRCCount * mediumGroupSize + smallRCCount := total / smallGroupSize return smallRCCount, mediumRCCount, bigRCCount } -func generateRCConfigs(totalPods int, image string, command []string, nss []*v1.Namespace) []*testutils.RCConfig { - configs := make([]*testutils.RCConfig, 0) +func generateConfigs(totalPods int, image string, command []string, nss []*v1.Namespace, kind schema.GroupKind) []testutils.RunObjectConfig { + configs := make([]testutils.RunObjectConfig, 0) - smallRCCount, mediumRCCount, bigRCCount := computeRCCounts(totalPods) - configs = append(configs, generateRCConfigsForGroup(nss, smallRCGroupName, smallRCSize, smallRCCount, image, command)...) - configs = append(configs, generateRCConfigsForGroup(nss, mediumRCGroupName, mediumRCSize, mediumRCCount, image, command)...) - configs = append(configs, generateRCConfigsForGroup(nss, bigRCGroupName, bigRCSize, bigRCCount, image, command)...) + smallRCCount, mediumRCCount, bigRCCount := computePodCounts(totalPods) + configs = append(configs, generateConfigsForGroup(nss, smallGroupName, smallGroupSize, smallRCCount, image, command, kind)...) + configs = append(configs, generateConfigsForGroup(nss, mediumGroupName, mediumGroupSize, mediumRCCount, image, command, kind)...) + configs = append(configs, generateConfigsForGroup(nss, bigGroupName, bigGroupSize, bigRCCount, image, command, kind)...) // Create a number of clients to better simulate real usecase // where not everyone is using exactly the same client. @@ -288,18 +293,18 @@ func generateRCConfigs(totalPods int, image string, command []string, nss []*v1. framework.ExpectNoError(err) for i := 0; i < len(configs); i++ { - configs[i].Client = clients[i%len(clients)] - configs[i].InternalClient = internalClients[i%len(internalClients)] + configs[i].SetClient(clients[i%len(clients)]) + configs[i].SetInternalClient(internalClients[i%len(internalClients)]) } return configs } -func generateRCConfigsForGroup( - nss []*v1.Namespace, groupName string, size, count int, image string, command []string) []*testutils.RCConfig { - configs := make([]*testutils.RCConfig, 0, count) +func generateConfigsForGroup( + nss []*v1.Namespace, groupName string, size, count int, image string, command []string, kind schema.GroupKind) []testutils.RunObjectConfig { + configs := make([]testutils.RunObjectConfig, 0, count) for i := 1; i <= count; i++ { - config := &testutils.RCConfig{ + baseConfig := &testutils.RCConfig{ Client: nil, // this will be overwritten later InternalClient: nil, // this will be overwritten later Name: groupName + "-" + strconv.Itoa(i), @@ -311,20 +316,31 @@ func generateRCConfigsForGroup( CpuRequest: 10, // 0.01 core MemRequest: 26214400, // 25MB } + var config testutils.RunObjectConfig + switch kind { + case api.Kind("ReplicationController"): + config = baseConfig + case extensions.Kind("ReplicaSet"): + config = &testutils.ReplicaSetConfig{RCConfig: *baseConfig} + case extensions.Kind("Deployment"): + config = &testutils.DeploymentConfig{RCConfig: *baseConfig} + default: + framework.Failf("Unsupported kind for config creation: %v", kind) + } configs = append(configs, config) } return configs } -func generateServicesForConfigs(configs []*testutils.RCConfig) []*v1.Service { +func generateServicesForConfigs(configs []testutils.RunObjectConfig) []*v1.Service { services := make([]*v1.Service, 0, len(configs)) for _, config := range configs { - serviceName := config.Name + "-svc" - labels := map[string]string{"name": config.Name} + serviceName := config.GetName() + "-svc" + labels := map[string]string{"name": config.GetName()} service := &v1.Service{ ObjectMeta: v1.ObjectMeta{ Name: serviceName, - Namespace: config.Namespace, + Namespace: config.GetNamespace(), }, Spec: v1.ServiceSpec{ Selector: labels, @@ -343,69 +359,75 @@ func sleepUpTo(d time.Duration) { time.Sleep(time.Duration(rand.Int63n(d.Nanoseconds()))) } -func createAllRC(configs []*testutils.RCConfig, creatingTime time.Duration) { +func createAllResources(configs []testutils.RunObjectConfig, creatingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { - go createRC(&wg, config, creatingTime) + go createResource(&wg, config, creatingTime) } wg.Wait() } -func createRC(wg *sync.WaitGroup, config *testutils.RCConfig, creatingTime time.Duration) { +func createResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, creatingTime time.Duration) { defer GinkgoRecover() defer wg.Done() sleepUpTo(creatingTime) - framework.ExpectNoError(framework.RunRC(*config), fmt.Sprintf("creating rc %s", config.Name)) + framework.ExpectNoError(config.Run(), fmt.Sprintf("creating %v %s", config.GetKind(), config.GetName())) } -func scaleAllRC(configs []*testutils.RCConfig, scalingTime time.Duration) { +func scaleAllResources(configs []testutils.RunObjectConfig, scalingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { - go scaleRC(&wg, config, scalingTime) + go scaleResource(&wg, config, scalingTime) } wg.Wait() } // Scales RC to a random size within [0.5*size, 1.5*size] and lists all the pods afterwards. // Scaling happens always based on original size, not the current size. -func scaleRC(wg *sync.WaitGroup, config *testutils.RCConfig, scalingTime time.Duration) { +func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scalingTime time.Duration) { defer GinkgoRecover() defer wg.Done() sleepUpTo(scalingTime) - newSize := uint(rand.Intn(config.Replicas) + config.Replicas/2) - framework.ExpectNoError(framework.ScaleRC(config.Client, config.InternalClient, config.Namespace, config.Name, newSize, true), - fmt.Sprintf("scaling rc %s for the first time", config.Name)) - selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) + newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2) + framework.ExpectNoError(framework.ScaleResource( + config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()), + fmt.Sprintf("scaling rc %s for the first time", config.GetName())) + + selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()})) options := v1.ListOptions{ LabelSelector: selector.String(), ResourceVersion: "0", } - _, err := config.Client.Core().Pods(config.Namespace).List(options) - framework.ExpectNoError(err, fmt.Sprintf("listing pods from rc %v", config.Name)) + _, err := config.GetClient().Core().Pods(config.GetNamespace()).List(options) + framework.ExpectNoError(err, fmt.Sprintf("listing pods from rc %v", config.GetName())) } -func deleteAllRC(configs []*testutils.RCConfig, deletingTime time.Duration) { +func deleteAllResources(configs []testutils.RunObjectConfig, deletingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { - go deleteRC(&wg, config, deletingTime) + go deleteResource(&wg, config, deletingTime) } wg.Wait() } -func deleteRC(wg *sync.WaitGroup, config *testutils.RCConfig, deletingTime time.Duration) { +func deleteResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, deletingTime time.Duration) { defer GinkgoRecover() defer wg.Done() sleepUpTo(deletingTime) - if framework.TestContext.GarbageCollectorEnabled { - framework.ExpectNoError(framework.DeleteRCAndWaitForGC(config.Client, config.Namespace, config.Name), fmt.Sprintf("deleting rc %s", config.Name)) + if framework.TestContext.GarbageCollectorEnabled && config.GetKind() != extensions.Kind("Deployment") { + framework.ExpectNoError(framework.DeleteResourceAndWaitForGC( + config.GetClient(), config.GetKind(), config.GetNamespace(), config.GetName()), + fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName())) } else { - framework.ExpectNoError(framework.DeleteRCAndPods(config.Client, config.InternalClient, config.Namespace, config.Name), fmt.Sprintf("deleting rc %s", config.Name)) + framework.ExpectNoError(framework.DeleteResourceAndPods( + config.GetClient(), config.GetInternalClient(), config.GetKind(), config.GetNamespace(), config.GetName()), + fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName())) } } diff --git a/test/e2e/serviceloadbalancers.go b/test/e2e/serviceloadbalancers.go index 6249cedf03a..b0d27dfad11 100644 --- a/test/e2e/serviceloadbalancers.go +++ b/test/e2e/serviceloadbalancers.go @@ -104,7 +104,7 @@ func (h *haproxyControllerTester) start(namespace string) (err error) { if err != nil { return } - if err = framework.WaitForRCPodsRunning(h.client, namespace, rc.Name); err != nil { + if err = framework.WaitForControlledPodsRunning(h.client, namespace, rc.Name, api.Kind("ReplicationController")); err != nil { return } h.rcName = rc.Name @@ -171,7 +171,7 @@ func (s *ingManager) start(namespace string) (err error) { if err != nil { return } - if err = framework.WaitForRCPodsRunning(s.client, rc.Namespace, rc.Name); err != nil { + if err = framework.WaitForControlledPodsRunning(s.client, rc.Namespace, rc.Name, api.Kind("ReplicationController")); err != nil { return } } diff --git a/test/utils/BUILD b/test/utils/BUILD index e862d8835a8..eb4301de584 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/api/errors:go_default_library", "//pkg/api/resource:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/meta/v1:go_default_library", "//pkg/client/cache:go_default_library", @@ -33,6 +34,7 @@ go_library( "//pkg/fields:go_default_library", "//pkg/labels:go_default_library", "//pkg/runtime:go_default_library", + "//pkg/runtime/schema:go_default_library", "//pkg/util/sets:go_default_library", "//pkg/util/uuid:go_default_library", "//pkg/util/workqueue:go_default_library", diff --git a/test/utils/runners.go b/test/utils/runners.go index 68d0a8eb04f..ee35c3aeaa9 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -27,12 +27,14 @@ import ( apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/v1" + extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/workqueue" @@ -45,6 +47,18 @@ const ( nonExist = "NonExist" ) +type RunObjectConfig interface { + Run() error + GetName() string + GetNamespace() string + GetKind() schema.GroupKind + GetClient() clientset.Interface + GetInternalClient() internalclientset.Interface + SetClient(clientset.Interface) + SetInternalClient(internalclientset.Interface) + GetReplicas() int +} + type RCConfig struct { Client clientset.Interface InternalClient internalclientset.Interface @@ -193,6 +207,14 @@ func RunDeployment(config DeploymentConfig) error { return config.start() } +func (config *DeploymentConfig) Run() error { + return RunDeployment(*config) +} + +func (config *DeploymentConfig) GetKind() schema.GroupKind { + return extensionsinternal.Kind("Deployment") +} + func (config *DeploymentConfig) create() error { deployment := &extensions.Deployment{ ObjectMeta: v1.ObjectMeta{ @@ -245,6 +267,14 @@ func RunReplicaSet(config ReplicaSetConfig) error { return config.start() } +func (config *ReplicaSetConfig) Run() error { + return RunReplicaSet(*config) +} + +func (config *ReplicaSetConfig) GetKind() schema.GroupKind { + return extensionsinternal.Kind("ReplicaSet") +} + func (config *ReplicaSetConfig) create() error { rs := &extensions.ReplicaSet{ ObjectMeta: v1.ObjectMeta{ @@ -297,6 +327,42 @@ func RunRC(config RCConfig) error { return config.start() } +func (config *RCConfig) Run() error { + return RunRC(*config) +} + +func (config *RCConfig) GetName() string { + return config.Name +} + +func (config *RCConfig) GetNamespace() string { + return config.Namespace +} + +func (config *RCConfig) GetKind() schema.GroupKind { + return api.Kind("ReplicationController") +} + +func (config *RCConfig) GetClient() clientset.Interface { + return config.Client +} + +func (config *RCConfig) GetInternalClient() internalclientset.Interface { + return config.InternalClient +} + +func (config *RCConfig) SetClient(c clientset.Interface) { + config.Client = c +} + +func (config *RCConfig) SetInternalClient(c internalclientset.Interface) { + config.InternalClient = c +} + +func (config *RCConfig) GetReplicas() int { + return config.Replicas +} + func (config *RCConfig) create() error { dnsDefault := v1.DNSDefault if config.DNSPolicy == nil {