Merge pull request #24918 from Random-Liu/add-docker-operation-timeout

Automatic merge from submit-queue

Kubelet: Add docker operation timeout

For #23563.
Based on #24748, only the last 2 commits are new.

This PR:
1) Add timeout for all docker operations.
2) Add docker operation timeout metrics
3) Cleanup kubelet stats and add runtime operation error and timeout rate monitoring.
4) Monitor runtime operation error and timeout rate in kubelet perf.

@yujuhong 
/cc @gmarek Because of the metrics change.
/cc @kubernetes/sig-node
This commit is contained in:
k8s-merge-robot 2016-05-09 21:51:52 -07:00
commit c4214f743f
7 changed files with 401 additions and 140 deletions

View File

@ -38,13 +38,18 @@ func newInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterfac
// recordOperation records the duration of the operation.
func recordOperation(operation string, start time.Time) {
metrics.DockerOperations.WithLabelValues(operation).Inc()
metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start))
}
// recordError records error for metric if an error occurred.
func recordError(operation string, err error) {
if err != nil {
metrics.DockerErrors.WithLabelValues(operation).Inc()
if _, ok := err.(operationTimeout); ok {
metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc()
}
// Docker operation timeout error is also a docker error, so we don't add else here.
metrics.DockerOperationsErrors.WithLabelValues(operation).Inc()
}
}

View File

