add +build linux to density_test, resource_usage_test and resource_collector

This commit is contained in:
Zhou Fang 2016-08-03 11:43:24 -07:00
parent 5841f6e164
commit 637e0f91ce
3 changed files with 1193 additions and 0 deletions

View File

@ -0,0 +1,432 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"errors"
"fmt"
"sort"
"strconv"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
controllerframework "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/metrics"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
kubeletAddr = "localhost:10255"
)
var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
const (
// the data collection time of `resource collector' and the standalone cadvisor
// is not synchronizated. Therefore `resource collector' may miss data or
// collect duplicated data
monitoringInterval = 500 * time.Millisecond
sleepBeforeCreatePods = 30 * time.Second
)
var (
ns string
nodeName string
)
f := framework.NewDefaultFramework("density-test")
podType := "density_test_pod"
BeforeEach(func() {
ns = f.Namespace.Name
nodeName = framework.TestContext.NodeName
})
AfterEach(func() {
})
Context("create a batch of pods", func() {
densityTests := []DensityTest{
{
podsNr: 10,
interval: 0 * time.Millisecond,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.20, 0.95: 0.30},
stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024},
},
// percentile limit of single pod startup latency
podStartupLimits: framework.LatencyMetric{
Perc50: 10 * time.Second,
Perc90: 15 * time.Second,
Perc99: 20 * time.Second,
},
// upbound of startup latency of a batch of pods
podBatchStartupLimit: 25 * time.Second,
},
}
for _, testArg := range densityTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval",
itArg.podsNr, itArg.interval), func() {
var (
mutex = &sync.Mutex{}
watchTimes = make(map[string]unversioned.Time, 0)
stopCh = make(chan struct{})
)
// create specifications of the test pods
pods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType)
// start a standalone cadvisor pod
// it uses `createSync', so the pod is running when it returns
createCadvisorPod(f)
// `resource collector' monitoring fine-grain CPU/memory usage by a standalone Cadvisor with
// 1s housingkeeping interval
rc := NewResourceCollector(monitoringInterval)
// the controller watches the change of pod status
controller := newInformerWatchPod(f, mutex, watchTimes, podType)
go controller.Run(stopCh)
// Zhou: In test we see kubelet starts while it is busy on something, as a result `syncLoop'
// does not response to pod creation immediately. Creating the first pod has a delay around 5s.
// The node status has been `ready' so `wait and check node being ready' does not help here.
// Now wait here for a grace period to have `syncLoop' be ready
time.Sleep(sleepBeforeCreatePods)
// the density test only monitors the overhead of creating pod
// or start earliest and call `rc.Reset()' here to clear the buffer
rc.Start()
By("Creating a batch of pods")
// it returns a map[`pod name']`creation time' as the creation timestamps
createTimes := createBatchPodWithRateControl(f, pods, itArg.interval)
By("Waiting for all Pods to be observed by the watch...")
// checks every 10s util all pods are running. it times out ater 10min
Eventually(func() bool {
return len(watchTimes) == itArg.podsNr
}, 10*time.Minute, 10*time.Second).Should(BeTrue())
if len(watchTimes) < itArg.podsNr {
framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
}
// stop the watching controller, and the resource collector
close(stopCh)
rc.Stop()
// data analyis
var (
firstCreate unversioned.Time
lastRunning unversioned.Time
init = true
e2eLags = make([]framework.PodLatencyData, 0)
)
for name, create := range createTimes {
watch, ok := watchTimes[name]
Expect(ok).To(Equal(true))
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
if !init {
if firstCreate.Time.After(create.Time) {
firstCreate = create
}
if lastRunning.Time.Before(watch.Time) {
lastRunning = watch
}
} else {
init = false
firstCreate, lastRunning = create, watch
}
}
sort.Sort(framework.LatencySlice(e2eLags))
// verify latency
By("Verifying latency")
verifyLatency(lastRunning.Time.Sub(firstCreate.Time), e2eLags, itArg)
// verify resource
By("Verifying resource")
verifyResource(f, testArg, rc)
})
}
})
Context("create a sequence of pods", func() {
densityTests := []DensityTest{
{
podsNr: 10,
bgPodsNr: 10,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.20, 0.95: 0.25},
stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
},
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024},
},
podStartupLimits: framework.LatencyMetric{
Perc50: 3000 * time.Millisecond,
Perc90: 4000 * time.Millisecond,
Perc99: 5000 * time.Millisecond,
},
},
}
for _, testArg := range densityTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods",
itArg.podsNr, itArg.bgPodsNr), func() {
bgPods := newTestPods(itArg.bgPodsNr, ImageRegistry[pauseImage], "background_pod")
testPods := newTestPods(itArg.podsNr, ImageRegistry[pauseImage], podType)
createCadvisorPod(f)
rc := NewResourceCollector(monitoringInterval)
By("Creating a batch of background pods")
// creatBatch is synchronized
// all pods are running when it returns
f.PodClient().CreateBatch(bgPods)
time.Sleep(sleepBeforeCreatePods)
// starting resource monitoring
rc.Start()
// do a sequential creation of pod (back to back)
batchlag, e2eLags := createBatchPodSequential(f, testPods)
rc.Stop()
// verify latency
By("Verifying latency")
verifyLatency(batchlag, e2eLags, itArg)
// verify resource
By("Verifying resource")
verifyResource(f, testArg, rc)
})
}
})
})
type DensityTest struct {
// number of pods
podsNr int
// number of background pods
bgPodsNr int
// interval between creating pod (rate control)
interval time.Duration
// resource bound
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
podStartupLimits framework.LatencyMetric
podBatchStartupLimit time.Duration
}
// it creates a batch of pods concurrently, uses one goroutine for each creation.
// between creations there is an interval for throughput control
func createBatchPodWithRateControl(f *framework.Framework, pods []*api.Pod, interval time.Duration) map[string]unversioned.Time {
createTimes := make(map[string]unversioned.Time)
for _, pod := range pods {
createTimes[pod.ObjectMeta.Name] = unversioned.Now()
go f.PodClient().Create(pod)
time.Sleep(interval)
}
return createTimes
}
func checkPodDeleted(f *framework.Framework, podName string) error {
ns := f.Namespace.Name
_, err := f.Client.Pods(ns).Get(podName)
if apierrors.IsNotFound(err) {
return nil
}
return errors.New("Pod Not Deleted")
}
// get prometheus metric `pod start latency' from kubelet
func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) {
latencyMetrics := framework.KubeletLatencyMetrics{}
ms, err := metrics.GrabKubeletMetricsWithoutProxy(node)
Expect(err).NotTo(HaveOccurred())
for _, samples := range ms {
for _, sample := range samples {
if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartLatencyKey {
quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64)
latencyMetrics = append(latencyMetrics,
framework.KubeletLatencyMetric{
Quantile: quantile,
Method: kubemetrics.PodStartLatencyKey,
Latency: time.Duration(int(sample.Value)) * time.Microsecond})
}
}
}
return latencyMetrics, nil
}
// Verifies whether 50, 90 and 99th percentiles of PodStartupLatency are
// within the threshold.
func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error {
if actual.Perc50 > expect.Perc50 {
return fmt.Errorf("too high pod startup latency 50th percentile: %v", actual.Perc50)
}
if actual.Perc90 > expect.Perc90 {
return fmt.Errorf("too high pod startup latency 90th percentile: %v", actual.Perc90)
}
if actual.Perc99 > actual.Perc99 {
return fmt.Errorf("too high pod startup latency 99th percentil: %v", actual.Perc99)
}
return nil
}
func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]unversioned.Time,
podType string) *controllerframework.Controller {
ns := f.Namespace.Name
checkPodRunning := func(p *api.Pod) {
mutex.Lock()
defer mutex.Unlock()
defer GinkgoRecover()
if p.Status.Phase == api.PodRunning {
if _, found := watchTimes[p.Name]; !found {
watchTimes[p.Name] = unversioned.Now()
}
}
}
_, controller := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
return f.Client.Pods(ns).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
return f.Client.Pods(ns).Watch(options)
},
},
&api.Pod{},
0,
controllerframework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p, ok := obj.(*api.Pod)
Expect(ok).To(Equal(true))
go checkPodRunning(p)
},
UpdateFunc: func(oldObj, newObj interface{}) {
p, ok := newObj.(*api.Pod)
Expect(ok).To(Equal(true))
go checkPodRunning(p)
},
},
)
return controller
}
func verifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, testArg DensityTest) {
framework.PrintLatencies(e2eLags, "worst client e2e total latencies")
// Zhou: do not trust `kubelet' metrics since they are not reset!
latencyMetrics, _ := getPodStartLatency(kubeletAddr)
framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
// check whether e2e pod startup time is acceptable.
podCreateLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLags)}
framework.Logf("Pod create latency: %s", framework.PrettyPrintJSON(podCreateLatency))
framework.ExpectNoError(verifyPodStartupLatency(testArg.podStartupLimits, podCreateLatency.Latency))
// check bactch pod creation latency
if testArg.podBatchStartupLimit > 0 {
Expect(batchLag <= testArg.podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v",
batchLag, testArg.podBatchStartupLimit)
}
// calculate and log throughput
throughputBatch := float64(testArg.podsNr) / batchLag.Minutes()
framework.Logf("Batch creation throughput is %.1f pods/min", throughputBatch)
throughputSequential := 1.0 / e2eLags[len(e2eLags)-1].Latency.Minutes()
framework.Logf("Sequential creation throughput is %.1f pods/min", throughputSequential)
}
func verifyResource(f *framework.Framework, testArg DensityTest, rc *ResourceCollector) {
nodeName := framework.TestContext.NodeName
// verify and log memory
usagePerContainer, err := rc.GetLatest()
Expect(err).NotTo(HaveOccurred())
framework.Logf("%s", formatResourceUsageStats(usagePerContainer))
usagePerNode := make(framework.ResourceUsagePerNode)
usagePerNode[nodeName] = usagePerContainer
memPerfData := framework.ResourceUsageToPerfData(usagePerNode)
framework.PrintPerfData(memPerfData)
verifyMemoryLimits(f.Client, testArg.memLimits, usagePerNode)
// verify and log cpu
cpuSummary := rc.GetCPUSummary()
framework.Logf("%s", formatCPUSummary(cpuSummary))
cpuSummaryPerNode := make(framework.NodesCPUSummary)
cpuSummaryPerNode[nodeName] = cpuSummary
cpuPerfData := framework.CPUUsageToPerfData(cpuSummaryPerNode)
framework.PrintPerfData(cpuPerfData)
verifyCPULimits(testArg.cpuLimits, cpuSummaryPerNode)
}
func createBatchPodSequential(f *framework.Framework, pods []*api.Pod) (time.Duration, []framework.PodLatencyData) {
batchStartTime := unversioned.Now()
e2eLags := make([]framework.PodLatencyData, 0)
for _, pod := range pods {
create := unversioned.Now()
f.PodClient().CreateSync(pod)
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: pod.ObjectMeta.Name, Latency: unversioned.Now().Time.Sub(create.Time)})
}
batchLag := unversioned.Now().Time.Sub(batchStartTime.Time)
sort.Sort(framework.LatencySlice(e2eLags))
return batchLag, e2eLags
}

