diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 36aef115b85..48ec7b5face 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -33,7 +33,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" cadvisor "github.com/google/cadvisor/info/v1" + + "github.com/prometheus/client_golang/extraction" + "github.com/prometheus/client_golang/model" ) // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. @@ -56,96 +60,58 @@ func (a KubeletMetricByLatency) Len() int { return len(a) } func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } +// KubeletMetricIngester implements extraction.Ingester +type kubeletMetricIngester []KubeletMetric + +func (k *kubeletMetricIngester) Ingest(samples model.Samples) error { + acceptedMethods := util.NewStringSet( + metrics.PodWorkerLatencyKey, + metrics.PodWorkerStartLatencyKey, + metrics.SyncPodsLatencyKey, + metrics.PodStartLatencyKey, + metrics.PodStatusLatencyKey, + metrics.ContainerManagerOperationsKey, + metrics.DockerOperationsKey, + metrics.DockerErrorsKey, + ) + + for _, sample := range samples { + const prefix = metrics.KubeletSubsystem + "_" + metricName := string(sample.Metric[model.MetricNameLabel]) + if !strings.HasPrefix(metricName, prefix) { + // Not a kubelet metric. + continue + } + + method := strings.TrimPrefix(metricName, prefix) + if !acceptedMethods.Has(method) { + continue + } + + if method == metrics.DockerErrorsKey { + Logf("ERROR %v", sample) + } + + latency := sample.Value + operation := string(sample.Metric["operation_type"]) + var quantile float64 + if val, ok := sample.Metric[model.QuantileLabel]; ok { + var err error + if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { + continue + } + } + + *k = append(*k, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond}) + } + return nil +} + // ReadKubeletMetrics reads metrics from the kubelet server running on the given node func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) { - metric := make([]KubeletMetric, 0) - for _, line := range strings.Split(metricsBlob, "\n") { - - // A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields - // that vary by stat described on a case by case basis below. - // TODO: String parsing is such a hack, but getting our rest client/proxy to cooperate with prometheus - // client is weird, we should eventually invest some time in doing this the right way. - if !strings.HasPrefix(line, fmt.Sprintf("%v_", metrics.KubeletSubsystem)) { - continue - } - keyVal := strings.Split(line, " ") - if len(keyVal) != 2 { - return nil, fmt.Errorf("Error parsing metric %q", line) - } - keyElems := strings.Split(line, "\"") - - latency, err := strconv.ParseFloat(keyVal[1], 64) - if err != nil { - continue - } - - methodLine := strings.Split(keyElems[0], "{") - methodList := strings.Split(methodLine[0], "_") - if len(methodLine) != 2 || len(methodList) == 1 { - continue - } - method := strings.Join(methodList[1:], "_") - - var operation, rawQuantile string - var quantile float64 - - switch method { - case metrics.PodWorkerLatencyKey: - // eg: kubelet_pod_worker_latency_microseconds{operation_type="create",pod_name="foopause3_default",quantile="0.99"} 1344 - if len(keyElems) != 7 { - continue - } - operation = keyElems[1] - rawQuantile = keyElems[5] - break - - case metrics.PodWorkerStartLatencyKey: - // eg: kubelet_pod_worker_start_latency_microseconds{quantile="0.99"} 12 - fallthrough - - case metrics.SyncPodsLatencyKey: - // eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949 - fallthrough - - case metrics.PodStartLatencyKey: - // eg: kubelet_pod_start_latency_microseconds{quantile="0.5"} 123 - fallthrough - - case metrics.PodStatusLatencyKey: - // eg: kubelet_generate_pod_status_latency_microseconds{quantile="0.5"} 12715 - if len(keyElems) != 3 { - continue - } - operation = "" - rawQuantile = keyElems[1] - break - - case metrics.ContainerManagerOperationsKey: - // eg: kubelet_container_manager_latency_microseconds{operation_type="SyncPod",quantile="0.5"} 6705 - fallthrough - - case metrics.DockerOperationsKey: - // eg: kubelet_docker_operations_latency_microseconds{operation_type="info",quantile="0.5"} 31590 - if len(keyElems) != 5 { - continue - } - operation = keyElems[1] - rawQuantile = keyElems[3] - break - - case metrics.DockerErrorsKey: - Logf("ERROR %v", line) - - default: - continue - } - quantile, err = strconv.ParseFloat(rawQuantile, 64) - if err != nil { - continue - } - metric = append(metric, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond}) - } - return metric, nil + var ingester kubeletMetricIngester + err := extraction.Processor004.ProcessSingle(strings.NewReader(metricsBlob), &ingester, &extraction.ProcessOptions{}) + return ingester, err } // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. diff --git a/test/e2e/util.go b/test/e2e/util.go index add60e07901..c26e010ae1f 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -49,6 +49,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/davecgh/go-spew/spew" + "github.com/prometheus/client_golang/extraction" + "github.com/prometheus/client_golang/model" "golang.org/x/crypto/ssh" . "github.com/onsi/ginkgo" @@ -1559,6 +1561,34 @@ type LatencyMetric struct { Latency time.Duration } +// latencyMetricIngestor implements extraction.Ingester +type latencyMetricIngester []LatencyMetric + +func (l *latencyMetricIngester) Ingest(samples model.Samples) error { + for _, sample := range samples { + // Example line: + // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 + if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" { + continue + } + + resource := string(sample.Metric["resource"]) + verb := string(sample.Metric["verb"]) + latency := sample.Value + quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64) + if err != nil { + return err + } + *l = append(*l, LatencyMetric{ + verb, + resource, + quantile, + time.Duration(int64(latency)) * time.Microsecond, + }) + } + return nil +} + // LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on // the latency field. type LatencyMetricByLatency []LatencyMetric @@ -1572,34 +1602,9 @@ func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { if err != nil { return nil, err } - metrics := make([]LatencyMetric, 0) - for _, line := range strings.Split(string(body), "\n") { - if strings.HasPrefix(line, "apiserver_request_latencies_summary{") { - // Example line: - // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 - // TODO: This parsing code is long and not readable. We should improve it. - keyVal := strings.Split(line, " ") - if len(keyVal) != 2 { - return nil, fmt.Errorf("Error parsing metric %q", line) - } - keyElems := strings.Split(line, "\"") - if len(keyElems) != 7 { - return nil, fmt.Errorf("Error parsing metric %q", line) - } - resource := keyElems[1] - verb := keyElems[3] - quantile, err := strconv.ParseFloat(keyElems[5], 64) - if err != nil { - return nil, fmt.Errorf("Error parsing metric %q", line) - } - latency, err := strconv.ParseFloat(keyVal[1], 64) - if err != nil { - return nil, fmt.Errorf("Error parsing metric %q", line) - } - metrics = append(metrics, LatencyMetric{verb, resource, quantile, time.Duration(int64(latency)) * time.Microsecond}) - } - } - return metrics, nil + var ingester latencyMetricIngester + err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{}) + return ingester, err } // Prints summary metrics for request types with latency above threshold