From fb72b501352544c943652616a1955ff26364b839 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Fri, 20 May 2016 10:14:02 -0400 Subject: [PATCH] Added test to density that will run maximum capacity pods on all nodes --- test/e2e/density.go | 361 +++++++++++++++++++------------ test/e2e/framework/util.go | 71 ++++++ test/e2e/scheduler_predicates.go | 43 +--- 3 files changed, 294 insertions(+), 181 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 09ee63b7d31..094a9727a7a 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/sets" utiluuid "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" @@ -50,6 +51,15 @@ const ( // Maximum container failures this test tolerates before failing. var MaxContainerFailures = 0 +type DensityTestConfig struct { + Configs []framework.RCConfig + Client *client.Client + Namespace string + PollInterval time.Duration + PodCount int + Timeout time.Duration +} + func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint { var apiserverMem uint64 var controllerMem uint64 @@ -167,6 +177,155 @@ func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observed } } +// runDensityTest will perform a density test and return the time it took for +// all pods to start +func runDensityTest(dtc DensityTestConfig) time.Duration { + defer GinkgoRecover() + // Create a listener for events. + // eLock is a lock protects the events + var eLock sync.Mutex + events := make([](*api.Event), 0) + _, controller := controllerframework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dtc.Client.Events(dtc.Namespace).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dtc.Client.Events(dtc.Namespace).Watch(options) + }, + }, + &api.Event{}, + 0, + controllerframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eLock.Lock() + defer eLock.Unlock() + events = append(events, obj.(*api.Event)) + }, + }, + ) + stop := make(chan struct{}) + go controller.Run(stop) + + // Create a listener for api updates + // uLock is a lock protects the updateCount + var uLock sync.Mutex + updateCount := 0 + label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"})) + _, updateController := controllerframework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.LabelSelector = label + return dtc.Client.Pods(dtc.Namespace).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.LabelSelector = label + return dtc.Client.Pods(dtc.Namespace).Watch(options) + }, + }, + &api.Pod{}, + 0, + controllerframework.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, _ interface{}) { + uLock.Lock() + defer uLock.Unlock() + updateCount++ + }, + }, + ) + go updateController.Run(stop) + + // Start all replication controllers. + startTime := time.Now() + wg := sync.WaitGroup{} + wg.Add(len(dtc.Configs)) + for i := range dtc.Configs { + rcConfig := dtc.Configs[i] + go func() { + framework.ExpectNoError(framework.RunRC(rcConfig)) + wg.Done() + }() + } + logStopCh := make(chan struct{}) + go logPodStartupStatus(dtc.Client, dtc.PodCount, dtc.Namespace, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh) + wg.Wait() + startupTime := time.Now().Sub(startTime) + close(logStopCh) + framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime) + framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second)) + + By("Waiting for all events to be recorded") + last := -1 + current := len(events) + lastCount := -1 + currentCount := updateCount + for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < dtc.Timeout; time.Sleep(10 * time.Second) { + func() { + eLock.Lock() + defer eLock.Unlock() + last = current + current = len(events) + }() + func() { + uLock.Lock() + defer uLock.Unlock() + lastCount = currentCount + currentCount = updateCount + }() + } + close(stop) + + if current != last { + framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", dtc.Timeout.Minutes()) + } + framework.Logf("Found %d events", current) + if currentCount != lastCount { + framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", dtc.Timeout.Minutes()) + } + framework.Logf("Found %d updates", currentCount) + + // Tune the threshold for allowed failures. + badEvents := framework.BadEvents(events) + Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(dtc.PodCount))))) + // Print some data about Pod to Node allocation + By("Printing Pod to Node allocation data") + podList, err := dtc.Client.Pods(api.NamespaceAll).List(api.ListOptions{}) + framework.ExpectNoError(err) + pausePodAllocation := make(map[string]int) + systemPodAllocation := make(map[string][]string) + for _, pod := range podList.Items { + if pod.Namespace == api.NamespaceSystem { + systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name) + } else { + pausePodAllocation[pod.Spec.NodeName]++ + } + } + nodeNames := make([]string, 0) + for k := range pausePodAllocation { + nodeNames = append(nodeNames, k) + } + sort.Strings(nodeNames) + for _, node := range nodeNames { + framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node]) + } + return startupTime +} + +func cleanupDensityTest(dtc DensityTestConfig) { + defer GinkgoRecover() + By("Deleting ReplicationController") + // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics. + for i := range dtc.Configs { + rcName := dtc.Configs[i].Name + rc, err := dtc.Client.ReplicationControllers(dtc.Namespace).Get(rcName) + if err == nil && rc.Spec.Replicas != 0 { + By("Cleaning up the replication controller") + err := framework.DeleteRC(dtc.Client, dtc.Namespace, rcName) + framework.ExpectNoError(err) + } + } +} + // This test suite can take a long time to run, and can affect or be affected by other tests. // So by default it is added to the ginkgo.skip list (see driver.go). // To run this suite you must explicitly ask for it by setting the @@ -185,6 +344,8 @@ var _ = framework.KubeDescribe("Density", func() { var totalPods int var nodeCpuCapacity int64 var nodeMemCapacity int64 + var nodes *api.NodeList + var masters sets.String // Gathers data prior to framework namespace teardown AfterEach(func() { @@ -225,8 +386,7 @@ var _ = framework.KubeDescribe("Density", func() { // of nodes without Routes created. Since this would make a node // unschedulable, we need to wait until all of them are schedulable. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c)) - - nodes := framework.GetReadySchedulableNodesOrDie(c) + masters, nodes = framework.GetMasterAndWorkerNodesOrDie(c) nodeCount = len(nodes.Items) Expect(nodeCount).NotTo(BeZero()) if nodeCount == 30 { @@ -291,16 +451,18 @@ var _ = framework.KubeDescribe("Density", func() { } itArg := testArg It(name, func() { + podsPerNode := itArg.podsPerNode + totalPods = podsPerNode * nodeCount fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid)) framework.ExpectNoError(err) defer fileHndl.Close() - podsPerNode := itArg.podsPerNode - totalPods = podsPerNode * nodeCount + timeout := 10 * time.Minute + // TODO: loop to podsPerNode instead of 1 when we're ready. numberOrRCs := 1 RCConfigs := make([]framework.RCConfig, numberOrRCs) for i := 0; i < numberOrRCs; i++ { - RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid + RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid RCConfigs[i] = framework.RCConfig{Client: c, Image: framework.GetPauseImageName(f.Client), Name: RCName, @@ -316,135 +478,14 @@ var _ = framework.KubeDescribe("Density", func() { } } - // Create a listener for events. - // eLock is a lock protects the events - var eLock sync.Mutex - events := make([](*api.Event), 0) - _, controller := controllerframework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return c.Events(ns).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return c.Events(ns).Watch(options) - }, - }, - &api.Event{}, - 0, - controllerframework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - eLock.Lock() - defer eLock.Unlock() - events = append(events, obj.(*api.Event)) - }, - }, - ) - stop := make(chan struct{}) - go controller.Run(stop) - - // Create a listener for api updates - // uLock is a lock protects the updateCount - var uLock sync.Mutex - updateCount := 0 - label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"})) - _, updateController := controllerframework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - options.LabelSelector = label - return c.Pods(ns).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.LabelSelector = label - return c.Pods(ns).Watch(options) - }, - }, - &api.Pod{}, - 0, - controllerframework.ResourceEventHandlerFuncs{ - UpdateFunc: func(_, _ interface{}) { - uLock.Lock() - defer uLock.Unlock() - updateCount++ - }, - }, - ) - go updateController.Run(stop) - - // Start all replication controllers. - startTime := time.Now() - wg := sync.WaitGroup{} - wg.Add(len(RCConfigs)) - for i := range RCConfigs { - rcConfig := RCConfigs[i] - go func() { - framework.ExpectNoError(framework.RunRC(rcConfig)) - wg.Done() - }() + dConfig := DensityTestConfig{Client: c, + Configs: RCConfigs, + PodCount: totalPods, + Namespace: ns, + PollInterval: itArg.interval, + Timeout: timeout, } - logStopCh := make(chan struct{}) - go logPodStartupStatus(c, totalPods, ns, map[string]string{"type": "densityPod"}, itArg.interval, logStopCh) - wg.Wait() - e2eStartupTime = time.Now().Sub(startTime) - close(logStopCh) - framework.Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) - framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(totalPods)/float32(e2eStartupTime/time.Second)) - - By("Waiting for all events to be recorded") - last := -1 - current := len(events) - lastCount := -1 - currentCount := updateCount - timeout := 10 * time.Minute - for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < timeout; time.Sleep(10 * time.Second) { - func() { - eLock.Lock() - defer eLock.Unlock() - last = current - current = len(events) - }() - func() { - uLock.Lock() - defer uLock.Unlock() - lastCount = currentCount - currentCount = updateCount - }() - } - close(stop) - - if current != last { - framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", timeout.Minutes()) - } - framework.Logf("Found %d events", current) - if currentCount != lastCount { - framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", timeout.Minutes()) - } - framework.Logf("Found %d updates", currentCount) - - // Tune the threshold for allowed failures. - badEvents := framework.BadEvents(events) - Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(totalPods))))) - // Print some data about Pod to Node allocation - By("Printing Pod to Node allocation data") - podList, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) - framework.ExpectNoError(err) - pausePodAllocation := make(map[string]int) - systemPodAllocation := make(map[string][]string) - for _, pod := range podList.Items { - if pod.Namespace == api.NamespaceSystem { - systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name) - } else { - pausePodAllocation[pod.Spec.NodeName]++ - } - } - nodeNames := make([]string, 0) - for k := range pausePodAllocation { - nodeNames = append(nodeNames, k) - } - sort.Strings(nodeNames) - for _, node := range nodeNames { - framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node]) - } - + e2eStartupTime = runDensityTest(dConfig) if itArg.runLatencyTest { By("Scheduling additional Pods to measure startup latencies") @@ -613,17 +654,7 @@ var _ = framework.KubeDescribe("Density", func() { framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c) } - By("Deleting ReplicationController") - // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics. - for i := range RCConfigs { - rcName := RCConfigs[i].Name - rc, err := c.ReplicationControllers(ns).Get(rcName) - if err == nil && rc.Spec.Replicas != 0 { - By("Cleaning up the replication controller") - err := framework.DeleteRC(c, ns, rcName) - framework.ExpectNoError(err) - } - } + cleanupDensityTest(dConfig) By("Removing additional replication controllers if any") for i := 1; i <= nodeCount; i++ { @@ -632,6 +663,48 @@ var _ = framework.KubeDescribe("Density", func() { } }) } + + // Calculate total number of pods from each node's max-pod + It("[Feature:ManualPerformance] should allow running maximum capacity pods on nodes", func() { + totalPods = 0 + for _, n := range nodes.Items { + totalPods += int(n.Status.Capacity.Pods().Value()) + } + totalPods -= framework.WaitForStableCluster(c, masters) + + fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid)) + framework.ExpectNoError(err) + defer fileHndl.Close() + rcCnt := 1 + RCConfigs := make([]framework.RCConfig, rcCnt) + podsPerRC := int(totalPods / rcCnt) + for i := 0; i < rcCnt; i++ { + if i == rcCnt-1 { + podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt))) + } + RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid + RCConfigs[i] = framework.RCConfig{Client: c, + Image: "gcr.io/google_containers/pause-amd64:3.0", + Name: RCName, + Namespace: ns, + Labels: map[string]string{"type": "densityPod"}, + PollInterval: 10 * time.Second, + PodStatusFile: fileHndl, + Replicas: podsPerRC, + MaxContainerFailures: &MaxContainerFailures, + Silent: true, + } + } + dConfig := DensityTestConfig{Client: c, + Configs: RCConfigs, + PodCount: totalPods, + Namespace: ns, + PollInterval: 10 * time.Second, + Timeout: 10 * time.Minute, + } + e2eStartupTime = runDensityTest(dConfig) + cleanupDensityTest(dConfig) + }) }) func createRunningPodFromRC(wg *sync.WaitGroup, c *client.Client, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 69a25d1ca36..5ac4034c10f 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -65,6 +65,7 @@ import ( "k8s.io/kubernetes/pkg/types" labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" utilyaml "k8s.io/kubernetes/pkg/util/yaml" @@ -4665,3 +4666,73 @@ func retryCmd(command string, args ...string) (string, string, error) { }) return stdout, stderr, err } + +// GetPodsScheduled returns a number of currently scheduled and not scheduled Pods. +func GetPodsScheduled(masterNodes sets.String, pods *api.PodList) (scheduledPods, notScheduledPods []api.Pod) { + for _, pod := range pods.Items { + if !masterNodes.Has(pod.Spec.NodeName) { + if pod.Spec.NodeName != "" { + _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled) + Expect(scheduledCondition != nil).To(Equal(true)) + Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue)) + scheduledPods = append(scheduledPods, pod) + } else { + _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled) + Expect(scheduledCondition != nil).To(Equal(true)) + Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse)) + if scheduledCondition.Reason == "Unschedulable" { + + notScheduledPods = append(notScheduledPods, pod) + } + } + } + } + return +} + +// WaitForStableCluster waits until all existing pods are scheduled and returns their amount. +func WaitForStableCluster(c *client.Client, masterNodes sets.String) int { + timeout := 10 * time.Minute + startTime := time.Now() + + allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) + ExpectNoError(err) + // API server returns also Pods that succeeded. We need to filter them out. + currentPods := make([]api.Pod, 0, len(allPods.Items)) + for _, pod := range allPods.Items { + if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed { + currentPods = append(currentPods, pod) + } + + } + allPods.Items = currentPods + scheduledPods, currentlyNotScheduledPods := GetPodsScheduled(masterNodes, allPods) + for len(currentlyNotScheduledPods) != 0 { + time.Sleep(2 * time.Second) + + allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) + ExpectNoError(err) + scheduledPods, currentlyNotScheduledPods = GetPodsScheduled(masterNodes, allPods) + + if startTime.Add(timeout).Before(time.Now()) { + Failf("Timed out after %v waiting for stable cluster.", timeout) + break + } + } + return len(scheduledPods) +} + +// GetMasterAndWorkerNodesOrDie will return a list masters and schedulable worker nodes +func GetMasterAndWorkerNodesOrDie(c *client.Client) (sets.String, *api.NodeList) { + nodes := &api.NodeList{} + masters := sets.NewString() + all, _ := c.Nodes().List(api.ListOptions{}) + for _, n := range all.Items { + if system.IsMasterNode(&n) { + masters.Insert(n.Name) + } else if isNodeSchedulable(&n) { + nodes.Items = append(nodes.Items, n) + } + } + return masters, nodes +} diff --git a/test/e2e/scheduler_predicates.go b/test/e2e/scheduler_predicates.go index 6e3e5ac6ee5..8689ffbbf4c 100644 --- a/test/e2e/scheduler_predicates.go +++ b/test/e2e/scheduler_predicates.go @@ -148,7 +148,7 @@ func getRequestedCPU(pod api.Pod) int64 { func verifyResult(c *client.Client, podName string, expectedScheduled int, expectedNotScheduled int, ns string) { allPods, err := c.Pods(ns).List(api.ListOptions{}) framework.ExpectNoError(err) - scheduledPods, notScheduledPods := getPodsScheduled(allPods) + scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods) printed := false printOnce := func(msg string) string { @@ -174,37 +174,6 @@ func cleanupPods(c *client.Client, ns string) { } } -// Waits until all existing pods are scheduled and returns their amount. -func waitForStableCluster(c *client.Client) int { - timeout := 10 * time.Minute - startTime := time.Now() - - allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) - framework.ExpectNoError(err) - // API server returns also Pods that succeeded. We need to filter them out. - currentPods := make([]api.Pod, 0, len(allPods.Items)) - for _, pod := range allPods.Items { - if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed { - currentPods = append(currentPods, pod) - } - } - allPods.Items = currentPods - scheduledPods, currentlyNotScheduledPods := getPodsScheduled(allPods) - for len(currentlyNotScheduledPods) != 0 { - time.Sleep(2 * time.Second) - - allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) - framework.ExpectNoError(err) - scheduledPods, currentlyNotScheduledPods = getPodsScheduled(allPods) - - if startTime.Add(timeout).Before(time.Now()) { - framework.Failf("Timed out after %v waiting for stable cluster.", timeout) - break - } - } - return len(scheduledPods) -} - var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { var c *client.Client var nodeList *api.NodeList @@ -279,7 +248,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { totalPodCapacity += podCapacity.Value() } - currentlyScheduledPods := waitForStableCluster(c) + currentlyScheduledPods := framework.WaitForStableCluster(c, masterNodes) podsNeededForSaturation := int(totalPodCapacity) - currentlyScheduledPods By(fmt.Sprintf("Starting additional %v Pods to fully saturate the cluster max pods and trying to start another one", podsNeededForSaturation)) @@ -349,7 +318,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { nodeMaxCapacity = capacity.MilliValue() } } - waitForStableCluster(c) + framework.WaitForStableCluster(c, masterNodes) pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}) framework.ExpectNoError(err) @@ -444,7 +413,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to schedule Pod with nonempty NodeSelector.") podName := "restricted-pod" - waitForStableCluster(c) + framework.WaitForStableCluster(c, masterNodes) _, err := c.Pods(ns).Create(&api.Pod{ TypeMeta: unversioned.TypeMeta{ @@ -597,7 +566,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to schedule Pod with nonempty NodeSelector.") podName := "restricted-pod" - waitForStableCluster(c) + framework.WaitForStableCluster(c, masterNodes) _, err := c.Pods(ns).Create(&api.Pod{ TypeMeta: unversioned.TypeMeta{ @@ -853,7 +822,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to schedule Pod with nonempty Pod Affinity.") podName := "without-label-" + string(uuid.NewUUID()) - waitForStableCluster(c) + framework.WaitForStableCluster(c, masterNodes) _, err := c.Pods(ns).Create(&api.Pod{ TypeMeta: unversioned.TypeMeta{