Scrape /metrics of kubelets from e2e tests

This commit is contained in:
Prashanth Balasubramanian 2015-06-09 14:01:23 -07:00
parent dae03043d4
commit 831d7a36d0
4 changed files with 261 additions and 15 deletions

View File

@ -35,6 +35,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
@ -722,6 +723,10 @@ func (dm *DockerManager) GetContainers(all bool) ([]*kubecontainer.Container, er
}
func (dm *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("GetPods").Observe(metrics.SinceInMicroseconds(start))
}()
pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod
@ -1159,6 +1164,11 @@ func (dm *DockerManager) killContainer(containerID types.UID) error {
// Run a single container from a pod. Returns the docker container ID
func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("runContainerInPod").Observe(metrics.SinceInMicroseconds(start))
}()
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -1224,6 +1234,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start))
}()
// Use host networking if specified.
netNamespace := ""
var ports []api.ContainerPort
@ -1296,6 +1310,11 @@ type PodContainerChangesSpec struct {
}
func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("computePodContainerChanges").Observe(metrics.SinceInMicroseconds(start))
}()
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
@ -1442,6 +1461,12 @@ func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container, pullS
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
}()
podFullName := kubecontainer.GetPodFullName(pod)
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)

View File

@ -1092,6 +1092,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
start := time.Now()
// Before returning, regenerate status and store it in the cache.
defer func() {
@ -1108,6 +1109,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podFullName)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning {
// TODO: Check the pod annotation instead of using `start`
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(start))
}
kl.statusManager.SetPodStatus(podToUpdate, status)
}
}()
@ -1379,7 +1385,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
@ -2094,6 +2100,12 @@ func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []a
// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
start := time.Now()
defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}()
podFullName := kubecontainer.GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName)

View File

@ -25,43 +25,74 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
const kubeletSubsystem = "kubelet"
const (
KubeletSubsystem = "kubelet"
PodWorkerLatencyKey = "pod_worker_latency_microseconds"
SyncPodsLatencyKey = "sync_pods_latency_microseconds"
PodStartLatencyKey = "pod_start_latency_microseconds"
PodStatusLatencyKey = "generate_pod_status_latency_microseconds"
ContainerManagerOperationsKey = "container_manager_latency_microseconds"
DockerOperationsKey = "docker_operations_latency_microseconds"
DockerErrorsKey = "docker_errors"
)
var (
ContainersPerPodCount = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Subsystem: KubeletSubsystem,
Name: "containers_per_pod_count",
Help: "The number of containers per pod.",
},
)
SyncPodLatency = prometheus.NewSummaryVec(
PodWorkerLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pod_latency_microseconds",
Subsystem: KubeletSubsystem,
Name: PodWorkerLatencyKey,
Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync",
},
[]string{"operation_type"},
)
SyncPodsLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pods_latency_microseconds",
Subsystem: KubeletSubsystem,
Name: SyncPodsLatencyKey,
Help: "Latency in microseconds to sync all pods.",
},
)
PodStartLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: PodStartLatencyKey,
Help: "Latency in microseconds for a single pod to go from pending to running. Broken down by podname.",
},
)
PodStatusLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: PodStatusLatencyKey,
Help: "Latency in microseconds to generate status for a single pod.",
},
)
ContainerManagerLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: ContainerManagerOperationsKey,
Help: "Latency in microseconds for container manager operations. Broken down by method.",
},
[]string{"operation_type"},
)
DockerOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "docker_operations_latency_microseconds",
Subsystem: KubeletSubsystem,
Name: DockerOperationsKey,
Help: "Latency in microseconds of Docker operations. Broken down by operation type.",
},
[]string{"operation_type"},
)
DockerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: kubeletSubsystem,
Name: "docker_errors",
Subsystem: KubeletSubsystem,
Name: DockerErrorsKey,
Help: "Cumulative number of Docker errors by operation type.",
},
[]string{"operation_type"},
@ -74,8 +105,11 @@ var registerMetrics sync.Once
func Register(containerCache kubecontainer.RuntimeCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(PodWorkerLatency)
prometheus.MustRegister(PodStartLatency)
prometheus.MustRegister(PodStatusLatency)
prometheus.MustRegister(DockerOperationsLatency)
prometheus.MustRegister(ContainerManagerLatency)
prometheus.MustRegister(SyncPodsLatency)
prometheus.MustRegister(ContainersPerPodCount)
prometheus.MustRegister(DockerErrors)
@ -103,11 +137,11 @@ type podAndContainerCollector struct {
// TODO(vmarmol): Split by source?
var (
runningPodCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", kubeletSubsystem, "running_pod_count"),
prometheus.BuildFQName("", KubeletSubsystem, "running_pod_count"),
"Number of pods currently running",
nil, nil)
runningContainerCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", kubeletSubsystem, "running_container_count"),
prometheus.BuildFQName("", KubeletSubsystem, "running_container_count"),
"Number of containers currently running",
nil, nil)
)

