From 831d7a36d01fed071ac9eef00b61fb22421a6ac8 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 9 Jun 2015 14:01:23 -0700 Subject: [PATCH] Scrape /metrics of kubelets from e2e tests --- pkg/kubelet/dockertools/manager.go | 25 +++++ pkg/kubelet/kubelet.go | 14 ++- pkg/kubelet/metrics/metrics.go | 62 +++++++--- test/e2e/kubelet_stats.go | 175 +++++++++++++++++++++++++++++ 4 files changed, 261 insertions(+), 15 deletions(-) create mode 100644 test/e2e/kubelet_stats.go diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index ee9ee7230c2..7a7243c1d97 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -35,6 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" @@ -722,6 +723,10 @@ func (dm *DockerManager) GetContainers(all bool) ([]*kubecontainer.Container, er } func (dm *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) { + start := time.Now() + defer func() { + metrics.ContainerManagerLatency.WithLabelValues("GetPods").Observe(metrics.SinceInMicroseconds(start)) + }() pods := make(map[types.UID]*kubecontainer.Pod) var result []*kubecontainer.Pod @@ -1159,6 +1164,11 @@ func (dm *DockerManager) killContainer(containerID types.UID) error { // Run a single container from a pod. Returns the docker container ID func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) { + start := time.Now() + defer func() { + metrics.ContainerManagerLatency.WithLabelValues("runContainerInPod").Observe(metrics.SinceInMicroseconds(start)) + }() + ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) @@ -1224,6 +1234,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe // createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) { + start := time.Now() + defer func() { + metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start)) + }() // Use host networking if specified. netNamespace := "" var ports []api.ContainerPort @@ -1296,6 +1310,11 @@ type PodContainerChangesSpec struct { } func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) { + start := time.Now() + defer func() { + metrics.ContainerManagerLatency.WithLabelValues("computePodContainerChanges").Observe(metrics.SinceInMicroseconds(start)) + }() + podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) @@ -1442,6 +1461,12 @@ func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container, pullS // Sync the running pod to match the specified desired pod. func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error { + + start := time.Now() + defer func() { + metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start)) + }() + podFullName := kubecontainer.GetPodFullName(pod) containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f5013f06e38..958791a6ea3 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1092,6 +1092,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID + start := time.Now() // Before returning, regenerate status and store it in the cache. defer func() { @@ -1108,6 +1109,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if mirrorPod != nil { podToUpdate = mirrorPod } + existingStatus, ok := kl.statusManager.GetPodStatus(podFullName) + if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning { + // TODO: Check the pod annotation instead of using `start` + metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(start)) + } kl.statusManager.SetPodStatus(podToUpdate, status) } }() @@ -1379,7 +1385,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP // Run the sync in an async manifest worker. kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() { - metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) + metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) // Note the number of containers for new pods. @@ -2094,6 +2100,12 @@ func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []a // By passing the pod directly, this method avoids pod lookup, which requires // grabbing a lock. func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { + + start := time.Now() + defer func() { + metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) + }() + podFullName := kubecontainer.GetPodFullName(pod) glog.V(3).Infof("Generating status for %q", podFullName) diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 199c52b9fef..06eb2330998 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -25,43 +25,74 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const kubeletSubsystem = "kubelet" +const ( + KubeletSubsystem = "kubelet" + PodWorkerLatencyKey = "pod_worker_latency_microseconds" + SyncPodsLatencyKey = "sync_pods_latency_microseconds" + PodStartLatencyKey = "pod_start_latency_microseconds" + PodStatusLatencyKey = "generate_pod_status_latency_microseconds" + ContainerManagerOperationsKey = "container_manager_latency_microseconds" + DockerOperationsKey = "docker_operations_latency_microseconds" + DockerErrorsKey = "docker_errors" +) var ( ContainersPerPodCount = prometheus.NewSummary( prometheus.SummaryOpts{ - Subsystem: kubeletSubsystem, + Subsystem: KubeletSubsystem, Name: "containers_per_pod_count", Help: "The number of containers per pod.", }, ) - SyncPodLatency = prometheus.NewSummaryVec( + PodWorkerLatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Subsystem: kubeletSubsystem, - Name: "sync_pod_latency_microseconds", + Subsystem: KubeletSubsystem, + Name: PodWorkerLatencyKey, Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync", }, []string{"operation_type"}, ) SyncPodsLatency = prometheus.NewSummary( prometheus.SummaryOpts{ - Subsystem: kubeletSubsystem, - Name: "sync_pods_latency_microseconds", + Subsystem: KubeletSubsystem, + Name: SyncPodsLatencyKey, Help: "Latency in microseconds to sync all pods.", }, ) + PodStartLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Subsystem: KubeletSubsystem, + Name: PodStartLatencyKey, + Help: "Latency in microseconds for a single pod to go from pending to running. Broken down by podname.", + }, + ) + PodStatusLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Subsystem: KubeletSubsystem, + Name: PodStatusLatencyKey, + Help: "Latency in microseconds to generate status for a single pod.", + }, + ) + ContainerManagerLatency = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Subsystem: KubeletSubsystem, + Name: ContainerManagerOperationsKey, + Help: "Latency in microseconds for container manager operations. Broken down by method.", + }, + []string{"operation_type"}, + ) DockerOperationsLatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Subsystem: kubeletSubsystem, - Name: "docker_operations_latency_microseconds", + Subsystem: KubeletSubsystem, + Name: DockerOperationsKey, Help: "Latency in microseconds of Docker operations. Broken down by operation type.", }, []string{"operation_type"}, ) DockerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ - Subsystem: kubeletSubsystem, - Name: "docker_errors", + Subsystem: KubeletSubsystem, + Name: DockerErrorsKey, Help: "Cumulative number of Docker errors by operation type.", }, []string{"operation_type"}, @@ -74,8 +105,11 @@ var registerMetrics sync.Once func Register(containerCache kubecontainer.RuntimeCache) { // Register the metrics. registerMetrics.Do(func() { - prometheus.MustRegister(SyncPodLatency) + prometheus.MustRegister(PodWorkerLatency) + prometheus.MustRegister(PodStartLatency) + prometheus.MustRegister(PodStatusLatency) prometheus.MustRegister(DockerOperationsLatency) + prometheus.MustRegister(ContainerManagerLatency) prometheus.MustRegister(SyncPodsLatency) prometheus.MustRegister(ContainersPerPodCount) prometheus.MustRegister(DockerErrors) @@ -103,11 +137,11 @@ type podAndContainerCollector struct { // TODO(vmarmol): Split by source? var ( runningPodCountDesc = prometheus.NewDesc( - prometheus.BuildFQName("", kubeletSubsystem, "running_pod_count"), + prometheus.BuildFQName("", KubeletSubsystem, "running_pod_count"), "Number of pods currently running", nil, nil) runningContainerCountDesc = prometheus.NewDesc( - prometheus.BuildFQName("", kubeletSubsystem, "running_container_count"), + prometheus.BuildFQName("", KubeletSubsystem, "running_container_count"), "Number of containers currently running", nil, nil) ) diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go new file mode 100644 index 00000000000..6ac28f865ce --- /dev/null +++ b/test/e2e/kubelet_stats.go @@ -0,0 +1,175 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" +) + +// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. +// TODO: Get some more structure aroud the metrics and this type +type KubeletMetric struct { + // eg: list, info, create + Operation string + // eg: sync_pods, pod_worker + Method string + // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median. + Quantile float64 + Latency time.Duration +} + +// KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on +// the latency field. +type KubeletMetricByLatency []KubeletMetric + +func (a KubeletMetricByLatency) Len() int { return len(a) } +func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } + +// ReadKubeletMetrics reads metrics from the kubelet server running on the given node +func ReadKubeletMetrics(c *client.Client, nodeName string) ([]KubeletMetric, error) { + body, err := getKubeletMetrics(c, nodeName) + if err != nil { + return nil, err + } + + metric := make([]KubeletMetric, 0) + for _, line := range strings.Split(string(body), "\n") { + + // A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields + // that vary by stat described on a case by case basis below. + // TODO: String parsing is such a hack, but getting our rest client/proxy to cooperate with prometheus + // client is weird, we should eventually invest some time in doing this the right way. + if !strings.HasPrefix(line, fmt.Sprintf("%v_", metrics.KubeletSubsystem)) { + continue + } + keyVal := strings.Split(line, " ") + if len(keyVal) != 2 { + return nil, fmt.Errorf("Error parsing metric %q", line) + } + keyElems := strings.Split(line, "\"") + + latency, err := strconv.ParseFloat(keyVal[1], 64) + if err != nil { + continue + } + + methodLine := strings.Split(keyElems[0], "{") + methodList := strings.Split(methodLine[0], "_") + if len(methodLine) != 2 || len(methodList) == 1 { + continue + } + method := strings.Join(methodList[1:], "_") + + var operation, rawQuantile string + var quantile float64 + + switch method { + case metrics.PodWorkerLatencyKey: + // eg: kubelet_pod_worker_latency_microseconds{operation_type="create",pod_name="foopause3_default",quantile="0.99"} 1344 + if len(keyElems) != 7 { + continue + } + operation = keyElems[1] + rawQuantile = keyElems[5] + break + + case metrics.SyncPodsLatencyKey: + // eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949 + fallthrough + + case metrics.PodStartLatencyKey: + // eg: kubelet_pod_start_latency_microseconds{quantile="0.5"} 123 + fallthrough + + case metrics.PodStatusLatencyKey: + // eg: kubelet_generate_pod_status_latency_microseconds{quantile="0.5"} 12715 + if len(keyElems) != 3 { + continue + } + operation = "" + rawQuantile = keyElems[1] + break + + case metrics.ContainerManagerOperationsKey: + // eg: kubelet_container_manager_latency_microseconds{operation_type="SyncPod",quantile="0.5"} 6705 + fallthrough + + case metrics.DockerOperationsKey: + // eg: kubelet_docker_operations_latency_microseconds{operation_type="info",quantile="0.5"} 31590 + if len(keyElems) != 5 { + continue + } + operation = keyElems[1] + rawQuantile = keyElems[3] + break + + case metrics.DockerErrorsKey: + Logf("ERROR %v", line) + + default: + continue + } + quantile, err = strconv.ParseFloat(rawQuantile, 64) + if err != nil { + continue + } + metric = append(metric, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond}) + } + return metric, nil +} + +// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. +func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) { + metric, err := ReadKubeletMetrics(c, nodeName) + if err != nil { + return []KubeletMetric{}, err + } + sort.Sort(KubeletMetricByLatency(metric)) + var badMetrics []KubeletMetric + Logf("Latency metrics for node %v", nodeName) + for _, m := range metric { + if m.Latency > threshold { + badMetrics = append(badMetrics, m) + Logf("%+v", m) + } + } + return badMetrics, nil +} + +// Retrieve metrics from the kubelet server of the given node. +func getKubeletMetrics(c *client.Client, node string) (string, error) { + metric, err := c.Get(). + Prefix("proxy"). + Resource("nodes"). + Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). + Suffix("metrics"). + Do(). + Raw() + if err != nil { + return "", err + } + return string(metric), nil +}