diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index f877413f0ea..ba30ffa4000 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -35,6 +35,7 @@ go_library( "//pkg/controller:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", + "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/sysctl:go_default_library", "//pkg/master/ports:go_default_library", @@ -80,7 +81,6 @@ go_library( "//staging/src/k8s.io/component-base/version:go_default_library", "//test/e2e/framework/auth:go_default_library", "//test/e2e/framework/ginkgowrapper:go_default_library", - "//test/e2e/framework/kubelet:go_default_library", "//test/e2e/framework/metrics:go_default_library", "//test/e2e/framework/node:go_default_library", "//test/e2e/framework/pod:go_default_library", diff --git a/test/e2e/framework/kubelet/BUILD b/test/e2e/framework/kubelet/BUILD index c48e9d3aa09..062c77dffc0 100644 --- a/test/e2e/framework/kubelet/BUILD +++ b/test/e2e/framework/kubelet/BUILD @@ -18,7 +18,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", - "//test/e2e/framework/log:go_default_library", + "//test/e2e/framework:go_default_library", "//test/e2e/framework/metrics:go_default_library", ], ) diff --git a/test/e2e/framework/kubelet/kubelet_pods.go b/test/e2e/framework/kubelet/kubelet_pods.go index 79f2973d88d..2e8d6093d95 100644 --- a/test/e2e/framework/kubelet/kubelet_pods.go +++ b/test/e2e/framework/kubelet/kubelet_pods.go @@ -20,7 +20,7 @@ import ( v1 "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/master/ports" - e2elog "k8s.io/kubernetes/test/e2e/framework/log" + "k8s.io/kubernetes/test/e2e/framework" ) // GetKubeletPods retrieves the list of pods on the kubelet. @@ -51,13 +51,13 @@ func getKubeletPods(c clientset.Interface, node, resource string) (*v1.PodList, func PrintAllKubeletPods(c clientset.Interface, nodeName string) { podList, err := GetKubeletPods(c, nodeName) if err != nil { - e2elog.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err) + framework.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err) return } for _, p := range podList.Items { - e2elog.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses)) + framework.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses)) for _, c := range p.Status.ContainerStatuses { - e2elog.Logf("\tContainer %v ready: %v, restart count %v", + framework.Logf("\tContainer %v ready: %v, restart count %v", c.Name, c.Ready, c.RestartCount) } } diff --git a/test/e2e/framework/kubelet/stats.go b/test/e2e/framework/kubelet/stats.go index e0e66445748..c315ae90802 100644 --- a/test/e2e/framework/kubelet/stats.go +++ b/test/e2e/framework/kubelet/stats.go @@ -36,7 +36,7 @@ import ( kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics" "k8s.io/kubernetes/pkg/master/ports" - e2elog "k8s.io/kubernetes/test/e2e/framework/log" + "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" ) @@ -118,7 +118,7 @@ func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor } nodes, err := m.client.CoreV1().Nodes().List(metav1.ListOptions{}) if err != nil { - e2elog.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) + framework.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) } for _, node := range nodes.Items { m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate) @@ -134,7 +134,7 @@ func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]Node for node := range m.nodesRuntimeOps { nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) if err != nil { - e2elog.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + framework.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) continue } m.nodesRuntimeOps[node] = nodeResult @@ -150,7 +150,7 @@ func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[strin oldNodeResult := m.nodesRuntimeOps[node] curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) if err != nil { - e2elog.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + framework.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) continue } for op, cur := range curNodeResult { @@ -239,90 +239,6 @@ func GetStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alp return &summary, nil } -func removeUint64Ptr(ptr *uint64) uint64 { - if ptr == nil { - return 0 - } - return *ptr -} - -// GetOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint -// and returns the resource usage of all containerNames for the past -// cpuInterval. -// The acceptable range of the interval is 2s~120s. Be warned that as the -// interval (and #containers) increases, the size of kubelet's response -// could be significant. E.g., the 60s interval stats for ~20 containers is -// ~1.5MB. Don't hammer the node with frequent, heavy requests. -// -// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two -// stats points to compute the cpu usage over the interval. Assuming cadvisor -// polls every second, we'd need to get N stats points for N-second interval. -// Note that this is an approximation and may not be accurate, hence we also -// write the actual interval used for calculation (based on the timestamps of -// the stats points in ContainerResourceUsage.CPUInterval. -// -// containerNames is a function returning a collection of container names in which -// user is interested in. -func GetOneTimeResourceUsageOnNode( - c clientset.Interface, - nodeName string, - cpuInterval time.Duration, - containerNames func() []string, -) (ResourceUsagePerContainer, error) { - const ( - // cadvisor records stats about every second. - cadvisorStatsPollingIntervalInSeconds float64 = 1.0 - // cadvisor caches up to 2 minutes of stats (configured by kubelet). - maxNumStatsToRequest int = 120 - ) - - numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) - if numStats < 2 || numStats > maxNumStatsToRequest { - return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) - } - // Get information of all containers on the node. - summary, err := GetStatsSummary(c, nodeName) - if err != nil { - return nil, err - } - - f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage { - if newStats == nil || newStats.CPU == nil || newStats.Memory == nil { - return nil - } - return &ContainerResourceUsage{ - Name: name, - Timestamp: newStats.StartTime.Time, - CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000, - MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes), - MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes), - MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes), - CPUInterval: 0, - } - } - // Process container infos that are relevant to us. - containers := containerNames() - usageMap := make(ResourceUsagePerContainer, len(containers)) - for _, pod := range summary.Pods { - for _, container := range pod.Containers { - isInteresting := false - for _, interestingContainerName := range containers { - if container.Name == interestingContainerName { - isInteresting = true - break - } - } - if !isInteresting { - continue - } - if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil { - usageMap[pod.PodRef.Name+"/"+container.Name] = usage - } - } - } - return usageMap, nil -} - func getNodeStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { data, err := c.CoreV1().RESTClient().Get(). Resource("nodes"). @@ -463,7 +379,7 @@ func (r *resourceCollector) Stop() { func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) { summary, err := getNodeStatsSummary(r.client, r.node) if err != nil { - e2elog.Logf("Error getting node stats summary on %q, err: %v", r.node, err) + framework.Logf("Error getting node stats summary on %q, err: %v", r.node, err) return } cStatsMap := getSystemContainerStats(summary) @@ -472,7 +388,7 @@ func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1a for _, name := range r.containers { cStats, ok := cStatsMap[name] if !ok { - e2elog.Logf("Missing info/stats for container %q on node %q", name, r.node) + framework.Logf("Missing info/stats for container %q on node %q", name, r.node) return } @@ -565,7 +481,7 @@ func (r *ResourceMonitor) Start() { // It should be OK to monitor unschedulable Nodes nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{}) if err != nil { - e2elog.Failf("ResourceMonitor: unable to get list of nodes: %v", err) + framework.Failf("ResourceMonitor: unable to get list of nodes: %v", err) } r.collectors = make(map[string]*resourceCollector, 0) for _, node := range nodes.Items { @@ -593,9 +509,9 @@ func (r *ResourceMonitor) Reset() { func (r *ResourceMonitor) LogLatest() { summary, err := r.GetLatest() if err != nil { - e2elog.Logf("%v", err) + framework.Logf("%v", err) } - e2elog.Logf("%s", r.FormatResourceUsage(summary)) + framework.Logf("%s", r.FormatResourceUsage(summary)) } // FormatResourceUsage returns the formatted string for LogLatest(). @@ -699,7 +615,7 @@ func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string { // LogCPUSummary outputs summary of CPU into log. func (r *ResourceMonitor) LogCPUSummary() { summary := r.GetCPUSummary() - e2elog.Logf("%s", r.FormatCPUSummary(summary)) + framework.Logf("%s", r.FormatCPUSummary(summary)) } // GetCPUSummary returns summary of CPU. diff --git a/test/e2e/framework/resource_usage_gatherer.go b/test/e2e/framework/resource_usage_gatherer.go index bbb6ce4a873..9ce357b081b 100644 --- a/test/e2e/framework/resource_usage_gatherer.go +++ b/test/e2e/framework/resource_usage_gatherer.go @@ -18,6 +18,8 @@ package framework import ( "bytes" + "context" + "encoding/json" "fmt" "math" "sort" @@ -31,7 +33,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" - e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" + kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/test/e2e/system" ) @@ -48,6 +51,21 @@ type SingleContainerSummary struct { Mem uint64 } +// ContainerResourceUsage is a structure for gathering container resource usage. +type ContainerResourceUsage struct { + Name string + Timestamp time.Time + CPUUsageInCores float64 + MemoryUsageInBytes uint64 + MemoryWorkingSetInBytes uint64 + MemoryRSSInBytes uint64 + // The interval used to calculate CPUUsageInCores. + CPUInterval time.Duration +} + +// ResourceUsagePerContainer is map of ContainerResourceUsage +type ResourceUsagePerContainer map[string]*ContainerResourceUsage + // ResourceUsageSummary is a struct to hold resource usage summary. // we can't have int here, as JSON does not accept integer keys. type ResourceUsageSummary map[string][]SingleContainerSummary @@ -92,9 +110,9 @@ type usageDataPerContainer struct { memWorkSetData []uint64 } -func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, percentilesToCompute []int) map[int]e2ekubelet.ResourceUsagePerContainer { +func computePercentiles(timeSeries []ResourceUsagePerContainer, percentilesToCompute []int) map[int]ResourceUsagePerContainer { if len(timeSeries) == 0 { - return make(map[int]e2ekubelet.ResourceUsagePerContainer) + return make(map[int]ResourceUsagePerContainer) } dataMap := make(map[string]*usageDataPerContainer) for i := range timeSeries { @@ -117,12 +135,12 @@ func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, perce sort.Sort(uint64arr(v.memWorkSetData)) } - result := make(map[int]e2ekubelet.ResourceUsagePerContainer) + result := make(map[int]ResourceUsagePerContainer) for _, perc := range percentilesToCompute { - data := make(e2ekubelet.ResourceUsagePerContainer) + data := make(ResourceUsagePerContainer) for k, v := range dataMap { percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 - data[k] = &e2ekubelet.ContainerResourceUsage{ + data[k] = &ContainerResourceUsage{ Name: k, CPUUsageInCores: v.cpuData[percentileIndex], MemoryUsageInBytes: v.memUseData[percentileIndex], @@ -134,8 +152,8 @@ func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, perce return result } -func leftMergeData(left, right map[int]e2ekubelet.ResourceUsagePerContainer) map[int]e2ekubelet.ResourceUsagePerContainer { - result := make(map[int]e2ekubelet.ResourceUsagePerContainer) +func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]ResourceUsagePerContainer { + result := make(map[int]ResourceUsagePerContainer) for percentile, data := range left { result[percentile] = data if _, ok := right[percentile]; !ok { @@ -154,7 +172,7 @@ type resourceGatherWorker struct { wg *sync.WaitGroup containerIDs []string stopCh chan struct{} - dataSeries []e2ekubelet.ResourceUsagePerContainer + dataSeries []ResourceUsagePerContainer finished bool inKubemark bool resourceDataGatheringPeriod time.Duration @@ -163,21 +181,21 @@ type resourceGatherWorker struct { } func (w *resourceGatherWorker) singleProbe() { - data := make(e2ekubelet.ResourceUsagePerContainer) + data := make(ResourceUsagePerContainer) if w.inKubemark { kubemarkData := GetKubemarkMasterComponentsResourceUsage() if data == nil { return } for k, v := range kubemarkData { - data[k] = &e2ekubelet.ContainerResourceUsage{ + data[k] = &ContainerResourceUsage{ Name: v.Name, MemoryWorkingSetInBytes: v.MemoryWorkingSetInBytes, CPUUsageInCores: v.CPUUsageInCores, } } } else { - nodeUsage, err := e2ekubelet.GetOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs }) + nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs }) if err != nil { Logf("Error while reading data from %v: %v", w.nodeName, err) return @@ -192,6 +210,115 @@ func (w *resourceGatherWorker) singleProbe() { w.dataSeries = append(w.dataSeries, data) } +// getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint +// and returns the resource usage of all containerNames for the past +// cpuInterval. +// The acceptable range of the interval is 2s~120s. Be warned that as the +// interval (and #containers) increases, the size of kubelet's response +// could be significant. E.g., the 60s interval stats for ~20 containers is +// ~1.5MB. Don't hammer the node with frequent, heavy requests. +// +// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two +// stats points to compute the cpu usage over the interval. Assuming cadvisor +// polls every second, we'd need to get N stats points for N-second interval. +// Note that this is an approximation and may not be accurate, hence we also +// write the actual interval used for calculation (based on the timestamps of +// the stats points in ContainerResourceUsage.CPUInterval. +// +// containerNames is a function returning a collection of container names in which +// user is interested in. +func getOneTimeResourceUsageOnNode( + c clientset.Interface, + nodeName string, + cpuInterval time.Duration, + containerNames func() []string, +) (ResourceUsagePerContainer, error) { + const ( + // cadvisor records stats about every second. + cadvisorStatsPollingIntervalInSeconds float64 = 1.0 + // cadvisor caches up to 2 minutes of stats (configured by kubelet). + maxNumStatsToRequest int = 120 + ) + + numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) + if numStats < 2 || numStats > maxNumStatsToRequest { + return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) + } + // Get information of all containers on the node. + summary, err := getStatsSummary(c, nodeName) + if err != nil { + return nil, err + } + + f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage { + if newStats == nil || newStats.CPU == nil || newStats.Memory == nil { + return nil + } + return &ContainerResourceUsage{ + Name: name, + Timestamp: newStats.StartTime.Time, + CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000, + MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes), + MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes), + MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes), + CPUInterval: 0, + } + } + // Process container infos that are relevant to us. + containers := containerNames() + usageMap := make(ResourceUsagePerContainer, len(containers)) + for _, pod := range summary.Pods { + for _, container := range pod.Containers { + isInteresting := false + for _, interestingContainerName := range containers { + if container.Name == interestingContainerName { + isInteresting = true + break + } + } + if !isInteresting { + continue + } + if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil { + usageMap[pod.PodRef.Name+"/"+container.Name] = usage + } + } + } + return usageMap, nil +} + +// getStatsSummary contacts kubelet for the container information. +func getStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { + ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout) + defer cancel() + + data, err := c.CoreV1().RESTClient().Get(). + Context(ctx). + Resource("nodes"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). + Suffix("stats/summary"). + Do().Raw() + + if err != nil { + return nil, err + } + + summary := kubeletstatsv1alpha1.Summary{} + err = json.Unmarshal(data, &summary) + if err != nil { + return nil, err + } + return &summary, nil +} + +func removeUint64Ptr(ptr *uint64) uint64 { + if ptr == nil { + return 0 + } + return *ptr +} + func (w *resourceGatherWorker) gather(initialSleep time.Duration) { defer utilruntime.HandleCrash() defer w.wg.Done() @@ -367,7 +494,7 @@ func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constrai Logf("Warning! Empty percentile list for stopAndPrintData.") return &ResourceUsageSummary{}, fmt.Errorf("Failed to get any resource usage data") } - data := make(map[int]e2ekubelet.ResourceUsagePerContainer) + data := make(map[int]ResourceUsagePerContainer) for i := range g.workers { if g.workers[i].finished { stats := computePercentiles(g.workers[i].dataSeries, percentiles) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 0319b12e9dc..661cd29391a 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -74,7 +74,6 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/master/ports" taintutils "k8s.io/kubernetes/pkg/util/taints" - e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -103,6 +102,9 @@ const ( // PodDeleteTimeout is how long to wait for a pod to be deleted. PodDeleteTimeout = 5 * time.Minute + // PodGetTimeout is how long to wait for a pod to be got. + PodGetTimeout = 2 * time.Minute + // PodEventTimeout is how much we wait for a pod event to occur. PodEventTimeout = 2 * time.Minute @@ -1370,7 +1372,7 @@ func DumpNodeDebugInfo(c clientset.Interface, nodeNames []string, logFunc func(f e.Source, e.Type, e.Message, e.Reason, e.FirstTimestamp, e.LastTimestamp, e.InvolvedObject) } logFunc("\nLogging pods the kubelet thinks is on node %v", n) - podList, err := e2ekubelet.GetKubeletPods(c, n) + podList, err := getKubeletPods(c, n) if err != nil { logFunc("Unable to retrieve kubelet pods for node %v: %v", n, err) continue @@ -1391,6 +1393,33 @@ func DumpNodeDebugInfo(c clientset.Interface, nodeNames []string, logFunc func(f } } +// getKubeletPods retrieves the list of pods on the kubelet. +func getKubeletPods(c clientset.Interface, node string) (*v1.PodList, error) { + var result *v1.PodList + var client restclient.Result + finished := make(chan struct{}, 1) + go func() { + // call chain tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. #22165 + client = c.CoreV1().RESTClient().Get(). + Resource("nodes"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). + Suffix("pods"). + Do() + + finished <- struct{}{} + }() + select { + case <-finished: + if err := client.Into(result); err != nil { + return &v1.PodList{}, err + } + return result, nil + case <-time.After(PodGetTimeout): + return &v1.PodList{}, fmt.Errorf("Waiting up to %v for getting the list of pods", PodGetTimeout) + } +} + // logNodeEvents logs kubelet events from the given node. This includes kubelet // restart and node unhealthy events. Note that listing events like this will mess // with latency metrics, beware of calling it during a test.