1) Add docker operation timeout metrics.

2) Cleanup kubelet stats and add runtime operation error and timeout
rate monitoring.
3) Monitor runtime operation error and timeout rate in
kubelet perf.
This commit is contained in:
Random-Liu 2016-05-06 10:25:18 -07:00
parent 66678354a0
commit 148588e6a1
6 changed files with 252 additions and 105 deletions

View File

@ -38,13 +38,18 @@ func newInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterfac
// recordOperation records the duration of the operation.
func recordOperation(operation string, start time.Time) {
metrics.DockerOperations.WithLabelValues(operation).Inc()
metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start))
}
// recordError records error for metric if an error occurred.
func recordError(operation string, err error) {
if err != nil {
metrics.DockerErrors.WithLabelValues(operation).Inc()
if _, ok := err.(operationTimeout); ok {
metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc()
}
// Docker operation timeout error is also a docker error, so we don't add else here.
metrics.DockerOperationsErrors.WithLabelValues(operation).Inc()
}
}

View File

@ -32,8 +32,10 @@ const (
PodStartLatencyKey = "pod_start_latency_microseconds"
PodStatusLatencyKey = "generate_pod_status_latency_microseconds"
ContainerManagerOperationsKey = "container_manager_latency_microseconds"
DockerOperationsKey = "docker_operations_latency_microseconds"
DockerErrorsKey = "docker_errors"
DockerOperationsLatencyKey = "docker_operations_latency_microseconds"
DockerOperationsKey = "docker_operations"
DockerOperationsErrorsKey = "docker_operations_errors"
DockerOperationsTimeoutKey = "docker_operations_timeout"
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
@ -94,16 +96,32 @@ var (
DockerOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsKey,
Name: DockerOperationsLatencyKey,
Help: "Latency in microseconds of Docker operations. Broken down by operation type.",
},
[]string{"operation_type"},
)
DockerErrors = prometheus.NewCounterVec(
DockerOperations = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerErrorsKey,
Help: "Cumulative number of Docker errors by operation type.",
Name: DockerOperationsKey,
Help: "Cumulative number of Docker operations by operation type.",
},
[]string{"operation_type"},
)
DockerOperationsErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsErrorsKey,
Help: "Cumulative number of Docker operation errors by operation type.",
},
[]string{"operation_type"},
)
DockerOperationsTimeout = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsTimeoutKey,
Help: "Cumulative number of Docker operation timeout by operation type.",
},
[]string{"operation_type"},
)
@ -137,7 +155,9 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(SyncPodsLatency)
prometheus.MustRegister(PodWorkerStartLatency)
prometheus.MustRegister(ContainersPerPodCount)
prometheus.MustRegister(DockerErrors)
prometheus.MustRegister(DockerOperations)
prometheus.MustRegister(DockerOperationsErrors)
prometheus.MustRegister(DockerOperationsTimeout)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGRelistInterval)

View File

