Add density and resource performance test to test-node-e2e

This commit is contained in:
Zhou Fang 2016-07-28 19:14:43 -07:00
parent 68def062e2
commit df55e8fb70
4 changed files with 1147 additions and 0 deletions

View File

@ -0,0 +1,490 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"errors"
"fmt"
"sort"
"strconv"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
controllerframework "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/metrics"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
kubeletAddr = "localhost:10255"
)
var _ = framework.KubeDescribe("Density", func() {
const (
// the data collection time of `resource collector' and the standalone cadvisor
// is not synchronizated. Therefore `resource collector' may miss data or
// collect duplicated data
monitoringInterval = 500 * time.Millisecond
sleepBeforeEach = 30 * time.Second
sleepBeforeCreatePods = 30 * time.Second
sleepAfterDeletePods = 60 * time.Second
)
var (
ns string
nodeName string
)
f := framework.NewDefaultFramework("density-test")
podType := "density_test_pod"
BeforeEach(func() {
ns = f.Namespace.Name
nodeName = framework.TestContext.NodeName
})
AfterEach(func() {
time.Sleep(sleepAfterDeletePods)
})
Context("create a batch of pods", func() {
densityTests := []DensityTest{
{
podsNr: 10,
interval: 0 * time.Millisecond,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.20},
stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.50},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 250 * 1024 * 1024},
},
// percentile limit of single pod startup latency
podStartupLimits: framework.LatencyMetric{
Perc50: 7 * time.Second,
Perc90: 10 * time.Second,
Perc99: 15 * time.Second,
},
// upbound of startup latency of a batch of pods
podBatchStartupLimit: 20 * time.Second,
},
{
podsNr: 30,
interval: 0 * time.Millisecond,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.35},
stats.SystemContainerRuntime: {0.50: 0.10, 0.95: 0.70},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024},
},
// percentile limit of single pod startup latency
podStartupLimits: framework.LatencyMetric{
Perc50: 30 * time.Second,
Perc90: 35 * time.Second,
Perc99: 40 * time.Second,
},
// upbound of startup latency of a batch of pods
podBatchStartupLimit: 90 * time.Second,
},
}
for _, testArg := range densityTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval",
itArg.podsNr, itArg.interval), func() {
var (
mutex = &sync.Mutex{}
watchTimes = make(map[string]unversioned.Time, 0)
stopCh = make(chan struct{})
)
// create specifications of the test pods
pods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType)
// start a standalone cadvisor pod
// it uses `createSync', so the pod is running when it returns
createCadvisorPod(f)
// `resource collector' monitoring fine-grain CPU/memory usage by a standalone Cadvisor with
// 1s housingkeeping interval
rm := NewResourceCollector(monitoringInterval)
// the controller watches the change of pod status
controller := newInformerWatchPod(f, mutex, watchTimes, podType)
go controller.Run(stopCh)
// Zhou: In test we see kubelet starts while it is busy on sth, as a result `syncLoop'
// does not response to pod creation immediately. Creating the first pod has a delay around 5s.
// The node status has been `ready' so `wait and check node being ready' does not help here.
// Now wait here for a grace period to have `syncLoop' be ready
time.Sleep(sleepBeforeCreatePods)
// the density test only monitors the overhead of creating pod
// or start earliest and call `rm.Reset()' here to clear the buffer
rm.Start()
By("Creating a batch of pods")
// it returns a map[`pod name']`creation time' as the creation timestamps
createTimes := createBatchPodWithRateControl(f, pods, itArg.interval)
By("Waiting for all Pods begin observed by the watch...")
// checks every 10s util all pods are running. it timeouts ater 10min
Eventually(func() bool {
return len(watchTimes) == itArg.podsNr
}, 10*time.Minute, 10*time.Second).Should(BeTrue())
if len(watchTimes) < itArg.podsNr {
framework.Failf("Timeout reached waiting for all Pods being observed by the watch.")
}
// stop the watching controller, and the resource collector
close(stopCh)
rm.Stop()
// data analyis
var (
firstCreate unversioned.Time
lastRunning unversioned.Time
init = true
e2eLags = make([]framework.PodLatencyData, 0)
)
for name, create := range createTimes {
watch, ok := watchTimes[name]
Expect(ok).To(Equal(true))
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
if !init {
if firstCreate.Time.After(create.Time) {
firstCreate = create
}
if lastRunning.Time.Before(watch.Time) {
lastRunning = watch
}
} else {
init = false
firstCreate, lastRunning = create, watch
}
}
sort.Sort(framework.LatencySlice(e2eLags))
// verify latency
By("Verifying latency")
verifyLatency(lastRunning.Time.Sub(firstCreate.Time), e2eLags, itArg)
// verify resource
By("Verifying resource")
verifyResource(f, testArg, rm)
// delete pods
By("Deleting a batch of pods")
deleteBatchPod(f, pods)
// tear down cadvisor
Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))).
NotTo(HaveOccurred())
Eventually(func() error {
return checkPodDeleted(f, cadvisorPodName)
}, 10*time.Minute, time.Second*3).Should(BeNil())
})
}
})
Context("create a sequence of pods", func() {
densityTests := []DensityTest{
{
podsNr: 10,
bgPodsNr: 10,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.10, 0.95: 0.12},
stats.SystemContainerRuntime: {0.50: 0.16, 0.95: 0.20},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024},
},
podStartupLimits: framework.LatencyMetric{
Perc50: 1500 * time.Millisecond,
Perc90: 2500 * time.Millisecond,
Perc99: 3500 * time.Millisecond,
},
},
{
podsNr: 10,
bgPodsNr: 30,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.15},
stats.SystemContainerRuntime: {0.50: 0.22, 0.95: 0.27},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 40 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024},
},
podStartupLimits: framework.LatencyMetric{
Perc50: 1500 * time.Millisecond,
Perc90: 2500 * time.Millisecond,
Perc99: 3500 * time.Millisecond,
},
},
}
for _, testArg := range densityTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods",
itArg.podsNr, itArg.bgPodsNr), func() {
bgPods := newTestPods(itArg.bgPodsNr, ImageRegistry[pauseImage], "background_pod")
testPods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType)
createCadvisorPod(f)
rm := NewResourceCollector(monitoringInterval)
By("Creating a batch of background pods")
// creatBatch is synchronized
// all pods are running when it returns
f.PodClient().CreateBatch(bgPods)
//time.Sleep(sleepBeforeCreatePods)
// starting resource monitoring
rm.Start()
// do a sequential creation of pod (back to back)
batchlag, e2eLags := createBatchPodSequential(f, testPods)
rm.Stop()
// verify latency
By("Verifying latency")
verifyLatency(batchlag, e2eLags, itArg)
// verify resource
By("Verifying resource")
verifyResource(f, testArg, rm)
// delete pods
By("Deleting a batch of pods")
deleteBatchPod(f, append(bgPods, testPods...))
// tear down cadvisor
Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))).
NotTo(HaveOccurred())
Eventually(func() error {
return checkPodDeleted(f, cadvisorPodName)
}, 10*time.Minute, time.Second*3).Should(BeNil())
})
}
})
})
type DensityTest struct {
// number of pods
podsNr int
bgPodsNr int
// interval between creating pod (rate control)
interval time.Duration
// resource bound
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
podStartupLimits framework.LatencyMetric
podBatchStartupLimit time.Duration
}
// it creates a batch of pods concurrently, uses one goroutine for each creation.
// between creations there is an interval for throughput control
func createBatchPodWithRateControl(f *framework.Framework, pods []*api.Pod, interval time.Duration) map[string]unversioned.Time {
createTimes := make(map[string]unversioned.Time)
for _, pod := range pods {
createTimes[pod.ObjectMeta.Name] = unversioned.Now()
go f.PodClient().Create(pod)
time.Sleep(interval)
}
return createTimes
}
func checkPodDeleted(f *framework.Framework, podName string) error {
ns := f.Namespace.Name
_, err := f.Client.Pods(ns).Get(podName)
if apierrors.IsNotFound(err) {
return nil
}
return errors.New("Pod Not Deleted")
}
// get prometheus metric `pod start latency' from kubelet
func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) {
latencyMetrics := framework.KubeletLatencyMetrics{}
ms, err := metrics.GrabKubeletMetricsWithoutProxy(node)
Expect(err).NotTo(HaveOccurred())
for _, samples := range ms {
for _, sample := range samples {
if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartLatencyKey {
quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64)
latencyMetrics = append(latencyMetrics,
framework.KubeletLatencyMetric{
Quantile: quantile,
Method: kubemetrics.PodStartLatencyKey,
Latency: time.Duration(int(sample.Value)) * time.Microsecond})
}
}
}
return latencyMetrics, nil
}
// Verifies whether 50, 90 and 99th percentiles of PodStartupLatency are
// within the threshold.
func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error {
if actual.Perc50 > expect.Perc50 {
return fmt.Errorf("too high pod startup latency 50th percentile: %v", actual.Perc50)
}
if actual.Perc90 > expect.Perc90 {
return fmt.Errorf("too high pod startup latency 90th percentile: %v", actual.Perc90)
}
if actual.Perc99 > actual.Perc99 {
return fmt.Errorf("too high pod startup latency 99th percentil: %v", actual.Perc99)
}
return nil
}
func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]unversioned.Time,
podType string) *controllerframework.Controller {
ns := f.Namespace.Name
checkPodRunning := func(p *api.Pod) {
mutex.Lock()
defer mutex.Unlock()
defer GinkgoRecover()
if p.Status.Phase == api.PodRunning {
if _, found := watchTimes[p.Name]; !found {
watchTimes[p.Name] = unversioned.Now()
}
}
}
_, controller := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
return f.Client.Pods(ns).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
return f.Client.Pods(ns).Watch(options)
},
},
&api.Pod{},
0,
controllerframework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p, ok := obj.(*api.Pod)
Expect(ok).To(Equal(true))
go checkPodRunning(p)
},
UpdateFunc: func(oldObj, newObj interface{}) {
p, ok := newObj.(*api.Pod)
Expect(ok).To(Equal(true))
go checkPodRunning(p)
},
},
)
return controller
}
func verifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, testArg DensityTest) {
framework.PrintLatencies(e2eLags, "worst client e2e total latencies")
// Zhou: do not trust `kubelet' metrics since they are not reset!
latencyMetrics, _ := getPodStartLatency(kubeletAddr)
framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
// check whether e2e pod startup time is acceptable.
podCreateLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLags)}
framework.Logf("Pod create latency: %s", framework.PrettyPrintJSON(podCreateLatency))
framework.ExpectNoError(verifyPodStartupLatency(testArg.podStartupLimits, podCreateLatency.Latency))
// check bactch pod creation latency
if testArg.podBatchStartupLimit > 0 {
Expect(batchLag <= testArg.podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v",
batchLag, testArg.podBatchStartupLimit)
}
// calculate and log throughput
throughputBatch := float64(testArg.podsNr) / batchLag.Minutes()
framework.Logf("Batch creation throughput is %.1f pods/min", throughputBatch)
throughputSequential := 1.0 / e2eLags[len(e2eLags)-1].Latency.Minutes()
framework.Logf("Sequential creation throughput is %.1f pods/min", throughputSequential)
}
func verifyResource(f *framework.Framework, testArg DensityTest, rm *ResourceCollector) {
nodeName := framework.TestContext.NodeName
// verify and log memory
usagePerContainer, err := rm.GetLatest()
Expect(err).NotTo(HaveOccurred())
framework.Logf("%s", formatResourceUsageStats(usagePerContainer))
usagePerNode := make(framework.ResourceUsagePerNode)
usagePerNode[nodeName] = usagePerContainer
memPerfData := framework.ResourceUsageToPerfData(usagePerNode)
framework.PrintPerfData(memPerfData)
verifyMemoryLimits(f.Client, testArg.memLimits, usagePerNode)
// verify and log cpu
cpuSummary := rm.GetCPUSummary()
framework.Logf("%s", formatCPUSummary(cpuSummary))
cpuSummaryPerNode := make(framework.NodesCPUSummary)
cpuSummaryPerNode[nodeName] = cpuSummary
cpuPerfData := framework.CPUUsageToPerfData(cpuSummaryPerNode)
framework.PrintPerfData(cpuPerfData)
verifyCPULimits(testArg.cpuLimits, cpuSummaryPerNode)
}
func createBatchPodSequential(f *framework.Framework, pods []*api.Pod) (time.Duration, []framework.PodLatencyData) {
batchStartTime := unversioned.Now()
e2eLags := make([]framework.PodLatencyData, 0)
for _, pod := range pods {
create := unversioned.Now()
f.PodClient().CreateSync(pod)
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: pod.ObjectMeta.Name, Latency: unversioned.Now().Time.Sub(create.Time)})
}
batchLag := unversioned.Now().Time.Sub(batchStartTime.Time)
sort.Sort(framework.LatencySlice(e2eLags))
return batchLag, e2eLags
}

