From bfe2a2b03c21c2a4c57d77dbddb5425fb81922e4 Mon Sep 17 00:00:00 2001 From: gmarek Date: Wed, 7 Dec 2016 15:51:57 +0100 Subject: [PATCH] Add Daemons to Load/Density tests --- test/e2e/density.go | 35 +++++++++++++-- test/e2e/framework/util.go | 6 +++ test/e2e/load.go | 51 ++++++++++++++-------- test/utils/BUILD | 1 + test/utils/runners.go | 88 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 160 insertions(+), 21 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index e54e65f62de..be3b61ef5b5 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -66,6 +66,7 @@ type DensityTestConfig struct { // What kind of resource we want to create kind schema.GroupKind SecretConfigs []*testutils.SecretConfig + DaemonConfigs []*testutils.DaemonConfig } func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint { @@ -197,6 +198,11 @@ func runDensityTest(dtc DensityTestConfig) time.Duration { for i := range dtc.SecretConfigs { dtc.SecretConfigs[i].Run() } + + for i := range dtc.DaemonConfigs { + dtc.DaemonConfigs[i].Run() + } + // Start all replication controllers. startTime := time.Now() wg := sync.WaitGroup{} @@ -258,7 +264,7 @@ func cleanupDensityTest(dtc DensityTestConfig) { framework.ExpectNoError(err) } else { By(fmt.Sprintf("Cleaning up the %v and pods", kind)) - err := framework.DeleteResourceAndPods(dtc.ClientSet, dtc.InternalClientset, kind, dtc.Configs[i].GetNamespace(), name) + err := framework.DeleteResourceAndPods(dtc.ClientSet, dtc.InternalClientset, kind, namespace, name) framework.ExpectNoError(err) } } @@ -267,6 +273,16 @@ func cleanupDensityTest(dtc DensityTestConfig) { for i := range dtc.SecretConfigs { dtc.SecretConfigs[i].Stop() } + + for i := range dtc.DaemonConfigs { + framework.ExpectNoError(framework.DeleteResourceAndPods( + dtc.ClientSet, + dtc.InternalClientset, + extensions.Kind("DaemonSet"), + dtc.DaemonConfigs[i].Namespace, + dtc.DaemonConfigs[i].Name, + )) + } } // This test suite can take a long time to run, and can affect or be affected by other tests. @@ -367,8 +383,9 @@ var _ = framework.KubeDescribe("Density", func() { // Controls how often the apiserver is polled for pods interval time.Duration // What kind of resource we should be creating. Default: ReplicationController - kind schema.GroupKind - secretsPerPod int + kind schema.GroupKind + secretsPerPod int + daemonsPerNode int } densityTests := []Density{ @@ -405,7 +422,7 @@ var _ = framework.KubeDescribe("Density", func() { if podsPerNode == 30 { f.AddonResourceConstraints = func() map[string]framework.ResourceConstraint { return density30AddonResourceVerifier(nodeCount) }() } - totalPods = podsPerNode * nodeCount + totalPods = (podsPerNode - itArg.daemonsPerNode) * nodeCount fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid)) framework.ExpectNoError(err) defer fileHndl.Close() @@ -477,6 +494,16 @@ var _ = framework.KubeDescribe("Density", func() { kind: itArg.kind, SecretConfigs: secretConfigs, } + + for i := 0; i < itArg.daemonsPerNode; i++ { + dConfig.DaemonConfigs = append(dConfig.DaemonConfigs, + &testutils.DaemonConfig{ + Client: f.ClientSet, + Name: fmt.Sprintf("density-daemon-%v", i), + Namespace: f.Namespace.Name, + LogFunc: framework.Logf, + }) + } e2eStartupTime = runDensityTest(dConfig) if itArg.runLatencyTest { By("Scheduling additional Pods to measure startup latencies") diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 6a85fc4ffd4..528dde36f2e 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2779,6 +2779,8 @@ func getRuntimeObjectForKind(c clientset.Interface, kind schema.GroupKind, ns, n return c.Extensions().ReplicaSets(ns).Get(name, metav1.GetOptions{}) case extensionsinternal.Kind("Deployment"): return c.Extensions().Deployments(ns).Get(name, metav1.GetOptions{}) + case extensionsinternal.Kind("DaemonSet"): + return c.Extensions().DaemonSets(ns).Get(name, metav1.GetOptions{}) default: return nil, fmt.Errorf("Unsupported kind when getting runtime object: %v", kind) } @@ -2805,6 +2807,8 @@ func getSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) { return metav1.LabelSelectorAsSelector(typed.Spec.Selector) case *extensions.Deployment: return metav1.LabelSelectorAsSelector(typed.Spec.Selector) + case *extensions.DaemonSet: + return metav1.LabelSelectorAsSelector(typed.Spec.Selector) default: return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj) } @@ -2840,6 +2844,8 @@ func getReaperForKind(internalClientset internalclientset.Interface, kind schema return kubectl.ReaperFor(extensionsinternal.Kind("ReplicaSet"), internalClientset) case extensionsinternal.Kind("Deployment"): return kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClientset) + case extensionsinternal.Kind("DaemonSet"): + return kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), internalClientset) default: return nil, fmt.Errorf("Unsupported kind: %v", kind) } diff --git a/test/e2e/load.go b/test/e2e/load.go index 934e018cbc4..bab64360605 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -122,9 +122,10 @@ var _ = framework.KubeDescribe("Load capacity", func() { image string command []string // What kind of resource we want to create - kind schema.GroupKind - services bool - secretsPerPod int + kind schema.GroupKind + services bool + secretsPerPod int + daemonsPerNode int } loadTests := []Load{ @@ -149,9 +150,8 @@ var _ = framework.KubeDescribe("Load capacity", func() { namespaces, err := CreateNamespaces(f, namespaceCount, fmt.Sprintf("load-%v-nodepods", itArg.podsPerNode)) framework.ExpectNoError(err) - totalPods := itArg.podsPerNode * nodeCount + totalPods := (itArg.podsPerNode - itArg.daemonsPerNode) * nodeCount configs, secretConfigs = generateConfigs(totalPods, itArg.image, itArg.command, namespaces, itArg.kind, itArg.secretsPerPod) - var services []*v1.Service if itArg.services { framework.Logf("Creating services") services := generateServicesForConfigs(configs) @@ -160,12 +160,41 @@ var _ = framework.KubeDescribe("Load capacity", func() { framework.ExpectNoError(err) } framework.Logf("%v Services created.", len(services)) + defer func(services []*v1.Service) { + framework.Logf("Starting to delete services...") + for _, service := range services { + err := clientset.Core().Services(ns).Delete(service.Name, nil) + framework.ExpectNoError(err) + } + framework.Logf("Services deleted") + }(services) } else { framework.Logf("Skipping service creation") } // Create all secrets for i := range secretConfigs { secretConfigs[i].Run() + defer secretConfigs[i].Stop() + } + // StartDeamon if needed + for i := 0; i < itArg.daemonsPerNode; i++ { + daemonName := fmt.Sprintf("load-daemon-%v", i) + daemonConfig := &testutils.DaemonConfig{ + Client: f.ClientSet, + Name: daemonName, + Namespace: f.Namespace.Name, + LogFunc: framework.Logf, + } + daemonConfig.Run() + defer func(config *testutils.DaemonConfig) { + framework.ExpectNoError(framework.DeleteResourceAndPods( + f.ClientSet, + f.InternalClientset, + extensions.Kind("DaemonSet"), + config.Namespace, + config.Name, + )) + }(daemonConfig) } // Simulate lifetime of RC: @@ -207,18 +236,6 @@ var _ = framework.KubeDescribe("Load capacity", func() { deletingTime := time.Duration(totalPods/throughput) * time.Second framework.Logf("Starting to delete ReplicationControllers...") deleteAllResources(configs, deletingTime) - // Delete all secrets - for i := range secretConfigs { - secretConfigs[i].Stop() - } - if itArg.services { - framework.Logf("Starting to delete services...") - for _, service := range services { - err := clientset.Core().Services(ns).Delete(service.Name, nil) - framework.ExpectNoError(err) - } - framework.Logf("Services deleted") - } }) } }) diff --git a/test/utils/BUILD b/test/utils/BUILD index eb4301de584..691d067ca6d 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -37,6 +37,7 @@ go_library( "//pkg/runtime/schema:go_default_library", "//pkg/util/sets:go_default_library", "//pkg/util/uuid:go_default_library", + "//pkg/util/wait:go_default_library", "//pkg/util/workqueue:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", diff --git a/test/utils/runners.go b/test/utils/runners.go index 19aa138a85c..7a79895e357 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/uuid" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "github.com/golang/glog" @@ -1000,3 +1001,90 @@ func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) { template.Spec.Volumes = volumes template.Spec.Containers[0].VolumeMounts = mounts } + +type DaemonConfig struct { + Client clientset.Interface + Name string + Namespace string + Image string + // If set this function will be used to print log lines instead of glog. + LogFunc func(fmt string, args ...interface{}) + // How long we wait for DaemonSet to become running. + Timeout time.Duration +} + +func (config *DaemonConfig) Run() error { + if config.Image == "" { + config.Image = "kubernetes/pause" + } + nameLabel := map[string]string{ + "name": config.Name + "-daemon", + } + daemon := &extensions.DaemonSet{ + ObjectMeta: v1.ObjectMeta{ + Name: config.Name, + }, + Spec: extensions.DaemonSetSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: nameLabel, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: config.Name, + Image: config.Image, + }, + }, + }, + }, + }, + } + + _, err := config.Client.Extensions().DaemonSets(config.Namespace).Create(daemon) + if err != nil { + return fmt.Errorf("Error creating DaemonSet %v: %v", config.Name, err) + } + + var nodes *v1.NodeList + for i := 0; i < retries; i++ { + // Wait for all daemons to be running + nodes, err = config.Client.Core().Nodes().List(v1.ListOptions{ResourceVersion: "0"}) + if err == nil { + break + } else if i+1 == retries { + return fmt.Errorf("Error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err) + } + } + + timeout := config.Timeout + if timeout <= 0 { + timeout = 5 * time.Minute + } + + podStore := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything()) + defer podStore.Stop() + + err = wait.Poll(time.Second, timeout, func() (bool, error) { + pods := podStore.List() + + nodeHasDaemon := sets.NewString() + for _, pod := range pods { + podReady, _ := PodRunningReady(pod) + if pod.Spec.NodeName != "" && podReady { + nodeHasDaemon.Insert(pod.Spec.NodeName) + } + } + + running := len(nodeHasDaemon) + config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items)) + return running == len(nodes.Items), nil + }) + if err != nil { + config.LogFunc("Timed out while waiting for DaemonsSet %v/%v to be running.", config.Namespace, config.Name) + } else { + config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name) + } + + return err +}