diff --git a/test/e2e/kubelet.go b/test/e2e/kubelet.go index 6c9c52de2b7..d67e5aa7b96 100644 --- a/test/e2e/kubelet.go +++ b/test/e2e/kubelet.go @@ -34,8 +34,9 @@ import ( const ( // Interval to poll /runningpods on a node pollInterval = 1 * time.Second - // Interval used compute cpu usage of a container - cpuIntervalInSeconds = 60 + // Interval to poll /stats/container on a node + containerStatsPollingInterval = 5 * time.Second + resourceCollectionTime = 1 * time.Minute ) // getPodMatches returns a set of pod names on the given node that matches the @@ -87,10 +88,11 @@ func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, pod }) } -var _ = Describe("Clean up pods on node", func() { +var _ = Describe("kubelet", func() { var numNodes int var nodeNames util.StringSet - framework := NewFramework("kubelet-delete") + framework := NewFramework("kubelet") + var resourceMonitor *resourceMonitor BeforeEach(func() { nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything()) @@ -100,56 +102,73 @@ var _ = Describe("Clean up pods on node", func() { for _, node := range nodes.Items { nodeNames.Insert(node.Name) } - logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds) + resourceMonitor = newResourceMonitor(framework.Client, targetContainers, containerStatsPollingInterval) + resourceMonitor.Start() }) - type DeleteTest struct { - podsPerNode int - timeout time.Duration - } + AfterEach(func() { + resourceMonitor.Stop() + }) - deleteTests := []DeleteTest{ - {podsPerNode: 10, timeout: 1 * time.Minute}, - } + Describe("Clean up pods on node", func() { + type DeleteTest struct { + podsPerNode int + timeout time.Duration + } + deleteTests := []DeleteTest{ + {podsPerNode: 10, timeout: 1 * time.Minute}, + } + for _, itArg := range deleteTests { + name := fmt.Sprintf( + "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout) + It(name, func() { + totalPods := itArg.podsPerNode * numNodes + By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) + rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID())) - for _, itArg := range deleteTests { - name := fmt.Sprintf( - "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout) - It(name, func() { - totalPods := itArg.podsPerNode * numNodes + Expect(RunRC(RCConfig{ + Client: framework.Client, + Name: rcName, + Namespace: framework.Namespace.Name, + Image: "gcr.io/google_containers/pause:go", + Replicas: totalPods, + })).NotTo(HaveOccurred()) + // Perform a sanity check so that we know all desired pods are + // running on the nodes according to kubelet. The timeout is set to + // only 30 seconds here because RunRC already waited for all pods to + // transition to the running status. + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods, + time.Second*30)).NotTo(HaveOccurred()) + resourceMonitor.LogLatest() - By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) - rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID())) + By("Deleting the RC") + DeleteRC(framework.Client, framework.Namespace.Name, rcName) + // Check that the pods really are gone by querying /runningpods on the + // node. The /runningpods handler checks the container runtime (or its + // cache) and returns a list of running pods. Some possible causes of + // failures are: + // - kubelet deadlock + // - a bug in graceful termination (if it is enabled) + // - docker slow to delete pods (or resource problems causing slowness) + start := time.Now() + Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0, + itArg.timeout)).NotTo(HaveOccurred()) + Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames), + time.Since(start)) + resourceMonitor.LogCPUSummary() + }) + } + }) - Expect(RunRC(RCConfig{ - Client: framework.Client, - Name: rcName, - Namespace: framework.Namespace.Name, - Image: "gcr.io/google_containers/pause:go", - Replicas: totalPods, - })).NotTo(HaveOccurred()) - // Perform a sanity check so that we know all desired pods are - // running on the nodes according to kubelet. The timeout is set to - // only 30 seconds here because RunRC already waited for all pods to - // transition to the running status. - Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods, - time.Second*30)).NotTo(HaveOccurred()) - logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds) - - By("Deleting the RC") - DeleteRC(framework.Client, framework.Namespace.Name, rcName) - // Check that the pods really are gone by querying /runningpods on the - // node. The /runningpods handler checks the container runtime (or its - // cache) and returns a list of running pods. Some possible causes of - // failures are: - // - kubelet deadlock - // - a bug in graceful termination (if it is enabled) - // - docker slow to delete pods (or resource problems causing slowness) - start := time.Now() - Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0, - itArg.timeout)).NotTo(HaveOccurred()) - Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames), - time.Since(start)) + Describe("Monitor resource usage on node", func() { + It("Ask kubelet to report container resource usage", func() { + // TODO: After gathering some numbers, we should set a resource + // limit for each container and fail the test if the usage exceeds + // the preset limit. + By(fmt.Sprintf("Waiting %v to collect resource usage on node", resourceCollectionTime)) + time.Sleep(resourceCollectionTime) + resourceMonitor.LogLatest() + resourceMonitor.LogCPUSummary() }) - } + }) }) diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 48ec7b5face..d94fdb88c03 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -30,8 +30,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" cadvisor "github.com/google/cadvisor/info/v1" @@ -200,9 +202,6 @@ type containerResourceUsage struct { // interval (and #containers) increases, the size of kubelet's response // could be sigificant. E.g., the 60s interval stats for ~20 containers is // ~1.5MB. Don't hammer the node with frequent, heavy requests. -// TODO: Implement a constant, lightweight resource monitor, which polls -// kubelet every few second, stores the data, and reports meaningful statistics -// numbers over a longer period (e.g., max/mean cpu usage in the last hour). // // cadvisor records cumulative cpu usage in nanoseconds, so we need to have two // stats points to compute the cpu usage over the interval. Assuming cadvisor @@ -233,14 +232,7 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva } first := info.Stats[0] last := info.Stats[len(info.Stats)-1] - usageMap[name] = &containerResourceUsage{ - Name: name, - Timestamp: last.Timestamp, - CPUUsageInCores: float64(last.Cpu.Usage.Total-first.Cpu.Usage.Total) / float64(last.Timestamp.Sub(first.Timestamp).Nanoseconds()), - MemoryUsageInBytes: int64(last.Memory.Usage), - MemoryWorkingSetInBytes: int64(last.Memory.WorkingSet), - CPUInterval: last.Timestamp.Sub(first.Timestamp), - } + usageMap[name] = computeContainerResourceUsage(name, first, last) } return usageMap, nil } @@ -324,3 +316,186 @@ func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) { } return result, nil } + +func computeContainerResourceUsage(name string, oldStats, newStats *cadvisor.ContainerStats) *containerResourceUsage { + return &containerResourceUsage{ + Name: name, + Timestamp: newStats.Timestamp, + CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()), + MemoryUsageInBytes: int64(newStats.Memory.Usage), + MemoryWorkingSetInBytes: int64(newStats.Memory.WorkingSet), + CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp), + } +} + +// resourceCollector periodically polls the node, collect stats for a given +// list of containers, computes and cache resource usage up to +// maxEntriesPerContainer for each container. +type resourceCollector struct { + node string + containers []string + client *client.Client + buffers map[string][]*containerResourceUsage + pollingInterval time.Duration + stopCh chan struct{} +} + +func newResourceCollector(c *client.Client, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector { + buffers := make(map[string][]*containerResourceUsage) + return &resourceCollector{ + node: nodeName, + containers: containerNames, + client: c, + buffers: buffers, + pollingInterval: pollingInterval, + } +} + +// Start starts a goroutine to poll the node every pollingInerval. +func (r *resourceCollector) Start() { + r.stopCh = make(chan struct{}, 1) + // Keep the last observed stats for comparison. + oldStats := make(map[string]*cadvisor.ContainerStats) + go util.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) +} + +// Stop sends a signal to terminate the stats collecting goroutine. +func (r *resourceCollector) Stop() { + close(r.stopCh) +} + +// collectStats gets the latest stats from kubelet's /stats/container, computes +// the resource usage, and pushes it to the buffer. +func (r *resourceCollector) collectStats(oldStats map[string]*cadvisor.ContainerStats) { + infos, err := getContainerInfo(r.client, r.node, &kubelet.StatsRequest{ + ContainerName: "/", + NumStats: 1, + Subcontainers: true, + }) + if err != nil { + Logf("Error getting container info on %q, err: %v", r.node, err) + return + } + for _, name := range r.containers { + info, ok := infos[name] + if !ok || len(info.Stats) < 1 { + Logf("Missing info/stats for container %q on node %q", name, r.node) + return + } + if _, ok := oldStats[name]; ok { + r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats[name], info.Stats[0])) + } + oldStats[name] = info.Stats[0] + } +} + +// LogLatest logs the latest resource usage of each container. +func (r *resourceCollector) LogLatest() { + stats := make(map[string]*containerResourceUsage) + for _, name := range r.containers { + s := r.buffers[name][len(r.buffers)-1] + if s == nil { + Logf("Resource usage on node %q is not ready yet", r.node) + return + } + stats[name] = s + } + Logf("\n%s", formatResourceUsageStats(r.node, stats)) +} + +type resourceUsageByCPU []*containerResourceUsage + +func (r resourceUsageByCPU) Len() int { return len(r) } +func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores } + +var percentiles = [...]float64{0.05, 0.50, 0.90, 0.95} + +// GetBasicCPUStats returns the 5-th, 50-th, and 95-th, percentiles the cpu +// usage in cores for containerName. This method examines all data currently in the buffer. +func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 { + result := make(map[float64]float64, len(percentiles)) + usages := r.buffers[containerName] + sort.Sort(resourceUsageByCPU(usages)) + for _, q := range percentiles { + index := int(float64(len(usages))*q) - 1 + if index < 0 { + index = 0 + } + result[q] = usages[index].CPUUsageInCores + } + return result +} + +// resourceMonitor manages a resourceCollector per node. +type resourceMonitor struct { + client *client.Client + containers []string + pollingInterval time.Duration + collectors map[string]*resourceCollector +} + +func newResourceMonitor(c *client.Client, containerNames []string, pollingInterval time.Duration) *resourceMonitor { + return &resourceMonitor{ + containers: containerNames, + client: c, + pollingInterval: pollingInterval, + } +} + +func (r *resourceMonitor) Start() { + nodes, err := r.client.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + Failf("resourceMonitor: unable to get list of nodes: %v", err) + } + r.collectors = make(map[string]*resourceCollector, 0) + for _, node := range nodes.Items { + collector := newResourceCollector(r.client, node.Name, r.containers, pollInterval) + r.collectors[node.Name] = collector + collector.Start() + } +} + +func (r *resourceMonitor) Stop() { + for _, collector := range r.collectors { + collector.Stop() + } +} + +func (r *resourceMonitor) LogLatest() { + for _, collector := range r.collectors { + collector.LogLatest() + } +} + +func (r *resourceMonitor) LogCPUSummary() { + // Example output for a node: + // CPU usage of containers on node "e2e-test-yjhong-minion-0vj7": + // container 5th% 50th% 90th% 95th% + // "/" 0.051 0.159 0.387 0.455 + // "/docker-daemon" 0.000 0.000 0.146 0.166 + // "/kubelet" 0.036 0.053 0.091 0.154 + // "/kube-proxy" 0.017 0.000 0.000 0.000 + // "/system" 0.001 0.001 0.001 0.002 + var header []string + header = append(header, "container") + for _, p := range percentiles { + header = append(header, fmt.Sprintf("%.0fth%%", p*100)) + } + for nodeName, collector := range r.collectors { + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "%s\n", strings.Join(header, "\t")) + for _, containerName := range targetContainers { + data := collector.GetBasicCPUStats(containerName) + var s []string + s = append(s, fmt.Sprintf("%q", containerName)) + for _, p := range percentiles { + s = append(s, fmt.Sprintf("%.3f", data[p])) + } + fmt.Fprintf(w, "%s\n", strings.Join(s, "\t")) + } + w.Flush() + Logf("\nCPU usage of containers on node %q:\n%s", nodeName, buf.String()) + } +}