From cfe391d4eed271de16c30ea40ff24c345d03b0c1 Mon Sep 17 00:00:00 2001 From: gmarek Date: Thu, 29 Oct 2015 15:49:23 +0100 Subject: [PATCH] Add resource monitoring of kube-system pods --- test/e2e/density.go | 1 + test/e2e/framework.go | 13 +++ test/e2e/kubelet_stats.go | 192 ++++++++++++++++++++++++++++++++-- test/e2e/metrics_util.go | 3 +- test/e2e/monitor_resources.go | 13 ++- 5 files changed, 211 insertions(+), 11 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 4f1142e63b4..fa62b3cb0d9 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -112,6 +112,7 @@ var _ = Describe("Density", func() { framework := NewFramework("density") framework.NamespaceDeletionTimeout = time.Hour + framework.GatherKubeSystemResourceUsageData = true BeforeEach(func() { c = framework.Client diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 3c2927c921e..8c09775fe30 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -39,6 +39,11 @@ type Framework struct { Namespace *api.Namespace Client *client.Client NamespaceDeletionTimeout time.Duration + + // If set to true framework will start a goroutine monitoring resource usage of system add-ons. + // It will read the data every 30 seconds from all Nodes and print summary during afterEach. + GatherKubeSystemResourceUsageData bool + gatherer containerResourceGatherer } // NewFramework makes a new framework and sets up a BeforeEach/AfterEach for @@ -75,6 +80,10 @@ func (f *Framework) beforeEach() { } else { Logf("Skipping waiting for service account") } + + if f.GatherKubeSystemResourceUsageData { + f.gatherer.startGatheringData(c, time.Minute) + } } // afterEach deletes the namespace, after reading its events. @@ -113,6 +122,10 @@ func (f *Framework) afterEach() { } else { Logf("Found DeleteNamespace=false, skipping namespace deletion!") } + + if f.GatherKubeSystemResourceUsageData { + f.gatherer.stopAndPrintData([]int{50, 90, 99, 100}) + } // Paranoia-- prevent reuse! f.Namespace = nil f.Client = nil diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 72ee8323e11..3e76cf492eb 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "sort" "strconv" @@ -209,8 +210,10 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryUsageInBytes > rhs.MemoryUsageInBytes && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes } +type resourceUsagePerContainer map[string]*containerResourceUsage + // getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint -// and returns the resource usage of targetContainers for the past +// and returns the resource usage of all containerNames for the past // cpuInterval. // The acceptable range of the interval is 2s~120s. Be warned that as the // interval (and #containers) increases, the size of kubelet's response @@ -223,7 +226,19 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa // Note that this is an approximation and may not be accurate, hence we also // write the actual interval used for calculation (based on the timestamps of // the stats points in containerResourceUsage.CPUInterval. -func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterval time.Duration) (map[string]*containerResourceUsage, error) { +// +// containerNames is a function returning a collection of contianer names in which +// user is interested in. ExpectMissingContainers is a flag which says if the test +// should fail if one of containers listed by containerNames is missing on any node +// (useful e.g. when looking for system containers or daemons). If set to true function +// is more forgiving and ignores missing containers. +func getOneTimeResourceUsageOnNode( + c *client.Client, + nodeName string, + cpuInterval time.Duration, + containerNames func() []string, + expectMissingContainers bool, +) (resourceUsagePerContainer, error) { numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) if numStats < 2 || numStats > maxNumStatsToRequest { return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) @@ -238,12 +253,15 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva return nil, err } // Process container infos that are relevant to us. - containers := targetContainers() - usageMap := make(map[string]*containerResourceUsage, len(containers)) + containers := containerNames() + usageMap := make(resourceUsagePerContainer, len(containers)) for _, name := range containers { info, ok := containerInfos[name] if !ok { - return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName) + if !expectMissingContainers { + return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName) + } + continue } first := info.Stats[0] last := info.Stats[len(info.Stats)-1] @@ -252,12 +270,58 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva return usageMap, nil } +func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) { + pods, err := c.Pods("kube-system").List(labels.Everything(), fields.Everything()) + if err != nil { + return resourceUsagePerContainer{}, err + } + nodes, err := c.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return resourceUsagePerContainer{}, err + } + containerIDToNameMap := make(map[string]string) + containerIDs := make([]string, 0) + for _, pod := range pods.Items { + for _, container := range pod.Status.ContainerStatuses { + containerID := strings.TrimPrefix(container.ContainerID, "docker:/") + containerIDToNameMap[containerID] = pod.Name + "/" + container.Name + containerIDs = append(containerIDs, containerID) + } + } + + mutex := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(nodes.Items)) + errors := make([]error, 0) + nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap)) + for _, node := range nodes.Items { + go func(nodeName string) { + defer wg.Done() + nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 5*time.Second, func() []string { return containerIDs }, true) + mutex.Lock() + defer mutex.Unlock() + if err != nil { + errors = append(errors, err) + return + } + for k, v := range nodeUsage { + nameToUsageMap[containerIDToNameMap[k]] = v + } + }(node.Name) + } + wg.Wait() + if len(errors) != 0 { + return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors) + } + return nameToUsageMap, nil +} + // logOneTimeResourceUsageSummary collects container resource for the list of // nodes, formats and logs the stats. func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) { var summary []string for _, nodeName := range nodeNames { - stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval) + stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval, targetContainers, false) if err != nil { summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err)) } else { @@ -267,7 +331,7 @@ func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInt Logf("\n%s", strings.Join(summary, "\n")) } -func formatResourceUsageStats(nodeName string, containerStats map[string]*containerResourceUsage) string { +func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string { // Example output: // // Resource usage for node "e2e-test-foo-minion-abcde": @@ -287,6 +351,120 @@ func formatResourceUsageStats(nodeName string, containerStats map[string]*contai return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String()) } +type int64arr []int64 + +func (a int64arr) Len() int { return len(a) } +func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a int64arr) Less(i, j int) bool { return a[i] < a[j] } + +type usageDataPerContainer struct { + cpuData []float64 + memUseData []int64 + memWorkSetData []int64 +} + +func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { + if len(timeSeries) == 0 { + return make(map[int]resourceUsagePerContainer) + } + dataMap := make(map[string]*usageDataPerContainer) + for _, v := range timeSeries { + for k := range v { + dataMap[k] = &usageDataPerContainer{ + cpuData: make([]float64, len(timeSeries)), + memUseData: make([]int64, len(timeSeries)), + memWorkSetData: make([]int64, len(timeSeries)), + } + } + break + } + for _, singleStatistic := range timeSeries { + for name, data := range singleStatistic { + dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) + dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) + dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) + } + } + for _, v := range dataMap { + sort.Float64s(v.cpuData) + sort.Sort(int64arr(v.memUseData)) + sort.Sort(int64arr(v.memWorkSetData)) + } + + result := make(map[int]resourceUsagePerContainer) + for _, perc := range percentilesToCompute { + data := make(resourceUsagePerContainer) + for k, v := range dataMap { + percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 + data[k] = &containerResourceUsage{ + Name: k, + CPUUsageInCores: v.cpuData[percentileIndex], + MemoryUsageInBytes: v.memUseData[percentileIndex], + MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], + } + } + result[perc] = data + } + return result +} + +type containerResourceGatherer struct { + usageTimeseries map[time.Time]resourceUsagePerContainer + stopCh chan struct{} + timer *time.Ticker + wg sync.WaitGroup +} + +func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) { + g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer) + g.wg.Add(1) + g.stopCh = make(chan struct{}) + g.timer = time.NewTicker(period) + go func() error { + for { + select { + case <-g.timer.C: + now := time.Now() + data, err := getKubeSystemContainersResourceUsage(c) + if err != nil { + return err + } + g.usageTimeseries[now] = data + case <-g.stopCh: + g.wg.Done() + return nil + } + } + }() +} + +func (g *containerResourceGatherer) stopAndPrintData(percentiles []int) { + close(g.stopCh) + g.timer.Stop() + g.wg.Wait() + if len(percentiles) == 0 { + Logf("Warning! Empty percentile list for stopAndPrintData.") + return + } + stats := computePercentiles(g.usageTimeseries, percentiles) + sortedKeys := []string{} + for name := range stats[percentiles[0]] { + sortedKeys = append(sortedKeys, name) + } + sort.Strings(sortedKeys) + for _, perc := range percentiles { + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n") + for _, name := range sortedKeys { + usage := stats[perc][name] + fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024)) + } + w.Flush() + Logf("%v percentile:\n%v", perc, buf.String()) + } +} + // Performs a get on a node proxy endpoint given the nodename and rest client. func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result { return c.Get(). diff --git a/test/e2e/metrics_util.go b/test/e2e/metrics_util.go index ae008433c3c..ad1223e0895 100644 --- a/test/e2e/metrics_util.go +++ b/test/e2e/metrics_util.go @@ -110,7 +110,8 @@ func readLatencyMetrics(c *client.Client) (APIResponsiveness, error) { } ignoredResources := sets.NewString("events") - ignoredVerbs := sets.NewString("WATCHLIST", "PROXY") + // TODO: figure out why we're getting non-capitalized proxy and fix this. + ignoredVerbs := sets.NewString("WATCHLIST", "PROXY", "proxy") for _, sample := range samples { // Example line: diff --git a/test/e2e/monitor_resources.go b/test/e2e/monitor_resources.go index a3c279a0980..98f3748015a 100644 --- a/test/e2e/monitor_resources.go +++ b/test/e2e/monitor_resources.go @@ -29,8 +29,6 @@ import ( const datapointAmount = 5 -type resourceUsagePerContainer map[string]*containerResourceUsage - var systemContainers = []string{"/docker-daemon", "/kubelet", "/kube-proxy", "/system"} //TODO tweak those values. @@ -102,7 +100,13 @@ var _ = Describe("Resource usage of system containers", func() { for i := 0; i < datapointAmount; i++ { for _, node := range nodeList.Items { - resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second) + resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second, func() []string { + if providerIs("gce", "gke") { + return systemContainers + } else { + return []string{} + } + }, false) expectNoError(err) resourceUsagePerNode[node.Name] = append(resourceUsagePerNode[node.Name], resourceUsage) } @@ -119,6 +123,9 @@ var _ = Describe("Resource usage of system containers", func() { for container, cUsage := range usage { Logf("%v on %v usage: %#v", container, node, cUsage) if !allowedUsage[container].isStrictlyGreaterThan(cUsage) { + if _, ok := violating[node]; !ok { + violating[node] = make(resourceUsagePerContainer) + } if allowedUsage[container].CPUUsageInCores < cUsage.CPUUsageInCores { Logf("CPU is too high for %s (%v)", container, cUsage.CPUUsageInCores) }