View File

@ -252,6 +252,9 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) {
"--file-check-frequency", "10s", // Check file frequently so tests won't wait too long
"--v", LOG_VERBOSITY_LEVEL, "--logtostderr",
"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
"--cgroup-root=/",
"--runtime-cgroups=/docker-daemon",
"--kubelet-cgroups=/kubelet",
)
if es.cgroupsPerQOS {
cmdArgs = append(cmdArgs,

View File

@ -0,0 +1,277 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Kubelet-perf [Serial] [Slow]", func() {
const (
// Interval to poll /stats/container on a node
containerStatsPollingPeriod = 10 * time.Second
// The monitoring time for one test.
monitoringTime = 6 * time.Minute
// The periodic reporting period.
reportingPeriod = 3 * time.Minute
sleepAfterCreatePods = 10 * time.Second
sleepAfterDeletePods = 120 * time.Second
)
var (
ns string
rm *ResourceCollector
om *framework.RuntimeOperationMonitor
)
f := framework.NewDefaultFramework("kubelet-perf")
BeforeEach(func() {
ns = f.Namespace.Name
om = framework.NewRuntimeOperationMonitor(f.Client)
})
AfterEach(func() {
result := om.GetLatestRuntimeOperationErrorRate()
framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result))
})
Context("regular resource usage tracking", func() {
rTests := []resourceTest{
{
podsPerNode: 0,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.06, 0.95: 0.08},
stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.06},
},
// We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node.
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 85 * 1024 * 1024},
},
},
{
podsPerNode: 35,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.14},
stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.07},
},
// We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node.
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 150 * 1024 * 1024},
},
},
{
podsPerNode: 100,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.17, 0.95: 0.22},
stats.SystemContainerRuntime: {0.50: 0.06, 0.95: 0.09},
},
// We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node.
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 80 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 300 * 1024 * 1024},
},
},
}
for _, testArg := range rTests {
itArg := testArg
podsPerNode := itArg.podsPerNode
name := fmt.Sprintf("resource tracking for %d pods per node", podsPerNode)
It(name, func() {
expectedCPU, expectedMemory := itArg.cpuLimits, itArg.memLimits
createCadvisorPod(f)
rm = NewResourceCollector(containerStatsPollingPeriod)
rm.Start()
By("Creating a batch of Pods")
pods := newTestPods(podsPerNode, ImageRegistry[pauseImage], "test_pod")
for _, pod := range pods {
f.PodClient().CreateSync(pod)
}
// wait for a while to let the node be steady
time.Sleep(sleepAfterCreatePods)
// Log once and flush the stats.
rm.LogLatest()
rm.Reset()
By("Start monitoring resource usage")
// Periodically dump the cpu summary until the deadline is met.
// Note that without calling framework.ResourceMonitor.Reset(), the stats
// would occupy increasingly more memory. This should be fine
// for the current test duration, but we should reclaim the
// entries if we plan to monitor longer (e.g., 8 hours).
deadline := time.Now().Add(monitoringTime)
for time.Now().Before(deadline) {
timeLeft := deadline.Sub(time.Now())
framework.Logf("Still running...%v left", timeLeft)
if timeLeft < reportingPeriod {
time.Sleep(timeLeft)
} else {
time.Sleep(reportingPeriod)
}
logPodsOnNodes(f.Client)
}
By("Reporting overall resource usage")
logPodsOnNodes(f.Client)
usagePerContainer, err := rm.GetLatest()
Expect(err).NotTo(HaveOccurred())
// TODO(random-liu): Remove the original log when we migrate to new perfdash
nodeName := framework.TestContext.NodeName
framework.Logf("%s", formatResourceUsageStats(usagePerContainer))
// Log perf result
usagePerNode := make(framework.ResourceUsagePerNode)
usagePerNode[nodeName] = usagePerContainer
framework.PrintPerfData(framework.ResourceUsageToPerfData(usagePerNode))
verifyMemoryLimits(f.Client, expectedMemory, usagePerNode)
cpuSummary := rm.GetCPUSummary()
framework.Logf("%s", formatCPUSummary(cpuSummary))
// Log perf result
cpuSummaryPerNode := make(framework.NodesCPUSummary)
cpuSummaryPerNode[nodeName] = cpuSummary
framework.PrintPerfData(framework.CPUUsageToPerfData(cpuSummaryPerNode))
verifyCPULimits(expectedCPU, cpuSummaryPerNode)
// delete pods
By("Deleting a batch of pods")
deleteBatchPod(f, pods)
rm.Stop()
// tear down cadvisor
Expect(f.Client.Pods(ns).Delete(cadvisorPodName, api.NewDeleteOptions(30))).
NotTo(HaveOccurred())
Expect(framework.WaitForPodToDisappear(f.Client, ns, cadvisorPodName, labels.Everything(),
3*time.Second, 10*time.Minute)).
NotTo(HaveOccurred())
time.Sleep(sleepAfterDeletePods)
})
}
})
})
type resourceTest struct {
podsPerNode int
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
}
func verifyMemoryLimits(c *client.Client, expected framework.ResourceUsagePerContainer, actual framework.ResourceUsagePerNode) {
if expected == nil {
return
}
var errList []string
for nodeName, nodeSummary := range actual {
var nodeErrs []string
for cName, expectedResult := range expected {
container, ok := nodeSummary[cName]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
continue
}
expectedValue := expectedResult.MemoryRSSInBytes
actualValue := container.MemoryRSSInBytes
if expectedValue != 0 && actualValue > expectedValue {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d",
cName, expectedValue, actualValue))
}
}
if len(nodeErrs) > 0 {
errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
heapStats, err := framework.GetKubeletHeapStats(c, nodeName)
if err != nil {
framework.Logf("Unable to get heap stats from %q", nodeName)
} else {
framework.Logf("Heap stats on %q\n:%v", nodeName, heapStats)
}
}
}
if len(errList) > 0 {
framework.Failf("Memory usage exceeding limits:\n %s", strings.Join(errList, "\n"))
}
}
func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.NodesCPUSummary) {
if expected == nil {
return
}
var errList []string
for nodeName, perNodeSummary := range actual {
var nodeErrs []string
for cName, expectedResult := range expected {
perContainerSummary, ok := perNodeSummary[cName]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
continue
}
for p, expectedValue := range expectedResult {
actualValue, ok := perContainerSummary[p]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing percentile %v", cName, p))
continue
}
if actualValue > expectedValue {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected %.0fth%% usage < %.3f; got %.3f",
cName, p*100, expectedValue, actualValue))
}
}
}
if len(nodeErrs) > 0 {
errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
}
}
if len(errList) > 0 {
framework.Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n"))
}
}
func logPodsOnNodes(c *client.Client) {
nodeName := framework.TestContext.NodeName
podList, err := framework.GetKubeletRunningPods(c, nodeName)
if err != nil {
framework.Logf("Unable to retrieve kubelet pods for node %v", nodeName)
}
framework.Logf("%d pods are running on node %v", len(podList.Items), nodeName)
}

