Merge pull request #21020 from yujuhong/rss

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-15 12:50:07 -08:00
commit 2778bc0c13
3 changed files with 93 additions and 29 deletions

View File

@ -95,15 +95,53 @@ func runResourceTrackingTest(framework *Framework, podsPerNode int, nodeNames se
By("Reporting overall resource usage") By("Reporting overall resource usage")
logPodsOnNodes(framework.Client, nodeNames.List()) logPodsOnNodes(framework.Client, nodeNames.List())
rm.LogLatest() rm.LogLatest()
usageSummary, err := rm.GetLatest()
Expect(err).NotTo(HaveOccurred())
Logf("%s", rm.FormatResourceUsage(usageSummary))
// TODO(yujuhong): Set realistic values after gathering enough data.
verifyMemoryLimits(resourceUsagePerContainer{
"/kubelet": &containerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
"/docker-daemon": &containerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
}, usageSummary)
summary := rm.GetCPUSummary() cpuSummary := rm.GetCPUSummary()
Logf("%s", rm.FormatCPUSummary(summary)) Logf("%s", rm.FormatCPUSummary(cpuSummary))
verifyCPULimits(expected, summary) verifyCPULimits(expected, cpuSummary)
By("Deleting the RC") By("Deleting the RC")
DeleteRC(framework.Client, framework.Namespace.Name, rcName) DeleteRC(framework.Client, framework.Namespace.Name, rcName)
} }
func verifyMemoryLimits(expected resourceUsagePerContainer, actual resourceUsagePerNode) {
if expected == nil {
return
}
var errList []string
for nodeName, nodeSummary := range actual {
var nodeErrs []string
for cName, expectedResult := range expected {
container, ok := nodeSummary[cName]
if !ok {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: missing", cName))
continue
}
expectedValue := expectedResult.MemoryRSSInBytes
actualValue := container.MemoryRSSInBytes
if expectedValue != 0 && actualValue > expectedValue {
nodeErrs = append(nodeErrs, fmt.Sprintf("container %q: expected RSS memory (MB) < %d; got %d",
cName, expectedValue, actualValue))
}
}
if len(nodeErrs) > 0 {
errList = append(errList, fmt.Sprintf("node %v:\n %s", nodeName, strings.Join(nodeErrs, ", ")))
}
}
if len(errList) > 0 {
Failf("CPU usage exceeding limits:\n %s", strings.Join(errList, "\n"))
}
}
func verifyCPULimits(expected containersCPUSummary, actual nodesCPUSummary) { func verifyCPULimits(expected containersCPUSummary, actual nodesCPUSummary) {
if expected == nil { if expected == nil {
return return

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -196,8 +197,9 @@ type containerResourceUsage struct {
Name string Name string
Timestamp time.Time Timestamp time.Time
CPUUsageInCores float64 CPUUsageInCores float64
MemoryUsageInBytes int64 MemoryUsageInBytes uint64
MemoryWorkingSetInBytes int64 MemoryWorkingSetInBytes uint64
MemoryRSSInBytes uint64
// The interval used to calculate CPUUsageInCores. // The interval used to calculate CPUUsageInCores.
CPUInterval time.Duration CPUInterval time.Duration
} }
@ -207,6 +209,7 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa
} }
type resourceUsagePerContainer map[string]*containerResourceUsage type resourceUsagePerContainer map[string]*containerResourceUsage
type resourceUsagePerNode map[string]resourceUsagePerContainer
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint // getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
// and returns the resource usage of all containerNames for the past // and returns the resource usage of all containerNames for the past
@ -292,24 +295,24 @@ func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerCo
// "/system" 0.007 119.88 // "/system" 0.007 119.88
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n") fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
for name, s := range containerStats { for name, s := range containerStats {
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024)) fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
} }
w.Flush() w.Flush()
return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String()) return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
} }
type int64arr []int64 type uint64arr []uint64
func (a int64arr) Len() int { return len(a) } func (a uint64arr) Len() int { return len(a) }
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] } func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] }
type usageDataPerContainer struct { type usageDataPerContainer struct {
cpuData []float64 cpuData []float64
memUseData []int64 memUseData []uint64
memWorkSetData []int64 memWorkSetData []uint64
} }
// Performs a get on a node proxy endpoint given the nodename and rest client. // Performs a get on a node proxy endpoint given the nodename and rest client.
@ -362,8 +365,9 @@ func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapi.
Name: name, Name: name,
Timestamp: newStats.Timestamp, Timestamp: newStats.Timestamp,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()), CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
MemoryUsageInBytes: int64(newStats.Memory.Usage), MemoryUsageInBytes: newStats.Memory.Usage,
MemoryWorkingSetInBytes: int64(newStats.Memory.WorkingSet), MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
MemoryRSSInBytes: newStats.Memory.RSS,
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp), CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
} }
} }
@ -437,20 +441,18 @@ func (r *resourceCollector) collectStats(oldStats map[string]*cadvisorapi.Contai
} }
} }
// LogLatest logs the latest resource usage of each container. func (r *resourceCollector) GetLatest() (resourceUsagePerContainer, error) {
func (r *resourceCollector) LogLatest() {
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
stats := make(map[string]*containerResourceUsage) stats := make(resourceUsagePerContainer)
for _, name := range r.containers { for _, name := range r.containers {
contStats, ok := r.buffers[name] contStats, ok := r.buffers[name]
if !ok || len(contStats) == 0 { if !ok || len(contStats) == 0 {
Logf("Resource usage on node %q is not ready yet", r.node) return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node)
return
} }
stats[name] = contStats[len(contStats)-1] stats[name] = contStats[len(contStats)-1]
} }
Logf("\n%s", formatResourceUsageStats(r.node, stats)) return stats, nil
} }
// Reset frees the stats and start over. // Reset frees the stats and start over.
@ -534,9 +536,33 @@ func (r *resourceMonitor) Reset() {
} }
func (r *resourceMonitor) LogLatest() { func (r *resourceMonitor) LogLatest() {
for _, collector := range r.collectors { summary, err := r.GetLatest()
collector.LogLatest() if err != nil {
Logf("%v", err)
} }
r.FormatResourceUsage(summary)
}
func (r *resourceMonitor) FormatResourceUsage(s resourceUsagePerNode) string {
summary := []string{}
for node, usage := range s {
summary = append(summary, formatResourceUsageStats(node, usage))
}
return strings.Join(summary, "\n")
}
func (r *resourceMonitor) GetLatest() (resourceUsagePerNode, error) {
result := make(resourceUsagePerNode)
errs := []error{}
for key, collector := range r.collectors {
s, err := collector.GetLatest()
if err != nil {
errs = append(errs, err)
continue
}
result[key] = s
}
return result, utilerrors.NewAggregate(errs)
} }
// containersCPUSummary is indexed by the container name with each entry a // containersCPUSummary is indexed by the container name with each entry a

