Use log functions of core framework on test/e2e/framework/kubelet

1. move GetOneTimeResourceUsageOnNode() from test/e2e/framework/kubelet/stats.go to getOneTimeResourceUsageOnNode() in test/e2e/framework/resource_usage_gatherer.go
2. copy GetKubeletPods() from test/e2e/framework/kubelet/kubelet_pods.go to getKubeletPods() in test/e2e/framework/util.go
Signed-off-by: clarklee92 <clarklee1992@hotmail.com>
This commit is contained in:
clarklee92 2019-11-01 18:30:41 +08:00
parent b1ac4cda11
commit dfa069a4bd
6 changed files with 187 additions and 115 deletions

View File

@ -35,6 +35,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/master/ports:go_default_library",
@ -80,7 +81,6 @@ go_library(
"//staging/src/k8s.io/component-base/version:go_default_library",
"//test/e2e/framework/auth:go_default_library",
"//test/e2e/framework/ginkgowrapper:go_default_library",
"//test/e2e/framework/kubelet:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/framework/node:go_default_library",
"//test/e2e/framework/pod:go_default_library",

View File

@ -18,7 +18,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
],
)

View File

@ -20,7 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/master/ports"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework"
)
// GetKubeletPods retrieves the list of pods on the kubelet.
@ -51,13 +51,13 @@ func getKubeletPods(c clientset.Interface, node, resource string) (*v1.PodList,
func PrintAllKubeletPods(c clientset.Interface, nodeName string) {
podList, err := GetKubeletPods(c, nodeName)
if err != nil {
e2elog.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
framework.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
return
}
for _, p := range podList.Items {
e2elog.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses))
framework.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses))
for _, c := range p.Status.ContainerStatuses {
e2elog.Logf("\tContainer %v ready: %v, restart count %v",
framework.Logf("\tContainer %v ready: %v, restart count %v",
c.Name, c.Ready, c.RestartCount)
}
}

View File

