Merge pull request #10362 from bprashanth/latency_integration

Scrape latency stats from integration tests
This commit is contained in:
Robert Bailey 2015-06-26 10:52:01 -07:00
commit e5f44535a9
2 changed files with 42 additions and 9 deletions

View File

@ -59,6 +59,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/GoogleCloudPlatform/kubernetes/test/e2e"
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -1000,6 +1001,11 @@ func main() {
// parallel and also it schedules extra pods which would change the // parallel and also it schedules extra pods which would change the
// above pod counting logic. // above pod counting logic.
runSchedulerNoPhantomPodsTest(kubeClient) runSchedulerNoPhantomPodsTest(kubeClient)
glog.Infof("\n\nLogging high latency metrics from the 10250 kubelet")
e2e.HighLatencyKubeletOperations(nil, 1*time.Second, "localhost:10250")
glog.Infof("\n\nLogging high latency metrics from the 10251 kubelet")
e2e.HighLatencyKubeletOperations(nil, 1*time.Second, "localhost:10251")
} }
// ServeCachedManifestFile serves a file for kubelet to read. // ServeCachedManifestFile serves a file for kubelet to read.

View File

@ -18,6 +18,8 @@ package e2e
import ( import (
"fmt" "fmt"
"io/ioutil"
"net/http"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -49,14 +51,9 @@ func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// ReadKubeletMetrics reads metrics from the kubelet server running on the given node // ReadKubeletMetrics reads metrics from the kubelet server running on the given node
func ReadKubeletMetrics(c *client.Client, nodeName string) ([]KubeletMetric, error) { func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
body, err := getKubeletMetrics(c, nodeName)
if err != nil {
return nil, err
}
metric := make([]KubeletMetric, 0) metric := make([]KubeletMetric, 0)
for _, line := range strings.Split(string(body), "\n") { for _, line := range strings.Split(metricsBlob, "\n") {
// A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields // A kubelet stats line starts with the KubeletSubsystem marker, followed by a stat name, followed by fields
// that vary by stat described on a case by case basis below. // that vary by stat described on a case by case basis below.
@ -96,6 +93,10 @@ func ReadKubeletMetrics(c *client.Client, nodeName string) ([]KubeletMetric, err
rawQuantile = keyElems[5] rawQuantile = keyElems[5]
break break
case metrics.PodWorkerStartLatencyKey:
// eg: kubelet_pod_worker_start_latency_microseconds{quantile="0.99"} 12
fallthrough
case metrics.SyncPodsLatencyKey: case metrics.SyncPodsLatencyKey:
// eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949 // eg: kubelet_sync_pods_latency_microseconds{quantile="0.5"} 9949
fallthrough fallthrough
@ -143,7 +144,18 @@ func ReadKubeletMetrics(c *client.Client, nodeName string) ([]KubeletMetric, err
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) { func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) {
metric, err := ReadKubeletMetrics(c, nodeName) var metricsBlob string
var err error
// If we haven't been given a client try scraping the nodename directly for a /metrics endpoint.
if c == nil {
metricsBlob, err = getKubeletMetricsThroughNode(nodeName)
} else {
metricsBlob, err = getKubeletMetricsThroughProxy(c, nodeName)
}
if err != nil {
return []KubeletMetric{}, err
}
metric, err := ParseKubeletMetrics(metricsBlob)
if err != nil { if err != nil {
return []KubeletMetric{}, err return []KubeletMetric{}, err
} }
@ -160,7 +172,7 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod
} }
// Retrieve metrics from the kubelet server of the given node. // Retrieve metrics from the kubelet server of the given node.
func getKubeletMetrics(c *client.Client, node string) (string, error) { func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
metric, err := c.Get(). metric, err := c.Get().
Prefix("proxy"). Prefix("proxy").
Resource("nodes"). Resource("nodes").
@ -173,3 +185,18 @@ func getKubeletMetrics(c *client.Client, node string) (string, error) {
} }
return string(metric), nil return string(metric), nil
} }
// Retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func getKubeletMetricsThroughNode(nodeName string) (string, error) {
resp, err := http.Get(fmt.Sprintf("http://%v/metrics", nodeName))
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}