Use prometheus extraction library for metric parsing in e2e tests

This commit is contained in:
Tim St. Clair 2015-07-23 18:09:06 -07:00
parent 6676331663
commit a244357ccd
2 changed files with 87 additions and 116 deletions

View File

@ -33,7 +33,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
cadvisor "github.com/google/cadvisor/info/v1"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
)
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
@ -56,96 +60,58 @@ func (a KubeletMetricByLatency) Len() int { return len(a) }
func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// KubeletMetricIngester implements extraction.Ingester
type kubeletMetricIngester []KubeletMetric
func (k *kubeletMetricIngester) Ingest(samples model.Samples) error {
acceptedMethods := util.NewStringSet(
metrics.PodWorkerLatencyKey,
metrics.PodWorkerStartLatencyKey,
metrics.SyncPodsLatencyKey,
metrics.PodStartLatencyKey,
metrics.PodStatusLatencyKey,
metrics.ContainerManagerOperationsKey,
metrics.DockerOperationsKey,
metrics.DockerErrorsKey,
)
for _, sample := range samples {
const prefix = metrics.KubeletSubsystem + "_"
metricName := string(sample.Metric[model.MetricNameLabel])
if !strings.HasPrefix(metricName, prefix) {
// Not a kubelet metric.
continue
}
method := strings.TrimPrefix(metricName, prefix)
if !acceptedMethods.Has(method) {
continue
}
if method == metrics.DockerErrorsKey {
Logf("ERROR %v", sample)
}
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
}
}
*k = append(*k, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond})
}
return nil
}
// ReadKubeletMetrics reads metrics from the kubelet server running on the given node
func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
metric := make([]KubeletMetric, 0)
for _, line := range strings.Split(metricsBlob, "\n") {
// A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields
// that vary by stat described on a case by case basis below.
// TODO: String parsing is such a hack, but getting our rest client/proxy to cooperate with prometheus
// client is weird, we should eventually invest some time in doing this the right way.
if !strings.HasPrefix(line, fmt.Sprintf("%v_", metrics.KubeletSubsystem)) {
continue
}
keyVal := strings.Split(line, " ")
if len(keyVal) != 2 {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
keyElems := strings.Split(line, "\"")
latency, err := strconv.ParseFloat(keyVal[1], 64)
if err != nil {
continue
}
methodLine := strings.Split(keyElems[0], "{")
methodList := strings.Split(methodLine[0], "_")
if len(methodLine) != 2 || len(methodList) == 1 {
continue
}
method := strings.Join(methodList[1:], "_")
var operation, rawQuantile string
var quantile float64
switch method {
case metrics.PodWorkerLatencyKey:
// eg: kubelet_pod_worker_latency_microseconds{operation_type="create",pod_name="foopause3_default",quantile="0.99"} 1344
if len(keyElems) != 7 {
continue
}
operation = keyElems[1]
rawQuantile = keyElems[5]
break
case metrics.PodWorkerStartLatencyKey:
// eg: kubelet_pod_worker_start_latency_microseconds{quantile="0.99"} 12
fallthrough
case metrics.SyncPodsLatencyKey:
// eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949
fallthrough
case metrics.PodStartLatencyKey:
// eg: kubelet_pod_start_latency_microseconds{quantile="0.5"} 123
fallthrough
case metrics.PodStatusLatencyKey:
// eg: kubelet_generate_pod_status_latency_microseconds{quantile="0.5"} 12715
if len(keyElems) != 3 {
continue
}
operation = ""
rawQuantile = keyElems[1]
break
case metrics.ContainerManagerOperationsKey:
// eg: kubelet_container_manager_latency_microseconds{operation_type="SyncPod",quantile="0.5"} 6705
fallthrough
case metrics.DockerOperationsKey:
// eg: kubelet_docker_operations_latency_microseconds{operation_type="info",quantile="0.5"} 31590
if len(keyElems) != 5 {
continue
}
operation = keyElems[1]
rawQuantile = keyElems[3]
break
case metrics.DockerErrorsKey:
Logf("ERROR %v", line)
default:
continue
}
quantile, err = strconv.ParseFloat(rawQuantile, 64)
if err != nil {
continue
}
metric = append(metric, KubeletMetric{operation, method, quantile, time.Duration(int64(latency)) * time.Microsecond})
}
return metric, nil
var ingester kubeletMetricIngester
err := extraction.Processor004.ProcessSingle(strings.NewReader(metricsBlob), &ingester, &extraction.ProcessOptions{})
return ingester, err
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.

View File

@ -49,6 +49,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/davecgh/go-spew/spew"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
"golang.org/x/crypto/ssh"
. "github.com/onsi/ginkgo"
@ -1559,6 +1561,34 @@ type LatencyMetric struct {
Latency time.Duration
}
// latencyMetricIngestor implements extraction.Ingester
type latencyMetricIngester []LatencyMetric
func (l *latencyMetricIngester) Ingest(samples model.Samples) error {
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" {
continue
}
resource := string(sample.Metric["resource"])
verb := string(sample.Metric["verb"])
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return err
}
*l = append(*l, LatencyMetric{
verb,
resource,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
}
return nil
}
// LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on
// the latency field.
type LatencyMetricByLatency []LatencyMetric
@ -1572,34 +1602,9 @@ func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
if err != nil {
return nil, err
}
metrics := make([]LatencyMetric, 0)
for _, line := range strings.Split(string(body), "\n") {
if strings.HasPrefix(line, "apiserver_request_latencies_summary{") {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
// TODO: This parsing code is long and not readable. We should improve it.
keyVal := strings.Split(line, " ")
if len(keyVal) != 2 {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
keyElems := strings.Split(line, "\"")
if len(keyElems) != 7 {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
resource := keyElems[1]
verb := keyElems[3]
quantile, err := strconv.ParseFloat(keyElems[5], 64)
if err != nil {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
latency, err := strconv.ParseFloat(keyVal[1], 64)
if err != nil {
return nil, fmt.Errorf("Error parsing metric %q", line)
}
metrics = append(metrics, LatencyMetric{verb, resource, quantile, time.Duration(int64(latency)) * time.Microsecond})
}
}
return metrics, nil
var ingester latencyMetricIngester
err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{})
return ingester, err
}
// Prints summary metrics for request types with latency above threshold