Use up-to-date prometheus extraction libraries

This commit is contained in:
Tim St. Clair 2015-09-18 13:28:53 -07:00
parent 495f6ef212
commit 3a94f3b5c2
2 changed files with 74 additions and 48 deletions

View File

@ -29,6 +29,7 @@ import (
"time"
cadvisor "github.com/google/cadvisor/info/v1"
"github.com/prometheus/common/model"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
@ -38,9 +39,6 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
)
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
@ -63,10 +61,13 @@ func (a KubeletMetricByLatency) Len() int { return len(a) }
func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// KubeletMetricIngester implements extraction.Ingester
type kubeletMetricIngester []KubeletMetric
// ParseKubeletMetrics reads metrics from the kubelet server running on the given node
func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
samples, err := extractMetricSamples(metricsBlob)
if err != nil {
return nil, err
}
func (k *kubeletMetricIngester) Ingest(samples model.Samples) error {
acceptedMethods := sets.NewString(
metrics.PodWorkerLatencyKey,
metrics.PodWorkerStartLatencyKey,
@ -78,6 +79,7 @@ func (k *kubeletMetricIngester) Ingest(samples model.Samples) error {
metrics.DockerErrorsKey,
)
var kms []KubeletMetric
for _, sample := range samples {
const prefix = metrics.KubeletSubsystem + "_"
metricName := string(sample.Metric[model.MetricNameLabel])
@ -105,16 +107,14 @@ func (k *kubeletMetricIngester) Ingest(samples model.Samples) error {
}
}
*k = append(*k, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond})
kms = append(kms, KubeletMetric{
operation,
method,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
}
return nil
}
// ReadKubeletMetrics reads metrics from the kubelet server running on the given node
func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
var ingester kubeletMetricIngester
err := extraction.Processor004.ProcessSingle(strings.NewReader(metricsBlob), &ingester, &extraction.ProcessOptions{})
return ingester, err
return kms, nil
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.

View File

@ -50,8 +50,8 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/davecgh/go-spew/spew"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"golang.org/x/crypto/ssh"
. "github.com/onsi/ginkgo"
@ -1817,34 +1817,6 @@ type LatencyMetric struct {
Latency time.Duration
}
// latencyMetricIngestor implements extraction.Ingester
type latencyMetricIngester []LatencyMetric
func (l *latencyMetricIngester) Ingest(samples model.Samples) error {
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" {
continue
}
resource := string(sample.Metric["resource"])
verb := string(sample.Metric["verb"])
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return err
}
*l = append(*l, LatencyMetric{
verb,
resource,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
}
return nil
}
// LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on
// the latency field.
type LatencyMetricByLatency []LatencyMetric
@ -1858,9 +1830,37 @@ func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
if err != nil {
return nil, err
}
var ingester latencyMetricIngester
err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{})
return ingester, err
samples, err := extractMetricSamples(body)
if err != nil {
return nil, err
}
var metrics []LatencyMetric
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" {
continue
}
resource := string(sample.Metric["resource"])
verb := string(sample.Metric["verb"])
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return metrics, err
}
metrics = append(metrics, LatencyMetric{
verb,
resource,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
}
return metrics, nil
}
// Prints summary metrics for request types with latency above threshold
@ -2070,3 +2070,29 @@ func waitForClusterSize(c *client.Client, size int, timeout time.Duration) error
}
return fmt.Errorf("timeout waiting %v for cluster size to be %d", timeout, size)
}
// extractMetricSamples parses the prometheus metric samples from the input string.
func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
dec, err := expfmt.NewDecoder(strings.NewReader(metricsBlob),
http.Header{"Content-Type": []string{"text/plain"}}) // FIXME
if err != nil {
return nil, err
}
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
var samples []*model.Sample
for {
var v model.Vector
if err = decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}