e2e: adapt kubelet_perf.go to use the new summary metrics API

This commit switch most functions in kubelet_stats.go to use the new API.
However, the functions that perform one-time resource usage retrieval remain
unchanged to be compatible with reource_usage_gatherer.go. They should be
handled separately.

Also, the new summary API does not provide the RSS memory yet, so all memory
checking tests will *always* pass. We plan to add this metrics in the API and
restore the functionality of the test.
This commit is contained in:
Yu-Ju Hong 2016-04-07 15:20:50 -07:00
parent e93c0d727f
commit a8c685921f
2 changed files with 142 additions and 91 deletions

View File

@ -22,6 +22,7 @@ import (
"time" "time"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -31,7 +32,7 @@ import (
const ( const (
// Interval to poll /stats/container on a node // Interval to poll /stats/container on a node
containerStatsPollingPeriod = 3 * time.Second containerStatsPollingPeriod = 10 * time.Second
// The monitoring time for one test. // The monitoring time for one test.
monitoringTime = 20 * time.Minute monitoringTime = 20 * time.Minute
// The periodic reporting period. // The periodic reporting period.
@ -210,27 +211,26 @@ var _ = KubeDescribe("Kubelet [Serial] [Slow]", func() {
{ {
podsPerNode: 0, podsPerNode: 0,
cpuLimits: containersCPUSummary{ cpuLimits: containersCPUSummary{
"/kubelet": {0.50: 0.06, 0.95: 0.08}, stats.SystemContainerKubelet: {0.50: 0.06, 0.95: 0.08},
"/docker-daemon": {0.50: 0.05, 0.95: 0.06}, stats.SystemContainerRuntime: {0.50: 0.05, 0.95: 0.06},
}, },
// We set the memory limits generously because the distribution // We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node. // of the addon pods affect the memory usage on each node.
memLimits: resourceUsagePerContainer{ memLimits: resourceUsagePerContainer{
"/kubelet": &containerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024}, stats.SystemContainerKubelet: &containerResourceUsage{MemoryRSSInBytes: 70 * 1024 * 1024},
"/docker-daemon": &containerResourceUsage{MemoryRSSInBytes: 85 * 1024 * 1024}, stats.SystemContainerRuntime: &containerResourceUsage{MemoryRSSInBytes: 85 * 1024 * 1024},
}, },
}, },
{ {
podsPerNode: 35, podsPerNode: 35,
cpuLimits: containersCPUSummary{ cpuLimits: containersCPUSummary{
"/kubelet": {0.50: 0.12, 0.95: 0.14}, stats.SystemContainerKubelet: {0.50: 0.12, 0.95: 0.14},
"/docker-daemon": {0.50: 0.06, 0.95: 0.08}, stats.SystemContainerRuntime: {0.50: 0.06, 0.95: 0.08},
}, },
// We set the memory limits generously because the distribution // We set the memory limits generously because the distribution
// of the addon pods affect the memory usage on each node. // of the addon pods affect the memory usage on each node.
memLimits: resourceUsagePerContainer{ memLimits: resourceUsagePerContainer{
"/kubelet": &containerResourceUsage{MemoryRSSInBytes: 75 * 1024 * 1024}, stats.SystemContainerRuntime: &containerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
"/docker-daemon": &containerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
}, },
}, },
{ {

View File

@ -33,8 +33,9 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats" kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -148,7 +149,9 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod
// getContainerInfo contacts kubelet for the container information. The "Stats" // getContainerInfo contacts kubelet for the container information. The "Stats"
// in the returned ContainerInfo is subject to the requirements in statsRequest. // in the returned ContainerInfo is subject to the requirements in statsRequest.
func getContainerInfo(c *client.Client, nodeName string, req *stats.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) { // TODO: This function uses the deprecated kubelet stats API; it should be
// removed.
func getContainerInfo(c *client.Client, nodeName string, req *kubeletstats.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
reqBody, err := json.Marshal(req) reqBody, err := json.Marshal(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -191,47 +194,6 @@ func getContainerInfo(c *client.Client, nodeName string, req *stats.StatsRequest
return containers, nil return containers, nil
} }
const (
// cadvisor records stats about every second.
cadvisorStatsPollingIntervalInSeconds float64 = 1.0
// cadvisor caches up to 2 minutes of stats (configured by kubelet).
maxNumStatsToRequest int = 120
)
// A list of containers for which we want to collect resource usage.
func targetContainers() []string {
if providerIs("gce", "gke") {
return []string{
"/",
"/docker-daemon",
"/kubelet",
"/system",
}
} else {
return []string{
"/",
}
}
}
type containerResourceUsage struct {
Name string
Timestamp time.Time
CPUUsageInCores float64
MemoryUsageInBytes uint64
MemoryWorkingSetInBytes uint64
MemoryRSSInBytes uint64
// The interval used to calculate CPUUsageInCores.
CPUInterval time.Duration
}
func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsage) bool {
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
}
type resourceUsagePerContainer map[string]*containerResourceUsage
type resourceUsagePerNode map[string]resourceUsagePerContainer
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint // getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
// and returns the resource usage of all containerNames for the past // and returns the resource usage of all containerNames for the past
// cpuInterval. // cpuInterval.
@ -252,6 +214,8 @@ type resourceUsagePerNode map[string]resourceUsagePerContainer
// should fail if one of containers listed by containerNames is missing on any node // should fail if one of containers listed by containerNames is missing on any node
// (useful e.g. when looking for system containers or daemons). If set to true function // (useful e.g. when looking for system containers or daemons). If set to true function
// is more forgiving and ignores missing containers. // is more forgiving and ignores missing containers.
// TODO: This function relies on the deprecated kubelet stats API and should be
// removed and/or rewritten.
func getOneTimeResourceUsageOnNode( func getOneTimeResourceUsageOnNode(
c *client.Client, c *client.Client,
nodeName string, nodeName string,
@ -259,12 +223,19 @@ func getOneTimeResourceUsageOnNode(
containerNames func() []string, containerNames func() []string,
expectMissingContainers bool, expectMissingContainers bool,
) (resourceUsagePerContainer, error) { ) (resourceUsagePerContainer, error) {
const (
// cadvisor records stats about every second.
cadvisorStatsPollingIntervalInSeconds float64 = 1.0
// cadvisor caches up to 2 minutes of stats (configured by kubelet).
maxNumStatsToRequest int = 120
)
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
if numStats < 2 || numStats > maxNumStatsToRequest { if numStats < 2 || numStats > maxNumStatsToRequest {
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
} }
// Get information of all containers on the node. // Get information of all containers on the node.
containerInfos, err := getContainerInfo(c, nodeName, &stats.StatsRequest{ containerInfos, err := getContainerInfo(c, nodeName, &kubeletstats.StatsRequest{
ContainerName: "/", ContainerName: "/",
NumStats: numStats, NumStats: numStats,
Subcontainers: true, Subcontainers: true,
@ -272,6 +243,18 @@ func getOneTimeResourceUsageOnNode(
if err != nil { if err != nil {
return nil, err return nil, err
} }
f := func(name string, oldStats, newStats *cadvisorapi.ContainerStats) *containerResourceUsage {
return &containerResourceUsage{
Name: name,
Timestamp: newStats.Timestamp,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
MemoryUsageInBytes: newStats.Memory.Usage,
MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
MemoryRSSInBytes: newStats.Memory.RSS,
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
}
}
// Process container infos that are relevant to us. // Process container infos that are relevant to us.
containers := containerNames() containers := containerNames()
usageMap := make(resourceUsagePerContainer, len(containers)) usageMap := make(resourceUsagePerContainer, len(containers))
@ -285,26 +268,96 @@ func getOneTimeResourceUsageOnNode(
} }
first := info.Stats[0] first := info.Stats[0]
last := info.Stats[len(info.Stats)-1] last := info.Stats[len(info.Stats)-1]
usageMap[name] = computeContainerResourceUsage(name, first, last) usageMap[name] = f(name, first, last)
} }
return usageMap, nil return usageMap, nil
} }
// logOneTimeResourceUsageSummary collects container resource for the list of func getNodeStatsSummary(c *client.Client, nodeName string) (*stats.Summary, error) {
// nodes, formats and logs the stats. subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c)
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) {
var summary []string
for _, nodeName := range nodeNames {
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval, targetContainers, false)
if err != nil { if err != nil {
summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err)) return nil, err
}
var data []byte
if subResourceProxyAvailable {
data, err = c.Get().
Resource("nodes").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
Suffix("stats/summary").
SetHeader("Content-Type", "application/json").
Do().Raw()
} else { } else {
summary = append(summary, formatResourceUsageStats(nodeName, stats)) data, err = c.Get().
Prefix("proxy").
Resource("nodes").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
Suffix("stats/summary").
SetHeader("Content-Type", "application/json").
Do().Raw()
}
if err != nil {
return nil, err
}
var summary *stats.Summary
err = json.Unmarshal(data, &summary)
if err != nil {
return nil, err
}
return summary, nil
}
func getSystemContainerStats(summary *stats.Summary) map[string]*stats.ContainerStats {
statsList := summary.Node.SystemContainers
statsMap := make(map[string]*stats.ContainerStats)
for i := range statsList {
statsMap[statsList[i].Name] = &statsList[i]
}
// Create a root container stats using information available in
// stats.NodeStats. This is necessary since it is a different type.
statsMap[rootContainerName] = &stats.ContainerStats{
CPU: summary.Node.CPU,
Memory: summary.Node.Memory,
}
return statsMap
}
const (
rootContainerName = "/"
)
// A list of containers for which we want to collect resource usage.
func targetContainers() []string {
return []string{
rootContainerName,
stats.SystemContainerRuntime,
stats.SystemContainerKubelet,
stats.SystemContainerMisc,
} }
} }
Logf("\n%s", strings.Join(summary, "\n"))
type containerResourceUsage struct {
Name string
Timestamp time.Time
CPUUsageInCores float64
MemoryUsageInBytes uint64
MemoryWorkingSetInBytes uint64
MemoryRSSInBytes uint64
// The interval used to calculate CPUUsageInCores.
CPUInterval time.Duration
} }
func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsage) bool {
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
}
type resourceUsagePerContainer map[string]*containerResourceUsage
type resourceUsagePerNode map[string]resourceUsagePerContainer
func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string { func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string {
// Example output: // Example output:
// //
@ -395,15 +448,15 @@ func PrintAllKubeletPods(c *client.Client, nodeName string) {
} }
} }
func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapi.ContainerStats) *containerResourceUsage { func computeContainerResourceUsage(name string, oldStats, newStats *stats.ContainerStats) *containerResourceUsage {
return &containerResourceUsage{ return &containerResourceUsage{
Name: name, Name: name,
Timestamp: newStats.Timestamp, Timestamp: newStats.CPU.Time.Time,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()), CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()),
MemoryUsageInBytes: newStats.Memory.Usage, MemoryUsageInBytes: *newStats.Memory.UsageBytes,
MemoryWorkingSetInBytes: newStats.Memory.WorkingSet, MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes,
MemoryRSSInBytes: newStats.Memory.RSS, MemoryRSSInBytes: *newStats.Memory.RSSBytes,
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp), CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time),
} }
} }
@ -435,7 +488,7 @@ func newResourceCollector(c *client.Client, nodeName string, containerNames []st
func (r *resourceCollector) Start() { func (r *resourceCollector) Start() {
r.stopCh = make(chan struct{}, 1) r.stopCh = make(chan struct{}, 1)
// Keep the last observed stats for comparison. // Keep the last observed stats for comparison.
oldStats := make(map[string]*cadvisorapi.ContainerStats) oldStats := make(map[string]*stats.ContainerStats)
go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
} }
@ -444,35 +497,33 @@ func (r *resourceCollector) Stop() {
close(r.stopCh) close(r.stopCh)
} }
// collectStats gets the latest stats from kubelet's /stats/container, computes // collectStats gets the latest stats from kubelet stats summary API, computes
// the resource usage, and pushes it to the buffer. // the resource usage, and pushes it to the buffer.
func (r *resourceCollector) collectStats(oldStats map[string]*cadvisorapi.ContainerStats) { func (r *resourceCollector) collectStats(oldStatsMap map[string]*stats.ContainerStats) {
infos, err := getContainerInfo(r.client, r.node, &stats.StatsRequest{ summary, err := getNodeStatsSummary(r.client, r.node)
ContainerName: "/", cStatsMap := getSystemContainerStats(summary)
NumStats: 1,
Subcontainers: true,
})
if err != nil { if err != nil {
Logf("Error getting container info on %q, err: %v", r.node, err) Logf("Error getting node stats summary on %q, err: %v", r.node, err)
return return
} }
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
for _, name := range r.containers { for _, name := range r.containers {
info, ok := infos[name] cStats, ok := cStatsMap[name]
if !ok || len(info.Stats) < 1 { if !ok {
Logf("Missing info/stats for container %q on node %q", name, r.node) Logf("Missing info/stats for container %q on node %q", name, r.node)
return return
} }
if oldInfo, ok := oldStats[name]; ok {
newInfo := info.Stats[0] if oldStats, ok := oldStatsMap[name]; ok {
if oldInfo.Timestamp.Equal(newInfo.Timestamp) { if oldStats.CPU.Time.Equal(cStats.CPU.Time) {
// No change -> skip this stat. // No change -> skip this stat.
continue continue
} }
r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldInfo, newInfo)) r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats))
} }
oldStats[name] = info.Stats[0] // Update the old stats.
oldStatsMap[name] = cStats
} }
} }
@ -613,9 +664,9 @@ func (r *resourceMonitor) FormatCPUSummary(summary nodesCPUSummary) string {
// CPU usage of containers on node "e2e-test-foo-minion-0vj7": // CPU usage of containers on node "e2e-test-foo-minion-0vj7":
// container 5th% 50th% 90th% 95th% // container 5th% 50th% 90th% 95th%
// "/" 0.051 0.159 0.387 0.455 // "/" 0.051 0.159 0.387 0.455
// "/docker-daemon" 0.000 0.000 0.146 0.166 // "/runtime 0.000 0.000 0.146 0.166
// "/kubelet" 0.036 0.053 0.091 0.154 // "/kubelet" 0.036 0.053 0.091 0.154
// "/system" 0.001 0.001 0.001 0.002 // "/misc" 0.001 0.001 0.001 0.002
var summaryStrings []string var summaryStrings []string
var header []string var header []string
header = append(header, "container") header = append(header, "container")