@ -52,8 +52,13 @@ type kubeDockerClient struct {
// Make sure that kubeDockerClient implemented the DockerInterface.
var _ DockerInterface = &kubeDockerClient{}
// the default ShmSize to use (in bytes) if not specified.
const defaultShmSize = int64(1024 * 1024 * 64)
const (
// defaultTimeout is the default timeout of all docker operations.
defaultTimeout = 2 * time.Minute
// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
defaultShmSize = int64(1024 * 1024 * 64)
)
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
@ -62,27 +67,26 @@ func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface {
}
}
// getDefaultContext returns the default context, now the default context is
// context.Background()
// TODO(random-liu): Add timeout and timeout handling mechanism.
func getDefaultContext() context.Context {
return context.Background()
}
func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
containers, err := k.client.ContainerList(getDefaultContext(), options)
ctx, cancel := getDefaultContext()
defer cancel()
containers, err := k.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
apiContainers := []dockertypes.Container{}
for _, c := range containers {
apiContainers = append(apiContainers, dockertypes.Container(c))
}
return apiContainers, nil
return containers, nil
}
func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
containerJSON, err := d.client.ContainerInspect(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
containerJSON, err := d.client.ContainerInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
if dockerapi.IsErrContainerNotFound(err) {
return nil, containerNotFoundError{ID: id}
@ -93,12 +97,17 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS
}
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
ctx, cancel := getDefaultContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
opts.HostConfig.ShmSize = defaultShmSize
}
createResp, err := d.client.ContainerCreate(getDefaultContext(), opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -106,20 +115,43 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi
}
func (d *kubeDockerClient) StartContainer(id string) error {
return d.client.ContainerStart(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerStart(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
// Stopping an already stopped container will not cause an error in engine-api.
func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
return d.client.ContainerStop(getDefaultContext(), id, timeout)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerStop(ctx, id, timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
return d.client.ContainerRemove(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
err := d.client.ContainerRemove(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) {
resp, _, err := d.client.ImageInspectWithRaw(getDefaultContext(), image, true)
ctx, cancel := getDefaultContext()
defer cancel()
resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
if dockerapi.IsErrImageNotFound(err) {
err = imageNotFoundError{ID: image}
@ -130,11 +162,22 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
return d.client.ImageHistory(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ImageHistory(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return resp, err
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
images, err := d.client.ImageList(getDefaultContext(), opts)
ctx, cancel := getDefaultContext()
defer cancel()
images, err := d.client.ImageList(ctx, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -155,8 +198,13 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
if err != nil {
return err
}
ctx, cancel := getDefaultContext()
defer cancel()
opts.RegistryAuth = base64Auth
resp, err := d.client.ImagePull(getDefaultContext(), image, opts)
resp, err := d.client.ImagePull(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -180,11 +228,22 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
}
func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
return d.client.ImageRemove(getDefaultContext(), image, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ImageRemove(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return resp, err
}
func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
resp, err := d.client.ContainerLogs(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerLogs(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -193,7 +252,12 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions
}
func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
resp, err := d.client.ServerVersion(getDefaultContext())
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -201,7 +265,12 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
}
func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
resp, err := d.client.Info(getDefaultContext())
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -210,7 +279,12 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
// TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) {
resp, err := d.client.ContainerExecCreate(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerExecCreate(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -218,13 +292,22 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*
}
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
ctx, cancel := getDefaultContext()
defer cancel()
if opts.Detach {
return d.client.ContainerExecStart(getDefaultContext(), startExec, opts)
err := d.client.ContainerExecStart(ctx, startExec, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
resp, err := d.client.ContainerExecAttach(getDefaultContext(), startExec, dockertypes.ExecConfig{
resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecConfig{
Detach: opts.Detach,
Tty: opts.Tty,
})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -233,7 +316,12 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar
}
func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
resp, err := d.client.ContainerExecInspect(getDefaultContext(), id)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerExecInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
@ -241,7 +329,12 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns
}
func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
resp, err := d.client.ContainerAttach(getDefaultContext(), id, opts)
ctx, cancel := getDefaultContext()
defer cancel()
resp, err := d.client.ContainerAttach(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
@ -303,6 +396,18 @@ func parseDockerTimestamp(s string) (time.Time, error) {
return time.Parse(time.RFC3339Nano, s)
}
func getDefaultContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultTimeout)
}
// contextError checks the context, and returns error if the context is timeout.
func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {
return operationTimeout{err: ctx.Err()}
}
return ctx.Err()
}
// StreamOptions are the options used to configure the stream redirection
type StreamOptions struct {
RawTerminal bool
@ -311,6 +416,15 @@ type StreamOptions struct {
ErrorStream io.Writer
}
// operationTimeout is the error returned when the docker operations are timeout.
type operationTimeout struct {
err error
}
func (e operationTimeout) Error() string {
return fmt.Sprintf("operation timeout: %v", e.err)
}
// containerNotFoundError is the error returned by InspectContainer when container not found. We
// add this error type for testability. We don't use the original error returned by engine-api
// because dockertypes.containerNotFoundError is private, we can't create and inject it in our test.
@ -319,7 +433,7 @@ type containerNotFoundError struct {
}
func (e containerNotFoundError) Error() string {
return fmt.Sprintf("Error: No such container: %s", e.ID)
return fmt.Sprintf("no such container: %q", e.ID)
}
// imageNotFoundError is the error returned by InspectImage when image not found.
@ -328,5 +442,5 @@ type imageNotFoundError struct {
}
func (e imageNotFoundError) Error() string {
return fmt.Sprintf("Error: No such image: %s", e.ID)
return fmt.Sprintf("no such image: %q", e.ID)
}

View File

@ -32,8 +32,10 @@ const (
PodStartLatencyKey = "pod_start_latency_microseconds"
PodStatusLatencyKey = "generate_pod_status_latency_microseconds"
ContainerManagerOperationsKey = "container_manager_latency_microseconds"
DockerOperationsKey = "docker_operations_latency_microseconds"
DockerErrorsKey = "docker_errors"
DockerOperationsLatencyKey = "docker_operations_latency_microseconds"
DockerOperationsKey = "docker_operations"
DockerOperationsErrorsKey = "docker_operations_errors"
DockerOperationsTimeoutKey = "docker_operations_timeout"
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
@ -94,16 +96,32 @@ var (
DockerOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsKey,
Name: DockerOperationsLatencyKey,
Help: "Latency in microseconds of Docker operations. Broken down by operation type.",
},
[]string{"operation_type"},
)
DockerErrors = prometheus.NewCounterVec(
DockerOperations = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerErrorsKey,
Help: "Cumulative number of Docker errors by operation type.",
Name: DockerOperationsKey,
Help: "Cumulative number of Docker operations by operation type.",
},
[]string{"operation_type"},
)
DockerOperationsErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsErrorsKey,
Help: "Cumulative number of Docker operation errors by operation type.",
},
[]string{"operation_type"},
)
DockerOperationsTimeout = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: DockerOperationsTimeoutKey,
Help: "Cumulative number of Docker operation timeout by operation type.",
},
[]string{"operation_type"},
)
@ -137,7 +155,9 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(SyncPodsLatency)
prometheus.MustRegister(PodWorkerStartLatency)
prometheus.MustRegister(ContainersPerPodCount)
prometheus.MustRegister(DockerErrors)
prometheus.MustRegister(DockerOperations)
prometheus.MustRegister(DockerOperationsErrors)
prometheus.MustRegister(DockerOperationsTimeout)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGRelistInterval)

View File

@ -134,7 +134,6 @@ func parseMetrics(data string, knownMetrics map[string][]string, output *Metrics
if isKnownMetric || isCommonMetric {
(*output)[name] = append((*output)[name], metric)
} else {
glog.Warningf("Unknown metric %v", metric)
if unknownMetrics != nil {
unknownMetrics.Insert(name)
}

View File

@ -18,6 +18,8 @@ package metrics
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"k8s.io/kubernetes/pkg/util/sets"
@ -71,7 +73,9 @@ var NecessaryKubeletMetrics = map[string][]string{
"kubelet_containers_per_pod_count": {"quantile"},
"kubelet_containers_per_pod_count_count": {},
"kubelet_containers_per_pod_count_sum": {},
"kubelet_docker_errors": {"operation_type"},
"kubelet_docker_operations": {"operation_type"},
"kubelet_docker_operations_errors": {"operation_type"},
"kubelet_docker_operations_timeout": {"operation_type"},
"kubelet_docker_operations_latency_microseconds": {"operation_type", "quantile"},
"kubelet_docker_operations_latency_microseconds_count": {"operation_type"},
"kubelet_docker_operations_latency_microseconds_sum": {"operation_type"},
@ -126,6 +130,22 @@ func NewKubeletMetrics() KubeletMetrics {
return KubeletMetrics(result)
}
// GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) {
metricsEndpoint := "http://%s/metrics"
resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName))
if err != nil {
return KubeletMetrics{}, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return KubeletMetrics{}, err
}
return parseKubeletMetrics(string(body))
}
func parseKubeletMetrics(data string) (KubeletMetrics, error) {
result := NewKubeletMetrics()
if err := parseMetrics(data, NecessaryKubeletMetrics, (*Metrics)(&result), nil); err != nil {

View File

@ -20,8 +20,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
@ -34,9 +32,10 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/metrics"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -44,7 +43,7 @@ import (
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
// TODO: Get some more structure around the metrics and this type
type KubeletMetric struct {
type KubeletLatencyMetric struct {
// eg: list, info, create
Operation string
// eg: sync_pods, pod_worker
@ -56,89 +55,217 @@ type KubeletMetric struct {
// KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on
// the latency field.
type KubeletMetricByLatency []KubeletMetric
type KubeletLatencyMetrics []KubeletLatencyMetric
func (a KubeletMetricByLatency) Len() int { return len(a) }
func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
func (a KubeletLatencyMetrics) Len() int { return len(a) }
func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// ParseKubeletMetrics reads metrics from the kubelet server running on the given node
func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) {
samples, err := extractMetricSamples(metricsBlob)
// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
// or else, the function will try to get kubelet metrics directly from the node.
func getKubeletMetricsFromNode(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
if c == nil {
return metrics.GrabKubeletMetricsWithoutProxy(nodeName)
}
grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false)
if err != nil {
return nil, err
return metrics.KubeletMetrics{}, err
}
return grabber.GrabFromKubelet(nodeName)
}
// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
// the subsystem prefix.
func getKubeletMetrics(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
ms, err := getKubeletMetricsFromNode(c, nodeName)
if err != nil {
return metrics.KubeletMetrics{}, err
}
acceptedMethods := sets.NewString(
metrics.PodWorkerLatencyKey,
metrics.PodWorkerStartLatencyKey,
metrics.SyncPodsLatencyKey,
metrics.PodStartLatencyKey,
metrics.PodStatusLatencyKey,
metrics.ContainerManagerOperationsKey,
metrics.DockerOperationsKey,
metrics.DockerErrorsKey,
)
var kms []KubeletMetric
for _, sample := range samples {
const prefix = metrics.KubeletSubsystem + "_"
metricName := string(sample.Metric[model.MetricNameLabel])
if !strings.HasPrefix(metricName, prefix) {
kubeletMetrics := make(metrics.KubeletMetrics)
for name, samples := range ms {
const prefix = kubeletmetrics.KubeletSubsystem + "_"
if !strings.HasPrefix(name, prefix) {
// Not a kubelet metric.
continue
}
method := strings.TrimPrefix(name, prefix)
kubeletMetrics[method] = samples
}
return kubeletMetrics, nil
}
method := strings.TrimPrefix(metricName, prefix)
if !acceptedMethods.Has(method) {
// GetKubeletLatencyMetrics gets all latency related kubelet metrics. Note that the KubeletMetrcis
// passed in should not contain subsystem prefix.
func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics {
latencyMethods := sets.NewString(
kubeletmetrics.PodWorkerLatencyKey,
kubeletmetrics.PodWorkerStartLatencyKey,
kubeletmetrics.SyncPodsLatencyKey,
kubeletmetrics.PodStartLatencyKey,
kubeletmetrics.PodStatusLatencyKey,
kubeletmetrics.ContainerManagerOperationsKey,
kubeletmetrics.DockerOperationsLatencyKey,
kubeletmetrics.PodWorkerStartLatencyKey,
kubeletmetrics.PLEGRelistLatencyKey,
)
var latencyMetrics KubeletLatencyMetrics
for method, samples := range ms {
if !latencyMethods.Has(method) {
continue
}
if method == metrics.DockerErrorsKey {
Logf("ERROR %v", sample)
}
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
for _, sample := range samples {
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
}
}
}
kms = append(kms, KubeletMetric{
operation,
method,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
Operation: operation,
Method: method,
Quantile: quantile,
Latency: time.Duration(int64(latency)) * time.Microsecond,
})
}
}
return kms, nil
return latencyMetrics
}
// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics.
type RuntimeOperationMonitor struct {
client *client.Client
nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
}
// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node.
type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
// RuntimeOperationErrorRate is the error rate of a specified runtime operation.
type RuntimeOperationErrorRate struct {
TotalNumber float64
ErrorRate float64
TimeoutRate float64
}
func NewRuntimeOperationMonitor(c *client.Client) *RuntimeOperationMonitor {
m := &RuntimeOperationMonitor{
client: c,
nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
}
nodes, err := m.client.Nodes().List(api.ListOptions{})
if err != nil {
Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
}
for _, node := range nodes.Items {
m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
}
// Initialize the runtime operation error rate
m.GetRuntimeOperationErrorRate()
return m
}
// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate
// error rates of all runtime operations.
func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
for node := range m.nodesRuntimeOps {
nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
m.nodesRuntimeOps[node] = nodeResult
}
return m.nodesRuntimeOps
}
// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate.
func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
result := make(map[string]NodeRuntimeOperationErrorRate)
for node := range m.nodesRuntimeOps {
result[node] = make(NodeRuntimeOperationErrorRate)
oldNodeResult := m.nodesRuntimeOps[node]
curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
if err != nil {
Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
continue
}
for op, cur := range curNodeResult {
t := *cur
if old, found := oldNodeResult[op]; found {
t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
t.TotalNumber -= old.TotalNumber
}
result[node][op] = &t
}
m.nodesRuntimeOps[node] = curNodeResult
}
return result
}
// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string.
func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
lines := []string{}
for node, nodeResult := range nodesResult {
lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
for op, result := range nodeResult {
line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
result.TotalNumber, result.ErrorRate, result.TimeoutRate)
lines = append(lines, line)
}
lines = append(lines, fmt.Sprintln())
}
return strings.Join(lines, "\n")
}
// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node.
func getNodeRuntimeOperationErrorRate(c *client.Client, node string) (NodeRuntimeOperationErrorRate, error) {
result := make(NodeRuntimeOperationErrorRate)
ms, err := getKubeletMetrics(c, node)
if err != nil {
return result, err
}
// If no corresponding metrics are found, the returned samples will be empty. Then the following
// loop will be skipped automatically.
allOps := ms[kubeletmetrics.DockerOperationsKey]
errOps := ms[kubeletmetrics.DockerOperationsErrorsKey]
timeoutOps := ms[kubeletmetrics.DockerOperationsTimeoutKey]
for _, sample := range allOps {
operation := string(sample.Metric["operation_type"])
result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
}
for _, sample := range errOps {
operation := string(sample.Metric["operation_type"])
// Should always find the corresponding item, just in case
if _, found := result[operation]; found {
result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
}
}
for _, sample := range timeoutOps {
operation := string(sample.Metric["operation_type"])
if _, found := result[operation]; found {
result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
}
}
return result, nil
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) {
var metricsBlob string
var err error
// If we haven't been given a client try scraping the nodename directly for a /metrics endpoint.
if c == nil {
metricsBlob, err = getKubeletMetricsThroughNode(nodeName)
} else {
metricsBlob, err = getKubeletMetricsThroughProxy(c, nodeName)
}
func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) (KubeletLatencyMetrics, error) {
ms, err := getKubeletMetrics(c, nodeName)
if err != nil {
return []KubeletMetric{}, err
return KubeletLatencyMetrics{}, err
}
metric, err := ParseKubeletMetrics(metricsBlob)
if err != nil {
return []KubeletMetric{}, err
}
sort.Sort(KubeletMetricByLatency(metric))
var badMetrics []KubeletMetric
latencyMetrics := GetKubeletLatencyMetrics(ms)
sort.Sort(latencyMetrics)
var badMetrics KubeletLatencyMetrics
Logf("\nLatency metrics for node %v", nodeName)
for _, m := range metric {
for _, m := range latencyMetrics {
if m.Latency > threshold {
badMetrics = append(badMetrics, m)
Logf("%+v", m)
@ -389,34 +516,6 @@ type usageDataPerContainer struct {
memWorkSetData []uint64
}
// Retrieve metrics from the kubelet server of the given node.
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
client, err := NodeProxyRequest(c, node, "metrics")
if err != nil {
return "", err
}
metric, errRaw := client.Raw()
if errRaw != nil {
return "", err
}
return string(metric), nil
}
// Retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func getKubeletMetricsThroughNode(nodeName string) (string, error) {
resp, err := http.Get(fmt.Sprintf("http://%v/metrics", nodeName))
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func GetKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap")
if err != nil {

View File

@ -189,6 +189,7 @@ func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.N
var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() {
var nodeNames sets.String
f := framework.NewDefaultFramework("kubelet-perf")
var om *framework.RuntimeOperationMonitor
var rm *framework.ResourceMonitor
BeforeEach(func() {
@ -197,12 +198,15 @@ var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() {
for _, node := range nodes.Items {
nodeNames.Insert(node.Name)
}
om = framework.NewRuntimeOperationMonitor(f.Client)
rm = framework.NewResourceMonitor(f.Client, framework.TargetContainers(), containerStatsPollingPeriod)
rm.Start()
})
AfterEach(func() {
rm.Stop()
result := om.GetLatestRuntimeOperationErrorRate()
framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result))
})
framework.KubeDescribe("regular resource usage tracking", func() {
// We assume that the scheduler will make reasonable scheduling choices