View File

@ -0,0 +1,514 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"time"
cadvisorclient "github.com/google/cadvisor/client/v2"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/opencontainers/runc/libcontainer/cgroups"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/gomega"
)
const (
// resource monitoring
cadvisorImageName = "google/cadvisor:latest"
cadvisorPodName = "cadvisor"
cadvisorPort = 8090
// housekeeping interval of Cadvisor (second)
houseKeepingInterval = 1
)
var (
systemContainers map[string]string
)
type ResourceCollector struct {
client *cadvisorclient.Client
request *cadvisorapiv2.RequestOptions
pollingInterval time.Duration
buffers map[string][]*framework.ContainerResourceUsage
lock sync.RWMutex
stopCh chan struct{}
}
func NewResourceCollector(interval time.Duration) *ResourceCollector {
buffers := make(map[string][]*framework.ContainerResourceUsage)
return &ResourceCollector{
pollingInterval: interval,
buffers: buffers,
}
}
func (r *ResourceCollector) Start() {
// Get the cgroup containers for kubelet and docker
kubeletContainer, err := getContainerNameForProcess(kubeletProcessName, "")
dockerContainer, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
if err == nil {
systemContainers = map[string]string{
stats.SystemContainerKubelet: kubeletContainer,
stats.SystemContainerRuntime: dockerContainer,
}
} else {
framework.Failf("Failed to get docker container name in test-e2e-node resource collector.")
}
wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
var err error
r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort))
if err == nil {
return true, nil
}
return false, err
})
Expect(r.client).NotTo(BeNil(), "cadvisor client not ready")
r.request = &cadvisorapiv2.RequestOptions{IdType: "name", Count: 1, Recursive: false}
r.stopCh = make(chan struct{})
oldStatsMap := make(map[string]*cadvisorapiv2.ContainerStats)
go wait.Until(func() { r.collectStats(oldStatsMap) }, r.pollingInterval, r.stopCh)
}
func (r *ResourceCollector) Stop() {
close(r.stopCh)
}
func (r *ResourceCollector) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
for _, name := range systemContainers {
r.buffers[name] = []*framework.ContainerResourceUsage{}
}
}
func (r *ResourceCollector) GetCPUSummary() framework.ContainersCPUSummary {
result := make(framework.ContainersCPUSummary)
for key, name := range systemContainers {
data := r.GetBasicCPUStats(name)
result[key] = data
}
return result
}
func (r *ResourceCollector) LogLatest() {
summary, err := r.GetLatest()
if err != nil {
framework.Logf("%v", err)
}
framework.Logf("%s", formatResourceUsageStats(summary))
}
func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.ContainerStats) {
for _, name := range systemContainers {
ret, err := r.client.Stats(name, r.request)
if err != nil {
framework.Logf("Error getting container stats, err: %v", err)
return
}
cStats, ok := ret[name]
if !ok {
framework.Logf("Missing info/stats for container %q", name)
return
}
newStats := cStats.Stats[0]
if oldStats, ok := oldStatsMap[name]; ok && oldStats.Timestamp.Before(newStats.Timestamp) {
if oldStats.Timestamp.Equal(newStats.Timestamp) {
continue
}
r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, newStats))
}
oldStatsMap[name] = newStats
}
}
func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapiv2.ContainerStats) *framework.ContainerResourceUsage {
return &framework.ContainerResourceUsage{
Name: name,
Timestamp: newStats.Timestamp,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
MemoryUsageInBytes: newStats.Memory.Usage,
MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
MemoryRSSInBytes: newStats.Memory.RSS,
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
}
}
func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, error) {
r.lock.RLock()
defer r.lock.RUnlock()
stats := make(framework.ResourceUsagePerContainer)
for key, name := range systemContainers {
contStats, ok := r.buffers[name]
if !ok || len(contStats) == 0 {
return nil, fmt.Errorf("Resource usage of %s:%s is not ready yet", key, name)
}
stats[key] = contStats[len(contStats)-1]
}
return stats, nil
}
type resourceUsageByCPU []*framework.ContainerResourceUsage
func (r resourceUsageByCPU) Len() int { return len(r) }
func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
// The percentiles to report.
var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
// GetBasicCPUStats returns the percentiles the cpu usage in cores for
// containerName. This method examines all data currently in the buffer.
func (r *ResourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
r.lock.RLock()
defer r.lock.RUnlock()
result := make(map[float64]float64, len(percentiles))
usages := r.buffers[containerName]
sort.Sort(resourceUsageByCPU(usages))
for _, q := range percentiles {
index := int(float64(len(usages))*q) - 1
if index < 0 {
// We don't have enough data.
result[q] = 0
continue
}
result[q] = usages[index].CPUUsageInCores
}
return result
}
func formatResourceUsageStats(containerStats framework.ResourceUsagePerContainer) string {
// Example output:
//
// Resource usage for node "e2e-test-foo-minion-abcde":
// container cpu(cores) memory(MB)
// "/" 0.363 2942.09
// "/docker-daemon" 0.088 521.80
// "/kubelet" 0.086 424.37
// "/system" 0.007 119.88
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
for name, s := range containerStats {
fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
}
w.Flush()
return fmt.Sprintf("Resource usage:\n%s", buf.String())
}
func formatCPUSummary(summary framework.ContainersCPUSummary) string {
// Example output for a node (the percentiles may differ):
// CPU usage of containers on node "e2e-test-foo-minion-0vj7":
// container 5th% 50th% 90th% 95th%
// "/" 0.051 0.159 0.387 0.455
// "/runtime 0.000 0.000 0.146 0.166
// "/kubelet" 0.036 0.053 0.091 0.154
// "/misc" 0.001 0.001 0.001 0.002
var summaryStrings []string
var header []string
header = append(header, "container")
for _, p := range percentiles {
header = append(header, fmt.Sprintf("%.0fth%%", p*100))
}
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
for _, containerName := range framework.TargetContainers() {
var s []string
s = append(s, fmt.Sprintf("%q", containerName))
data, ok := summary[containerName]
for _, p := range percentiles {
value := "N/A"
if ok {
value = fmt.Sprintf("%.3f", data[p])
}
s = append(s, value)
}
fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
}
w.Flush()
summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers:\n%s", buf.String()))
return strings.Join(summaryStrings, "\n")
}
func createCadvisorPod(f *framework.Framework) {
f.PodClient().CreateSync(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: cadvisorPodName,
},
Spec: api.PodSpec{
// It uses a host port for the tests to collect data.
// Currently we can not use port mapping in test-e2e-node.
SecurityContext: &api.PodSecurityContext{
HostNetwork: true,
},
Containers: []api.Container{
{
Image: cadvisorImageName,
Name: cadvisorPodName,
Ports: []api.ContainerPort{
{
Name: "http",
HostPort: cadvisorPort,
ContainerPort: cadvisorPort,
Protocol: api.ProtocolTCP,
},
},
VolumeMounts: []api.VolumeMount{
{
Name: "sys",
ReadOnly: true,
MountPath: "/sys",
},
{
Name: "var-run",
ReadOnly: false,
MountPath: "/var/run",
},
{
Name: "docker",
ReadOnly: true,
MountPath: "/var/lib/docker/",
},
{
Name: "rootfs",
ReadOnly: true,
MountPath: "/rootfs",
},
},
Args: []string{
"--profiling",
fmt.Sprintf("--housekeeping_interval=%ds", houseKeepingInterval),
fmt.Sprintf("--port=%d", cadvisorPort),
},
},
},
Volumes: []api.Volume{
{
Name: "rootfs",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/"}},
},
{
Name: "var-run",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/run"}},
},
{
Name: "sys",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/sys"}},
},
{
Name: "docker",
VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/lib/docker"}},
},
},
},
})
}
func deleteBatchPod(f *framework.Framework, pods []*api.Pod) {
ns := f.Namespace.Name
var wg sync.WaitGroup
for _, pod := range pods {
wg.Add(1)
go func(pod *api.Pod) {
defer wg.Done()
err := f.Client.Pods(ns).Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(30))
Expect(err).NotTo(HaveOccurred())
Expect(framework.WaitForPodToDisappear(f.Client, ns, pod.ObjectMeta.Name, labels.Everything(),
30*time.Second, 10*time.Minute)).
NotTo(HaveOccurred())
}(pod)
}
wg.Wait()
return
}
func newTestPods(numPods int, imageName, podType string) []*api.Pod {
var pods []*api.Pod
for i := 0; i < numPods; i++ {
podName := "test-" + string(uuid.NewUUID())
labels := map[string]string{
"type": podType,
"name": podName,
}
pods = append(pods,
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: labels,
},
Spec: api.PodSpec{
// ToDo: restart policy is always
// check whether pods restart at the end of tests
Containers: []api.Container{
{
Image: imageName,
Name: podName,
},
},
},
})
}
return pods
}
// code for getting container name of docker
const (
kubeletProcessName = "kubelet"
dockerProcessName = "docker"
dockerPidFile = "/var/run/docker.pid"
containerdProcessName = "docker-containerd"
containerdPidFile = "/run/docker/libcontainerd/docker-containerd.pid"
)
func getContainerNameForProcess(name, pidFile string) (string, error) {
pids, err := getPidsForProcess(name, pidFile)
if err != nil {
return "", fmt.Errorf("failed to detect process id for %q - %v", name, err)
}
if len(pids) == 0 {
return "", nil
}
cont, err := getContainer(pids[0])
if err != nil {
return "", err
}
return cont, nil
}
func getPidFromPidFile(pidFile string) (int, error) {
file, err := os.Open(pidFile)
if err != nil {
return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
}
return pid, nil
}
func getPidsForProcess(name, pidFile string) ([]int, error) {
if len(pidFile) > 0 {
if pid, err := getPidFromPidFile(pidFile); err == nil {
return []int{pid}, nil
} else {
// log the error and fall back to pidof
runtime.HandleError(err)
}
}
out, err := exec.Command("pidof", name).Output()
if err != nil {
return []int{}, fmt.Errorf("failed to find pid of %q: %v", name, err)
}
// The output of pidof is a list of pids.
pids := []int{}
for _, pidStr := range strings.Split(strings.TrimSpace(string(out)), " ") {
pid, err := strconv.Atoi(pidStr)
if err != nil {
continue
}
pids = append(pids, pid)
}
return pids, nil
}
// getContainer returns the cgroup associated with the specified pid.
// It enforces a unified hierarchy for memory and cpu cgroups.
// On systemd environments, it uses the name=systemd cgroup for the specified pid.
func getContainer(pid int) (string, error) {
cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
if err != nil {
return "", err
}
cpu, found := cgs["cpu"]
if !found {
return "", cgroups.NewNotFoundError("cpu")
}
memory, found := cgs["memory"]
if !found {
return "", cgroups.NewNotFoundError("memory")
}
// since we use this container for accounting, we need to ensure its a unified hierarchy.
if cpu != memory {
return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory)
}
// on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
// cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
// users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
// users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
// we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
// for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
// cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
// as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
// in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
if systemd, found := cgs["name=systemd"]; found {
if systemd != cpu {
log.Printf("CPUAccounting not enabled for pid: %d", pid)
}
if systemd != memory {
log.Printf("MemoryAccounting not enabled for pid: %d", pid)
}
return systemd, nil
}
return cpu, nil
}

