Fix resource gatherer

This commit is contained in:
gmarek 2017-04-13 18:29:14 +02:00
parent 3b9eb1a875
commit fbeac32a70
3 changed files with 68 additions and 87 deletions

View File

@ -69,7 +69,6 @@ go_library(
"//pkg/kubelet/api/v1alpha1/stats:go_default_library", "//pkg/kubelet/api/v1alpha1/stats:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/sysctl:go_default_library", "//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/util/format:go_default_library",
"//pkg/master/ports:go_default_library", "//pkg/master/ports:go_default_library",
@ -92,7 +91,6 @@ go_library(
"//vendor:github.com/aws/aws-sdk-go/service/autoscaling", "//vendor:github.com/aws/aws-sdk-go/service/autoscaling",
"//vendor:github.com/aws/aws-sdk-go/service/ec2", "//vendor:github.com/aws/aws-sdk-go/service/ec2",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:github.com/google/cadvisor/info/v1",
"//vendor:github.com/onsi/ginkgo", "//vendor:github.com/onsi/ginkgo",
"//vendor:github.com/onsi/ginkgo/config", "//vendor:github.com/onsi/ginkgo/config",
"//vendor:github.com/onsi/gomega", "//vendor:github.com/onsi/gomega",

View File

@ -28,7 +28,6 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -37,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/metrics" "k8s.io/kubernetes/pkg/metrics"
) )
@ -276,15 +274,8 @@ func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration
return badMetrics, nil return badMetrics, nil
} }
// getContainerInfo contacts kubelet for the container information. The "Stats" // getStatsSummary contacts kubelet for the container information.
// in the returned ContainerInfo is subject to the requirements in statsRequest. func getStatsSummary(c clientset.Interface, nodeName string) (*stats.Summary, error) {
// TODO: This function uses the deprecated kubelet stats API; it should be
// removed.
func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
reqBody, err := json.Marshal(req)
if err != nil {
return nil, err
}
subResourceProxyAvailable, err := ServerVersionGTE(SubResourceServiceAndNodeProxyVersion, c.Discovery()) subResourceProxyAvailable, err := ServerVersionGTE(SubResourceServiceAndNodeProxyVersion, c.Discovery())
if err != nil { if err != nil {
return nil, err return nil, err
@ -295,40 +286,43 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
var data []byte var data []byte
if subResourceProxyAvailable { if subResourceProxyAvailable {
data, err = c.Core().RESTClient().Post(). data, err = c.Core().RESTClient().Get().
Context(ctx). Context(ctx).
Resource("nodes"). Resource("nodes").
SubResource("proxy"). SubResource("proxy").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
Suffix("stats/container"). Suffix("stats/summary").
SetHeader("Content-Type", "application/json").
Body(reqBody).
Do().Raw() Do().Raw()
} else { } else {
data, err = c.Core().RESTClient().Post(). data, err = c.Core().RESTClient().Get().
Context(ctx). Context(ctx).
Prefix("proxy"). Prefix("proxy").
Resource("nodes"). Resource("nodes").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
Suffix("stats/container"). Suffix("stats/summary").
SetHeader("Content-Type", "application/json").
Body(reqBody).
Do().Raw() Do().Raw()
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
var containers map[string]cadvisorapi.ContainerInfo summary := stats.Summary{}
err = json.Unmarshal(data, &containers) err = json.Unmarshal(data, &summary)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return containers, nil return &summary, nil
} }
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint func removeUint64Ptr(ptr *uint64) uint64 {
if ptr == nil {
return 0
}
return *ptr
}
// getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint
// and returns the resource usage of all containerNames for the past // and returns the resource usage of all containerNames for the past
// cpuInterval. // cpuInterval.
// The acceptable range of the interval is 2s~120s. Be warned that as the // The acceptable range of the interval is 2s~120s. Be warned that as the
@ -344,18 +338,12 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
// the stats points in ContainerResourceUsage.CPUInterval. // the stats points in ContainerResourceUsage.CPUInterval.
// //
// containerNames is a function returning a collection of container names in which // containerNames is a function returning a collection of container names in which
// user is interested in. ExpectMissingContainers is a flag which says if the test // user is interested in.
// should fail if one of containers listed by containerNames is missing on any node
// (useful e.g. when looking for system containers or daemons). If set to true function
// is more forgiving and ignores missing containers.
// TODO: This function relies on the deprecated kubelet stats API and should be
// removed and/or rewritten.
func getOneTimeResourceUsageOnNode( func getOneTimeResourceUsageOnNode(
c clientset.Interface, c clientset.Interface,
nodeName string, nodeName string,
cpuInterval time.Duration, cpuInterval time.Duration,
containerNames func() []string, containerNames func() []string,
expectMissingContainers bool,
) (ResourceUsagePerContainer, error) { ) (ResourceUsagePerContainer, error) {
const ( const (
// cadvisor records stats about every second. // cadvisor records stats about every second.
@ -369,40 +357,41 @@ func getOneTimeResourceUsageOnNode(
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
} }
// Get information of all containers on the node. // Get information of all containers on the node.
containerInfos, err := getContainerInfo(c, nodeName, &kubeletstats.StatsRequest{ summary, err := getStatsSummary(c, nodeName)
ContainerName: "/",
NumStats: numStats,
Subcontainers: true,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
f := func(name string, oldStats, newStats *cadvisorapi.ContainerStats) *ContainerResourceUsage { f := func(name string, newStats *stats.ContainerStats) *ContainerResourceUsage {
return &ContainerResourceUsage{ return &ContainerResourceUsage{
Name: name, Name: name,
Timestamp: newStats.Timestamp, Timestamp: newStats.StartTime.Time,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()), CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000,
MemoryUsageInBytes: newStats.Memory.Usage, MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes),
MemoryWorkingSetInBytes: newStats.Memory.WorkingSet, MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes),
MemoryRSSInBytes: newStats.Memory.RSS, MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes),
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp), CPUInterval: 0,
} }
} }
// Process container infos that are relevant to us. // Process container infos that are relevant to us.
containers := containerNames() containers := containerNames()
usageMap := make(ResourceUsagePerContainer, len(containers)) usageMap := make(ResourceUsagePerContainer, len(containers))
for _, name := range containers { observedContainers := []string{}
info, ok := containerInfos[name] for _, pod := range summary.Pods {
if !ok { for _, container := range pod.Containers {
if !expectMissingContainers { isInteresting := false
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName) for _, interestingContainerName := range containers {
if container.Name == interestingContainerName {
isInteresting = true
observedContainers = append(observedContainers, container.Name)
break
}
} }
continue if !isInteresting {
continue
}
usageMap[pod.PodRef.Name+"/"+container.Name] = f(pod.PodRef.Name+"/"+container.Name, &container)
} }
first := info.Stats[0]
last := info.Stats[len(info.Stats)-1]
usageMap[name] = f(name, first, last)
} }
return usageMap, nil return usageMap, nil
} }

