diff --git a/pkg/kubelet/dockertools/instrumented_docker.go b/pkg/kubelet/dockertools/instrumented_docker.go index 2cbaeb9c6e0..36e3fdd637c 100644 --- a/pkg/kubelet/dockertools/instrumented_docker.go +++ b/pkg/kubelet/dockertools/instrumented_docker.go @@ -38,13 +38,18 @@ func newInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterfac // recordOperation records the duration of the operation. func recordOperation(operation string, start time.Time) { + metrics.DockerOperations.WithLabelValues(operation).Inc() metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start)) } // recordError records error for metric if an error occurred. func recordError(operation string, err error) { if err != nil { - metrics.DockerErrors.WithLabelValues(operation).Inc() + if _, ok := err.(operationTimeout); ok { + metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc() + } + // Docker operation timeout error is also a docker error, so we don't add else here. + metrics.DockerOperationsErrors.WithLabelValues(operation).Inc() } } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 4c8a5215948..7d8294c1503 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -32,8 +32,10 @@ const ( PodStartLatencyKey = "pod_start_latency_microseconds" PodStatusLatencyKey = "generate_pod_status_latency_microseconds" ContainerManagerOperationsKey = "container_manager_latency_microseconds" - DockerOperationsKey = "docker_operations_latency_microseconds" - DockerErrorsKey = "docker_errors" + DockerOperationsLatencyKey = "docker_operations_latency_microseconds" + DockerOperationsKey = "docker_operations" + DockerOperationsErrorsKey = "docker_operations_errors" + DockerOperationsTimeoutKey = "docker_operations_timeout" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" @@ -94,16 +96,32 @@ var ( DockerOperationsLatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Subsystem: KubeletSubsystem, - Name: DockerOperationsKey, + Name: DockerOperationsLatencyKey, Help: "Latency in microseconds of Docker operations. Broken down by operation type.", }, []string{"operation_type"}, ) - DockerErrors = prometheus.NewCounterVec( + DockerOperations = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: KubeletSubsystem, - Name: DockerErrorsKey, - Help: "Cumulative number of Docker errors by operation type.", + Name: DockerOperationsKey, + Help: "Cumulative number of Docker operations by operation type.", + }, + []string{"operation_type"}, + ) + DockerOperationsErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: DockerOperationsErrorsKey, + Help: "Cumulative number of Docker operation errors by operation type.", + }, + []string{"operation_type"}, + ) + DockerOperationsTimeout = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: DockerOperationsTimeoutKey, + Help: "Cumulative number of Docker operation timeout by operation type.", }, []string{"operation_type"}, ) @@ -137,7 +155,9 @@ func Register(containerCache kubecontainer.RuntimeCache) { prometheus.MustRegister(SyncPodsLatency) prometheus.MustRegister(PodWorkerStartLatency) prometheus.MustRegister(ContainersPerPodCount) - prometheus.MustRegister(DockerErrors) + prometheus.MustRegister(DockerOperations) + prometheus.MustRegister(DockerOperationsErrors) + prometheus.MustRegister(DockerOperationsTimeout) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(PLEGRelistLatency) prometheus.MustRegister(PLEGRelistInterval) diff --git a/pkg/metrics/generic_metrics.go b/pkg/metrics/generic_metrics.go index b0905bc5eef..7a04e794e83 100644 --- a/pkg/metrics/generic_metrics.go +++ b/pkg/metrics/generic_metrics.go @@ -134,7 +134,6 @@ func parseMetrics(data string, knownMetrics map[string][]string, output *Metrics if isKnownMetric || isCommonMetric { (*output)[name] = append((*output)[name], metric) } else { - glog.Warningf("Unknown metric %v", metric) if unknownMetrics != nil { unknownMetrics.Insert(name) } diff --git a/pkg/metrics/kubelet_metrics.go b/pkg/metrics/kubelet_metrics.go index 323b80fa5fc..fc715261da7 100644 --- a/pkg/metrics/kubelet_metrics.go +++ b/pkg/metrics/kubelet_metrics.go @@ -18,6 +18,8 @@ package metrics import ( "fmt" + "io/ioutil" + "net/http" "time" "k8s.io/kubernetes/pkg/util/sets" @@ -71,7 +73,9 @@ var NecessaryKubeletMetrics = map[string][]string{ "kubelet_containers_per_pod_count": {"quantile"}, "kubelet_containers_per_pod_count_count": {}, "kubelet_containers_per_pod_count_sum": {}, - "kubelet_docker_errors": {"operation_type"}, + "kubelet_docker_operations": {"operation_type"}, + "kubelet_docker_operations_errors": {"operation_type"}, + "kubelet_docker_operations_timeout": {"operation_type"}, "kubelet_docker_operations_latency_microseconds": {"operation_type", "quantile"}, "kubelet_docker_operations_latency_microseconds_count": {"operation_type"}, "kubelet_docker_operations_latency_microseconds_sum": {"operation_type"}, @@ -126,6 +130,22 @@ func NewKubeletMetrics() KubeletMetrics { return KubeletMetrics(result) } +// GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http. +// Currently only used in integration tests. +func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) { + metricsEndpoint := "http://%s/metrics" + resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName)) + if err != nil { + return KubeletMetrics{}, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return KubeletMetrics{}, err + } + return parseKubeletMetrics(string(body)) +} + func parseKubeletMetrics(data string) (KubeletMetrics, error) { result := NewKubeletMetrics() if err := parseMetrics(data, NecessaryKubeletMetrics, (*Metrics)(&result), nil); err != nil { diff --git a/test/e2e/framework/kubelet_stats.go b/test/e2e/framework/kubelet_stats.go index be7f4488c8d..4b796b2ff71 100644 --- a/test/e2e/framework/kubelet_stats.go +++ b/test/e2e/framework/kubelet_stats.go @@ -20,8 +20,6 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" - "net/http" "sort" "strconv" "strings" @@ -34,9 +32,10 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" - "k8s.io/kubernetes/pkg/kubelet/metrics" + kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/metrics" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -44,7 +43,7 @@ import ( // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. // TODO: Get some more structure around the metrics and this type -type KubeletMetric struct { +type KubeletLatencyMetric struct { // eg: list, info, create Operation string // eg: sync_pods, pod_worker @@ -56,89 +55,217 @@ type KubeletMetric struct { // KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on // the latency field. -type KubeletMetricByLatency []KubeletMetric +type KubeletLatencyMetrics []KubeletLatencyMetric -func (a KubeletMetricByLatency) Len() int { return len(a) } -func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } +func (a KubeletLatencyMetrics) Len() int { return len(a) } +func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency } -// ParseKubeletMetrics reads metrics from the kubelet server running on the given node -func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) { - samples, err := extractMetricSamples(metricsBlob) +// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber; +// or else, the function will try to get kubelet metrics directly from the node. +func getKubeletMetricsFromNode(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) { + if c == nil { + return metrics.GrabKubeletMetricsWithoutProxy(nodeName) + } + grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false) if err != nil { - return nil, err + return metrics.KubeletMetrics{}, err + } + return grabber.GrabFromKubelet(nodeName) +} + +// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims +// the subsystem prefix. +func getKubeletMetrics(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) { + ms, err := getKubeletMetricsFromNode(c, nodeName) + if err != nil { + return metrics.KubeletMetrics{}, err } - acceptedMethods := sets.NewString( - metrics.PodWorkerLatencyKey, - metrics.PodWorkerStartLatencyKey, - metrics.SyncPodsLatencyKey, - metrics.PodStartLatencyKey, - metrics.PodStatusLatencyKey, - metrics.ContainerManagerOperationsKey, - metrics.DockerOperationsKey, - metrics.DockerErrorsKey, - ) - - var kms []KubeletMetric - for _, sample := range samples { - const prefix = metrics.KubeletSubsystem + "_" - metricName := string(sample.Metric[model.MetricNameLabel]) - if !strings.HasPrefix(metricName, prefix) { + kubeletMetrics := make(metrics.KubeletMetrics) + for name, samples := range ms { + const prefix = kubeletmetrics.KubeletSubsystem + "_" + if !strings.HasPrefix(name, prefix) { // Not a kubelet metric. continue } + method := strings.TrimPrefix(name, prefix) + kubeletMetrics[method] = samples + } + return kubeletMetrics, nil +} - method := strings.TrimPrefix(metricName, prefix) - if !acceptedMethods.Has(method) { +// GetKubeletLatencyMetrics gets all latency related kubelet metrics. Note that the KubeletMetrcis +// passed in should not contain subsystem prefix. +func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics { + latencyMethods := sets.NewString( + kubeletmetrics.PodWorkerLatencyKey, + kubeletmetrics.PodWorkerStartLatencyKey, + kubeletmetrics.SyncPodsLatencyKey, + kubeletmetrics.PodStartLatencyKey, + kubeletmetrics.PodStatusLatencyKey, + kubeletmetrics.ContainerManagerOperationsKey, + kubeletmetrics.DockerOperationsLatencyKey, + kubeletmetrics.PodWorkerStartLatencyKey, + kubeletmetrics.PLEGRelistLatencyKey, + ) + var latencyMetrics KubeletLatencyMetrics + for method, samples := range ms { + if !latencyMethods.Has(method) { continue } - - if method == metrics.DockerErrorsKey { - Logf("ERROR %v", sample) - } - - latency := sample.Value - operation := string(sample.Metric["operation_type"]) - var quantile float64 - if val, ok := sample.Metric[model.QuantileLabel]; ok { - var err error - if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { - continue + for _, sample := range samples { + latency := sample.Value + operation := string(sample.Metric["operation_type"]) + var quantile float64 + if val, ok := sample.Metric[model.QuantileLabel]; ok { + var err error + if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { + continue + } } - } - kms = append(kms, KubeletMetric{ - operation, - method, - quantile, - time.Duration(int64(latency)) * time.Microsecond, - }) + latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{ + Operation: operation, + Method: method, + Quantile: quantile, + Latency: time.Duration(int64(latency)) * time.Microsecond, + }) + } } - return kms, nil + return latencyMetrics +} + +// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics. +type RuntimeOperationMonitor struct { + client *client.Client + nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate +} + +// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node. +type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate + +// RuntimeOperationErrorRate is the error rate of a specified runtime operation. +type RuntimeOperationErrorRate struct { + TotalNumber float64 + ErrorRate float64 + TimeoutRate float64 +} + +func NewRuntimeOperationMonitor(c *client.Client) *RuntimeOperationMonitor { + m := &RuntimeOperationMonitor{ + client: c, + nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate), + } + nodes, err := m.client.Nodes().List(api.ListOptions{}) + if err != nil { + Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) + } + for _, node := range nodes.Items { + m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate) + } + // Initialize the runtime operation error rate + m.GetRuntimeOperationErrorRate() + return m +} + +// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate +// error rates of all runtime operations. +func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + for node := range m.nodesRuntimeOps { + nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + m.nodesRuntimeOps[node] = nodeResult + } + return m.nodesRuntimeOps +} + +// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate. +func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + result := make(map[string]NodeRuntimeOperationErrorRate) + for node := range m.nodesRuntimeOps { + result[node] = make(NodeRuntimeOperationErrorRate) + oldNodeResult := m.nodesRuntimeOps[node] + curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + for op, cur := range curNodeResult { + t := *cur + if old, found := oldNodeResult[op]; found { + t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TotalNumber -= old.TotalNumber + } + result[node][op] = &t + } + m.nodesRuntimeOps[node] = curNodeResult + } + return result +} + +// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string. +func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string { + lines := []string{} + for node, nodeResult := range nodesResult { + lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node)) + for op, result := range nodeResult { + line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op, + result.TotalNumber, result.ErrorRate, result.TimeoutRate) + lines = append(lines, line) + } + lines = append(lines, fmt.Sprintln()) + } + return strings.Join(lines, "\n") +} + +// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node. +func getNodeRuntimeOperationErrorRate(c *client.Client, node string) (NodeRuntimeOperationErrorRate, error) { + result := make(NodeRuntimeOperationErrorRate) + ms, err := getKubeletMetrics(c, node) + if err != nil { + return result, err + } + // If no corresponding metrics are found, the returned samples will be empty. Then the following + // loop will be skipped automatically. + allOps := ms[kubeletmetrics.DockerOperationsKey] + errOps := ms[kubeletmetrics.DockerOperationsErrorsKey] + timeoutOps := ms[kubeletmetrics.DockerOperationsTimeoutKey] + for _, sample := range allOps { + operation := string(sample.Metric["operation_type"]) + result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)} + } + for _, sample := range errOps { + operation := string(sample.Metric["operation_type"]) + // Should always find the corresponding item, just in case + if _, found := result[operation]; found { + result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber + } + } + for _, sample := range timeoutOps { + operation := string(sample.Metric["operation_type"]) + if _, found := result[operation]; found { + result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber + } + } + return result, nil } // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. -func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) { - var metricsBlob string - var err error - // If we haven't been given a client try scraping the nodename directly for a /metrics endpoint. - if c == nil { - metricsBlob, err = getKubeletMetricsThroughNode(nodeName) - } else { - metricsBlob, err = getKubeletMetricsThroughProxy(c, nodeName) - } +func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) (KubeletLatencyMetrics, error) { + ms, err := getKubeletMetrics(c, nodeName) if err != nil { - return []KubeletMetric{}, err + return KubeletLatencyMetrics{}, err } - metric, err := ParseKubeletMetrics(metricsBlob) - if err != nil { - return []KubeletMetric{}, err - } - sort.Sort(KubeletMetricByLatency(metric)) - var badMetrics []KubeletMetric + latencyMetrics := GetKubeletLatencyMetrics(ms) + sort.Sort(latencyMetrics) + var badMetrics KubeletLatencyMetrics Logf("\nLatency metrics for node %v", nodeName) - for _, m := range metric { + for _, m := range latencyMetrics { if m.Latency > threshold { badMetrics = append(badMetrics, m) Logf("%+v", m) @@ -389,34 +516,6 @@ type usageDataPerContainer struct { memWorkSetData []uint64 } -// Retrieve metrics from the kubelet server of the given node. -func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { - client, err := NodeProxyRequest(c, node, "metrics") - if err != nil { - return "", err - } - metric, errRaw := client.Raw() - if errRaw != nil { - return "", err - } - return string(metric), nil -} - -// Retrieve metrics from the kubelet on the given node using a simple GET over http. -// Currently only used in integration tests. -func getKubeletMetricsThroughNode(nodeName string) (string, error) { - resp, err := http.Get(fmt.Sprintf("http://%v/metrics", nodeName)) - if err != nil { - return "", err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - return string(body), nil -} - func GetKubeletHeapStats(c *client.Client, nodeName string) (string, error) { client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap") if err != nil { diff --git a/test/e2e/kubelet_perf.go b/test/e2e/kubelet_perf.go index afc87df2ee8..f5e5b485fd0 100644 --- a/test/e2e/kubelet_perf.go +++ b/test/e2e/kubelet_perf.go @@ -189,6 +189,7 @@ func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.N var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() { var nodeNames sets.String f := framework.NewDefaultFramework("kubelet-perf") + var om *framework.RuntimeOperationMonitor var rm *framework.ResourceMonitor BeforeEach(func() { @@ -197,12 +198,15 @@ var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() { for _, node := range nodes.Items { nodeNames.Insert(node.Name) } + om = framework.NewRuntimeOperationMonitor(f.Client) rm = framework.NewResourceMonitor(f.Client, framework.TargetContainers(), containerStatsPollingPeriod) rm.Start() }) AfterEach(func() { rm.Stop() + result := om.GetLatestRuntimeOperationErrorRate() + framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) }) framework.KubeDescribe("regular resource usage tracking", func() { // We assume that the scheduler will make reasonable scheduling choices