View File

@ -0,0 +1,377 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing perissions and
limitations under the License.
*/
package e2e_node
import (
"bytes"
"fmt"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
cadvisorclient "github.com/google/cadvisor/client/v2"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/gomega"
)
const (
// resource monitoring
cadvisorImageName = "google/cadvisor:latest"
cadvisorPodName = "cadvisor"
cadvisorPort = 8090
)
var (
systemContainers = map[string]string{
//"root": "/",
//stats.SystemContainerMisc: "misc"
stats.SystemContainerKubelet: "kubelet",
stats.SystemContainerRuntime: "docker-daemon",
}
)
type ResourceCollector struct {
client *cadvisorclient.Client
request *cadvisorapiv2.RequestOptions
pollingInterval time.Duration
buffers map[string][]*framework.ContainerResourceUsage
lock sync.RWMutex
stopCh chan struct{}
}
func NewResourceCollector(interval time.Duration) *ResourceCollector {
buffers := make(map[string][]*framework.ContainerResourceUsage)
return &ResourceCollector{
pollingInterval: interval,
buffers: buffers,
}
}
func (r *ResourceCollector) Start() {
wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
var err error
r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort))
if err == nil {
return true, nil
}
return false, err
})
Expect(r.client).NotTo(BeNil(), "cadvisor client not ready")
r.request = &cadvisorapiv2.RequestOptions{IdType: "name", Count: 1, Recursive: false}
r.stopCh = make(chan struct{})
oldStatsMap := make(map[string]*cadvisorapiv2.ContainerStats)
go wait.Until(func() { r.collectStats(oldStatsMap) }, r.pollingInterval, r.stopCh)
}
func (r *ResourceCollector) Stop() {
close(r.stopCh)
}
func (r *ResourceCollector) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
for _, name := range systemContainers {
r.buffers[name] = []*framework.ContainerResourceUsage{}
}
}
func (r *ResourceCollector) GetCPUSummary() framework.ContainersCPUSummary {
result := make(framework.ContainersCPUSummary)
for key, name := range systemContainers {
data := r.GetBasicCPUStats(name)
result[key] = data
}
return result
}
func (r *ResourceCollector) LogLatest() {
summary, err := r.GetLatest()
if err != nil {
framework.Logf("%v", err)
}
framework.Logf("%s", formatResourceUsageStats(summary))
}
func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.ContainerStats) {
for _, name := range systemContainers {
ret, err := r.client.Stats(name, r.request)
if err != nil {
framework.Logf("Error getting container stats, err: %v", err)
return
}
cStats, ok := ret["/"+name]
if !ok {
framework.Logf("Missing info/stats for container %q", name)
return
}
newStats := cStats.Stats[0]
if oldStats, ok := oldStatsMap[name]; ok && oldStats.Timestamp.Before(newStats.Timestamp) {
if oldStats.Timestamp.Equal(newStats.Timestamp) {
continue
}
r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, newStats))
}
oldStatsMap[name] = newStats
}
}
func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapiv2.ContainerStats) *framework.ContainerResourceUsage {
return &framework.ContainerResourceUsage{
Name: name,
Timestamp: newStats.Timestamp,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
MemoryUsageInBytes: newStats.Memory.Usage,
MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
MemoryRSSInBytes: newStats.Memory.RSS,
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
}
}
func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, error) {
r.lock.RLock()
defer r.lock.RUnlock()
stats := make(framework.ResourceUsagePerContainer)
for key, name := range systemContainers {
contStats, ok := r.buffers[name]
if !ok || len(contStats) == 0 {
return nil, fmt.Errorf("Resource usage is not ready yet")
}
stats[key] = contStats[len(contStats)-1]
}
return stats, nil
}
type resourceUsageByCPU []*framework.ContainerResourceUsage
func (r resourceUsageByCPU) Len() int { return len(r) }
func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
// The percentiles to report.
var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
// GetBasicCPUStats returns the percentiles the cpu usage in cores for
// containerName. This method examines all data currently in the buffer.
func (r *ResourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
r.lock.RLock()
defer r.lock.RUnlock()
result := make(map[float64]float64, len(percentiles))
usages := r.buffers[containerName]
sort.Sort(resourceUsageByCPU(usages))
for _, q := range percentiles {
index := int(float64(len(usages))*q) - 1
if index < 0 {
// We don't have enough data.
result[q] = 0
continue
}
result[q] = usages[index].CPUUsageInCores
}
return result
}
func formatResourceUsageStats(containerStats framework.ResourceUsagePerContainer) string {
// Example output:
//
// Resource usage for node "e2e-test-foo-minion-abcde":
// container cpu(cores) memory(MB)
// "/" 0.363 2942.09
// "/docker-daemon" 0.088 521.80
// "/kubelet" 0.086 424.37
// "/system" 0.007 119.88
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
for name, s := range containerStats {
fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
}
w.Flush()
return fmt.Sprintf("Resource usage:\n%s", buf.String())
}
func formatCPUSummary(summary framework.ContainersCPUSummary) string {
// Example output for a node (the percentiles may differ):
// CPU usage of containers on node "e2e-test-foo-minion-0vj7":
// container 5th% 50th% 90th% 95th%
// "/" 0.051 0.159 0.387 0.455
// "/runtime 0.000 0.000 0.146 0.166
// "/kubelet" 0.036 0.053 0.091 0.154
// "/misc" 0.001 0.001 0.001 0.002
var summaryStrings []string
var header []string
header = append(header, "container")
for _, p := range percentiles {
header = append(header, fmt.Sprintf("%.0fth%%", p*100))
}
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
for _, containerName := range framework.TargetContainers() {
var s []string
s = append(s, fmt.Sprintf("%q", containerName))
data, ok := summary[containerName]
for _, p := range percentiles {
value := "N/A"
if ok {
value = fmt.Sprintf("%.3f", data[p])
}
s = append(s, value)
}
fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
}
w.Flush()
summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers:\n%s", buf.String()))
return strings.Join(summaryStrings, "\n")
}
func createCadvisorPod(f *framework.Framework) {
f.PodClient().CreateSync(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: cadvisorPodName,
//Labels: map[string]string{"type": cadvisorPodType, "name": cadvisorPodName},
},
Spec: api.PodSpec{
// Don't restart the Pod since it is expected to exit
RestartPolicy: api.RestartPolicyNever,
SecurityContext: &api.PodSecurityContext{
HostNetwork: true,
},
Containers: []api.Container{
{
Image: cadvisorImageName,
Name: cadvisorPodName,
Ports: []api.ContainerPort{
{
Name: "http",
HostPort: cadvisorPort,
ContainerPort: cadvisorPort,
Protocol: api.ProtocolTCP,
},
},
VolumeMounts: []api.VolumeMount{
{
Name: "sys",
ReadOnly: true,
MountPath: "/sys",
},
{
Name: "var-run",
ReadOnly: false,
MountPath: "/var/run",
},
{
Name: "docker",
ReadOnly: true,
MountPath: "/var/lib/docker/",
},
{
Name: "rootfs",
ReadOnly: true,
MountPath: "/rootfs",
},
},
Args: []string{
"--profiling",
"--housekeeping_interval=1s",
fmt.Sprintf("--port=%d", cadvisorPort),
},
},
},
Volumes: []api.Volume{
{
Name: "rootfs",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/"}},
},
{
Name: "var-run",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/run"}},
},
{
Name: "sys",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/sys"}},
},
{
Name: "docker",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/lib/docker"}},
},
},
},
})
}
func deleteBatchPod(f *framework.Framework, pods []*api.Pod) {
ns := f.Namespace.Name
var wg sync.WaitGroup
for _, pod := range pods {
wg.Add(1)
go func(pod *api.Pod) {
defer wg.Done()
err := f.Client.Pods(ns).Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(60))
Expect(err).NotTo(HaveOccurred())
Expect(framework.WaitForPodToDisappear(f.Client, ns, pod.ObjectMeta.Name, labels.Everything(),
30*time.Second, 10*time.Minute)).
NotTo(HaveOccurred())
}(pod)
}
wg.Wait()
return
}
func newTestPods(podsPerNode int, imageName, podType string) []*api.Pod {
var pods []*api.Pod
for i := 0; i < podsPerNode; i++ {
podName := "test-" + string(util.NewUUID())
labels := map[string]string{
"type": podType,
"name": podName,
}
pods = append(pods,
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: labels,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
{
Image: imageName,
Name: podName,
},
},
},
})
}
return pods
}