From 92b0310cc5cbe9b483d8d6b6ede69f227c9e4887 Mon Sep 17 00:00:00 2001 From: WanLinghao Date: Mon, 29 Jul 2019 16:23:58 +0800 Subject: [PATCH] Refactor and clean up e2e framework utils, this patch handles test/e2e/framework/kubelet_stats.go file --- test/e2e/framework/BUILD | 6 - test/e2e/framework/kubelet/BUILD | 7 + test/e2e/framework/kubelet/kubelet_pods.go | 17 + test/e2e/framework/kubelet/stats.go | 677 ++++++++++++++ test/e2e/framework/kubelet_stats.go | 858 ------------------ test/e2e/framework/metrics/kubelet_metrics.go | 6 +- test/e2e/framework/resource_usage_gatherer.go | 14 +- test/e2e/framework/util.go | 3 +- test/e2e/node/kubelet.go | 4 +- test/e2e/node/kubelet_perf.go | 16 +- test/e2e/node/node_problem_detector.go | 3 +- test/e2e/scheduling/BUILD | 1 + .../equivalence_cache_predicates.go | 3 +- test/e2e/scheduling/predicates.go | 3 +- test/e2e_node/density_test.go | 6 +- test/e2e_node/resource_collector.go | 2 +- test/e2e_node/resource_usage_test.go | 10 +- test/e2e_node/util.go | 3 +- 18 files changed, 747 insertions(+), 892 deletions(-) delete mode 100644 test/e2e/framework/kubelet_stats.go diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index b851e6e99be..64a7bc414bd 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -12,7 +12,6 @@ go_library( "framework.go", "get-kubemark-resource-usage.go", "google_compute.go", - "kubelet_stats.go", "log_size_monitoring.go", "networking_utils.go", "nodes_util.go", @@ -37,10 +36,7 @@ go_library( "//pkg/controller/service:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", - "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", - "//pkg/kubelet/dockershim/metrics:go_default_library", "//pkg/kubelet/events:go_default_library", - "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/sysctl:go_default_library", "//pkg/master/ports:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", @@ -63,7 +59,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", @@ -109,7 +104,6 @@ go_library( "//vendor/github.com/onsi/gomega:go_default_library", "//vendor/github.com/onsi/gomega/types:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", - "//vendor/github.com/prometheus/common/model:go_default_library", "//vendor/golang.org/x/net/websocket:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/test/e2e/framework/kubelet/BUILD b/test/e2e/framework/kubelet/BUILD index 194c1b60fac..ed8151cbb2a 100644 --- a/test/e2e/framework/kubelet/BUILD +++ b/test/e2e/framework/kubelet/BUILD @@ -9,9 +9,16 @@ go_library( importpath = "k8s.io/kubernetes/test/e2e/framework/kubelet", visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", + "//pkg/kubelet/dockershim/metrics:go_default_library", "//pkg/master/ports:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//test/e2e/framework/log:go_default_library", + "//test/e2e/framework/metrics:go_default_library", "//test/e2e/framework/node:go_default_library", ], ) diff --git a/test/e2e/framework/kubelet/kubelet_pods.go b/test/e2e/framework/kubelet/kubelet_pods.go index 06dee8f98fb..012cf25170a 100644 --- a/test/e2e/framework/kubelet/kubelet_pods.go +++ b/test/e2e/framework/kubelet/kubelet_pods.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/master/ports" + e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" ) @@ -46,3 +47,19 @@ func getKubeletPods(c clientset.Interface, node, resource string) (*v1.PodList, } return result, nil } + +// PrintAllKubeletPods outputs status of all kubelet pods into log. +func PrintAllKubeletPods(c clientset.Interface, nodeName string) { + podList, err := GetKubeletPods(c, nodeName) + if err != nil { + e2elog.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err) + return + } + for _, p := range podList.Items { + e2elog.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses)) + for _, c := range p.Status.ContainerStatuses { + e2elog.Logf("\tContainer %v ready: %v, restart count %v", + c.Name, c.Ready, c.RestartCount) + } + } +} diff --git a/test/e2e/framework/kubelet/stats.go b/test/e2e/framework/kubelet/stats.go index fdd17fb0976..b1d1f28789f 100644 --- a/test/e2e/framework/kubelet/stats.go +++ b/test/e2e/framework/kubelet/stats.go @@ -17,7 +17,27 @@ limitations under the License. package kubelet import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sort" + "strings" + "sync" + "text/tabwriter" "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + + clientset "k8s.io/client-go/kubernetes" + kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics" + "k8s.io/kubernetes/pkg/master/ports" + e2elog "k8s.io/kubernetes/test/e2e/framework/log" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" ) // ContainerResourceUsage is a structure for gathering container resource usage. @@ -45,3 +65,660 @@ type ContainersCPUSummary map[string]map[float64]float64 // NodesCPUSummary is indexed by the node name with each entry a // ContainersCPUSummary map. type NodesCPUSummary map[string]ContainersCPUSummary + +// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics. +type RuntimeOperationMonitor struct { + client clientset.Interface + nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate +} + +// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node. +type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate + +// RuntimeOperationErrorRate is the error rate of a specified runtime operation. +type RuntimeOperationErrorRate struct { + TotalNumber float64 + ErrorRate float64 + TimeoutRate float64 +} + +// NewRuntimeOperationMonitor returns a new RuntimeOperationMonitor. +func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor { + m := &RuntimeOperationMonitor{ + client: c, + nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate), + } + nodes, err := m.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + e2elog.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) + } + for _, node := range nodes.Items { + m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate) + } + // Initialize the runtime operation error rate + m.GetRuntimeOperationErrorRate() + return m +} + +// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate +// error rates of all runtime operations. +func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + for node := range m.nodesRuntimeOps { + nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + e2elog.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + m.nodesRuntimeOps[node] = nodeResult + } + return m.nodesRuntimeOps +} + +// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate. +func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + result := make(map[string]NodeRuntimeOperationErrorRate) + for node := range m.nodesRuntimeOps { + result[node] = make(NodeRuntimeOperationErrorRate) + oldNodeResult := m.nodesRuntimeOps[node] + curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + e2elog.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + for op, cur := range curNodeResult { + t := *cur + if old, found := oldNodeResult[op]; found { + t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TotalNumber -= old.TotalNumber + } + result[node][op] = &t + } + m.nodesRuntimeOps[node] = curNodeResult + } + return result +} + +// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string. +func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string { + lines := []string{} + for node, nodeResult := range nodesResult { + lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node)) + for op, result := range nodeResult { + line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op, + result.TotalNumber, result.ErrorRate, result.TimeoutRate) + lines = append(lines, line) + } + lines = append(lines, fmt.Sprintln()) + } + return strings.Join(lines, "\n") +} + +// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node. +func getNodeRuntimeOperationErrorRate(c clientset.Interface, node string) (NodeRuntimeOperationErrorRate, error) { + result := make(NodeRuntimeOperationErrorRate) + ms, err := e2emetrics.GetKubeletMetrics(c, node) + if err != nil { + return result, err + } + // If no corresponding metrics are found, the returned samples will be empty. Then the following + // loop will be skipped automatically. + allOps := ms[dockermetrics.DockerOperationsKey] + errOps := ms[dockermetrics.DockerOperationsErrorsKey] + timeoutOps := ms[dockermetrics.DockerOperationsTimeoutKey] + for _, sample := range allOps { + operation := string(sample.Metric["operation_type"]) + result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)} + } + for _, sample := range errOps { + operation := string(sample.Metric["operation_type"]) + // Should always find the corresponding item, just in case + if _, found := result[operation]; found { + result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber + } + } + for _, sample := range timeoutOps { + operation := string(sample.Metric["operation_type"]) + if _, found := result[operation]; found { + result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber + } + } + return result, nil +} + +// GetStatsSummary contacts kubelet for the container information. +func GetStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { + ctx, cancel := context.WithTimeout(context.Background(), e2emetrics.SingleCallTimeout) + defer cancel() + + data, err := c.CoreV1().RESTClient().Get(). + Context(ctx). + Resource("nodes"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). + Suffix("stats/summary"). + Do().Raw() + + if err != nil { + return nil, err + } + + summary := kubeletstatsv1alpha1.Summary{} + err = json.Unmarshal(data, &summary) + if err != nil { + return nil, err + } + return &summary, nil +} + +func removeUint64Ptr(ptr *uint64) uint64 { + if ptr == nil { + return 0 + } + return *ptr +} + +// GetOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint +// and returns the resource usage of all containerNames for the past +// cpuInterval. +// The acceptable range of the interval is 2s~120s. Be warned that as the +// interval (and #containers) increases, the size of kubelet's response +// could be significant. E.g., the 60s interval stats for ~20 containers is +// ~1.5MB. Don't hammer the node with frequent, heavy requests. +// +// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two +// stats points to compute the cpu usage over the interval. Assuming cadvisor +// polls every second, we'd need to get N stats points for N-second interval. +// Note that this is an approximation and may not be accurate, hence we also +// write the actual interval used for calculation (based on the timestamps of +// the stats points in ContainerResourceUsage.CPUInterval. +// +// containerNames is a function returning a collection of container names in which +// user is interested in. +func GetOneTimeResourceUsageOnNode( + c clientset.Interface, + nodeName string, + cpuInterval time.Duration, + containerNames func() []string, +) (ResourceUsagePerContainer, error) { + const ( + // cadvisor records stats about every second. + cadvisorStatsPollingIntervalInSeconds float64 = 1.0 + // cadvisor caches up to 2 minutes of stats (configured by kubelet). + maxNumStatsToRequest int = 120 + ) + + numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) + if numStats < 2 || numStats > maxNumStatsToRequest { + return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) + } + // Get information of all containers on the node. + summary, err := GetStatsSummary(c, nodeName) + if err != nil { + return nil, err + } + + f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage { + if newStats == nil || newStats.CPU == nil || newStats.Memory == nil { + return nil + } + return &ContainerResourceUsage{ + Name: name, + Timestamp: newStats.StartTime.Time, + CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000, + MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes), + MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes), + MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes), + CPUInterval: 0, + } + } + // Process container infos that are relevant to us. + containers := containerNames() + usageMap := make(ResourceUsagePerContainer, len(containers)) + observedContainers := []string{} + for _, pod := range summary.Pods { + for _, container := range pod.Containers { + isInteresting := false + for _, interestingContainerName := range containers { + if container.Name == interestingContainerName { + isInteresting = true + observedContainers = append(observedContainers, container.Name) + break + } + } + if !isInteresting { + continue + } + if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil { + usageMap[pod.PodRef.Name+"/"+container.Name] = usage + } + } + } + return usageMap, nil +} + +func getNodeStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { + data, err := c.CoreV1().RESTClient().Get(). + Resource("nodes"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). + Suffix("stats/summary"). + SetHeader("Content-Type", "application/json"). + Do().Raw() + + if err != nil { + return nil, err + } + + var summary *kubeletstatsv1alpha1.Summary + err = json.Unmarshal(data, &summary) + if err != nil { + return nil, err + } + return summary, nil +} + +func getSystemContainerStats(summary *kubeletstatsv1alpha1.Summary) map[string]*kubeletstatsv1alpha1.ContainerStats { + statsList := summary.Node.SystemContainers + statsMap := make(map[string]*kubeletstatsv1alpha1.ContainerStats) + for i := range statsList { + statsMap[statsList[i].Name] = &statsList[i] + } + + // Create a root container stats using information available in + // stats.NodeStats. This is necessary since it is a different type. + statsMap[rootContainerName] = &kubeletstatsv1alpha1.ContainerStats{ + CPU: summary.Node.CPU, + Memory: summary.Node.Memory, + } + return statsMap +} + +const ( + rootContainerName = "/" +) + +// TargetContainers returns a list of containers for which we want to collect resource usage. +func TargetContainers() []string { + return []string{ + rootContainerName, + kubeletstatsv1alpha1.SystemContainerRuntime, + kubeletstatsv1alpha1.SystemContainerKubelet, + } +} + +func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string { + // Example output: + // + // Resource usage for node "e2e-test-foo-node-abcde": + // container cpu(cores) memory(MB) + // "/" 0.363 2942.09 + // "/docker-daemon" 0.088 521.80 + // "/kubelet" 0.086 424.37 + // "/system" 0.007 119.88 + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n") + for name, s := range containerStats { + fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024)) + } + w.Flush() + return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String()) +} + +// GetKubeletHeapStats returns stats of kubelet heap. +func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) { + client, err := e2enode.ProxyRequest(c, nodeName, "debug/pprof/heap", ports.KubeletPort) + if err != nil { + return "", err + } + raw, errRaw := client.Raw() + if errRaw != nil { + return "", err + } + kubeletstatsv1alpha1 := string(raw) + // Only dumping the runtime.MemStats numbers to avoid polluting the log. + numLines := 23 + lines := strings.Split(kubeletstatsv1alpha1, "\n") + return strings.Join(lines[len(lines)-numLines:], "\n"), nil +} + +func computeContainerResourceUsage(name string, oldStats, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage { + return &ContainerResourceUsage{ + Name: name, + Timestamp: newStats.CPU.Time.Time, + CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()), + MemoryUsageInBytes: *newStats.Memory.UsageBytes, + MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes, + MemoryRSSInBytes: *newStats.Memory.RSSBytes, + CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time), + } +} + +// resourceCollector periodically polls the node, collect stats for a given +// list of containers, computes and cache resource usage up to +// maxEntriesPerContainer for each container. +type resourceCollector struct { + lock sync.RWMutex + node string + containers []string + client clientset.Interface + buffers map[string][]*ContainerResourceUsage + pollingInterval time.Duration + stopCh chan struct{} +} + +func newResourceCollector(c clientset.Interface, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector { + buffers := make(map[string][]*ContainerResourceUsage) + return &resourceCollector{ + node: nodeName, + containers: containerNames, + client: c, + buffers: buffers, + pollingInterval: pollingInterval, + } +} + +// Start starts a goroutine to Poll the node every pollingInterval. +func (r *resourceCollector) Start() { + r.stopCh = make(chan struct{}, 1) + // Keep the last observed stats for comparison. + oldStats := make(map[string]*kubeletstatsv1alpha1.ContainerStats) + go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) +} + +// Stop sends a signal to terminate the stats collecting goroutine. +func (r *resourceCollector) Stop() { + close(r.stopCh) +} + +// collectStats gets the latest stats from kubelet stats summary API, computes +// the resource usage, and pushes it to the buffer. +func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) { + summary, err := getNodeStatsSummary(r.client, r.node) + if err != nil { + e2elog.Logf("Error getting node stats summary on %q, err: %v", r.node, err) + return + } + cStatsMap := getSystemContainerStats(summary) + r.lock.Lock() + defer r.lock.Unlock() + for _, name := range r.containers { + cStats, ok := cStatsMap[name] + if !ok { + e2elog.Logf("Missing info/stats for container %q on node %q", name, r.node) + return + } + + if oldStats, ok := oldStatsMap[name]; ok { + if oldStats.CPU == nil || cStats.CPU == nil || oldStats.Memory == nil || cStats.Memory == nil { + continue + } + if oldStats.CPU.Time.Equal(&cStats.CPU.Time) { + // No change -> skip this stat. + continue + } + r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats)) + } + // Update the old stats. + oldStatsMap[name] = cStats + } +} + +func (r *resourceCollector) GetLatest() (ResourceUsagePerContainer, error) { + r.lock.RLock() + defer r.lock.RUnlock() + kubeletstatsv1alpha1 := make(ResourceUsagePerContainer) + for _, name := range r.containers { + contStats, ok := r.buffers[name] + if !ok || len(contStats) == 0 { + return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node) + } + kubeletstatsv1alpha1[name] = contStats[len(contStats)-1] + } + return kubeletstatsv1alpha1, nil +} + +// Reset frees the stats and start over. +func (r *resourceCollector) Reset() { + r.lock.Lock() + defer r.lock.Unlock() + for _, name := range r.containers { + r.buffers[name] = []*ContainerResourceUsage{} + } +} + +type resourceUsageByCPU []*ContainerResourceUsage + +func (r resourceUsageByCPU) Len() int { return len(r) } +func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores } + +// The percentiles to report. +var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99} + +// GetBasicCPUStats returns the percentiles the cpu usage in cores for +// containerName. This method examines all data currently in the buffer. +func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 { + r.lock.RLock() + defer r.lock.RUnlock() + result := make(map[float64]float64, len(percentiles)) + usages := r.buffers[containerName] + sort.Sort(resourceUsageByCPU(usages)) + for _, q := range percentiles { + index := int(float64(len(usages))*q) - 1 + if index < 0 { + // We don't have enough data. + result[q] = 0 + continue + } + result[q] = usages[index].CPUUsageInCores + } + return result +} + +// ResourceMonitor manages a resourceCollector per node. +type ResourceMonitor struct { + client clientset.Interface + containers []string + pollingInterval time.Duration + collectors map[string]*resourceCollector +} + +// NewResourceMonitor returns a new ResourceMonitor. +func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor { + return &ResourceMonitor{ + containers: containerNames, + client: c, + pollingInterval: pollingInterval, + } +} + +// Start starts collectors. +func (r *ResourceMonitor) Start() { + // It should be OK to monitor unschedulable Nodes + nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + e2elog.Failf("ResourceMonitor: unable to get list of nodes: %v", err) + } + r.collectors = make(map[string]*resourceCollector, 0) + for _, node := range nodes.Items { + collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval) + r.collectors[node.Name] = collector + collector.Start() + } +} + +// Stop stops collectors. +func (r *ResourceMonitor) Stop() { + for _, collector := range r.collectors { + collector.Stop() + } +} + +// Reset resets collectors. +func (r *ResourceMonitor) Reset() { + for _, collector := range r.collectors { + collector.Reset() + } +} + +// LogLatest outputs the latest resource usage into log. +func (r *ResourceMonitor) LogLatest() { + summary, err := r.GetLatest() + if err != nil { + e2elog.Logf("%v", err) + } + e2elog.Logf("%s", r.FormatResourceUsage(summary)) +} + +// FormatResourceUsage returns the formatted string for LogLatest(). +// TODO(oomichi): This can be made to local function after making test/e2e/node/kubelet_perf.go use LogLatest directly instead. +func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string { + summary := []string{} + for node, usage := range s { + summary = append(summary, formatResourceUsageStats(node, usage)) + } + return strings.Join(summary, "\n") +} + +// GetLatest returns the latest resource usage. +func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) { + result := make(ResourceUsagePerNode) + errs := []error{} + for key, collector := range r.collectors { + s, err := collector.GetLatest() + if err != nil { + errs = append(errs, err) + continue + } + result[key] = s + } + return result, utilerrors.NewAggregate(errs) +} + +// GetMasterNodeLatest returns the latest resource usage of master and node. +func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode { + result := make(ResourceUsagePerNode) + var masterUsage ResourceUsagePerContainer + var nodesUsage []ResourceUsagePerContainer + for node, usage := range usagePerNode { + if strings.HasSuffix(node, "master") { + masterUsage = usage + } else { + nodesUsage = append(nodesUsage, usage) + } + } + nodeAvgUsage := make(ResourceUsagePerContainer) + for _, nodeUsage := range nodesUsage { + for c, usage := range nodeUsage { + if _, found := nodeAvgUsage[c]; !found { + nodeAvgUsage[c] = &ContainerResourceUsage{Name: usage.Name} + } + nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores + nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes + nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes + nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes + } + } + for c := range nodeAvgUsage { + nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage)) + nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage)) + nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage)) + nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage)) + } + result["master"] = masterUsage + result["node"] = nodeAvgUsage + return result +} + +// FormatCPUSummary returns the string of human-readable CPU summary from the specified summary data. +func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string { + // Example output for a node (the percentiles may differ): + // CPU usage of containers on node "e2e-test-foo-node-0vj7": + // container 5th% 50th% 90th% 95th% + // "/" 0.051 0.159 0.387 0.455 + // "/runtime 0.000 0.000 0.146 0.166 + // "/kubelet" 0.036 0.053 0.091 0.154 + // "/misc" 0.001 0.001 0.001 0.002 + var summaryStrings []string + var header []string + header = append(header, "container") + for _, p := range percentiles { + header = append(header, fmt.Sprintf("%.0fth%%", p*100)) + } + for nodeName, containers := range summary { + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "%s\n", strings.Join(header, "\t")) + for _, containerName := range TargetContainers() { + var s []string + s = append(s, fmt.Sprintf("%q", containerName)) + data, ok := containers[containerName] + for _, p := range percentiles { + value := "N/A" + if ok { + value = fmt.Sprintf("%.3f", data[p]) + } + s = append(s, value) + } + fmt.Fprintf(w, "%s\n", strings.Join(s, "\t")) + } + w.Flush() + summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String())) + } + return strings.Join(summaryStrings, "\n") +} + +// LogCPUSummary outputs summary of CPU into log. +func (r *ResourceMonitor) LogCPUSummary() { + summary := r.GetCPUSummary() + e2elog.Logf("%s", r.FormatCPUSummary(summary)) +} + +// GetCPUSummary returns summary of CPU. +func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary { + result := make(NodesCPUSummary) + for nodeName, collector := range r.collectors { + result[nodeName] = make(ContainersCPUSummary) + for _, containerName := range TargetContainers() { + data := collector.GetBasicCPUStats(containerName) + result[nodeName][containerName] = data + } + } + return result +} + +// GetMasterNodeCPUSummary returns summary of master node CPUs. +func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary { + result := make(NodesCPUSummary) + var masterSummary ContainersCPUSummary + var nodesSummaries []ContainersCPUSummary + for node, summary := range summaryPerNode { + if strings.HasSuffix(node, "master") { + masterSummary = summary + } else { + nodesSummaries = append(nodesSummaries, summary) + } + } + + nodeAvgSummary := make(ContainersCPUSummary) + for _, nodeSummary := range nodesSummaries { + for c, summary := range nodeSummary { + if _, found := nodeAvgSummary[c]; !found { + nodeAvgSummary[c] = map[float64]float64{} + } + for perc, value := range summary { + nodeAvgSummary[c][perc] += value + } + } + } + for c := range nodeAvgSummary { + for perc := range nodeAvgSummary[c] { + nodeAvgSummary[c][perc] /= float64(len(nodesSummaries)) + } + } + result["master"] = masterSummary + result["node"] = nodeAvgSummary + return result +} diff --git a/test/e2e/framework/kubelet_stats.go b/test/e2e/framework/kubelet_stats.go deleted file mode 100644 index eabaa4714d2..00000000000 --- a/test/e2e/framework/kubelet_stats.go +++ /dev/null @@ -1,858 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package framework - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "sort" - "strconv" - "strings" - "sync" - "text/tabwriter" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - - clientset "k8s.io/client-go/kubernetes" - kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" - dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics" - kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/master/ports" - e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" - e2elog "k8s.io/kubernetes/test/e2e/framework/log" - "k8s.io/kubernetes/test/e2e/framework/metrics" - e2enode "k8s.io/kubernetes/test/e2e/framework/node" - - "github.com/prometheus/common/model" -) - -// KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint. -// TODO: Get some more structure around the metrics and this type -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -type KubeletLatencyMetric struct { - // eg: list, info, create - Operation string - // eg: sync_pods, pod_worker - Method string - // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median. - Quantile float64 - Latency time.Duration -} - -// KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on -// the latency field. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -type KubeletLatencyMetrics []KubeletLatencyMetric - -func (a KubeletLatencyMetrics) Len() int { return len(a) } -func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency } - -// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber; -// or else, the function will try to get kubelet metrics directly from the node. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) { - if c == nil { - return metrics.GrabKubeletMetricsWithoutProxy(nodeName, "/metrics") - } - grabber, err := metrics.NewMetricsGrabber(c, nil, true, false, false, false, false) - if err != nil { - return metrics.KubeletMetrics{}, err - } - return grabber.GrabFromKubelet(nodeName) -} - -// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims -// the subsystem prefix. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -func getKubeletMetrics(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) { - ms, err := getKubeletMetricsFromNode(c, nodeName) - if err != nil { - return metrics.KubeletMetrics{}, err - } - - kubeletMetrics := make(metrics.KubeletMetrics) - for name, samples := range ms { - const prefix = kubeletmetrics.KubeletSubsystem + "_" - if !strings.HasPrefix(name, prefix) { - // Not a kubelet metric. - continue - } - method := strings.TrimPrefix(name, prefix) - kubeletMetrics[method] = samples - } - return kubeletMetrics, nil -} - -// GetDefaultKubeletLatencyMetrics calls GetKubeletLatencyMetrics with a set of default metricNames -// identifying common latency metrics. -// Note that the KubeletMetrics passed in should not contain subsystem prefix. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -func GetDefaultKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics { - latencyMetricNames := sets.NewString( - kubeletmetrics.PodWorkerDurationKey, - kubeletmetrics.PodWorkerStartDurationKey, - kubeletmetrics.PodStartDurationKey, - kubeletmetrics.CgroupManagerOperationsKey, - dockermetrics.DockerOperationsLatencyKey, - kubeletmetrics.PodWorkerStartDurationKey, - kubeletmetrics.PLEGRelistDurationKey, - ) - return GetKubeletLatencyMetrics(ms, latencyMetricNames) -} - -// GetKubeletLatencyMetrics filters ms to include only those contained in the metricNames set, -// then constructs a KubeletLatencyMetrics list based on the samples associated with those metrics. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics { - var latencyMetrics KubeletLatencyMetrics - for name, samples := range ms { - if !filterMetricNames.Has(name) { - continue - } - for _, sample := range samples { - latency := sample.Value - operation := string(sample.Metric["operation_type"]) - var quantile float64 - if val, ok := sample.Metric[model.QuantileLabel]; ok { - var err error - if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { - continue - } - } - - latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{ - Operation: operation, - Method: name, - Quantile: quantile, - Latency: time.Duration(int64(latency)) * time.Microsecond, - }) - } - } - return latencyMetrics -} - -// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics. -type RuntimeOperationMonitor struct { - client clientset.Interface - nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate -} - -// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node. -type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate - -// RuntimeOperationErrorRate is the error rate of a specified runtime operation. -type RuntimeOperationErrorRate struct { - TotalNumber float64 - ErrorRate float64 - TimeoutRate float64 -} - -// NewRuntimeOperationMonitor returns a new RuntimeOperationMonitor. -func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor { - m := &RuntimeOperationMonitor{ - client: c, - nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate), - } - nodes, err := m.client.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil { - e2elog.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) - } - for _, node := range nodes.Items { - m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate) - } - // Initialize the runtime operation error rate - m.GetRuntimeOperationErrorRate() - return m -} - -// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate -// error rates of all runtime operations. -func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { - for node := range m.nodesRuntimeOps { - nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) - if err != nil { - e2elog.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) - continue - } - m.nodesRuntimeOps[node] = nodeResult - } - return m.nodesRuntimeOps -} - -// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate. -func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { - result := make(map[string]NodeRuntimeOperationErrorRate) - for node := range m.nodesRuntimeOps { - result[node] = make(NodeRuntimeOperationErrorRate) - oldNodeResult := m.nodesRuntimeOps[node] - curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) - if err != nil { - e2elog.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) - continue - } - for op, cur := range curNodeResult { - t := *cur - if old, found := oldNodeResult[op]; found { - t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) - t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) - t.TotalNumber -= old.TotalNumber - } - result[node][op] = &t - } - m.nodesRuntimeOps[node] = curNodeResult - } - return result -} - -// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string. -func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string { - lines := []string{} - for node, nodeResult := range nodesResult { - lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node)) - for op, result := range nodeResult { - line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op, - result.TotalNumber, result.ErrorRate, result.TimeoutRate) - lines = append(lines, line) - } - lines = append(lines, fmt.Sprintln()) - } - return strings.Join(lines, "\n") -} - -// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node. -func getNodeRuntimeOperationErrorRate(c clientset.Interface, node string) (NodeRuntimeOperationErrorRate, error) { - result := make(NodeRuntimeOperationErrorRate) - ms, err := getKubeletMetrics(c, node) - if err != nil { - return result, err - } - // If no corresponding metrics are found, the returned samples will be empty. Then the following - // loop will be skipped automatically. - allOps := ms[dockermetrics.DockerOperationsKey] - errOps := ms[dockermetrics.DockerOperationsErrorsKey] - timeoutOps := ms[dockermetrics.DockerOperationsTimeoutKey] - for _, sample := range allOps { - operation := string(sample.Metric["operation_type"]) - result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)} - } - for _, sample := range errOps { - operation := string(sample.Metric["operation_type"]) - // Should always find the corresponding item, just in case - if _, found := result[operation]; found { - result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber - } - } - for _, sample := range timeoutOps { - operation := string(sample.Metric["operation_type"]) - if _, found := result[operation]; found { - result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber - } - } - return result, nil -} - -// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. -// TODO(alejandrox1): this is already present in test/e2e/framework/metrics. -func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) { - ms, err := getKubeletMetrics(c, nodeName) - if err != nil { - return KubeletLatencyMetrics{}, err - } - latencyMetrics := GetDefaultKubeletLatencyMetrics(ms) - sort.Sort(latencyMetrics) - var badMetrics KubeletLatencyMetrics - logFunc("\nLatency metrics for node %v", nodeName) - for _, m := range latencyMetrics { - if m.Latency > threshold { - badMetrics = append(badMetrics, m) - e2elog.Logf("%+v", m) - } - } - return badMetrics, nil -} - -// GetStatsSummary contacts kubelet for the container information. -func GetStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { - ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout) - defer cancel() - - data, err := c.CoreV1().RESTClient().Get(). - Context(ctx). - Resource("nodes"). - SubResource("proxy"). - Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). - Suffix("stats/summary"). - Do().Raw() - - if err != nil { - return nil, err - } - - summary := kubeletstatsv1alpha1.Summary{} - err = json.Unmarshal(data, &summary) - if err != nil { - return nil, err - } - return &summary, nil -} - -func removeUint64Ptr(ptr *uint64) uint64 { - if ptr == nil { - return 0 - } - return *ptr -} - -// getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint -// and returns the resource usage of all containerNames for the past -// cpuInterval. -// The acceptable range of the interval is 2s~120s. Be warned that as the -// interval (and #containers) increases, the size of kubelet's response -// could be significant. E.g., the 60s interval stats for ~20 containers is -// ~1.5MB. Don't hammer the node with frequent, heavy requests. -// -// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two -// stats points to compute the cpu usage over the interval. Assuming cadvisor -// polls every second, we'd need to get N stats points for N-second interval. -// Note that this is an approximation and may not be accurate, hence we also -// write the actual interval used for calculation (based on the timestamps of -// the stats points in ContainerResourceUsage.CPUInterval. -// -// containerNames is a function returning a collection of container names in which -// user is interested in. -func getOneTimeResourceUsageOnNode( - c clientset.Interface, - nodeName string, - cpuInterval time.Duration, - containerNames func() []string, -) (e2ekubelet.ResourceUsagePerContainer, error) { - const ( - // cadvisor records stats about every second. - cadvisorStatsPollingIntervalInSeconds float64 = 1.0 - // cadvisor caches up to 2 minutes of stats (configured by kubelet). - maxNumStatsToRequest int = 120 - ) - - numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) - if numStats < 2 || numStats > maxNumStatsToRequest { - return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) - } - // Get information of all containers on the node. - summary, err := GetStatsSummary(c, nodeName) - if err != nil { - return nil, err - } - - f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *e2ekubelet.ContainerResourceUsage { - if newStats == nil || newStats.CPU == nil || newStats.Memory == nil { - return nil - } - return &e2ekubelet.ContainerResourceUsage{ - Name: name, - Timestamp: newStats.StartTime.Time, - CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000, - MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes), - MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes), - MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes), - CPUInterval: 0, - } - } - // Process container infos that are relevant to us. - containers := containerNames() - usageMap := make(e2ekubelet.ResourceUsagePerContainer, len(containers)) - observedContainers := []string{} - for _, pod := range summary.Pods { - for _, container := range pod.Containers { - isInteresting := false - for _, interestingContainerName := range containers { - if container.Name == interestingContainerName { - isInteresting = true - observedContainers = append(observedContainers, container.Name) - break - } - } - if !isInteresting { - continue - } - if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil { - usageMap[pod.PodRef.Name+"/"+container.Name] = usage - } - } - } - return usageMap, nil -} - -func getNodeStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) { - data, err := c.CoreV1().RESTClient().Get(). - Resource("nodes"). - SubResource("proxy"). - Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). - Suffix("stats/summary"). - SetHeader("Content-Type", "application/json"). - Do().Raw() - - if err != nil { - return nil, err - } - - var summary *kubeletstatsv1alpha1.Summary - err = json.Unmarshal(data, &summary) - if err != nil { - return nil, err - } - return summary, nil -} - -func getSystemContainerStats(summary *kubeletstatsv1alpha1.Summary) map[string]*kubeletstatsv1alpha1.ContainerStats { - statsList := summary.Node.SystemContainers - statsMap := make(map[string]*kubeletstatsv1alpha1.ContainerStats) - for i := range statsList { - statsMap[statsList[i].Name] = &statsList[i] - } - - // Create a root container stats using information available in - // stats.NodeStats. This is necessary since it is a different type. - statsMap[rootContainerName] = &kubeletstatsv1alpha1.ContainerStats{ - CPU: summary.Node.CPU, - Memory: summary.Node.Memory, - } - return statsMap -} - -const ( - rootContainerName = "/" -) - -// TargetContainers returns a list of containers for which we want to collect resource usage. -func TargetContainers() []string { - return []string{ - rootContainerName, - kubeletstatsv1alpha1.SystemContainerRuntime, - kubeletstatsv1alpha1.SystemContainerKubelet, - } -} - -func formatResourceUsageStats(nodeName string, containerStats e2ekubelet.ResourceUsagePerContainer) string { - // Example output: - // - // Resource usage for node "e2e-test-foo-node-abcde": - // container cpu(cores) memory(MB) - // "/" 0.363 2942.09 - // "/docker-daemon" 0.088 521.80 - // "/kubelet" 0.086 424.37 - // "/system" 0.007 119.88 - buf := &bytes.Buffer{} - w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) - fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n") - for name, s := range containerStats { - fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024)) - } - w.Flush() - return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String()) -} - -type uint64arr []uint64 - -func (a uint64arr) Len() int { return len(a) } -func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] } - -type usageDataPerContainer struct { - cpuData []float64 - memUseData []uint64 - memWorkSetData []uint64 -} - -// GetKubeletHeapStats returns stats of kubelet heap. -func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) { - client, err := e2enode.ProxyRequest(c, nodeName, "debug/pprof/heap", ports.KubeletPort) - if err != nil { - return "", err - } - raw, errRaw := client.Raw() - if errRaw != nil { - return "", err - } - kubeletstatsv1alpha1 := string(raw) - // Only dumping the runtime.MemStats numbers to avoid polluting the log. - numLines := 23 - lines := strings.Split(kubeletstatsv1alpha1, "\n") - return strings.Join(lines[len(lines)-numLines:], "\n"), nil -} - -// PrintAllKubeletPods outputs status of all kubelet pods into log. -func PrintAllKubeletPods(c clientset.Interface, nodeName string) { - podList, err := e2ekubelet.GetKubeletPods(c, nodeName) - if err != nil { - e2elog.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err) - return - } - for _, p := range podList.Items { - e2elog.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses)) - for _, c := range p.Status.ContainerStatuses { - e2elog.Logf("\tContainer %v ready: %v, restart count %v", - c.Name, c.Ready, c.RestartCount) - } - } -} - -func computeContainerResourceUsage(name string, oldStats, newStats *kubeletstatsv1alpha1.ContainerStats) *e2ekubelet.ContainerResourceUsage { - return &e2ekubelet.ContainerResourceUsage{ - Name: name, - Timestamp: newStats.CPU.Time.Time, - CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()), - MemoryUsageInBytes: *newStats.Memory.UsageBytes, - MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes, - MemoryRSSInBytes: *newStats.Memory.RSSBytes, - CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time), - } -} - -// resourceCollector periodically polls the node, collect stats for a given -// list of containers, computes and cache resource usage up to -// maxEntriesPerContainer for each container. -type resourceCollector struct { - lock sync.RWMutex - node string - containers []string - client clientset.Interface - buffers map[string][]*e2ekubelet.ContainerResourceUsage - pollingInterval time.Duration - stopCh chan struct{} -} - -func newResourceCollector(c clientset.Interface, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector { - buffers := make(map[string][]*e2ekubelet.ContainerResourceUsage) - return &resourceCollector{ - node: nodeName, - containers: containerNames, - client: c, - buffers: buffers, - pollingInterval: pollingInterval, - } -} - -// Start starts a goroutine to Poll the node every pollingInterval. -func (r *resourceCollector) Start() { - r.stopCh = make(chan struct{}, 1) - // Keep the last observed stats for comparison. - oldStats := make(map[string]*kubeletstatsv1alpha1.ContainerStats) - go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) -} - -// Stop sends a signal to terminate the stats collecting goroutine. -func (r *resourceCollector) Stop() { - close(r.stopCh) -} - -// collectStats gets the latest stats from kubelet stats summary API, computes -// the resource usage, and pushes it to the buffer. -func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) { - summary, err := getNodeStatsSummary(r.client, r.node) - if err != nil { - e2elog.Logf("Error getting node stats summary on %q, err: %v", r.node, err) - return - } - cStatsMap := getSystemContainerStats(summary) - r.lock.Lock() - defer r.lock.Unlock() - for _, name := range r.containers { - cStats, ok := cStatsMap[name] - if !ok { - e2elog.Logf("Missing info/stats for container %q on node %q", name, r.node) - return - } - - if oldStats, ok := oldStatsMap[name]; ok { - if oldStats.CPU == nil || cStats.CPU == nil || oldStats.Memory == nil || cStats.Memory == nil { - continue - } - if oldStats.CPU.Time.Equal(&cStats.CPU.Time) { - // No change -> skip this stat. - continue - } - r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats)) - } - // Update the old stats. - oldStatsMap[name] = cStats - } -} - -func (r *resourceCollector) GetLatest() (e2ekubelet.ResourceUsagePerContainer, error) { - r.lock.RLock() - defer r.lock.RUnlock() - kubeletstatsv1alpha1 := make(e2ekubelet.ResourceUsagePerContainer) - for _, name := range r.containers { - contStats, ok := r.buffers[name] - if !ok || len(contStats) == 0 { - return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node) - } - kubeletstatsv1alpha1[name] = contStats[len(contStats)-1] - } - return kubeletstatsv1alpha1, nil -} - -// Reset frees the stats and start over. -func (r *resourceCollector) Reset() { - r.lock.Lock() - defer r.lock.Unlock() - for _, name := range r.containers { - r.buffers[name] = []*e2ekubelet.ContainerResourceUsage{} - } -} - -type resourceUsageByCPU []*e2ekubelet.ContainerResourceUsage - -func (r resourceUsageByCPU) Len() int { return len(r) } -func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] } -func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores } - -// The percentiles to report. -var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99} - -// GetBasicCPUStats returns the percentiles the cpu usage in cores for -// containerName. This method examines all data currently in the buffer. -func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 { - r.lock.RLock() - defer r.lock.RUnlock() - result := make(map[float64]float64, len(percentiles)) - usages := r.buffers[containerName] - sort.Sort(resourceUsageByCPU(usages)) - for _, q := range percentiles { - index := int(float64(len(usages))*q) - 1 - if index < 0 { - // We don't have enough data. - result[q] = 0 - continue - } - result[q] = usages[index].CPUUsageInCores - } - return result -} - -// ResourceMonitor manages a resourceCollector per node. -type ResourceMonitor struct { - client clientset.Interface - containers []string - pollingInterval time.Duration - collectors map[string]*resourceCollector -} - -// NewResourceMonitor returns a new ResourceMonitor. -func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor { - return &ResourceMonitor{ - containers: containerNames, - client: c, - pollingInterval: pollingInterval, - } -} - -// Start starts collectors. -func (r *ResourceMonitor) Start() { - // It should be OK to monitor unschedulable Nodes - nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil { - e2elog.Failf("ResourceMonitor: unable to get list of nodes: %v", err) - } - r.collectors = make(map[string]*resourceCollector, 0) - for _, node := range nodes.Items { - collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval) - r.collectors[node.Name] = collector - collector.Start() - } -} - -// Stop stops collectors. -func (r *ResourceMonitor) Stop() { - for _, collector := range r.collectors { - collector.Stop() - } -} - -// Reset resets collectors. -func (r *ResourceMonitor) Reset() { - for _, collector := range r.collectors { - collector.Reset() - } -} - -// LogLatest outputs the latest resource usage into log. -func (r *ResourceMonitor) LogLatest() { - summary, err := r.GetLatest() - if err != nil { - e2elog.Logf("%v", err) - } - e2elog.Logf("%s", r.FormatResourceUsage(summary)) -} - -// FormatResourceUsage returns the formatted string for LogLatest(). -// TODO(oomichi): This can be made to local function after making test/e2e/node/kubelet_perf.go use LogLatest directly instead. -func (r *ResourceMonitor) FormatResourceUsage(s e2ekubelet.ResourceUsagePerNode) string { - summary := []string{} - for node, usage := range s { - summary = append(summary, formatResourceUsageStats(node, usage)) - } - return strings.Join(summary, "\n") -} - -// GetLatest returns the latest resource usage. -func (r *ResourceMonitor) GetLatest() (e2ekubelet.ResourceUsagePerNode, error) { - result := make(e2ekubelet.ResourceUsagePerNode) - errs := []error{} - for key, collector := range r.collectors { - s, err := collector.GetLatest() - if err != nil { - errs = append(errs, err) - continue - } - result[key] = s - } - return result, utilerrors.NewAggregate(errs) -} - -// GetMasterNodeLatest returns the latest resource usage of master and node. -func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode e2ekubelet.ResourceUsagePerNode) e2ekubelet.ResourceUsagePerNode { - result := make(e2ekubelet.ResourceUsagePerNode) - var masterUsage e2ekubelet.ResourceUsagePerContainer - var nodesUsage []e2ekubelet.ResourceUsagePerContainer - for node, usage := range usagePerNode { - if strings.HasSuffix(node, "master") { - masterUsage = usage - } else { - nodesUsage = append(nodesUsage, usage) - } - } - nodeAvgUsage := make(e2ekubelet.ResourceUsagePerContainer) - for _, nodeUsage := range nodesUsage { - for c, usage := range nodeUsage { - if _, found := nodeAvgUsage[c]; !found { - nodeAvgUsage[c] = &e2ekubelet.ContainerResourceUsage{Name: usage.Name} - } - nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores - nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes - nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes - nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes - } - } - for c := range nodeAvgUsage { - nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage)) - nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage)) - nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage)) - nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage)) - } - result["master"] = masterUsage - result["node"] = nodeAvgUsage - return result -} - -// FormatCPUSummary returns the string of human-readable CPU summary from the specified summary data. -func (r *ResourceMonitor) FormatCPUSummary(summary e2ekubelet.NodesCPUSummary) string { - // Example output for a node (the percentiles may differ): - // CPU usage of containers on node "e2e-test-foo-node-0vj7": - // container 5th% 50th% 90th% 95th% - // "/" 0.051 0.159 0.387 0.455 - // "/runtime 0.000 0.000 0.146 0.166 - // "/kubelet" 0.036 0.053 0.091 0.154 - // "/misc" 0.001 0.001 0.001 0.002 - var summaryStrings []string - var header []string - header = append(header, "container") - for _, p := range percentiles { - header = append(header, fmt.Sprintf("%.0fth%%", p*100)) - } - for nodeName, containers := range summary { - buf := &bytes.Buffer{} - w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) - fmt.Fprintf(w, "%s\n", strings.Join(header, "\t")) - for _, containerName := range TargetContainers() { - var s []string - s = append(s, fmt.Sprintf("%q", containerName)) - data, ok := containers[containerName] - for _, p := range percentiles { - value := "N/A" - if ok { - value = fmt.Sprintf("%.3f", data[p]) - } - s = append(s, value) - } - fmt.Fprintf(w, "%s\n", strings.Join(s, "\t")) - } - w.Flush() - summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String())) - } - return strings.Join(summaryStrings, "\n") -} - -// LogCPUSummary outputs summary of CPU into log. -func (r *ResourceMonitor) LogCPUSummary() { - summary := r.GetCPUSummary() - e2elog.Logf("%s", r.FormatCPUSummary(summary)) -} - -// GetCPUSummary returns summary of CPU. -func (r *ResourceMonitor) GetCPUSummary() e2ekubelet.NodesCPUSummary { - result := make(e2ekubelet.NodesCPUSummary) - for nodeName, collector := range r.collectors { - result[nodeName] = make(e2ekubelet.ContainersCPUSummary) - for _, containerName := range TargetContainers() { - data := collector.GetBasicCPUStats(containerName) - result[nodeName][containerName] = data - } - } - return result -} - -// GetMasterNodeCPUSummary returns summary of master node CPUs. -func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode e2ekubelet.NodesCPUSummary) e2ekubelet.NodesCPUSummary { - result := make(e2ekubelet.NodesCPUSummary) - var masterSummary e2ekubelet.ContainersCPUSummary - var nodesSummaries []e2ekubelet.ContainersCPUSummary - for node, summary := range summaryPerNode { - if strings.HasSuffix(node, "master") { - masterSummary = summary - } else { - nodesSummaries = append(nodesSummaries, summary) - } - } - - nodeAvgSummary := make(e2ekubelet.ContainersCPUSummary) - for _, nodeSummary := range nodesSummaries { - for c, summary := range nodeSummary { - if _, found := nodeAvgSummary[c]; !found { - nodeAvgSummary[c] = map[float64]float64{} - } - for perc, value := range summary { - nodeAvgSummary[c][perc] += value - } - } - } - for c := range nodeAvgSummary { - for perc := range nodeAvgSummary[c] { - nodeAvgSummary[c][perc] /= float64(len(nodesSummaries)) - } - } - result["master"] = masterSummary - result["node"] = nodeAvgSummary - return result -} diff --git a/test/e2e/framework/metrics/kubelet_metrics.go b/test/e2e/framework/metrics/kubelet_metrics.go index c0a53ba1afa..21a5f19e3a7 100644 --- a/test/e2e/framework/metrics/kubelet_metrics.go +++ b/test/e2e/framework/metrics/kubelet_metrics.go @@ -134,9 +134,9 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM return grabber.GrabFromKubelet(nodeName) } -// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims +// GetKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims // the subsystem prefix. -func getKubeletMetrics(c clientset.Interface, nodeName string) (KubeletMetrics, error) { +func GetKubeletMetrics(c clientset.Interface, nodeName string) (KubeletMetrics, error) { ms, err := getKubeletMetricsFromNode(c, nodeName) if err != nil { return KubeletMetrics{}, err @@ -203,7 +203,7 @@ func GetKubeletLatencyMetrics(ms KubeletMetrics, filterMetricNames sets.String) // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) { - ms, err := getKubeletMetrics(c, nodeName) + ms, err := GetKubeletMetrics(c, nodeName) if err != nil { return KubeletLatencyMetrics{}, err } diff --git a/test/e2e/framework/resource_usage_gatherer.go b/test/e2e/framework/resource_usage_gatherer.go index 44461598dd1..83ef42f8541 100644 --- a/test/e2e/framework/resource_usage_gatherer.go +++ b/test/e2e/framework/resource_usage_gatherer.go @@ -82,6 +82,18 @@ func (s *ResourceUsageSummary) SummaryKind() string { return "ResourceUsageSummary" } +type uint64arr []uint64 + +func (a uint64arr) Len() int { return len(a) } +func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] } + +type usageDataPerContainer struct { + cpuData []float64 + memUseData []uint64 + memWorkSetData []uint64 +} + func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, percentilesToCompute []int) map[int]e2ekubelet.ResourceUsagePerContainer { if len(timeSeries) == 0 { return make(map[int]e2ekubelet.ResourceUsagePerContainer) @@ -167,7 +179,7 @@ func (w *resourceGatherWorker) singleProbe() { } } } else { - nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs }) + nodeUsage, err := e2ekubelet.GetOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs }) if err != nil { e2elog.Logf("Error while reading data from %v: %v", w.nodeName, err) return diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 40cf97d1cc4..61b35b318a5 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -83,6 +83,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework/ginkgowrapper" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2elog "k8s.io/kubernetes/test/e2e/framework/log" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" @@ -1799,7 +1800,7 @@ func DumpNodeDebugInfo(c clientset.Interface, nodeNames []string, logFunc func(f c.Name, c.Ready, c.RestartCount) } } - HighLatencyKubeletOperations(c, 10*time.Second, n, logFunc) + e2emetrics.HighLatencyKubeletOperations(c, 10*time.Second, n, logFunc) // TODO: Log node resource info } } diff --git a/test/e2e/node/kubelet.go b/test/e2e/node/kubelet.go index d04aa6d30ae..ddca8215d81 100644 --- a/test/e2e/node/kubelet.go +++ b/test/e2e/node/kubelet.go @@ -255,7 +255,7 @@ var _ = SIGDescribe("kubelet", func() { numNodes int nodeNames sets.String nodeLabels map[string]string - resourceMonitor *framework.ResourceMonitor + resourceMonitor *e2ekubelet.ResourceMonitor ) type DeleteTest struct { podsPerNode int @@ -293,7 +293,7 @@ var _ = SIGDescribe("kubelet", func() { // Start resourceMonitor only in small clusters. if len(nodes.Items) <= maxNodesToCheck { - resourceMonitor = framework.NewResourceMonitor(f.ClientSet, framework.TargetContainers(), containerStatsPollingInterval) + resourceMonitor = e2ekubelet.NewResourceMonitor(f.ClientSet, e2ekubelet.TargetContainers(), containerStatsPollingInterval) resourceMonitor.Start() } }) diff --git a/test/e2e/node/kubelet_perf.go b/test/e2e/node/kubelet_perf.go index 9bda7a9d44b..96d8de99e41 100644 --- a/test/e2e/node/kubelet_perf.go +++ b/test/e2e/node/kubelet_perf.go @@ -63,7 +63,7 @@ func logPodsOnNodes(c clientset.Interface, nodeNames []string) { } } -func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames sets.String, rm *framework.ResourceMonitor, +func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames sets.String, rm *e2ekubelet.ResourceMonitor, expectedCPU map[string]map[float64]float64, expectedMemory e2ekubelet.ResourceUsagePerContainer) { numNodes := nodeNames.Len() totalPods := podsPerNode * numNodes @@ -86,7 +86,7 @@ func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames ginkgo.By("Start monitoring resource usage") // Periodically dump the cpu summary until the deadline is met. - // Note that without calling framework.ResourceMonitor.Reset(), the stats + // Note that without calling e2ekubelet.ResourceMonitor.Reset(), the stats // would occupy increasingly more memory. This should be fine // for the current test duration, but we should reclaim the // entries if we plan to monitor longer (e.g., 8 hours). @@ -145,7 +145,7 @@ func verifyMemoryLimits(c clientset.Interface, expected e2ekubelet.ResourceUsage } if len(nodeErrs) > 0 { errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", "))) - heapStats, err := framework.GetKubeletHeapStats(c, nodeName) + heapStats, err := e2ekubelet.GetKubeletHeapStats(c, nodeName) if err != nil { e2elog.Logf("Unable to get heap stats from %q", nodeName) } else { @@ -196,8 +196,8 @@ func verifyCPULimits(expected e2ekubelet.ContainersCPUSummary, actual e2ekubelet var _ = SIGDescribe("Kubelet [Serial] [Slow]", func() { var nodeNames sets.String f := framework.NewDefaultFramework("kubelet-perf") - var om *framework.RuntimeOperationMonitor - var rm *framework.ResourceMonitor + var om *e2ekubelet.RuntimeOperationMonitor + var rm *e2ekubelet.ResourceMonitor ginkgo.BeforeEach(func() { nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) @@ -205,15 +205,15 @@ var _ = SIGDescribe("Kubelet [Serial] [Slow]", func() { for _, node := range nodes.Items { nodeNames.Insert(node.Name) } - om = framework.NewRuntimeOperationMonitor(f.ClientSet) - rm = framework.NewResourceMonitor(f.ClientSet, framework.TargetContainers(), containerStatsPollingPeriod) + om = e2ekubelet.NewRuntimeOperationMonitor(f.ClientSet) + rm = e2ekubelet.NewResourceMonitor(f.ClientSet, e2ekubelet.TargetContainers(), containerStatsPollingPeriod) rm.Start() }) ginkgo.AfterEach(func() { rm.Stop() result := om.GetLatestRuntimeOperationErrorRate() - e2elog.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) + e2elog.Logf("runtime operation error metrics:\n%s", e2ekubelet.FormatRuntimeOperationErrorRate(result)) }) SIGDescribe("regular resource usage tracking", func() { // We assume that the scheduler will make reasonable scheduling choices diff --git a/test/e2e/node/node_problem_detector.go b/test/e2e/node/node_problem_detector.go index d28dcfc3ab5..9eaeb12d713 100644 --- a/test/e2e/node/node_problem_detector.go +++ b/test/e2e/node/node_problem_detector.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/kubernetes/test/e2e/framework" + e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" @@ -266,7 +267,7 @@ func getCPUStat(f *framework.Framework, host string) (usage, uptime float64) { } func getNpdPodStat(f *framework.Framework, nodeName string) (cpuUsage, rss, workingSet float64) { - summary, err := framework.GetStatsSummary(f.ClientSet, nodeName) + summary, err := e2ekubelet.GetStatsSummary(f.ClientSet, nodeName) framework.ExpectNoError(err) hasNpdPod := false diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index 7e211943992..e33ecb2c379 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -47,6 +47,7 @@ go_library( "//test/e2e/framework:go_default_library", "//test/e2e/framework/gpu:go_default_library", "//test/e2e/framework/job:go_default_library", + "//test/e2e/framework/kubelet:go_default_library", "//test/e2e/framework/log:go_default_library", "//test/e2e/framework/node:go_default_library", "//test/e2e/framework/pod:go_default_library", diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go index cd2406e7cf4..19bb9343ed5 100644 --- a/test/e2e/scheduling/equivalence_cache_predicates.go +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -27,6 +27,7 @@ import ( clientset "k8s.io/client-go/kubernetes" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/test/e2e/framework" + e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -82,7 +83,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { for _, node := range nodeList.Items { e2elog.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name) - framework.PrintAllKubeletPods(cs, node.Name) + e2ekubelet.PrintAllKubeletPods(cs, node.Name) } }) diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 6b5bb99a261..e0202e79c87 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -30,6 +30,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" + e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -98,7 +99,7 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { for _, node := range nodeList.Items { e2elog.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name) - framework.PrintAllKubeletPods(cs, node.Name) + e2ekubelet.PrintAllKubeletPods(cs, node.Name) } }) diff --git a/test/e2e_node/density_test.go b/test/e2e_node/density_test.go index 73147163fa7..bd0178756d6 100644 --- a/test/e2e_node/density_test.go +++ b/test/e2e_node/density_test.go @@ -454,8 +454,8 @@ func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, inter } // getPodStartLatency gets prometheus metric 'pod start latency' from kubelet -func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) { - latencyMetrics := framework.KubeletLatencyMetrics{} +func getPodStartLatency(node string) (e2emetrics.KubeletLatencyMetrics, error) { + latencyMetrics := e2emetrics.KubeletLatencyMetrics{} ms, err := e2emetrics.GrabKubeletMetricsWithoutProxy(node, "/metrics") framework.ExpectNoError(err, "Failed to get kubelet metrics without proxy in node %s", node) @@ -464,7 +464,7 @@ func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) { if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartDurationKey { quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64) latencyMetrics = append(latencyMetrics, - framework.KubeletLatencyMetric{ + e2emetrics.KubeletLatencyMetric{ Quantile: quantile, Method: kubemetrics.PodStartDurationKey, Latency: time.Duration(int(sample.Value)) * time.Microsecond}) diff --git a/test/e2e_node/resource_collector.go b/test/e2e_node/resource_collector.go index 32c71077c15..b27dbdc14cd 100644 --- a/test/e2e_node/resource_collector.go +++ b/test/e2e_node/resource_collector.go @@ -272,7 +272,7 @@ func formatCPUSummary(summary e2ekubelet.ContainersCPUSummary) string { w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) fmt.Fprintf(w, "%s\n", strings.Join(header, "\t")) - for _, containerName := range framework.TargetContainers() { + for _, containerName := range e2ekubelet.TargetContainers() { var s []string s = append(s, fmt.Sprintf("%q", containerName)) data, ok := summary[containerName] diff --git a/test/e2e_node/resource_usage_test.go b/test/e2e_node/resource_usage_test.go index d5869289fb4..ffa06d101e2 100644 --- a/test/e2e_node/resource_usage_test.go +++ b/test/e2e_node/resource_usage_test.go @@ -42,13 +42,13 @@ var _ = SIGDescribe("Resource-usage [Serial] [Slow]", func() { var ( rc *ResourceCollector - om *framework.RuntimeOperationMonitor + om *e2ekubelet.RuntimeOperationMonitor ) f := framework.NewDefaultFramework("resource-usage") ginkgo.BeforeEach(func() { - om = framework.NewRuntimeOperationMonitor(f.ClientSet) + om = e2ekubelet.NewRuntimeOperationMonitor(f.ClientSet) // The test collects resource usage from a standalone Cadvisor pod. // The Cadvsior of Kubelet has a housekeeping interval of 10s, which is too long to // show the resource usage spikes. But changing its interval increases the overhead @@ -59,7 +59,7 @@ var _ = SIGDescribe("Resource-usage [Serial] [Slow]", func() { ginkgo.AfterEach(func() { result := om.GetLatestRuntimeOperationErrorRate() - e2elog.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) + e2elog.Logf("runtime operation error metrics:\n%s", e2ekubelet.FormatRuntimeOperationErrorRate(result)) }) // This test measures and verifies the steady resource usage of node is within limit @@ -164,7 +164,7 @@ func runResourceUsageTest(f *framework.Framework, rc *ResourceCollector, testArg ginkgo.By("Start monitoring resource usage") // Periodically dump the cpu summary until the deadline is met. - // Note that without calling framework.ResourceMonitor.Reset(), the stats + // Note that without calling e2ekubelet.ResourceMonitor.Reset(), the stats // would occupy increasingly more memory. This should be fine // for the current test duration, but we should reclaim the // entries if we plan to monitor longer (e.g., 8 hours). @@ -238,7 +238,7 @@ func verifyMemoryLimits(c clientset.Interface, expected e2ekubelet.ResourceUsage } if len(nodeErrs) > 0 { errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", "))) - heapStats, err := framework.GetKubeletHeapStats(c, nodeName) + heapStats, err := e2ekubelet.GetKubeletHeapStats(c, nodeName) if err != nil { e2elog.Logf("Unable to get heap stats from %q", nodeName) } else { diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index f296afa6df6..c632fbc69bb 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/test/e2e/framework" e2elog "k8s.io/kubernetes/test/e2e/framework/log" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" frameworkmetrics "k8s.io/kubernetes/test/e2e/framework/metrics" imageutils "k8s.io/kubernetes/test/utils/image" @@ -363,7 +364,7 @@ func logKubeletLatencyMetrics(metricNames ...string) { if err != nil { e2elog.Logf("Error getting kubelet metrics: %v", err) } else { - e2elog.Logf("Kubelet Metrics: %+v", framework.GetKubeletLatencyMetrics(metric, metricSet)) + e2elog.Logf("Kubelet Metrics: %+v", e2emetrics.GetKubeletLatencyMetrics(metric, metricSet)) } }