From 3a94f3b5c2d1526a4ff936c9675d71af05176c9b Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Fri, 18 Sep 2015 13:28:53 -0700 Subject: [PATCH] Use up-to-date prometheus extraction libraries --- test/e2e/kubelet_stats.go | 30 ++++++------- test/e2e/util.go | 92 +++++++++++++++++++++++++-------------- 2 files changed, 74 insertions(+), 48 deletions(-) diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 98fca776661..5bee8fe7ac6 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -29,6 +29,7 @@ import ( "time" cadvisor "github.com/google/cadvisor/info/v1" + "github.com/prometheus/common/model" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" @@ -38,9 +39,6 @@ import ( "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" - - "github.com/prometheus/client_golang/extraction" - "github.com/prometheus/client_golang/model" ) // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. @@ -63,10 +61,13 @@ func (a KubeletMetricByLatency) Len() int { return len(a) } func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } -// KubeletMetricIngester implements extraction.Ingester -type kubeletMetricIngester []KubeletMetric +// ParseKubeletMetrics reads metrics from the kubelet server running on the given node +func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) { + samples, err := extractMetricSamples(metricsBlob) + if err != nil { + return nil, err + } -func (k *kubeletMetricIngester) Ingest(samples model.Samples) error { acceptedMethods := sets.NewString( metrics.PodWorkerLatencyKey, metrics.PodWorkerStartLatencyKey, @@ -78,6 +79,7 @@ func (k *kubeletMetricIngester) Ingest(samples model.Samples) error { metrics.DockerErrorsKey, ) + var kms []KubeletMetric for _, sample := range samples { const prefix = metrics.KubeletSubsystem + "_" metricName := string(sample.Metric[model.MetricNameLabel]) @@ -105,16 +107,14 @@ func (k *kubeletMetricIngester) Ingest(samples model.Samples) error { } } - *k = append(*k, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond}) + kms = append(kms, KubeletMetric{ + operation, + method, + quantile, + time.Duration(int64(latency)) * time.Microsecond, + }) } - return nil -} - -// ReadKubeletMetrics reads metrics from the kubelet server running on the given node -func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) { - var ingester kubeletMetricIngester - err := extraction.Processor004.ProcessSingle(strings.NewReader(metricsBlob), &ingester, &extraction.ProcessOptions{}) - return ingester, err + return kms, nil } // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. diff --git a/test/e2e/util.go b/test/e2e/util.go index 8bf253e7677..c86e9587e8e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -50,8 +50,8 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/davecgh/go-spew/spew" - "github.com/prometheus/client_golang/extraction" - "github.com/prometheus/client_golang/model" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" "golang.org/x/crypto/ssh" . "github.com/onsi/ginkgo" @@ -1817,34 +1817,6 @@ type LatencyMetric struct { Latency time.Duration } -// latencyMetricIngestor implements extraction.Ingester -type latencyMetricIngester []LatencyMetric - -func (l *latencyMetricIngester) Ingest(samples model.Samples) error { - for _, sample := range samples { - // Example line: - // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 - if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" { - continue - } - - resource := string(sample.Metric["resource"]) - verb := string(sample.Metric["verb"]) - latency := sample.Value - quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64) - if err != nil { - return err - } - *l = append(*l, LatencyMetric{ - verb, - resource, - quantile, - time.Duration(int64(latency)) * time.Microsecond, - }) - } - return nil -} - // LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on // the latency field. type LatencyMetricByLatency []LatencyMetric @@ -1858,9 +1830,37 @@ func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { if err != nil { return nil, err } - var ingester latencyMetricIngester - err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{}) - return ingester, err + + samples, err := extractMetricSamples(body) + if err != nil { + return nil, err + } + + var metrics []LatencyMetric + for _, sample := range samples { + // Example line: + // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 + if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" { + continue + } + + resource := string(sample.Metric["resource"]) + verb := string(sample.Metric["verb"]) + latency := sample.Value + quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64) + if err != nil { + return metrics, err + } + + metrics = append(metrics, LatencyMetric{ + verb, + resource, + quantile, + time.Duration(int64(latency)) * time.Microsecond, + }) + } + + return metrics, nil } // Prints summary metrics for request types with latency above threshold @@ -2070,3 +2070,29 @@ func waitForClusterSize(c *client.Client, size int, timeout time.Duration) error } return fmt.Errorf("timeout waiting %v for cluster size to be %d", timeout, size) } + +// extractMetricSamples parses the prometheus metric samples from the input string. +func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) { + dec, err := expfmt.NewDecoder(strings.NewReader(metricsBlob), + http.Header{"Content-Type": []string{"text/plain"}}) // FIXME + if err != nil { + return nil, err + } + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + var samples []*model.Sample + for { + var v model.Vector + if err = decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return samples, nil + } + return nil, err + } + samples = append(samples, v...) + } +}