diff --git a/test/e2e/framework/kubelet_stats.go b/test/e2e/framework/kubelet_stats.go index 684d8cc50b9..1464d210f50 100644 --- a/test/e2e/framework/kubelet_stats.go +++ b/test/e2e/framework/kubelet_stats.go @@ -43,7 +43,7 @@ import ( "github.com/prometheus/common/model" ) -// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. +// KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint. // TODO: Get some more structure around the metrics and this type type KubeletLatencyMetric struct { // eg: list, info, create @@ -55,7 +55,7 @@ type KubeletLatencyMetric struct { Latency time.Duration } -// KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on +// KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on // the latency field. type KubeletLatencyMetrics []KubeletLatencyMetric @@ -159,6 +159,7 @@ type RuntimeOperationErrorRate struct { TimeoutRate float64 } +// NewRuntimeOperationMonitor returns a new RuntimeOperationMonitor. func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor { m := &RuntimeOperationMonitor{ client: c, @@ -433,7 +434,7 @@ const ( rootContainerName = "/" ) -// A list of containers for which we want to collect resource usage. +// TargetContainers returns a list of containers for which we want to collect resource usage. func TargetContainers() []string { return []string{ rootContainerName, @@ -442,6 +443,7 @@ func TargetContainers() []string { } } +// ContainerResourceUsage is a structure for gathering container resource usage. type ContainerResourceUsage struct { Name string Timestamp time.Time @@ -457,7 +459,10 @@ func (r *ContainerResourceUsage) isStrictlyGreaterThan(rhs *ContainerResourceUsa return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes } +// ResourceUsagePerContainer is map of ContainerResourceUsage type ResourceUsagePerContainer map[string]*ContainerResourceUsage + +// ResourceUsagePerNode is map of ResourceUsagePerContainer. type ResourceUsagePerNode map[string]ResourceUsagePerContainer func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string { @@ -491,6 +496,7 @@ type usageDataPerContainer struct { memWorkSetData []uint64 } +// GetKubeletHeapStats returns stats of kubelet heap. func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) { client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap", ports.KubeletPort) if err != nil { @@ -507,6 +513,7 @@ func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) return strings.Join(lines[len(lines)-numLines:], "\n"), nil } +// PrintAllKubeletPods outputs status of all kubelet pods into log. func PrintAllKubeletPods(c clientset.Interface, nodeName string) { podList, err := GetKubeletPods(c, nodeName) if err != nil { @@ -661,6 +668,7 @@ type ResourceMonitor struct { collectors map[string]*resourceCollector } +// NewResourceMonitor returns a new ResourceMonitor. func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor { return &ResourceMonitor{ containers: containerNames, @@ -669,6 +677,7 @@ func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingI } } +// Start starts collectors. func (r *ResourceMonitor) Start() { // It should be OK to monitor unschedulable Nodes nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{}) @@ -683,18 +692,21 @@ func (r *ResourceMonitor) Start() { } } +// Stop stops collectors. func (r *ResourceMonitor) Stop() { for _, collector := range r.collectors { collector.Stop() } } +// Reset resets collectors. func (r *ResourceMonitor) Reset() { for _, collector := range r.collectors { collector.Reset() } } +// LogLatest outputs the latest resource usage into log. func (r *ResourceMonitor) LogLatest() { summary, err := r.GetLatest() if err != nil { @@ -703,6 +715,8 @@ func (r *ResourceMonitor) LogLatest() { Logf("%s", r.FormatResourceUsage(summary)) } +// FormatResourceUsage returns the formatted string for LogLatest(). +// TODO(oomichi): This can be made to local function after making test/e2e/node/kubelet_perf.go use LogLatest directly instead. func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string { summary := []string{} for node, usage := range s { @@ -711,6 +725,7 @@ func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string { return strings.Join(summary, "\n") } +// GetLatest returns the latest resource usage. func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) { result := make(ResourceUsagePerNode) errs := []error{} @@ -725,6 +740,7 @@ func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) { return result, utilerrors.NewAggregate(errs) } +// GetMasterNodeLatest returns the latest resource usage of master and node. func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode { result := make(ResourceUsagePerNode) var masterUsage ResourceUsagePerContainer @@ -767,6 +783,7 @@ type ContainersCPUSummary map[string]map[float64]float64 // ContainersCPUSummary map. type NodesCPUSummary map[string]ContainersCPUSummary +// FormatCPUSummary returns the string of human-readable CPU summary from the specified summary data. func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string { // Example output for a node (the percentiles may differ): // CPU usage of containers on node "e2e-test-foo-node-0vj7": @@ -804,11 +821,13 @@ func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string { return strings.Join(summaryStrings, "\n") } +// LogCPUSummary outputs summary of CPU into log. func (r *ResourceMonitor) LogCPUSummary() { summary := r.GetCPUSummary() Logf("%s", r.FormatCPUSummary(summary)) } +// GetCPUSummary returns summary of CPU. func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary { result := make(NodesCPUSummary) for nodeName, collector := range r.collectors { @@ -821,6 +840,7 @@ func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary { return result } +// GetMasterNodeCPUSummary returns summary of master node CPUs. func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary { result := make(NodesCPUSummary) var masterSummary ContainersCPUSummary diff --git a/test/e2e/framework/log_size_monitoring.go b/test/e2e/framework/log_size_monitoring.go index 9d50c94375c..f39c8ffa3b1 100644 --- a/test/e2e/framework/log_size_monitoring.go +++ b/test/e2e/framework/log_size_monitoring.go @@ -75,16 +75,20 @@ type LogsSizeVerifier struct { workers []*LogSizeGatherer } +// SingleLogSummary is a structure for handling average generation rate and number of probes. type SingleLogSummary struct { AverageGenerationRate int NumberOfProbes int } +// LogSizeDataTimeseries is map of timestamped size. type LogSizeDataTimeseries map[string]map[string][]TimestampedSize +// LogsSizeDataSummary is map of log summary. // node -> file -> data type LogsSizeDataSummary map[string]map[string]SingleLogSummary +// PrintHumanReadable returns string of log size data summary. // TODO: make sure that we don't need locking here func (s *LogsSizeDataSummary) PrintHumanReadable() string { buf := &bytes.Buffer{} @@ -100,14 +104,17 @@ func (s *LogsSizeDataSummary) PrintHumanReadable() string { return buf.String() } +// PrintJSON returns the summary of log size data with JSON format. func (s *LogsSizeDataSummary) PrintJSON() string { return PrettyPrintJSON(*s) } +// SummaryKind returns the summary of log size data summary. func (s *LogsSizeDataSummary) SummaryKind() string { return "LogSizeSummary" } +// LogsSizeData is a structure for handling timeseries of log size data and lock. type LogsSizeData struct { data LogSizeDataTimeseries lock sync.Mutex @@ -133,7 +140,7 @@ func prepareData(masterAddress string, nodeAddresses []string) *LogsSizeData { } } -func (d *LogsSizeData) AddNewData(ip, path string, timestamp time.Time, size int) { +func (d *LogsSizeData) addNewData(ip, path string, timestamp time.Time, size int) { d.lock.Lock() defer d.lock.Unlock() d.data[ip][path] = append( @@ -197,26 +204,27 @@ func (s *LogsSizeVerifier) GetSummary() *LogsSizeDataSummary { } // Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed -func (v *LogsSizeVerifier) Run() { - v.workChannel <- WorkItem{ - ip: v.masterAddress, +func (s *LogsSizeVerifier) Run() { + s.workChannel <- WorkItem{ + ip: s.masterAddress, paths: masterLogsToCheck, backoffMultiplier: 1, } - for _, node := range v.nodeAddresses { - v.workChannel <- WorkItem{ + for _, node := range s.nodeAddresses { + s.workChannel <- WorkItem{ ip: node, paths: nodeLogsToCheck, backoffMultiplier: 1, } } - for _, worker := range v.workers { + for _, worker := range s.workers { go worker.Run() } - <-v.stopChannel - v.wg.Wait() + <-s.stopChannel + s.wg.Wait() } +// Run starts log size gathering. func (g *LogSizeGatherer) Run() { for g.Work() { } @@ -270,7 +278,7 @@ func (g *LogSizeGatherer) Work() bool { Logf("Error during conversion to int: %v, skipping data. Error: %v", results[i+1], err) continue } - g.data.AddNewData(workItem.ip, path, now, size) + g.data.addNewData(workItem.ip, path, now, size) } go g.pushWorkItem(workItem) return true