@ -36,7 +36,7 @@ import (
kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
"k8s.io/kubernetes/pkg/master/ports"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
)
@ -118,7 +118,7 @@ func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor
}
nodes, err := m.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
e2elog.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
framework.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
}
for _, node := range nodes.Items {
m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
@ -134,7 +134,7 @@ func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]Node
for node := range m.nodesRuntimeOps {
nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
e2elog.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
framework.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
m.nodesRuntimeOps[node] = nodeResult
@ -150,7 +150,7 @@ func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[strin
oldNodeResult := m.nodesRuntimeOps[node]
curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
e2elog.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
framework.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
for op, cur := range curNodeResult {
@ -239,90 +239,6 @@ func GetStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alp
return &summary, nil
}
func removeUint64Ptr(ptr *uint64) uint64 {
if ptr == nil {
return 0
}
return *ptr
}
// GetOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint
// and returns the resource usage of all containerNames for the past
// cpuInterval.
// The acceptable range of the interval is 2s~120s. Be warned that as the
// interval (and #containers) increases, the size of kubelet's response
// could be significant. E.g., the 60s interval stats for ~20 containers is
// ~1.5MB. Don't hammer the node with frequent, heavy requests.
//
// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
// stats points to compute the cpu usage over the interval. Assuming cadvisor
// polls every second, we'd need to get N stats points for N-second interval.
// Note that this is an approximation and may not be accurate, hence we also
// write the actual interval used for calculation (based on the timestamps of
// the stats points in ContainerResourceUsage.CPUInterval.
//
// containerNames is a function returning a collection of container names in which
// user is interested in.
func GetOneTimeResourceUsageOnNode(
c clientset.Interface,
nodeName string,
cpuInterval time.Duration,
containerNames func() []string,
) (ResourceUsagePerContainer, error) {
const (
// cadvisor records stats about every second.
cadvisorStatsPollingIntervalInSeconds float64 = 1.0
// cadvisor caches up to 2 minutes of stats (configured by kubelet).
maxNumStatsToRequest int = 120
)
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
if numStats < 2 || numStats > maxNumStatsToRequest {
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
}
// Get information of all containers on the node.
summary, err := GetStatsSummary(c, nodeName)
if err != nil {
return nil, err
}
f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage {
if newStats == nil || newStats.CPU == nil || newStats.Memory == nil {
return nil
}
return &ContainerResourceUsage{
Name: name,
Timestamp: newStats.StartTime.Time,
CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000,
MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes),
MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes),
MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes),
CPUInterval: 0,
}
}
// Process container infos that are relevant to us.
containers := containerNames()
usageMap := make(ResourceUsagePerContainer, len(containers))
for _, pod := range summary.Pods {
for _, container := range pod.Containers {
isInteresting := false
for _, interestingContainerName := range containers {
if container.Name == interestingContainerName {
isInteresting = true
break
}
}
if !isInteresting {
continue
}
if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil {
usageMap[pod.PodRef.Name+"/"+container.Name] = usage
}
}
}
return usageMap, nil
}
func getNodeStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
data, err := c.CoreV1().RESTClient().Get().
Resource("nodes").
@ -463,7 +379,7 @@ func (r *resourceCollector) Stop() {
func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) {
summary, err := getNodeStatsSummary(r.client, r.node)
if err != nil {
e2elog.Logf("Error getting node stats summary on %q, err: %v", r.node, err)
framework.Logf("Error getting node stats summary on %q, err: %v", r.node, err)
return
}
cStatsMap := getSystemContainerStats(summary)
@ -472,7 +388,7 @@ func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1a
for _, name := range r.containers {
cStats, ok := cStatsMap[name]
if !ok {
e2elog.Logf("Missing info/stats for container %q on node %q", name, r.node)
framework.Logf("Missing info/stats for container %q on node %q", name, r.node)
return
}
@ -565,7 +481,7 @@ func (r *ResourceMonitor) Start() {
// It should be OK to monitor unschedulable Nodes
nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
e2elog.Failf("ResourceMonitor: unable to get list of nodes: %v", err)
framework.Failf("ResourceMonitor: unable to get list of nodes: %v", err)
}
r.collectors = make(map[string]*resourceCollector, 0)
for _, node := range nodes.Items {
@ -593,9 +509,9 @@ func (r *ResourceMonitor) Reset() {
func (r *ResourceMonitor) LogLatest() {
summary, err := r.GetLatest()
if err != nil {
e2elog.Logf("%v", err)
framework.Logf("%v", err)
}
e2elog.Logf("%s", r.FormatResourceUsage(summary))
framework.Logf("%s", r.FormatResourceUsage(summary))
}
// FormatResourceUsage returns the formatted string for LogLatest().
@ -699,7 +615,7 @@ func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string {
// LogCPUSummary outputs summary of CPU into log.
func (r *ResourceMonitor) LogCPUSummary() {
summary := r.GetCPUSummary()
e2elog.Logf("%s", r.FormatCPUSummary(summary))
framework.Logf("%s", r.FormatCPUSummary(summary))
}
// GetCPUSummary returns summary of CPU.

View File

@ -18,6 +18,8 @@ package framework
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"sort"
@ -31,7 +33,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/test/e2e/system"
)
@ -48,6 +51,21 @@ type SingleContainerSummary struct {
Mem uint64
}
// ContainerResourceUsage is a structure for gathering container resource usage.
type ContainerResourceUsage struct {
Name string
Timestamp time.Time
CPUUsageInCores float64
MemoryUsageInBytes uint64
MemoryWorkingSetInBytes uint64
MemoryRSSInBytes uint64
// The interval used to calculate CPUUsageInCores.
CPUInterval time.Duration
}
// ResourceUsagePerContainer is map of ContainerResourceUsage
type ResourceUsagePerContainer map[string]*ContainerResourceUsage
// ResourceUsageSummary is a struct to hold resource usage summary.
// we can't have int here, as JSON does not accept integer keys.
type ResourceUsageSummary map[string][]SingleContainerSummary
@ -92,9 +110,9 @@ type usageDataPerContainer struct {
memWorkSetData []uint64
}
func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, percentilesToCompute []int) map[int]e2ekubelet.ResourceUsagePerContainer {
func computePercentiles(timeSeries []ResourceUsagePerContainer, percentilesToCompute []int) map[int]ResourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]e2ekubelet.ResourceUsagePerContainer)
return make(map[int]ResourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for i := range timeSeries {
@ -117,12 +135,12 @@ func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, perce
sort.Sort(uint64arr(v.memWorkSetData))
}
result := make(map[int]e2ekubelet.ResourceUsagePerContainer)
result := make(map[int]ResourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(e2ekubelet.ResourceUsagePerContainer)
data := make(ResourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &e2ekubelet.ContainerResourceUsage{
data[k] = &ContainerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
@ -134,8 +152,8 @@ func computePercentiles(timeSeries []e2ekubelet.ResourceUsagePerContainer, perce
return result
}
func leftMergeData(left, right map[int]e2ekubelet.ResourceUsagePerContainer) map[int]e2ekubelet.ResourceUsagePerContainer {
result := make(map[int]e2ekubelet.ResourceUsagePerContainer)
func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]ResourceUsagePerContainer {
result := make(map[int]ResourceUsagePerContainer)
for percentile, data := range left {
result[percentile] = data
if _, ok := right[percentile]; !ok {
@ -154,7 +172,7 @@ type resourceGatherWorker struct {
wg *sync.WaitGroup
containerIDs []string
stopCh chan struct{}
dataSeries []e2ekubelet.ResourceUsagePerContainer
dataSeries []ResourceUsagePerContainer
finished bool
inKubemark bool
resourceDataGatheringPeriod time.Duration
@ -163,21 +181,21 @@ type resourceGatherWorker struct {
}
func (w *resourceGatherWorker) singleProbe() {
data := make(e2ekubelet.ResourceUsagePerContainer)
data := make(ResourceUsagePerContainer)
if w.inKubemark {
kubemarkData := GetKubemarkMasterComponentsResourceUsage()
if data == nil {
return
}
for k, v := range kubemarkData {
data[k] = &e2ekubelet.ContainerResourceUsage{
data[k] = &ContainerResourceUsage{
Name: v.Name,
MemoryWorkingSetInBytes: v.MemoryWorkingSetInBytes,
CPUUsageInCores: v.CPUUsageInCores,
}
}
} else {
nodeUsage, err := e2ekubelet.GetOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err)
return
@ -192,6 +210,115 @@ func (w *resourceGatherWorker) singleProbe() {
w.dataSeries = append(w.dataSeries, data)
}
// getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint
// and returns the resource usage of all containerNames for the past
// cpuInterval.
// The acceptable range of the interval is 2s~120s. Be warned that as the
// interval (and #containers) increases, the size of kubelet's response
// could be significant. E.g., the 60s interval stats for ~20 containers is
// ~1.5MB. Don't hammer the node with frequent, heavy requests.
//
// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
// stats points to compute the cpu usage over the interval. Assuming cadvisor
// polls every second, we'd need to get N stats points for N-second interval.
// Note that this is an approximation and may not be accurate, hence we also
// write the actual interval used for calculation (based on the timestamps of
// the stats points in ContainerResourceUsage.CPUInterval.
//
// containerNames is a function returning a collection of container names in which
// user is interested in.
func getOneTimeResourceUsageOnNode(
c clientset.Interface,
nodeName string,
cpuInterval time.Duration,
containerNames func() []string,
) (ResourceUsagePerContainer, error) {
const (
// cadvisor records stats about every second.
cadvisorStatsPollingIntervalInSeconds float64 = 1.0
// cadvisor caches up to 2 minutes of stats (configured by kubelet).
maxNumStatsToRequest int = 120
)
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
if numStats < 2 || numStats > maxNumStatsToRequest {
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
}
// Get information of all containers on the node.
summary, err := getStatsSummary(c, nodeName)
if err != nil {
return nil, err
}
f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage {
if newStats == nil || newStats.CPU == nil || newStats.Memory == nil {
return nil
}
return &ContainerResourceUsage{
Name: name,
Timestamp: newStats.StartTime.Time,
CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000,
MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes),
MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes),
MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes),
CPUInterval: 0,
}
}
// Process container infos that are relevant to us.
containers := containerNames()
usageMap := make(ResourceUsagePerContainer, len(containers))
for _, pod := range summary.Pods {
for _, container := range pod.Containers {
isInteresting := false
for _, interestingContainerName := range containers {
if container.Name == interestingContainerName {
isInteresting = true
break
}
}
if !isInteresting {
continue
}
if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil {
usageMap[pod.PodRef.Name+"/"+container.Name] = usage
}
}
}
return usageMap, nil
}
// getStatsSummary contacts kubelet for the container information.
func getStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
data, err := c.CoreV1().RESTClient().Get().
Context(ctx).
Resource("nodes").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
Suffix("stats/summary").
Do().Raw()
if err != nil {
return nil, err
}
summary := kubeletstatsv1alpha1.Summary{}
err = json.Unmarshal(data, &summary)
if err != nil {
return nil, err
}
return &summary, nil
}
func removeUint64Ptr(ptr *uint64) uint64 {
if ptr == nil {
return 0
}
return *ptr
}
func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
defer utilruntime.HandleCrash()
defer w.wg.Done()
@ -367,7 +494,7 @@ func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constrai
Logf("Warning! Empty percentile list for stopAndPrintData.")
return &ResourceUsageSummary{}, fmt.Errorf("Failed to get any resource usage data")
}
data := make(map[int]e2ekubelet.ResourceUsagePerContainer)
data := make(map[int]ResourceUsagePerContainer)
for i := range g.workers {
if g.workers[i].finished {
stats := computePercentiles(g.workers[i].dataSeries, percentiles)

View File

@ -74,7 +74,6 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/master/ports"
taintutils "k8s.io/kubernetes/pkg/util/taints"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -103,6 +102,9 @@ const (
// PodDeleteTimeout is how long to wait for a pod to be deleted.
PodDeleteTimeout = 5 * time.Minute
// PodGetTimeout is how long to wait for a pod to be got.
PodGetTimeout = 2 * time.Minute
// PodEventTimeout is how much we wait for a pod event to occur.
PodEventTimeout = 2 * time.Minute
@ -1370,7 +1372,7 @@ func DumpNodeDebugInfo(c clientset.Interface, nodeNames []string, logFunc func(f
e.Source, e.Type, e.Message, e.Reason, e.FirstTimestamp, e.LastTimestamp, e.InvolvedObject)
}
logFunc("\nLogging pods the kubelet thinks is on node %v", n)
podList, err := e2ekubelet.GetKubeletPods(c, n)
podList, err := getKubeletPods(c, n)
if err != nil {
logFunc("Unable to retrieve kubelet pods for node %v: %v", n, err)
continue
@ -1391,6 +1393,33 @@ func DumpNodeDebugInfo(c clientset.Interface, nodeNames []string, logFunc func(f
}
}
// getKubeletPods retrieves the list of pods on the kubelet.
func getKubeletPods(c clientset.Interface, node string) (*v1.PodList, error) {
var result *v1.PodList
var client restclient.Result
finished := make(chan struct{}, 1)
go func() {
// call chain tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. #22165
client = c.CoreV1().RESTClient().Get().
Resource("nodes").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
Suffix("pods").
Do()
finished <- struct{}{}
}()
select {
case <-finished:
if err := client.Into(result); err != nil {
return &v1.PodList{}, err
}
return result, nil
case <-time.After(PodGetTimeout):
return &v1.PodList{}, fmt.Errorf("Waiting up to %v for getting the list of pods", PodGetTimeout)
}
}
// logNodeEvents logs kubelet events from the given node. This includes kubelet
// restart and node unhealthy events. Note that listing events like this will mess
// with latency metrics, beware of calling it during a test.