From df55e8fb7061c37b4b6f2af52e854a2c4adcbb6e Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Thu, 28 Jul 2016 19:14:43 -0700 Subject: [PATCH 1/4] Add density and resource performance test to test-node-e2e --- test/e2e_node/density_test.go | 490 +++++++++++++++++++++++++++ test/e2e_node/e2e_service.go | 3 + test/e2e_node/kubelet_perf_test.go | 277 +++++++++++++++ test/e2e_node/resource_controller.go | 377 +++++++++++++++++++++ 4 files changed, 1147 insertions(+) create mode 100644 test/e2e_node/density_test.go create mode 100644 test/e2e_node/kubelet_perf_test.go create mode 100644 test/e2e_node/resource_controller.go diff --git a/test/e2e_node/density_test.go b/test/e2e_node/density_test.go new file mode 100644 index 00000000000..4d9a0371e53 --- /dev/null +++ b/test/e2e_node/density_test.go @@ -0,0 +1,490 @@ +/* +Copyright 2016 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "errors" + "fmt" + "sort" + "strconv" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + controllerframework "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/metrics" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + kubeletAddr = "localhost:10255" +) + +var _ = framework.KubeDescribe("Density", func() { + const ( + // the data collection time of `resource collector' and the standalone cadvisor + // is not synchronizated. Therefore `resource collector' may miss data or + // collect duplicated data + monitoringInterval = 500 * time.Millisecond + sleepBeforeEach = 30 * time.Second + sleepBeforeCreatePods = 30 * time.Second + sleepAfterDeletePods = 60 * time.Second + ) + + var ( + ns string + nodeName string + ) + + f := framework.NewDefaultFramework("density-test") + podType := "density_test_pod" + + BeforeEach(func() { + ns = f.Namespace.Name + nodeName = framework.TestContext.NodeName + }) + + AfterEach(func() { + time.Sleep(sleepAfterDeletePods) + }) + + Context("create a batch of pods", func() { + densityTests := []DensityTest{ + { + podsNr: 10, + interval: 0 * time.Millisecond, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.20}, + stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.50}, + }, + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 250 * 1024 * 1024}, + }, + // percentile limit of single pod startup latency + podStartupLimits: framework.LatencyMetric{ + Perc50: 7 * time.Second, + Perc90: 10 * time.Second, + Perc99: 15 * time.Second, + }, + // upbound of startup latency of a batch of pods + podBatchStartupLimit: 20 * time.Second, + }, + { + podsNr: 30, + interval: 0 * time.Millisecond, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.35}, + stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.70}, + }, + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + }, + // percentile limit of single pod startup latency + podStartupLimits: framework.LatencyMetric{ + Perc50: 30 * time.Second, + Perc90: 35 * time.Second, + Perc99: 40 * time.Second, + }, + // upbound of startup latency of a batch of pods + podBatchStartupLimit: 90 * time.Second, + }, + } + + for _, testArg := range densityTests { + itArg := testArg + It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval", + itArg.podsNr, itArg.interval), func() { + var ( + mutex = &sync.Mutex{} + watchTimes = make(map[string]unversioned.Time, 0) + stopCh = make(chan struct{}) + ) + + // create specifications of the test pods + pods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType) + + // start a standalone cadvisor pod + // it uses `createSync', so the pod is running when it returns + createCadvisorPod(f) + + // `resource collector' monitoring fine-grain CPU/memory usage by a standalone Cadvisor with + // 1s housingkeeping interval + rm := NewResourceCollector(monitoringInterval) + + // the controller watches the change of pod status + controller := newInformerWatchPod(f, mutex, watchTimes, podType) + go controller.Run(stopCh) + + // Zhou: In test we see kubelet starts while it is busy on sth, as a result `syncLoop' + // does not response to pod creation immediately. Creating the first pod has a delay around 5s. + // The node status has been `ready' so `wait and check node being ready' does not help here. + // Now wait here for a grace period to have `syncLoop' be ready + time.Sleep(sleepBeforeCreatePods) + + // the density test only monitors the overhead of creating pod + // or start earliest and call `rm.Reset()' here to clear the buffer + rm.Start() + + By("Creating a batch of pods") + // it returns a map[`pod name']`creation time' as the creation timestamps + createTimes := createBatchPodWithRateControl(f, pods, itArg.interval) + + By("Waiting for all Pods begin observed by the watch...") + // checks every 10s util all pods are running. it timeouts ater 10min + Eventually(func() bool { + return len(watchTimes) == itArg.podsNr + }, 10*time.Minute, 10*time.Second).Should(BeTrue()) + + if len(watchTimes) < itArg.podsNr { + framework.Failf("Timeout reached waiting for all Pods being observed by the watch.") + } + + // stop the watching controller, and the resource collector + close(stopCh) + rm.Stop() + + // data analyis + var ( + firstCreate unversioned.Time + lastRunning unversioned.Time + init = true + e2eLags = make([]framework.PodLatencyData, 0) + ) + + for name, create := range createTimes { + watch, ok := watchTimes[name] + Expect(ok).To(Equal(true)) + + e2eLags = append(e2eLags, + framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)}) + + if !init { + if firstCreate.Time.After(create.Time) { + firstCreate = create + } + if lastRunning.Time.Before(watch.Time) { + lastRunning = watch + } + } else { + init = false + firstCreate, lastRunning = create, watch + } + } + + sort.Sort(framework.LatencySlice(e2eLags)) + + // verify latency + By("Verifying latency") + verifyLatency(lastRunning.Time.Sub(firstCreate.Time), e2eLags, itArg) + + // verify resource + By("Verifying resource") + verifyResource(f, testArg, rm) + + // delete pods + By("Deleting a batch of pods") + deleteBatchPod(f, pods) + + // tear down cadvisor + Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). + NotTo(HaveOccurred()) + + Eventually(func() error { + return checkPodDeleted(f, cadvisorPodName) + }, 10*time.Minute, time.Second*3).Should(BeNil()) + }) + } + }) + + Context("create a sequence of pods", func() { + densityTests := []DensityTest{ + { + podsNr: 10, + bgPodsNr: 10, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.12}, + stats.SystemContainerRuntime: {0.50: 0.16, 0.95: 0.20}, + }, + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + }, + podStartupLimits: framework.LatencyMetric{ + Perc50: 1500 * time.Millisecond, + Perc90: 2500 * time.Millisecond, + Perc99: 3500 * time.Millisecond, + }, + }, + { + podsNr: 10, + bgPodsNr: 30, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.15}, + stats.SystemContainerRuntime: {0.50: 0.22, 0.95: 0.27}, + }, + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + }, + podStartupLimits: framework.LatencyMetric{ + Perc50: 1500 * time.Millisecond, + Perc90: 2500 * time.Millisecond, + Perc99: 3500 * time.Millisecond, + }, + }, + } + + for _, testArg := range densityTests { + itArg := testArg + It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods", + itArg.podsNr, itArg.bgPodsNr), func() { + bgPods := newTestPods(itArg.bgPodsNr, ImageRegistry[pauseImage], "background_pod") + testPods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType) + + createCadvisorPod(f) + rm := NewResourceCollector(monitoringInterval) + + By("Creating a batch of background pods") + // creatBatch is synchronized + // all pods are running when it returns + f.PodClient().CreateBatch(bgPods) + + //time.Sleep(sleepBeforeCreatePods) + + // starting resource monitoring + rm.Start() + + // do a sequential creation of pod (back to back) + batchlag, e2eLags := createBatchPodSequential(f, testPods) + + rm.Stop() + + // verify latency + By("Verifying latency") + verifyLatency(batchlag, e2eLags, itArg) + + // verify resource + By("Verifying resource") + verifyResource(f, testArg, rm) + + // delete pods + By("Deleting a batch of pods") + deleteBatchPod(f, append(bgPods, testPods...)) + + // tear down cadvisor + Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). + NotTo(HaveOccurred()) + + Eventually(func() error { + return checkPodDeleted(f, cadvisorPodName) + }, 10*time.Minute, time.Second*3).Should(BeNil()) + }) + } + }) +}) + +type DensityTest struct { + // number of pods + podsNr int + bgPodsNr int + // interval between creating pod (rate control) + interval time.Duration + // resource bound + cpuLimits framework.ContainersCPUSummary + memLimits framework.ResourceUsagePerContainer + podStartupLimits framework.LatencyMetric + podBatchStartupLimit time.Duration +} + +// it creates a batch of pods concurrently, uses one goroutine for each creation. +// between creations there is an interval for throughput control +func createBatchPodWithRateControl(f *framework.Framework, pods []*api.Pod, interval time.Duration) map[string]unversioned.Time { + createTimes := make(map[string]unversioned.Time) + for _, pod := range pods { + createTimes[pod.ObjectMeta.Name] = unversioned.Now() + go f.PodClient().Create(pod) + time.Sleep(interval) + } + return createTimes +} + +func checkPodDeleted(f *framework.Framework, podName string) error { + ns := f.Namespace.Name + _, err := f.Client.Pods(ns).Get(podName) + if apierrors.IsNotFound(err) { + return nil + } + return errors.New("Pod Not Deleted") +} + +// get prometheus metric `pod start latency' from kubelet +func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) { + latencyMetrics := framework.KubeletLatencyMetrics{} + ms, err := metrics.GrabKubeletMetricsWithoutProxy(node) + Expect(err).NotTo(HaveOccurred()) + + for _, samples := range ms { + for _, sample := range samples { + if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartLatencyKey { + quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64) + latencyMetrics = append(latencyMetrics, + framework.KubeletLatencyMetric{ + Quantile: quantile, + Method: kubemetrics.PodStartLatencyKey, + Latency: time.Duration(int(sample.Value)) * time.Microsecond}) + } + } + } + return latencyMetrics, nil +} + +// Verifies whether 50, 90 and 99th percentiles of PodStartupLatency are +// within the threshold. +func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error { + if actual.Perc50 > expect.Perc50 { + return fmt.Errorf("too high pod startup latency 50th percentile: %v", actual.Perc50) + } + if actual.Perc90 > expect.Perc90 { + return fmt.Errorf("too high pod startup latency 90th percentile: %v", actual.Perc90) + } + if actual.Perc99 > actual.Perc99 { + return fmt.Errorf("too high pod startup latency 99th percentil: %v", actual.Perc99) + } + return nil +} + +func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]unversioned.Time, + podType string) *controllerframework.Controller { + ns := f.Namespace.Name + checkPodRunning := func(p *api.Pod) { + mutex.Lock() + defer mutex.Unlock() + defer GinkgoRecover() + + if p.Status.Phase == api.PodRunning { + if _, found := watchTimes[p.Name]; !found { + watchTimes[p.Name] = unversioned.Now() + } + } + } + + _, controller := controllerframework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}) + return f.Client.Pods(ns).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}) + return f.Client.Pods(ns).Watch(options) + }, + }, + &api.Pod{}, + 0, + controllerframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p, ok := obj.(*api.Pod) + Expect(ok).To(Equal(true)) + go checkPodRunning(p) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + p, ok := newObj.(*api.Pod) + Expect(ok).To(Equal(true)) + go checkPodRunning(p) + }, + }, + ) + return controller +} + +func verifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, testArg DensityTest) { + framework.PrintLatencies(e2eLags, "worst client e2e total latencies") + + // Zhou: do not trust `kubelet' metrics since they are not reset! + latencyMetrics, _ := getPodStartLatency(kubeletAddr) + framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics)) + + // check whether e2e pod startup time is acceptable. + podCreateLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLags)} + framework.Logf("Pod create latency: %s", framework.PrettyPrintJSON(podCreateLatency)) + framework.ExpectNoError(verifyPodStartupLatency(testArg.podStartupLimits, podCreateLatency.Latency)) + + // check bactch pod creation latency + if testArg.podBatchStartupLimit > 0 { + Expect(batchLag <= testArg.podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v", + batchLag, testArg.podBatchStartupLimit) + } + + // calculate and log throughput + throughputBatch := float64(testArg.podsNr) / batchLag.Minutes() + framework.Logf("Batch creation throughput is %.1f pods/min", throughputBatch) + throughputSequential := 1.0 / e2eLags[len(e2eLags)-1].Latency.Minutes() + framework.Logf("Sequential creation throughput is %.1f pods/min", throughputSequential) +} + +func verifyResource(f *framework.Framework, testArg DensityTest, rm *ResourceCollector) { + nodeName := framework.TestContext.NodeName + + // verify and log memory + usagePerContainer, err := rm.GetLatest() + Expect(err).NotTo(HaveOccurred()) + framework.Logf("%s", formatResourceUsageStats(usagePerContainer)) + + usagePerNode := make(framework.ResourceUsagePerNode) + usagePerNode[nodeName] = usagePerContainer + + memPerfData := framework.ResourceUsageToPerfData(usagePerNode) + framework.PrintPerfData(memPerfData) + + verifyMemoryLimits(f.Client, testArg.memLimits, usagePerNode) + + // verify and log cpu + cpuSummary := rm.GetCPUSummary() + framework.Logf("%s", formatCPUSummary(cpuSummary)) + + cpuSummaryPerNode := make(framework.NodesCPUSummary) + cpuSummaryPerNode[nodeName] = cpuSummary + + cpuPerfData := framework.CPUUsageToPerfData(cpuSummaryPerNode) + framework.PrintPerfData(cpuPerfData) + + verifyCPULimits(testArg.cpuLimits, cpuSummaryPerNode) +} + +func createBatchPodSequential(f *framework.Framework, pods []*api.Pod) (time.Duration, []framework.PodLatencyData) { + batchStartTime := unversioned.Now() + e2eLags := make([]framework.PodLatencyData, 0) + for _, pod := range pods { + create := unversioned.Now() + f.PodClient().CreateSync(pod) + e2eLags = append(e2eLags, + framework.PodLatencyData{Name: pod.ObjectMeta.Name, Latency: unversioned.Now().Time.Sub(create.Time)}) + } + batchLag := unversioned.Now().Time.Sub(batchStartTime.Time) + sort.Sort(framework.LatencySlice(e2eLags)) + return batchLag, e2eLags +} diff --git a/test/e2e_node/e2e_service.go b/test/e2e_node/e2e_service.go index 37914d2c91c..125cdaebf49 100644 --- a/test/e2e_node/e2e_service.go +++ b/test/e2e_node/e2e_service.go @@ -252,6 +252,9 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) { "--file-check-frequency", "10s", // Check file frequently so tests won't wait too long "--v", LOG_VERBOSITY_LEVEL, "--logtostderr", "--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller. + "--cgroup-root=/", + "--runtime-cgroups=/docker-daemon", + "--kubelet-cgroups=/kubelet", ) if es.cgroupsPerQOS { cmdArgs = append(cmdArgs, diff --git a/test/e2e_node/kubelet_perf_test.go b/test/e2e_node/kubelet_perf_test.go new file mode 100644 index 00000000000..ef562afe90c --- /dev/null +++ b/test/e2e_node/kubelet_perf_test.go @@ -0,0 +1,277 @@ +/* +Copyright 2016 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "fmt" + "strings" + "time" + + "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { + const ( + // Interval to poll /stats/container on a node + containerStatsPollingPeriod = 10 * time.Second + // The monitoring time for one test. + monitoringTime = 6 * time.Minute + // The periodic reporting period. + reportingPeriod = 3 * time.Minute + + sleepAfterCreatePods = 10 * time.Second + sleepAfterDeletePods = 120 * time.Second + ) + + var ( + ns string + rm *ResourceCollector + om *framework.RuntimeOperationMonitor + ) + + f := framework.NewDefaultFramework("kubelet-perf") + + BeforeEach(func() { + ns = f.Namespace.Name + om = framework.NewRuntimeOperationMonitor(f.Client) + }) + + AfterEach(func() { + result := om.GetLatestRuntimeOperationErrorRate() + framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) + }) + + Context("regular resource usage tracking", func() { + rTests := []resourceTest{ + { + podsPerNode: 0, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.06, 0.95: 0.08}, + stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.06}, + }, + // We set the memory limits generously because the distribution + // of the addon pods affect the memory usage on each node. + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 85 * 1024 * 1024}, + }, + }, + { + podsPerNode: 35, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.14}, + stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.07}, + }, + // We set the memory limits generously because the distribution + // of the addon pods affect the memory usage on each node. + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 150 * 1024 * 1024}, + }, + }, + { + podsPerNode: 100, + cpuLimits: framework.ContainersCPUSummary{ + stats.SystemContainerKubelet: {0.50: 0.17, 0.95: 0.22}, + stats.SystemContainerRuntime: {0.50: 0.06, 0.95: 0.09}, + }, + // We set the memory limits generously because the distribution + // of the addon pods affect the memory usage on each node. + memLimits: framework.ResourceUsagePerContainer{ + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 80 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + }, + }, + } + + for _, testArg := range rTests { + itArg := testArg + + podsPerNode := itArg.podsPerNode + name := fmt.Sprintf("resource tracking for %d pods per node", podsPerNode) + + It(name, func() { + expectedCPU, expectedMemory := itArg.cpuLimits, itArg.memLimits + + createCadvisorPod(f) + rm = NewResourceCollector(containerStatsPollingPeriod) + rm.Start() + + By("Creating a batch of Pods") + pods := newTestPods(podsPerNode, ImageRegistry[pauseImage], "test_pod") + for _, pod := range pods { + f.PodClient().CreateSync(pod) + } + + // wait for a while to let the node be steady + time.Sleep(sleepAfterCreatePods) + + // Log once and flush the stats. + rm.LogLatest() + rm.Reset() + + By("Start monitoring resource usage") + // Periodically dump the cpu summary until the deadline is met. + // Note that without calling framework.ResourceMonitor.Reset(), the stats + // would occupy increasingly more memory. This should be fine + // for the current test duration, but we should reclaim the + // entries if we plan to monitor longer (e.g., 8 hours). + deadline := time.Now().Add(monitoringTime) + for time.Now().Before(deadline) { + timeLeft := deadline.Sub(time.Now()) + framework.Logf("Still running...%v left", timeLeft) + if timeLeft < reportingPeriod { + time.Sleep(timeLeft) + } else { + time.Sleep(reportingPeriod) + } + logPodsOnNodes(f.Client) + } + + By("Reporting overall resource usage") + logPodsOnNodes(f.Client) + + usagePerContainer, err := rm.GetLatest() + Expect(err).NotTo(HaveOccurred()) + + // TODO(random-liu): Remove the original log when we migrate to new perfdash + nodeName := framework.TestContext.NodeName + framework.Logf("%s", formatResourceUsageStats(usagePerContainer)) + + // Log perf result + usagePerNode := make(framework.ResourceUsagePerNode) + usagePerNode[nodeName] = usagePerContainer + + framework.PrintPerfData(framework.ResourceUsageToPerfData(usagePerNode)) + verifyMemoryLimits(f.Client, expectedMemory, usagePerNode) + + cpuSummary := rm.GetCPUSummary() + framework.Logf("%s", formatCPUSummary(cpuSummary)) + + // Log perf result + cpuSummaryPerNode := make(framework.NodesCPUSummary) + cpuSummaryPerNode[nodeName] = cpuSummary + framework.PrintPerfData(framework.CPUUsageToPerfData(cpuSummaryPerNode)) + verifyCPULimits(expectedCPU, cpuSummaryPerNode) + + // delete pods + By("Deleting a batch of pods") + deleteBatchPod(f, pods) + + rm.Stop() + + // tear down cadvisor + Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). + NotTo(HaveOccurred()) + Expect(framework.WaitForPodToDisappear(f.Client, ns, cadvisorPodName, labels.Everything(), + 3*time.Second, 10*time.Minute)). + NotTo(HaveOccurred()) + + time.Sleep(sleepAfterDeletePods) + }) + } + }) +}) + +type resourceTest struct { + podsPerNode int + cpuLimits framework.ContainersCPUSummary + memLimits framework.ResourceUsagePerContainer +} + +func verifyMemoryLimits(c *client.Client, expected framework.ResourceUsagePerContainer, actual framework.ResourceUsagePerNode) { + if expected == nil { + return + } + var errList []string + for nodeName, nodeSummary := range actual { + var nodeErrs []string + for cName, expectedResult := range expected { + container, ok := nodeSummary[cName] + if !ok { + nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName)) + continue + } + + expectedValue := expectedResult.MemoryRSSInBytes + actualValue := container.MemoryRSSInBytes + if expectedValue != 0 && actualValue > expectedValue { + nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d", + cName, expectedValue, actualValue)) + } + } + if len(nodeErrs) > 0 { + errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", "))) + heapStats, err := framework.GetKubeletHeapStats(c, nodeName) + if err != nil { + framework.Logf("Unable to get heap stats from %q", nodeName) + } else { + framework.Logf("Heap stats on %q\n:%v", nodeName, heapStats) + } + } + } + if len(errList) > 0 { + framework.Failf("Memory usage exceeding limits:\n %s", strings.Join(errList, "\n")) + } +} + +func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.NodesCPUSummary) { + if expected == nil { + return + } + var errList []string + for nodeName, perNodeSummary := range actual { + var nodeErrs []string + for cName, expectedResult := range expected { + perContainerSummary, ok := perNodeSummary[cName] + if !ok { + nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName)) + continue + } + for p, expectedValue := range expectedResult { + actualValue, ok := perContainerSummary[p] + if !ok { + nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing percentile %v", cName, p)) + continue + } + if actualValue > expectedValue { + nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected %.0fth%% usage < %.3f; got %.3f", + cName, p*100, expectedValue, actualValue)) + } + } + } + if len(nodeErrs) > 0 { + errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", "))) + } + } + if len(errList) > 0 { + framework.Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n")) + } +} + +func logPodsOnNodes(c *client.Client) { + nodeName := framework.TestContext.NodeName + podList, err := framework.GetKubeletRunningPods(c, nodeName) + if err != nil { + framework.Logf("Unable to retrieve kubelet pods for node %v", nodeName) + } + framework.Logf("%d pods are running on node %v", len(podList.Items), nodeName) +} diff --git a/test/e2e_node/resource_controller.go b/test/e2e_node/resource_controller.go new file mode 100644 index 00000000000..f643c82a658 --- /dev/null +++ b/test/e2e_node/resource_controller.go @@ -0,0 +1,377 @@ +/* +Copyright 2016 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing perissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "bytes" + "fmt" + "sort" + "strings" + "sync" + "text/tabwriter" + "time" + + cadvisorclient "github.com/google/cadvisor/client/v2" + cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/gomega" +) + +const ( + // resource monitoring + cadvisorImageName = "google/cadvisor:latest" + cadvisorPodName = "cadvisor" + cadvisorPort = 8090 +) + +var ( + systemContainers = map[string]string{ + //"root": "/", + //stats.SystemContainerMisc: "misc" + stats.SystemContainerKubelet: "kubelet", + stats.SystemContainerRuntime: "docker-daemon", + } +) + +type ResourceCollector struct { + client *cadvisorclient.Client + request *cadvisorapiv2.RequestOptions + + pollingInterval time.Duration + buffers map[string][]*framework.ContainerResourceUsage + lock sync.RWMutex + stopCh chan struct{} +} + +func NewResourceCollector(interval time.Duration) *ResourceCollector { + buffers := make(map[string][]*framework.ContainerResourceUsage) + return &ResourceCollector{ + pollingInterval: interval, + buffers: buffers, + } +} + +func (r *ResourceCollector) Start() { + wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { + var err error + r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort)) + if err == nil { + return true, nil + } + return false, err + }) + + Expect(r.client).NotTo(BeNil(), "cadvisor client not ready") + + r.request = &cadvisorapiv2.RequestOptions{IdType: "name", Count: 1, Recursive: false} + r.stopCh = make(chan struct{}) + + oldStatsMap := make(map[string]*cadvisorapiv2.ContainerStats) + go wait.Until(func() { r.collectStats(oldStatsMap) }, r.pollingInterval, r.stopCh) +} + +func (r *ResourceCollector) Stop() { + close(r.stopCh) +} + +func (r *ResourceCollector) Reset() { + r.lock.Lock() + defer r.lock.Unlock() + for _, name := range systemContainers { + r.buffers[name] = []*framework.ContainerResourceUsage{} + } +} + +func (r *ResourceCollector) GetCPUSummary() framework.ContainersCPUSummary { + result := make(framework.ContainersCPUSummary) + for key, name := range systemContainers { + data := r.GetBasicCPUStats(name) + result[key] = data + } + return result +} + +func (r *ResourceCollector) LogLatest() { + summary, err := r.GetLatest() + if err != nil { + framework.Logf("%v", err) + } + framework.Logf("%s", formatResourceUsageStats(summary)) +} + +func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.ContainerStats) { + for _, name := range systemContainers { + ret, err := r.client.Stats(name, r.request) + if err != nil { + framework.Logf("Error getting container stats, err: %v", err) + return + } + cStats, ok := ret["/"+name] + if !ok { + framework.Logf("Missing info/stats for container %q", name) + return + } + + newStats := cStats.Stats[0] + + if oldStats, ok := oldStatsMap[name]; ok && oldStats.Timestamp.Before(newStats.Timestamp) { + if oldStats.Timestamp.Equal(newStats.Timestamp) { + continue + } + r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, newStats)) + } + oldStatsMap[name] = newStats + } +} + +func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapiv2.ContainerStats) *framework.ContainerResourceUsage { + return &framework.ContainerResourceUsage{ + Name: name, + Timestamp: newStats.Timestamp, + CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()), + MemoryUsageInBytes: newStats.Memory.Usage, + MemoryWorkingSetInBytes: newStats.Memory.WorkingSet, + MemoryRSSInBytes: newStats.Memory.RSS, + CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp), + } +} + +func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, error) { + r.lock.RLock() + defer r.lock.RUnlock() + stats := make(framework.ResourceUsagePerContainer) + for key, name := range systemContainers { + contStats, ok := r.buffers[name] + if !ok || len(contStats) == 0 { + return nil, fmt.Errorf("Resource usage is not ready yet") + } + stats[key] = contStats[len(contStats)-1] + } + return stats, nil +} + +type resourceUsageByCPU []*framework.ContainerResourceUsage + +func (r resourceUsageByCPU) Len() int { return len(r) } +func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores } + +// The percentiles to report. +var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99} + +// GetBasicCPUStats returns the percentiles the cpu usage in cores for +// containerName. This method examines all data currently in the buffer. +func (r *ResourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 { + r.lock.RLock() + defer r.lock.RUnlock() + result := make(map[float64]float64, len(percentiles)) + usages := r.buffers[containerName] + sort.Sort(resourceUsageByCPU(usages)) + for _, q := range percentiles { + index := int(float64(len(usages))*q) - 1 + if index < 0 { + // We don't have enough data. + result[q] = 0 + continue + } + result[q] = usages[index].CPUUsageInCores + } + return result +} + +func formatResourceUsageStats(containerStats framework.ResourceUsagePerContainer) string { + // Example output: + // + // Resource usage for node "e2e-test-foo-minion-abcde": + // container cpu(cores) memory(MB) + // "/" 0.363 2942.09 + // "/docker-daemon" 0.088 521.80 + // "/kubelet" 0.086 424.37 + // "/system" 0.007 119.88 + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n") + for name, s := range containerStats { + fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024)) + } + w.Flush() + return fmt.Sprintf("Resource usage:\n%s", buf.String()) +} + +func formatCPUSummary(summary framework.ContainersCPUSummary) string { + // Example output for a node (the percentiles may differ): + // CPU usage of containers on node "e2e-test-foo-minion-0vj7": + // container 5th% 50th% 90th% 95th% + // "/" 0.051 0.159 0.387 0.455 + // "/runtime 0.000 0.000 0.146 0.166 + // "/kubelet" 0.036 0.053 0.091 0.154 + // "/misc" 0.001 0.001 0.001 0.002 + var summaryStrings []string + var header []string + header = append(header, "container") + for _, p := range percentiles { + header = append(header, fmt.Sprintf("%.0fth%%", p*100)) + } + + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "%s\n", strings.Join(header, "\t")) + + for _, containerName := range framework.TargetContainers() { + var s []string + s = append(s, fmt.Sprintf("%q", containerName)) + data, ok := summary[containerName] + for _, p := range percentiles { + value := "N/A" + if ok { + value = fmt.Sprintf("%.3f", data[p]) + } + s = append(s, value) + } + fmt.Fprintf(w, "%s\n", strings.Join(s, "\t")) + } + w.Flush() + summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers:\n%s", buf.String())) + + return strings.Join(summaryStrings, "\n") +} + +func createCadvisorPod(f *framework.Framework) { + f.PodClient().CreateSync(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: cadvisorPodName, + //Labels: map[string]string{"type": cadvisorPodType, "name": cadvisorPodName}, + }, + Spec: api.PodSpec{ + // Don't restart the Pod since it is expected to exit + RestartPolicy: api.RestartPolicyNever, + SecurityContext: &api.PodSecurityContext{ + HostNetwork: true, + }, + Containers: []api.Container{ + { + Image: cadvisorImageName, + Name: cadvisorPodName, + Ports: []api.ContainerPort{ + { + Name: "http", + HostPort: cadvisorPort, + ContainerPort: cadvisorPort, + Protocol: api.ProtocolTCP, + }, + }, + VolumeMounts: []api.VolumeMount{ + { + Name: "sys", + ReadOnly: true, + MountPath: "/sys", + }, + { + Name: "var-run", + ReadOnly: false, + MountPath: "/var/run", + }, + { + Name: "docker", + ReadOnly: true, + MountPath: "/var/lib/docker/", + }, + { + Name: "rootfs", + ReadOnly: true, + MountPath: "/rootfs", + }, + }, + Args: []string{ + "--profiling", + "--housekeeping_interval=1s", + fmt.Sprintf("--port=%d", cadvisorPort), + }, + }, + }, + Volumes: []api.Volume{ + { + Name: "rootfs", + VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/"}}, + }, + { + Name: "var-run", + VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/run"}}, + }, + { + Name: "sys", + VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/sys"}}, + }, + { + Name: "docker", + VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/lib/docker"}}, + }, + }, + }, + }) +} + +func deleteBatchPod(f *framework.Framework, pods []*api.Pod) { + ns := f.Namespace.Name + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func(pod *api.Pod) { + defer wg.Done() + + err := f.Client.Pods(ns).Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(60)) + Expect(err).NotTo(HaveOccurred()) + + Expect(framework.WaitForPodToDisappear(f.Client, ns, pod.ObjectMeta.Name, labels.Everything(), + 30*time.Second, 10*time.Minute)). + NotTo(HaveOccurred()) + }(pod) + } + wg.Wait() + return +} + +func newTestPods(podsPerNode int, imageName, podType string) []*api.Pod { + var pods []*api.Pod + for i := 0; i < podsPerNode; i++ { + podName := "test-" + string(util.NewUUID()) + labels := map[string]string{ + "type": podType, + "name": podName, + } + pods = append(pods, + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: podName, + Labels: labels, + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyNever, + Containers: []api.Container{ + { + Image: imageName, + Name: podName, + }, + }, + }, + }) + } + return pods +} From 32e1db16c5fa489fbf97d6a595276518cdbd056c Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Thu, 28 Jul 2016 19:13:55 -0700 Subject: [PATCH 2/4] Update Godeps to include cadvisor client v2 --- Godeps/Godeps.json | 5 + Godeps/LICENSES | 198 ++++++++++++++++++ .../google/cadvisor/client/v2/README.md | 69 ++++++ .../google/cadvisor/client/v2/client.go | 176 ++++++++++++++++ 4 files changed, 448 insertions(+) create mode 100644 vendor/github.com/google/cadvisor/client/v2/README.md create mode 100644 vendor/github.com/google/cadvisor/client/v2/client.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 642d97ebe54..8c8858da203 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1043,6 +1043,11 @@ "Comment": "v0.23.2-63-ge47efa0", "Rev": "e47efa0e8af65e9a2a2eb2ce955e156eac067852" }, + { + "ImportPath": "github.com/google/cadvisor/client/v2", + "Comment": "v0.23.2-63-ge47efa0", + "Rev": "e47efa0e8af65e9a2a2eb2ce955e156eac067852" + }, { "ImportPath": "github.com/google/cadvisor/collector", "Comment": "v0.23.2-63-ge47efa0", diff --git a/Godeps/LICENSES b/Godeps/LICENSES index af322ca35d5..0c358708663 100644 --- a/Godeps/LICENSES +++ b/Godeps/LICENSES @@ -33571,6 +33571,204 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ================================================================================ +================================================================================ += vendor/github.com/google/cadvisor/client/v2 licensed under: = + + Copyright 2014 The cAdvisor Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + += vendor/github.com/google/cadvisor/LICENSE e7790b946bfacb700e8a8f2baedb3205 - +================================================================================ + + ================================================================================ = vendor/github.com/google/cadvisor/collector licensed under: = diff --git a/vendor/github.com/google/cadvisor/client/v2/README.md b/vendor/github.com/google/cadvisor/client/v2/README.md new file mode 100644 index 00000000000..64cc30d5ce6 --- /dev/null +++ b/vendor/github.com/google/cadvisor/client/v2/README.md @@ -0,0 +1,69 @@ +# Example REST API Client + +This is an implementation of a cAdvisor REST API in Go. You can use it like this: + +```go +client, err := client.NewClient("http://192.168.59.103:8080/") +``` + +Obviously, replace the URL with the path to your actual cAdvisor REST endpoint. + + +### MachineInfo + +```go +client.MachineInfo() +``` + +There is no v2 MachineInfo API, so the v2 client exposes the [v1 MachineInfo](../../info/v1/machine.go#L131) + +``` +(*v1.MachineInfo)(0xc208022b10)({ + NumCores: (int) 4, + MemoryCapacity: (int64) 2106028032, + Filesystems: ([]v1.FsInfo) (len=1 cap=4) { + (v1.FsInfo) { + Device: (string) (len=9) "/dev/sda1", + Capacity: (uint64) 19507089408 + } + } +}) +``` + +You can see the full specification of the [MachineInfo struct in the source](../../info/v1/machine.go#L131) + +### VersionInfo + +```go +client.VersionInfo() +``` + +This method returns the cAdvisor version. + +### Attributes + +```go +client.Attributes() +``` + +This method returns a [cadvisor/info/v2/Attributes](../../info/v2/machine.go#L24) struct with all the fields filled in. Attributes includes hardware attributes (as returned by MachineInfo) as well as software attributes (eg. software versions). Here is an example return value: + +``` +(*v2.Attributes)({ + KernelVersion: (string) (len=17) "3.13.0-44-generic" + ContainerOsVersion: (string) (len=18) "Ubuntu 14.04.1 LTS" + DockerVersion: (string) (len=9) "1.5.0-rc4" + CadvisorVersion: (string) (len=6) "0.10.1" + NumCores: (int) 4, + MemoryCapacity: (int64) 2106028032, + Filesystems: ([]v2.FsInfo) (len=1 cap=4) { + (v2.FsInfo) { + Device: (string) (len=9) "/dev/sda1", + Capacity: (uint64) 19507089408 + } + } +}) +``` + +You can see the full specification of the [Attributes struct in the source](../../info/v2/machine.go#L24) + diff --git a/vendor/github.com/google/cadvisor/client/v2/client.go b/vendor/github.com/google/cadvisor/client/v2/client.go new file mode 100644 index 00000000000..716517c3381 --- /dev/null +++ b/vendor/github.com/google/cadvisor/client/v2/client.go @@ -0,0 +1,176 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Client library to programmatically access cAdvisor API. +package v2 + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "strings" + + v1 "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/info/v2" +) + +// Client represents the base URL for a cAdvisor client. +type Client struct { + baseUrl string +} + +// NewClient returns a new client with the specified base URL. +func NewClient(url string) (*Client, error) { + if !strings.HasSuffix(url, "/") { + url += "/" + } + + return &Client{ + baseUrl: fmt.Sprintf("%sapi/v2.1/", url), + }, nil +} + +// MachineInfo returns the JSON machine information for this client. +// A non-nil error result indicates a problem with obtaining +// the JSON machine information data. +func (self *Client) MachineInfo() (minfo *v1.MachineInfo, err error) { + u := self.machineInfoUrl() + ret := new(v1.MachineInfo) + if err = self.httpGetJsonData(ret, nil, u, "machine info"); err != nil { + return + } + minfo = ret + return +} + +// MachineStats returns the JSON machine statistics for this client. +// A non-nil error result indicates a problem with obtaining +// the JSON machine information data. +func (self *Client) MachineStats() ([]v2.MachineStats, error) { + var ret []v2.MachineStats + u := self.machineStatsUrl() + err := self.httpGetJsonData(&ret, nil, u, "machine stats") + return ret, err +} + +// VersionInfo returns the version info for cAdvisor. +func (self *Client) VersionInfo() (version string, err error) { + u := self.versionInfoUrl() + version, err = self.httpGetString(u, "version info") + return +} + +// Attributes returns hardware and software attributes of the machine. +func (self *Client) Attributes() (attr *v2.Attributes, err error) { + u := self.attributesUrl() + ret := new(v2.Attributes) + if err = self.httpGetJsonData(ret, nil, u, "attributes"); err != nil { + return + } + attr = ret + return +} + +// Stats returns stats for the requested container. +func (self *Client) Stats(name string, request *v2.RequestOptions) (map[string]v2.ContainerInfo, error) { + u := self.statsUrl(name) + ret := make(map[string]v2.ContainerInfo) + data := url.Values{ + "type": []string{request.IdType}, + "count": []string{strconv.Itoa(request.Count)}, + "recursive": []string{strconv.FormatBool(request.Recursive)}, + } + + u = fmt.Sprintf("%s?%s", u, data.Encode()) + if err := self.httpGetJsonData(&ret, nil, u, "stats"); err != nil { + return nil, err + } + return ret, nil +} + +func (self *Client) machineInfoUrl() string { + return self.baseUrl + path.Join("machine") +} + +func (self *Client) machineStatsUrl() string { + return self.baseUrl + path.Join("machinestats") +} + +func (self *Client) versionInfoUrl() string { + return self.baseUrl + path.Join("version") +} + +func (self *Client) attributesUrl() string { + return self.baseUrl + path.Join("attributes") +} + +func (self *Client) statsUrl(name string) string { + return self.baseUrl + path.Join("stats", name) +} + +func (self *Client) httpGetResponse(postData interface{}, urlPath, infoName string) ([]byte, error) { + var resp *http.Response + var err error + + if postData != nil { + data, marshalErr := json.Marshal(postData) + if marshalErr != nil { + return nil, fmt.Errorf("unable to marshal data: %v", marshalErr) + } + resp, err = http.Post(urlPath, "application/json", bytes.NewBuffer(data)) + } else { + resp, err = http.Get(urlPath) + } + if err != nil { + return nil, fmt.Errorf("unable to post %q to %q: %v", infoName, urlPath, err) + } + if resp == nil { + return nil, fmt.Errorf("received empty response for %q from %q", infoName, urlPath) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + err = fmt.Errorf("unable to read all %q from %q: %v", infoName, urlPath, err) + return nil, err + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("request %q failed with error: %q", urlPath, strings.TrimSpace(string(body))) + } + return body, nil +} + +func (self *Client) httpGetString(url, infoName string) (string, error) { + body, err := self.httpGetResponse(nil, url, infoName) + if err != nil { + return "", err + } + return string(body), nil +} + +func (self *Client) httpGetJsonData(data, postData interface{}, url, infoName string) error { + body, err := self.httpGetResponse(postData, url, infoName) + if err != nil { + return err + } + if err = json.Unmarshal(body, data); err != nil { + err = fmt.Errorf("unable to unmarshal %q (Body: %q) from %q with error: %v", infoName, string(body), url, err) + return err + } + return nil +} From f3f3e965cc993ec23d75f96318fd1ffd3eea59bd Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Fri, 29 Jul 2016 14:25:49 -0700 Subject: [PATCH 3/4] modify resource_collector.go to get container names of kubelet and docker dynamically --- test/e2e_node/density_test.go | 114 +++--------- test/e2e_node/e2e_service.go | 7 +- test/e2e_node/resource_controller.go | 171 ++++++++++++++++-- ...et_perf_test.go => resource_usage_test.go} | 98 ++++------ 4 files changed, 217 insertions(+), 173 deletions(-) rename test/e2e_node/{kubelet_perf_test.go => resource_usage_test.go} (72%) diff --git a/test/e2e_node/density_test.go b/test/e2e_node/density_test.go index 4d9a0371e53..7df004114bd 100644 --- a/test/e2e_node/density_test.go +++ b/test/e2e_node/density_test.go @@ -1,9 +1,12 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2015 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,15 +45,13 @@ const ( kubeletAddr = "localhost:10255" ) -var _ = framework.KubeDescribe("Density", func() { +var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() { const ( // the data collection time of `resource collector' and the standalone cadvisor // is not synchronizated. Therefore `resource collector' may miss data or // collect duplicated data monitoringInterval = 500 * time.Millisecond - sleepBeforeEach = 30 * time.Second sleepBeforeCreatePods = 30 * time.Second - sleepAfterDeletePods = 60 * time.Second ) var ( @@ -67,7 +68,6 @@ var _ = framework.KubeDescribe("Density", func() { }) AfterEach(func() { - time.Sleep(sleepAfterDeletePods) }) Context("create a batch of pods", func() { @@ -76,41 +76,21 @@ var _ = framework.KubeDescribe("Density", func() { podsNr: 10, interval: 0 * time.Millisecond, cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.20}, - stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.50}, + stats.SystemContainerKubelet: {0.50: 0.20, 0.95: 0.30}, + stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60}, }, memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 250 * 1024 * 1024}, + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024}, }, // percentile limit of single pod startup latency podStartupLimits: framework.LatencyMetric{ - Perc50: 7 * time.Second, - Perc90: 10 * time.Second, - Perc99: 15 * time.Second, + Perc50: 10 * time.Second, + Perc90: 15 * time.Second, + Perc99: 20 * time.Second, }, // upbound of startup latency of a batch of pods - podBatchStartupLimit: 20 * time.Second, - }, - { - podsNr: 30, - interval: 0 * time.Millisecond, - cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.35}, - stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.70}, - }, - memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, - }, - // percentile limit of single pod startup latency - podStartupLimits: framework.LatencyMetric{ - Perc50: 30 * time.Second, - Perc90: 35 * time.Second, - Perc99: 40 * time.Second, - }, - // upbound of startup latency of a batch of pods - podBatchStartupLimit: 90 * time.Second, + podBatchStartupLimit: 25 * time.Second, }, } @@ -139,7 +119,7 @@ var _ = framework.KubeDescribe("Density", func() { controller := newInformerWatchPod(f, mutex, watchTimes, podType) go controller.Run(stopCh) - // Zhou: In test we see kubelet starts while it is busy on sth, as a result `syncLoop' + // Zhou: In test we see kubelet starts while it is busy on something, as a result `syncLoop' // does not response to pod creation immediately. Creating the first pod has a delay around 5s. // The node status has been `ready' so `wait and check node being ready' does not help here. // Now wait here for a grace period to have `syncLoop' be ready @@ -153,14 +133,14 @@ var _ = framework.KubeDescribe("Density", func() { // it returns a map[`pod name']`creation time' as the creation timestamps createTimes := createBatchPodWithRateControl(f, pods, itArg.interval) - By("Waiting for all Pods begin observed by the watch...") - // checks every 10s util all pods are running. it timeouts ater 10min + By("Waiting for all Pods to be observed by the watch...") + // checks every 10s util all pods are running. it times out ater 10min Eventually(func() bool { return len(watchTimes) == itArg.podsNr }, 10*time.Minute, 10*time.Second).Should(BeTrue()) if len(watchTimes) < itArg.podsNr { - framework.Failf("Timeout reached waiting for all Pods being observed by the watch.") + framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.") } // stop the watching controller, and the resource collector @@ -204,18 +184,6 @@ var _ = framework.KubeDescribe("Density", func() { // verify resource By("Verifying resource") verifyResource(f, testArg, rm) - - // delete pods - By("Deleting a batch of pods") - deleteBatchPod(f, pods) - - // tear down cadvisor - Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). - NotTo(HaveOccurred()) - - Eventually(func() error { - return checkPodDeleted(f, cadvisorPodName) - }, 10*time.Minute, time.Second*3).Should(BeNil()) }) } }) @@ -226,34 +194,17 @@ var _ = framework.KubeDescribe("Density", func() { podsNr: 10, bgPodsNr: 10, cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.12}, - stats.SystemContainerRuntime: {0.50: 0.16, 0.95: 0.20}, + stats.SystemContainerKubelet: {0.50: 0.20, 0.95: 0.25}, + stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60}, }, memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024}, }, podStartupLimits: framework.LatencyMetric{ - Perc50: 1500 * time.Millisecond, - Perc90: 2500 * time.Millisecond, - Perc99: 3500 * time.Millisecond, - }, - }, - { - podsNr: 10, - bgPodsNr: 30, - cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.15}, - stats.SystemContainerRuntime: {0.50: 0.22, 0.95: 0.27}, - }, - memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, - }, - podStartupLimits: framework.LatencyMetric{ - Perc50: 1500 * time.Millisecond, - Perc90: 2500 * time.Millisecond, - Perc99: 3500 * time.Millisecond, + Perc50: 3000 * time.Millisecond, + Perc90: 4000 * time.Millisecond, + Perc99: 5000 * time.Millisecond, }, }, } @@ -273,7 +224,7 @@ var _ = framework.KubeDescribe("Density", func() { // all pods are running when it returns f.PodClient().CreateBatch(bgPods) - //time.Sleep(sleepBeforeCreatePods) + time.Sleep(sleepBeforeCreatePods) // starting resource monitoring rm.Start() @@ -290,18 +241,6 @@ var _ = framework.KubeDescribe("Density", func() { // verify resource By("Verifying resource") verifyResource(f, testArg, rm) - - // delete pods - By("Deleting a batch of pods") - deleteBatchPod(f, append(bgPods, testPods...)) - - // tear down cadvisor - Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). - NotTo(HaveOccurred()) - - Eventually(func() error { - return checkPodDeleted(f, cadvisorPodName) - }, 10*time.Minute, time.Second*3).Should(BeNil()) }) } }) @@ -309,7 +248,8 @@ var _ = framework.KubeDescribe("Density", func() { type DensityTest struct { // number of pods - podsNr int + podsNr int + // number of background pods bgPodsNr int // interval between creating pod (rate control) interval time.Duration diff --git a/test/e2e_node/e2e_service.go b/test/e2e_node/e2e_service.go index 125cdaebf49..e26e2702a2a 100644 --- a/test/e2e_node/e2e_service.go +++ b/test/e2e_node/e2e_service.go @@ -239,6 +239,10 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) { } } else { cmdArgs = append(cmdArgs, getKubeletServerBin()) + cmdArgs = append(cmdArgs, + "--kubelet-cgroups=/kubelet", + "--runtime-cgroups=/docker-daemon", + ) } cmdArgs = append(cmdArgs, "--api-servers", "http://127.0.0.1:8080", @@ -252,9 +256,6 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) { "--file-check-frequency", "10s", // Check file frequently so tests won't wait too long "--v", LOG_VERBOSITY_LEVEL, "--logtostderr", "--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller. - "--cgroup-root=/", - "--runtime-cgroups=/docker-daemon", - "--kubelet-cgroups=/kubelet", ) if es.cgroupsPerQOS { cmdArgs = append(cmdArgs, diff --git a/test/e2e_node/resource_controller.go b/test/e2e_node/resource_controller.go index f643c82a658..388a22d6655 100644 --- a/test/e2e_node/resource_controller.go +++ b/test/e2e_node/resource_controller.go @@ -1,13 +1,16 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2015 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing perissions and +See the License for the specific language governing permissions and limitations under the License. */ @@ -16,7 +19,12 @@ package e2e_node import ( "bytes" "fmt" + "io/ioutil" + "log" + "os" + "os/exec" "sort" + "strconv" "strings" "sync" "text/tabwriter" @@ -24,10 +32,12 @@ import ( cadvisorclient "github.com/google/cadvisor/client/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "github.com/opencontainers/runc/libcontainer/cgroups" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" @@ -39,15 +49,12 @@ const ( cadvisorImageName = "google/cadvisor:latest" cadvisorPodName = "cadvisor" cadvisorPort = 8090 + // housekeeping interval of Cadvisor (second) + houseKeepingInterval = 1 ) var ( - systemContainers = map[string]string{ - //"root": "/", - //stats.SystemContainerMisc: "misc" - stats.SystemContainerKubelet: "kubelet", - stats.SystemContainerRuntime: "docker-daemon", - } + systemContainers map[string]string ) type ResourceCollector struct { @@ -69,6 +76,18 @@ func NewResourceCollector(interval time.Duration) *ResourceCollector { } func (r *ResourceCollector) Start() { + // Get the cgroup containers for kubelet and docker + kubeletContainer, err := getContainerNameForProcess(kubeletProcessName, "") + dockerContainer, err := getContainerNameForProcess(dockerProcessName, dockerPidFile) + if err == nil { + systemContainers = map[string]string{ + stats.SystemContainerKubelet: kubeletContainer, + stats.SystemContainerRuntime: dockerContainer, + } + } else { + framework.Failf("Failed to get docker container name in test-e2e-node resource collector.") + } + wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { var err error r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort)) @@ -123,7 +142,7 @@ func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.C framework.Logf("Error getting container stats, err: %v", err) return } - cStats, ok := ret["/"+name] + cStats, ok := ret[name] if !ok { framework.Logf("Missing info/stats for container %q", name) return @@ -160,7 +179,7 @@ func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, er for key, name := range systemContainers { contStats, ok := r.buffers[name] if !ok || len(contStats) == 0 { - return nil, fmt.Errorf("Resource usage is not ready yet") + return nil, fmt.Errorf("Resource usage of %s:%s is not ready yet", key, name) } stats[key] = contStats[len(contStats)-1] } @@ -257,11 +276,10 @@ func createCadvisorPod(f *framework.Framework) { f.PodClient().CreateSync(&api.Pod{ ObjectMeta: api.ObjectMeta{ Name: cadvisorPodName, - //Labels: map[string]string{"type": cadvisorPodType, "name": cadvisorPodName}, }, Spec: api.PodSpec{ - // Don't restart the Pod since it is expected to exit - RestartPolicy: api.RestartPolicyNever, + // It uses a host port for the tests to collect data. + // Currently we can not use port mapping in test-e2e-node. SecurityContext: &api.PodSecurityContext{ HostNetwork: true, }, @@ -301,7 +319,7 @@ func createCadvisorPod(f *framework.Framework) { }, Args: []string{ "--profiling", - "--housekeeping_interval=1s", + fmt.Sprintf("--housekeeping_interval=%ds", houseKeepingInterval), fmt.Sprintf("--port=%d", cadvisorPort), }, }, @@ -336,7 +354,7 @@ func deleteBatchPod(f *framework.Framework, pods []*api.Pod) { go func(pod *api.Pod) { defer wg.Done() - err := f.Client.Pods(ns).Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(60)) + err := f.Client.Pods(ns).Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(30)) Expect(err).NotTo(HaveOccurred()) Expect(framework.WaitForPodToDisappear(f.Client, ns, pod.ObjectMeta.Name, labels.Everything(), @@ -348,9 +366,9 @@ func deleteBatchPod(f *framework.Framework, pods []*api.Pod) { return } -func newTestPods(podsPerNode int, imageName, podType string) []*api.Pod { +func newTestPods(numPods int, imageName, podType string) []*api.Pod { var pods []*api.Pod - for i := 0; i < podsPerNode; i++ { + for i := 0; i < numPods; i++ { podName := "test-" + string(util.NewUUID()) labels := map[string]string{ "type": podType, @@ -363,7 +381,8 @@ func newTestPods(podsPerNode int, imageName, podType string) []*api.Pod { Labels: labels, }, Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicyNever, + // ToDo: restart policy is always + // check whether pods restart at the end of tests Containers: []api.Container{ { Image: imageName, @@ -375,3 +394,119 @@ func newTestPods(podsPerNode int, imageName, podType string) []*api.Pod { } return pods } + +// code for getting container name of docker +const ( + kubeletProcessName = "kubelet" + dockerProcessName = "docker" + dockerPidFile = "/var/run/docker.pid" + containerdProcessName = "docker-containerd" + containerdPidFile = "/run/docker/libcontainerd/docker-containerd.pid" +) + +func getContainerNameForProcess(name, pidFile string) (string, error) { + pids, err := getPidsForProcess(name, pidFile) + if err != nil { + return "", fmt.Errorf("failed to detect process id for %q - %v", name, err) + } + if len(pids) == 0 { + return "", nil + } + cont, err := getContainer(pids[0]) + if err != nil { + return "", err + } + return cont, nil +} + +func getPidFromPidFile(pidFile string) (int, error) { + file, err := os.Open(pidFile) + if err != nil { + return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err) + } + defer file.Close() + + data, err := ioutil.ReadAll(file) + if err != nil { + return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err) + } + + pid, err := strconv.Atoi(string(data)) + if err != nil { + return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err) + } + + return pid, nil +} + +func getPidsForProcess(name, pidFile string) ([]int, error) { + if len(pidFile) > 0 { + if pid, err := getPidFromPidFile(pidFile); err == nil { + return []int{pid}, nil + } else { + // log the error and fall back to pidof + runtime.HandleError(err) + } + } + + out, err := exec.Command("pidof", name).Output() + if err != nil { + return []int{}, fmt.Errorf("failed to find pid of %q: %v", name, err) + } + + // The output of pidof is a list of pids. + pids := []int{} + for _, pidStr := range strings.Split(strings.TrimSpace(string(out)), " ") { + pid, err := strconv.Atoi(pidStr) + if err != nil { + continue + } + pids = append(pids, pid) + } + return pids, nil +} + +// getContainer returns the cgroup associated with the specified pid. +// It enforces a unified hierarchy for memory and cpu cgroups. +// On systemd environments, it uses the name=systemd cgroup for the specified pid. +func getContainer(pid int) (string, error) { + cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + + cpu, found := cgs["cpu"] + if !found { + return "", cgroups.NewNotFoundError("cpu") + } + memory, found := cgs["memory"] + if !found { + return "", cgroups.NewNotFoundError("memory") + } + + // since we use this container for accounting, we need to ensure its a unified hierarchy. + if cpu != memory { + return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory) + } + + // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls) + // cpu and memory accounting is off by default, users may choose to enable it per unit or globally. + // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true). + // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true + // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal. + // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory + // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers. + // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet. + // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally). + if systemd, found := cgs["name=systemd"]; found { + if systemd != cpu { + log.Printf("CPUAccounting not enabled for pid: %d", pid) + } + if systemd != memory { + log.Printf("MemoryAccounting not enabled for pid: %d", pid) + } + return systemd, nil + } + + return cpu, nil +} diff --git a/test/e2e_node/kubelet_perf_test.go b/test/e2e_node/resource_usage_test.go similarity index 72% rename from test/e2e_node/kubelet_perf_test.go rename to test/e2e_node/resource_usage_test.go index ef562afe90c..9c30da75fa8 100644 --- a/test/e2e_node/kubelet_perf_test.go +++ b/test/e2e_node/resource_usage_test.go @@ -1,9 +1,12 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2015 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,36 +21,33 @@ import ( "strings" "time" - "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { +var _ = framework.KubeDescribe("Resource-usage [Serial] [Slow]", func() { const ( // Interval to poll /stats/container on a node containerStatsPollingPeriod = 10 * time.Second // The monitoring time for one test. - monitoringTime = 6 * time.Minute + monitoringTime = 10 * time.Minute // The periodic reporting period. - reportingPeriod = 3 * time.Minute + reportingPeriod = 5 * time.Minute sleepAfterCreatePods = 10 * time.Second - sleepAfterDeletePods = 120 * time.Second ) var ( ns string - rm *ResourceCollector + rc *ResourceCollector om *framework.RuntimeOperationMonitor ) - f := framework.NewDefaultFramework("kubelet-perf") + f := framework.NewDefaultFramework("resource-usage") BeforeEach(func() { ns = f.Namespace.Name @@ -59,45 +59,22 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) }) + // This test measures and verifies the steady resource usage of node is within limit + // It collects data from a standalone Cadvisor with housekeeping interval 1s. + // It verifies CPU percentiles and the lastest memory usage. Context("regular resource usage tracking", func() { rTests := []resourceTest{ { - podsPerNode: 0, + podsPerNode: 10, cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.06, 0.95: 0.08}, - stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.06}, + stats.SystemContainerKubelet: {0.50: 0.25, 0.95: 0.30}, + stats.SystemContainerRuntime: {0.50: 0.30, 0.95: 0.40}, }, // We set the memory limits generously because the distribution // of the addon pods affect the memory usage on each node. memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 85 * 1024 * 1024}, - }, - }, - { - podsPerNode: 35, - cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.14}, - stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.07}, - }, - // We set the memory limits generously because the distribution - // of the addon pods affect the memory usage on each node. - memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 150 * 1024 * 1024}, - }, - }, - { - podsPerNode: 100, - cpuLimits: framework.ContainersCPUSummary{ - stats.SystemContainerKubelet: {0.50: 0.17, 0.95: 0.22}, - stats.SystemContainerRuntime: {0.50: 0.06, 0.95: 0.09}, - }, - // We set the memory limits generously because the distribution - // of the addon pods affect the memory usage on each node. - memLimits: framework.ResourceUsagePerContainer{ - stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 80 * 1024 * 1024}, - stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024}, + stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024}, + stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024}, }, }, } @@ -111,9 +88,13 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { It(name, func() { expectedCPU, expectedMemory := itArg.cpuLimits, itArg.memLimits + // The test collects resource usage from a standalone Cadvisor pod. + // The Cadvsior of Kubelet has a housekeeping interval of 10s, which is too long to + // show the resource usage spikes. But changing its interval increases the overhead + // of kubelet. Hence we use a Cadvisor pod. createCadvisorPod(f) - rm = NewResourceCollector(containerStatsPollingPeriod) - rm.Start() + rc = NewResourceCollector(containerStatsPollingPeriod) + rc.Start() By("Creating a batch of Pods") pods := newTestPods(podsPerNode, ImageRegistry[pauseImage], "test_pod") @@ -125,8 +106,8 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { time.Sleep(sleepAfterCreatePods) // Log once and flush the stats. - rm.LogLatest() - rm.Reset() + rc.LogLatest() + rc.Reset() By("Start monitoring resource usage") // Periodically dump the cpu summary until the deadline is met. @@ -143,13 +124,15 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { } else { time.Sleep(reportingPeriod) } - logPodsOnNodes(f.Client) + logPodsOnNode(f.Client) } - By("Reporting overall resource usage") - logPodsOnNodes(f.Client) + rc.Stop() - usagePerContainer, err := rm.GetLatest() + By("Reporting overall resource usage") + logPodsOnNode(f.Client) + + usagePerContainer, err := rc.GetLatest() Expect(err).NotTo(HaveOccurred()) // TODO(random-liu): Remove the original log when we migrate to new perfdash @@ -163,7 +146,7 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { framework.PrintPerfData(framework.ResourceUsageToPerfData(usagePerNode)) verifyMemoryLimits(f.Client, expectedMemory, usagePerNode) - cpuSummary := rm.GetCPUSummary() + cpuSummary := rc.GetCPUSummary() framework.Logf("%s", formatCPUSummary(cpuSummary)) // Log perf result @@ -171,21 +154,6 @@ var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() { cpuSummaryPerNode[nodeName] = cpuSummary framework.PrintPerfData(framework.CPUUsageToPerfData(cpuSummaryPerNode)) verifyCPULimits(expectedCPU, cpuSummaryPerNode) - - // delete pods - By("Deleting a batch of pods") - deleteBatchPod(f, pods) - - rm.Stop() - - // tear down cadvisor - Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))). - NotTo(HaveOccurred()) - Expect(framework.WaitForPodToDisappear(f.Client, ns, cadvisorPodName, labels.Everything(), - 3*time.Second, 10*time.Minute)). - NotTo(HaveOccurred()) - - time.Sleep(sleepAfterDeletePods) }) } }) @@ -267,7 +235,7 @@ func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.N } } -func logPodsOnNodes(c *client.Client) { +func logPodsOnNode(c *client.Client) { nodeName := framework.TestContext.NodeName podList, err := framework.GetKubeletRunningPods(c, nodeName) if err != nil { From 04f83c79e7cfebe5c625f091ef151a6de12d550b Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Tue, 2 Aug 2016 09:22:14 -0700 Subject: [PATCH 4/4] change the package of uuid in resource_controller --- test/e2e_node/resource_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e_node/resource_controller.go b/test/e2e_node/resource_controller.go index 388a22d6655..e884add06ee 100644 --- a/test/e2e_node/resource_controller.go +++ b/test/e2e_node/resource_controller.go @@ -36,8 +36,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" @@ -369,7 +369,7 @@ func deleteBatchPod(f *framework.Framework, pods []*api.Pod) { func newTestPods(numPods int, imageName, podType string) []*api.Pod { var pods []*api.Pod for i := 0; i < numPods; i++ { - podName := "test-" + string(util.NewUUID()) + podName := "test-" + string(uuid.NewUUID()) labels := map[string]string{ "type": podType, "name": podName,