View File

@ -0,0 +1,247 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"fmt"
"strings"
"time"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Resource-usage [Serial] [Slow]", func() {
const (
// Interval to poll /stats/container on a node
containerStatsPollingPeriod = 10 * time.Second
// The monitoring time for one test.
monitoringTime = 10 * time.Minute
// The periodic reporting period.
reportingPeriod = 5 * time.Minute
sleepAfterCreatePods = 10 * time.Second
)
var (
ns string
rc *ResourceCollector
om *framework.RuntimeOperationMonitor
)
f := framework.NewDefaultFramework("resource-usage")
BeforeEach(func() {
ns = f.Namespace.Name
om = framework.NewRuntimeOperationMonitor(f.Client)
})
AfterEach(func() {
result := om.GetLatestRuntimeOperationErrorRate()
framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result))
})
// This test measures and verifies the steady resource usage of node is within limit
// It collects data from a standalone Cadvisor with housekeeping interval 1s.
// It verifies CPU percentiles and the lastest memory usage.
Context("regular resource usage tracking", func() {
rTests := []resourceTest{
{
podsPerNode: 10,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.25, 0.95: 0.30},
stats.SystemContainerRuntime: {0.50: 0.30, 0.95: 0.40},
},
// We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node.
memLimits: framework.ResourceUsagePerContainer{
stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 400 * 1024 * 1024},
},
},
}
for _, testArg := range rTests {
itArg := testArg
podsPerNode := itArg.podsPerNode
name := fmt.Sprintf("resource tracking for %d pods per node", podsPerNode)
It(name, func() {
expectedCPU, expectedMemory := itArg.cpuLimits, itArg.memLimits
// The test collects resource usage from a standalone Cadvisor pod.
// The Cadvsior of Kubelet has a housekeeping interval of 10s, which is too long to
// show the resource usage spikes. But changing its interval increases the overhead
// of kubelet. Hence we use a Cadvisor pod.
createCadvisorPod(f)
rc = NewResourceCollector(containerStatsPollingPeriod)
rc.Start()
By("Creating a batch of Pods")
pods := newTestPods(podsPerNode, ImageRegistry[pauseImage], "test_pod")
for _, pod := range pods {
f.PodClient().CreateSync(pod)
}
// wait for a while to let the node be steady
time.Sleep(sleepAfterCreatePods)
// Log once and flush the stats.
rc.LogLatest()
rc.Reset()
By("Start monitoring resource usage")
// Periodically dump the cpu summary until the deadline is met.
// Note that without calling framework.ResourceMonitor.Reset(), the stats
// would occupy increasingly more memory. This should be fine
// for the current test duration, but we should reclaim the
// entries if we plan to monitor longer (e.g., 8 hours).
deadline := time.Now().Add(monitoringTime)
for time.Now().Before(deadline) {
timeLeft := deadline.Sub(time.Now())
framework.Logf("Still running...%v left", timeLeft)
if timeLeft < reportingPeriod {
time.Sleep(timeLeft)
} else {
time.Sleep(reportingPeriod)
}
logPodsOnNode(f.Client)
}
rc.Stop()
By("Reporting overall resource usage")
logPodsOnNode(f.Client)
usagePerContainer, err := rc.GetLatest()
Expect(err).NotTo(HaveOccurred())
// TODO(random-liu): Remove the original log when we migrate to new perfdash
nodeName := framework.TestContext.NodeName
framework.Logf("%s", formatResourceUsageStats(usagePerContainer))
// Log perf result
usagePerNode := make(framework.ResourceUsagePerNode)
usagePerNode[nodeName] = usagePerContainer
framework.PrintPerfData(framework.ResourceUsageToPerfData(usagePerNode))
verifyMemoryLimits(f.Client, expectedMemory, usagePerNode)
cpuSummary := rc.GetCPUSummary()
framework.Logf("%s", formatCPUSummary(cpuSummary))
// Log perf result
cpuSummaryPerNode := make(framework.NodesCPUSummary)
cpuSummaryPerNode[nodeName] = cpuSummary
framework.PrintPerfData(framework.CPUUsageToPerfData(cpuSummaryPerNode))
verifyCPULimits(expectedCPU, cpuSummaryPerNode)
})
}
})
})
type resourceTest struct {
podsPerNode int
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
}
func verifyMemoryLimits(c *client.Client, expected framework.ResourceUsagePerContainer, actual framework.ResourceUsagePerNode) {
if expected == nil {
return
}
var errList []string
for nodeName, nodeSummary := range actual {
var nodeErrs []string
for cName, expectedResult := range expected {
container, ok := nodeSummary[cName]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
continue
}
expectedValue := expectedResult.MemoryRSSInBytes
actualValue := container.MemoryRSSInBytes
if expectedValue != 0 && actualValue > expectedValue {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d",
cName, expectedValue, actualValue))
}
}
if len(nodeErrs) > 0 {
errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
heapStats, err := framework.GetKubeletHeapStats(c, nodeName)
if err != nil {
framework.Logf("Unable to get heap stats from %q", nodeName)
} else {
framework.Logf("Heap stats on %q\n:%v", nodeName, heapStats)
}
}
}
if len(errList) > 0 {
framework.Failf("Memory usage exceeding limits:\n %s", strings.Join(errList, "\n"))
}
}
func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.NodesCPUSummary) {
if expected == nil {
return
}
var errList []string
for nodeName, perNodeSummary := range actual {
var nodeErrs []string
for cName, expectedResult := range expected {
perContainerSummary, ok := perNodeSummary[cName]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
continue
}
for p, expectedValue := range expectedResult {
actualValue, ok := perContainerSummary[p]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing percentile %v", cName, p))
continue
}
if actualValue > expectedValue {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected %.0fth%% usage < %.3f; got %.3f",
cName, p*100, expectedValue, actualValue))
}
}
}
if len(nodeErrs) > 0 {
errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
}
}
if len(errList) > 0 {
framework.Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n"))
}
}
func logPodsOnNode(c *client.Client) {
nodeName := framework.TestContext.NodeName
podList, err := framework.GetKubeletRunningPods(c, nodeName)
if err != nil {
framework.Logf("Unable to retrieve kubelet pods for node %v", nodeName)
}
framework.Logf("%d pods are running on node %v", len(podList.Items), nodeName)
}