@ -134,7 +134,6 @@ func parseMetrics(data string, knownMetrics map[string][]string, output *Metrics
if isKnownMetric || isCommonMetric {
(*output)[name] = append((*output)[name], metric)
} else {
glog.Warningf("Unknown metric %v", metric)
if unknownMetrics != nil {
unknownMetrics.Insert(name)
}

View File

@ -18,6 +18,8 @@ package metrics
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"k8s.io/kubernetes/pkg/util/sets"
@ -71,7 +73,9 @@ var NecessaryKubeletMetrics = map[string][]string{
"kubelet_containers_per_pod_count": {"quantile"},
"kubelet_containers_per_pod_count_count": {},
"kubelet_containers_per_pod_count_sum": {},
"kubelet_docker_errors": {"operation_type"},
"kubelet_docker_operations": {"operation_type"},
"kubelet_docker_operations_errors": {"operation_type"},
"kubelet_docker_operations_timeout": {"operation_type"},
"kubelet_docker_operations_latency_microseconds": {"operation_type", "quantile"},
"kubelet_docker_operations_latency_microseconds_count": {"operation_type"},
"kubelet_docker_operations_latency_microseconds_sum": {"operation_type"},
@ -126,6 +130,22 @@ func NewKubeletMetrics() KubeletMetrics {
return KubeletMetrics(result)
}
// GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) {
metricsEndpoint := "http://%s/metrics"
resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName))
if err != nil {
return KubeletMetrics{}, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return KubeletMetrics{}, err
}
return parseKubeletMetrics(string(body))
}
func parseKubeletMetrics(data string) (KubeletMetrics, error) {
result := NewKubeletMetrics()
if err := parseMetrics(data, NecessaryKubeletMetrics, (*Metrics)(&result), nil); err != nil {

View File

@ -20,8 +20,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
@ -34,9 +32,10 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/metrics"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -44,7 +43,7 @@ import (
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
// TODO: Get some more structure around the metrics and this type
type KubeletMetric struct {
type KubeletLatencyMetric struct {
// eg: list, info, create
Operation string
// eg: sync_pods, pod_worker
@ -56,89 +55,217 @@ type KubeletMetric struct {
// KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on
// the latency field.
type KubeletMetricByLatency []KubeletMetric
type KubeletLatencyMetrics []KubeletLatencyMetric
func (a KubeletMetricByLatency) Len() int { return len(a) }
func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
func (a KubeletLatencyMetrics) Len() int { return len(a) }
func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// ParseKubeletMetrics reads metrics from the kubelet server running on the given node
func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
samples, err := extractMetricSamples(metricsBlob)
// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
// or else, the function will try to get kubelet metrics directly from the node.
func getKubeletMetricsFromNode(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
if c == nil {
return metrics.GrabKubeletMetricsWithoutProxy(nodeName)
}
grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false)
if err != nil {
return nil, err
return metrics.KubeletMetrics{}, err
}
return grabber.GrabFromKubelet(nodeName)
}
// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
// the subsystem prefix.
func getKubeletMetrics(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
ms, err := getKubeletMetricsFromNode(c, nodeName)
if err != nil {
return metrics.KubeletMetrics{}, err
}
acceptedMethods := sets.NewString(
metrics.PodWorkerLatencyKey,
metrics.PodWorkerStartLatencyKey,
metrics.SyncPodsLatencyKey,
metrics.PodStartLatencyKey,
metrics.PodStatusLatencyKey,
metrics.ContainerManagerOperationsKey,
metrics.DockerOperationsKey,
metrics.DockerErrorsKey,
)
var kms []KubeletMetric
for _, sample := range samples {
const prefix = metrics.KubeletSubsystem + "_"
metricName := string(sample.Metric[model.MetricNameLabel])
if !strings.HasPrefix(metricName, prefix) {
kubeletMetrics := make(metrics.KubeletMetrics)
for name, samples := range ms {
const prefix = kubeletmetrics.KubeletSubsystem + "_"
if !strings.HasPrefix(name, prefix) {
// Not a kubelet metric.
continue
}
method := strings.TrimPrefix(name, prefix)
kubeletMetrics[method] = samples
}
return kubeletMetrics, nil
}
method := strings.TrimPrefix(metricName, prefix)
if !acceptedMethods.Has(method) {
// GetKubeletLatencyMetrics gets all latency related kubelet metrics. Note that the KubeletMetrcis
// passed in should not contain subsystem prefix.
func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics {
latencyMethods := sets.NewString(
kubeletmetrics.PodWorkerLatencyKey,
kubeletmetrics.PodWorkerStartLatencyKey,
kubeletmetrics.SyncPodsLatencyKey,
kubeletmetrics.PodStartLatencyKey,
kubeletmetrics.PodStatusLatencyKey,
kubeletmetrics.ContainerManagerOperationsKey,
kubeletmetrics.DockerOperationsLatencyKey,
kubeletmetrics.PodWorkerStartLatencyKey,
kubeletmetrics.PLEGRelistLatencyKey,
)
var latencyMetrics KubeletLatencyMetrics
for method, samples := range ms {
if !latencyMethods.Has(method) {
continue
}
if method == metrics.DockerErrorsKey {
Logf("ERROR %v", sample)
}
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
for _, sample := range samples {
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
}
}
}
kms = append(kms, KubeletMetric{
operation,
method,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
Operation: operation,
Method: method,
Quantile: quantile,
Latency: time.Duration(int64(latency)) * time.Microsecond,
})
}
}
return kms, nil
return latencyMetrics
}
// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics.
type RuntimeOperationMonitor struct {
client *client.Client
nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
}
// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node.
type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
// RuntimeOperationErrorRate is the error rate of a specified runtime operation.
type RuntimeOperationErrorRate struct {
TotalNumber float64
ErrorRate float64
TimeoutRate float64
}
func NewRuntimeOperationMonitor(c *client.Client) *RuntimeOperationMonitor {
m := &RuntimeOperationMonitor{
client: c,
nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
}
nodes, err := m.client.Nodes().List(api.ListOptions{})
if err != nil {
Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
}
for _, node := range nodes.Items {
m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
}
// Initialize the runtime operation error rate
m.GetRuntimeOperationErrorRate()
return m
}
// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate
// error rates of all runtime operations.
func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
for node := range m.nodesRuntimeOps {
nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
m.nodesRuntimeOps[node] = nodeResult
}
return m.nodesRuntimeOps
}
// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate.
func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
result := make(map[string]NodeRuntimeOperationErrorRate)
for node := range m.nodesRuntimeOps {
result[node] = make(NodeRuntimeOperationErrorRate)
oldNodeResult := m.nodesRuntimeOps[node]
curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
for op, cur := range curNodeResult {
t := *cur
if old, found := oldNodeResult[op]; found {
t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
t.TotalNumber -= old.TotalNumber
}
result[node][op] = &t
}
m.nodesRuntimeOps[node] = curNodeResult
}
return result
}
// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string.
func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
lines := []string{}
for node, nodeResult := range nodesResult {
lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
for op, result := range nodeResult {
line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
result.TotalNumber, result.ErrorRate, result.TimeoutRate)
lines = append(lines, line)
}
lines = append(lines, fmt.Sprintln())
}
return strings.Join(lines, "\n")
}
// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node.
func getNodeRuntimeOperationErrorRate(c *client.Client, node string) (NodeRuntimeOperationErrorRate, error) {
result := make(NodeRuntimeOperationErrorRate)
ms, err := getKubeletMetrics(c, node)
if err != nil {
return result, err
}
// If no corresponding metrics are found, the returned samples will be empty. Then the following
// loop will be skipped automatically.
allOps := ms[kubeletmetrics.DockerOperationsKey]
errOps := ms[kubeletmetrics.DockerOperationsErrorsKey]
timeoutOps := ms[kubeletmetrics.DockerOperationsTimeoutKey]
for _, sample := range allOps {
operation := string(sample.Metric["operation_type"])
result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
}
for _, sample := range errOps {
operation := string(sample.Metric["operation_type"])
// Should always find the corresponding item, just in case
if _, found := result[operation]; found {
result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
}
}
for _, sample := range timeoutOps {
operation := string(sample.Metric["operation_type"])
if _, found := result[operation]; found {
result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
}
}
return result, nil
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) {
var metricsBlob string
var err error
// If we haven't been given a client try scraping the nodename directly for a /metrics endpoint.
if c == nil {
metricsBlob, err = getKubeletMetricsThroughNode(nodeName)
} else {
metricsBlob, err = getKubeletMetricsThroughProxy(c, nodeName)
}
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) (KubeletLatencyMetrics, error) {
ms, err := getKubeletMetrics(c, nodeName)
if err != nil {
return []KubeletMetric{}, err
return KubeletLatencyMetrics{}, err
}
metric, err := ParseKubeletMetrics(metricsBlob)
if err != nil {
return []KubeletMetric{}, err
}
sort.Sort(KubeletMetricByLatency(metric))
var badMetrics []KubeletMetric
latencyMetrics := GetKubeletLatencyMetrics(ms)
sort.Sort(latencyMetrics)
var badMetrics KubeletLatencyMetrics
Logf("\nLatency metrics for node %v", nodeName)
for _, m := range metric {
for _, m := range latencyMetrics {
if m.Latency > threshold {
badMetrics = append(badMetrics, m)
Logf("%+v", m)
@ -389,34 +516,6 @@ type usageDataPerContainer struct {
memWorkSetData []uint64
}
// Retrieve metrics from the kubelet server of the given node.
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
client, err := NodeProxyRequest(c, node, "metrics")
if err != nil {
return "", err
}
metric, errRaw := client.Raw()
if errRaw != nil {
return "", err
}
return string(metric), nil
}
// Retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func getKubeletMetricsThroughNode(nodeName string) (string, error) {
resp, err := http.Get(fmt.Sprintf("http://%v/metrics", nodeName))
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func GetKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap")
if err != nil {

View File

@ -189,6 +189,7 @@ func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.N
var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() {
var nodeNames sets.String
f := framework.NewDefaultFramework("kubelet-perf")
var om *framework.RuntimeOperationMonitor
var rm *framework.ResourceMonitor
BeforeEach(func() {
@ -197,12 +198,15 @@ var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() {
for _, node := range nodes.Items {
nodeNames.Insert(node.Name)
}
om = framework.NewRuntimeOperationMonitor(f.Client)
rm = framework.NewResourceMonitor(f.Client, framework.TargetContainers(), containerStatsPollingPeriod)
rm.Start()
})
AfterEach(func() {
rm.Stop()
result := om.GetLatestRuntimeOperationErrorRate()
framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result))
})
framework.KubeDescribe("regular resource usage tracking", func() {
// We assume that the scheduler will make reasonable scheduling choices