View File

@ -38,7 +38,7 @@ const (
type resourceConstraint struct { type resourceConstraint struct {
cpuConstraint float64 cpuConstraint float64
memoryConstraint int64 memoryConstraint uint64
} }
type containerResourceGatherer struct { type containerResourceGatherer struct {
@ -51,7 +51,7 @@ type containerResourceGatherer struct {
type SingleContainerSummary struct { type SingleContainerSummary struct {
Name string Name string
Cpu float64 Cpu float64
Mem int64 Mem uint64
} }
// we can't have int here, as JSON does not accept integer keys. // we can't have int here, as JSON does not accept integer keys.
@ -165,8 +165,8 @@ func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]
if dataMap[name] == nil { if dataMap[name] == nil {
dataMap[name] = &usageDataPerContainer{ dataMap[name] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)), cpuData: make([]float64, len(timeSeries)),
memUseData: make([]int64, len(timeSeries)), memUseData: make([]uint64, len(timeSeries)),
memWorkSetData: make([]int64, len(timeSeries)), memWorkSetData: make([]uint64, len(timeSeries)),
} }
} }
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
@ -176,8 +176,8 @@ func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]
} }
for _, v := range dataMap { for _, v := range dataMap {
sort.Float64s(v.cpuData) sort.Float64s(v.cpuData)
sort.Sort(int64arr(v.memUseData)) sort.Sort(uint64arr(v.memUseData))
sort.Sort(int64arr(v.memWorkSetData)) sort.Sort(uint64arr(v.memWorkSetData))
} }
result := make(map[int]resourceUsagePerContainer) result := make(map[int]resourceUsagePerContainer)