View File

@ -127,15 +127,14 @@ func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]Resour
} }
type resourceGatherWorker struct { type resourceGatherWorker struct {
c clientset.Interface c clientset.Interface
nodeName string nodeName string
wg *sync.WaitGroup wg *sync.WaitGroup
containerIDToNameMap map[string]string containerIDs []string
containerIDs []string stopCh chan struct{}
stopCh chan struct{} dataSeries []ResourceUsagePerContainer
dataSeries []ResourceUsagePerContainer finished bool
finished bool inKubemark bool
inKubemark bool
} }
func (w *resourceGatherWorker) singleProbe() { func (w *resourceGatherWorker) singleProbe() {
@ -153,13 +152,13 @@ func (w *resourceGatherWorker) singleProbe() {
} }
} }
} else { } else {
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }, true) nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs })
if err != nil { if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err) Logf("Error while reading data from %v: %v", w.nodeName, err)
return return
} }
for k, v := range nodeUsage { for k, v := range nodeUsage {
data[w.containerIDToNameMap[k]] = v data[k] = v
} }
} }
w.dataSeries = append(w.dataSeries, data) w.dataSeries = append(w.dataSeries, data)
@ -200,13 +199,12 @@ func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c clien
} }
type containerResourceGatherer struct { type containerResourceGatherer struct {
client clientset.Interface client clientset.Interface
stopCh chan struct{} stopCh chan struct{}
workers []resourceGatherWorker workers []resourceGatherWorker
workerWg sync.WaitGroup workerWg sync.WaitGroup
containerIDToNameMap map[string]string containerIDs []string
containerIDs []string options ResourceGathererOptions
options ResourceGathererOptions
} }
type ResourceGathererOptions struct { type ResourceGathererOptions struct {
@ -216,11 +214,10 @@ type ResourceGathererOptions struct {
func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions) (*containerResourceGatherer, error) { func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions) (*containerResourceGatherer, error) {
g := containerResourceGatherer{ g := containerResourceGatherer{
client: c, client: c,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
containerIDToNameMap: make(map[string]string), containerIDs: make([]string, 0),
containerIDs: make([]string, 0), options: options,
options: options,
} }
if options.inKubemark { if options.inKubemark {
@ -239,9 +236,7 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
} }
for _, pod := range pods.Items { for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses { for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/") g.containerIDs = append(g.containerIDs, container.Name)
g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
g.containerIDs = append(g.containerIDs, containerID)
} }
} }
nodeList, err := c.Core().Nodes().List(metav1.ListOptions{}) nodeList, err := c.Core().Nodes().List(metav1.ListOptions{})
@ -254,14 +249,13 @@ func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOpt
if !options.masterOnly || system.IsMasterNode(node.Name) { if !options.masterOnly || system.IsMasterNode(node.Name) {
g.workerWg.Add(1) g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{ g.workers = append(g.workers, resourceGatherWorker{
c: c, c: c,
nodeName: node.Name, nodeName: node.Name,
wg: &g.workerWg, wg: &g.workerWg,
containerIDToNameMap: g.containerIDToNameMap, containerIDs: g.containerIDs,
containerIDs: g.containerIDs, stopCh: g.stopCh,
stopCh: g.stopCh, finished: false,
finished: false, inKubemark: false,
inKubemark: false,
}) })
if options.masterOnly { if options.masterOnly {
break break