175
test/e2e/kubelet_stats.go Normal file
View File

@ -0,0 +1,175 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
)
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
// TODO: Get some more structure aroud the metrics and this type
type KubeletMetric struct {
// eg: list, info, create
Operation string
// eg: sync_pods, pod_worker
Method string
// 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
Quantile float64
Latency time.Duration
}
// KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on
// the latency field.
type KubeletMetricByLatency []KubeletMetric
func (a KubeletMetricByLatency) Len() int { return len(a) }
func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// ReadKubeletMetrics reads metrics from the kubelet server running on the given node
func ReadKubeletMetrics(c *client.Client, nodeName string) ([]KubeletMetric, error) {
body, err := getKubeletMetrics(c, nodeName)
if err != nil {
return nil, err
}
metric := make([]KubeletMetric, 0)
for _, line := range strings.Split(string(body), "\n") {
// A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields
// that vary by stat described on a case by case basis below.
// TODO: String parsing is such a hack, but getting our rest client/proxy to cooperate with prometheus
// client is weird, we should eventually invest some time in doing this the right way.
if !strings.HasPrefix(line, fmt.Sprintf("%v_", metrics.KubeletSubsystem)) {
continue
}
keyVal := strings.Split(line, " ")
if len(keyVal) != 2 {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
keyElems := strings.Split(line, "\"")
latency, err := strconv.ParseFloat(keyVal[1], 64)
if err != nil {
continue
}
methodLine := strings.Split(keyElems[0], "{")
methodList := strings.Split(methodLine[0], "_")
if len(methodLine) != 2 || len(methodList) == 1 {
continue
}
method := strings.Join(methodList[1:], "_")
var operation, rawQuantile string
var quantile float64
switch method {
case metrics.PodWorkerLatencyKey:
// eg: kubelet_pod_worker_latency_microseconds{operation_type="create",pod_name="foopause3_default",quantile="0.99"} 1344
if len(keyElems) != 7 {
continue
}
operation = keyElems[1]
rawQuantile = keyElems[5]
break
case metrics.SyncPodsLatencyKey:
// eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949
fallthrough
case metrics.PodStartLatencyKey:
// eg: kubelet_pod_start_latency_microseconds{quantile="0.5"} 123
fallthrough
case metrics.PodStatusLatencyKey:
// eg: kubelet_generate_pod_status_latency_microseconds{quantile="0.5"} 12715
if len(keyElems) != 3 {
continue
}
operation = ""
rawQuantile = keyElems[1]
break
case metrics.ContainerManagerOperationsKey:
// eg: kubelet_container_manager_latency_microseconds{operation_type="SyncPod",quantile="0.5"} 6705
fallthrough
case metrics.DockerOperationsKey:
// eg: kubelet_docker_operations_latency_microseconds{operation_type="info",quantile="0.5"} 31590
if len(keyElems) != 5 {
continue
}
operation = keyElems[1]
rawQuantile = keyElems[3]
break
case metrics.DockerErrorsKey:
Logf("ERROR %v", line)
default:
continue
}
quantile, err = strconv.ParseFloat(rawQuantile, 64)
if err != nil {
continue
}
metric = append(metric, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond})
}
return metric, nil
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) {
metric, err := ReadKubeletMetrics(c, nodeName)
if err != nil {
return []KubeletMetric{}, err
}
sort.Sort(KubeletMetricByLatency(metric))
var badMetrics []KubeletMetric
Logf("Latency metrics for node %v", nodeName)
for _, m := range metric {
if m.Latency > threshold {
badMetrics = append(badMetrics, m)
Logf("%+v", m)
}
}
return badMetrics, nil
}
// Retrieve metrics from the kubelet server of the given node.
func getKubeletMetrics(c *client.Client, node string) (string, error) {
metric, err := c.Get().
Prefix("proxy").
Resource("nodes").
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
Suffix("metrics").
Do().
Raw()
if err != nil {
return "", err
}
return string(metric), nil
}