diff --git a/pkg/kubelet/dockertools/instrumented_docker.go b/pkg/kubelet/dockertools/instrumented_docker.go index 2cbaeb9c6e0..36e3fdd637c 100644 --- a/pkg/kubelet/dockertools/instrumented_docker.go +++ b/pkg/kubelet/dockertools/instrumented_docker.go @@ -38,13 +38,18 @@ func newInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterfac // recordOperation records the duration of the operation. func recordOperation(operation string, start time.Time) { + metrics.DockerOperations.WithLabelValues(operation).Inc() metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start)) } // recordError records error for metric if an error occurred. func recordError(operation string, err error) { if err != nil { - metrics.DockerErrors.WithLabelValues(operation).Inc() + if _, ok := err.(operationTimeout); ok { + metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc() + } + // Docker operation timeout error is also a docker error, so we don't add else here. + metrics.DockerOperationsErrors.WithLabelValues(operation).Inc() } } diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index 76bb89e548e..6e440b25279 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -52,8 +52,13 @@ type kubeDockerClient struct { // Make sure that kubeDockerClient implemented the DockerInterface. var _ DockerInterface = &kubeDockerClient{} -// the default ShmSize to use (in bytes) if not specified. -const defaultShmSize = int64(1024 * 1024 * 64) +const ( + // defaultTimeout is the default timeout of all docker operations. + defaultTimeout = 2 * time.Minute + + // defaultShmSize is the default ShmSize to use (in bytes) if not specified. + defaultShmSize = int64(1024 * 1024 * 64) +) // newKubeDockerClient creates an kubeDockerClient from an existing docker client. func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface { @@ -62,27 +67,26 @@ func newKubeDockerClient(dockerClient *dockerapi.Client) DockerInterface { } } -// getDefaultContext returns the default context, now the default context is -// context.Background() -// TODO(random-liu): Add timeout and timeout handling mechanism. -func getDefaultContext() context.Context { - return context.Background() -} - func (k *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) { - containers, err := k.client.ContainerList(getDefaultContext(), options) + ctx, cancel := getDefaultContext() + defer cancel() + containers, err := k.client.ContainerList(ctx, options) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } - apiContainers := []dockertypes.Container{} - for _, c := range containers { - apiContainers = append(apiContainers, dockertypes.Container(c)) - } - return apiContainers, nil + return containers, nil } func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { - containerJSON, err := d.client.ContainerInspect(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + containerJSON, err := d.client.ContainerInspect(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { if dockerapi.IsErrContainerNotFound(err) { return nil, containerNotFoundError{ID: id} @@ -93,12 +97,17 @@ func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS } func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) { + ctx, cancel := getDefaultContext() + defer cancel() // we provide an explicit default shm size as to not depend on docker daemon. // TODO: evaluate exposing this as a knob in the API if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 { opts.HostConfig.ShmSize = defaultShmSize } - createResp, err := d.client.ContainerCreate(getDefaultContext(), opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) + createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -106,20 +115,43 @@ func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfi } func (d *kubeDockerClient) StartContainer(id string) error { - return d.client.ContainerStart(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerStart(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } // Stopping an already stopped container will not cause an error in engine-api. func (d *kubeDockerClient) StopContainer(id string, timeout int) error { - return d.client.ContainerStop(getDefaultContext(), id, timeout) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerStop(ctx, id, timeout) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error { - return d.client.ContainerRemove(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + err := d.client.ContainerRemove(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) { - resp, _, err := d.client.ImageInspectWithRaw(getDefaultContext(), image, true) + ctx, cancel := getDefaultContext() + defer cancel() + resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { if dockerapi.IsErrImageNotFound(err) { err = imageNotFoundError{ID: image} @@ -130,11 +162,22 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect } func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) { - return d.client.ImageHistory(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ImageHistory(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } + return resp, err } func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) { - images, err := d.client.ImageList(getDefaultContext(), opts) + ctx, cancel := getDefaultContext() + defer cancel() + images, err := d.client.ImageList(ctx, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -155,8 +198,13 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, if err != nil { return err } + ctx, cancel := getDefaultContext() + defer cancel() opts.RegistryAuth = base64Auth - resp, err := d.client.ImagePull(getDefaultContext(), image, opts) + resp, err := d.client.ImagePull(ctx, image, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -180,11 +228,22 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, } func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) { - return d.client.ImageRemove(getDefaultContext(), image, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ImageRemove(ctx, image, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } + return resp, err } func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error { - resp, err := d.client.ContainerLogs(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerLogs(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -193,7 +252,12 @@ func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions } func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { - resp, err := d.client.ServerVersion(getDefaultContext()) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ServerVersion(ctx) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -201,7 +265,12 @@ func (d *kubeDockerClient) Version() (*dockertypes.Version, error) { } func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { - resp, err := d.client.Info(getDefaultContext()) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.Info(ctx) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -210,7 +279,12 @@ func (d *kubeDockerClient) Info() (*dockertypes.Info, error) { // TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did. func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) { - resp, err := d.client.ContainerExecCreate(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerExecCreate(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -218,13 +292,22 @@ func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (* } func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { + ctx, cancel := getDefaultContext() + defer cancel() if opts.Detach { - return d.client.ContainerExecStart(getDefaultContext(), startExec, opts) + err := d.client.ContainerExecStart(ctx, startExec, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } + return err } - resp, err := d.client.ContainerExecAttach(getDefaultContext(), startExec, dockertypes.ExecConfig{ + resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecConfig{ Detach: opts.Detach, Tty: opts.Tty, }) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -233,7 +316,12 @@ func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStar } func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) { - resp, err := d.client.ContainerExecInspect(getDefaultContext(), id) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerExecInspect(ctx, id) + if ctxErr := contextError(ctx); ctxErr != nil { + return nil, ctxErr + } if err != nil { return nil, err } @@ -241,7 +329,12 @@ func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns } func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error { - resp, err := d.client.ContainerAttach(getDefaultContext(), id, opts) + ctx, cancel := getDefaultContext() + defer cancel() + resp, err := d.client.ContainerAttach(ctx, id, opts) + if ctxErr := contextError(ctx); ctxErr != nil { + return ctxErr + } if err != nil { return err } @@ -303,6 +396,18 @@ func parseDockerTimestamp(s string) (time.Time, error) { return time.Parse(time.RFC3339Nano, s) } +func getDefaultContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), defaultTimeout) +} + +// contextError checks the context, and returns error if the context is timeout. +func contextError(ctx context.Context) error { + if ctx.Err() == context.DeadlineExceeded { + return operationTimeout{err: ctx.Err()} + } + return ctx.Err() +} + // StreamOptions are the options used to configure the stream redirection type StreamOptions struct { RawTerminal bool @@ -311,6 +416,15 @@ type StreamOptions struct { ErrorStream io.Writer } +// operationTimeout is the error returned when the docker operations are timeout. +type operationTimeout struct { + err error +} + +func (e operationTimeout) Error() string { + return fmt.Sprintf("operation timeout: %v", e.err) +} + // containerNotFoundError is the error returned by InspectContainer when container not found. We // add this error type for testability. We don't use the original error returned by engine-api // because dockertypes.containerNotFoundError is private, we can't create and inject it in our test. @@ -319,7 +433,7 @@ type containerNotFoundError struct { } func (e containerNotFoundError) Error() string { - return fmt.Sprintf("Error: No such container: %s", e.ID) + return fmt.Sprintf("no such container: %q", e.ID) } // imageNotFoundError is the error returned by InspectImage when image not found. @@ -328,5 +442,5 @@ type imageNotFoundError struct { } func (e imageNotFoundError) Error() string { - return fmt.Sprintf("Error: No such image: %s", e.ID) + return fmt.Sprintf("no such image: %q", e.ID) } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 4c8a5215948..7d8294c1503 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -32,8 +32,10 @@ const ( PodStartLatencyKey = "pod_start_latency_microseconds" PodStatusLatencyKey = "generate_pod_status_latency_microseconds" ContainerManagerOperationsKey = "container_manager_latency_microseconds" - DockerOperationsKey = "docker_operations_latency_microseconds" - DockerErrorsKey = "docker_errors" + DockerOperationsLatencyKey = "docker_operations_latency_microseconds" + DockerOperationsKey = "docker_operations" + DockerOperationsErrorsKey = "docker_operations_errors" + DockerOperationsTimeoutKey = "docker_operations_timeout" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" @@ -94,16 +96,32 @@ var ( DockerOperationsLatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Subsystem: KubeletSubsystem, - Name: DockerOperationsKey, + Name: DockerOperationsLatencyKey, Help: "Latency in microseconds of Docker operations. Broken down by operation type.", }, []string{"operation_type"}, ) - DockerErrors = prometheus.NewCounterVec( + DockerOperations = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: KubeletSubsystem, - Name: DockerErrorsKey, - Help: "Cumulative number of Docker errors by operation type.", + Name: DockerOperationsKey, + Help: "Cumulative number of Docker operations by operation type.", + }, + []string{"operation_type"}, + ) + DockerOperationsErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: DockerOperationsErrorsKey, + Help: "Cumulative number of Docker operation errors by operation type.", + }, + []string{"operation_type"}, + ) + DockerOperationsTimeout = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: DockerOperationsTimeoutKey, + Help: "Cumulative number of Docker operation timeout by operation type.", }, []string{"operation_type"}, ) @@ -137,7 +155,9 @@ func Register(containerCache kubecontainer.RuntimeCache) { prometheus.MustRegister(SyncPodsLatency) prometheus.MustRegister(PodWorkerStartLatency) prometheus.MustRegister(ContainersPerPodCount) - prometheus.MustRegister(DockerErrors) + prometheus.MustRegister(DockerOperations) + prometheus.MustRegister(DockerOperationsErrors) + prometheus.MustRegister(DockerOperationsTimeout) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(PLEGRelistLatency) prometheus.MustRegister(PLEGRelistInterval) diff --git a/pkg/metrics/generic_metrics.go b/pkg/metrics/generic_metrics.go index b0905bc5eef..7a04e794e83 100644 --- a/pkg/metrics/generic_metrics.go +++ b/pkg/metrics/generic_metrics.go @@ -134,7 +134,6 @@ func parseMetrics(data string, knownMetrics map[string][]string, output *Metrics if isKnownMetric || isCommonMetric { (*output)[name] = append((*output)[name], metric) } else { - glog.Warningf("Unknown metric %v", metric) if unknownMetrics != nil { unknownMetrics.Insert(name) } diff --git a/pkg/metrics/kubelet_metrics.go b/pkg/metrics/kubelet_metrics.go index 323b80fa5fc..fc715261da7 100644 --- a/pkg/metrics/kubelet_metrics.go +++ b/pkg/metrics/kubelet_metrics.go @@ -18,6 +18,8 @@ package metrics import ( "fmt" + "io/ioutil" + "net/http" "time" "k8s.io/kubernetes/pkg/util/sets" @@ -71,7 +73,9 @@ var NecessaryKubeletMetrics = map[string][]string{ "kubelet_containers_per_pod_count": {"quantile"}, "kubelet_containers_per_pod_count_count": {}, "kubelet_containers_per_pod_count_sum": {}, - "kubelet_docker_errors": {"operation_type"}, + "kubelet_docker_operations": {"operation_type"}, + "kubelet_docker_operations_errors": {"operation_type"}, + "kubelet_docker_operations_timeout": {"operation_type"}, "kubelet_docker_operations_latency_microseconds": {"operation_type", "quantile"}, "kubelet_docker_operations_latency_microseconds_count": {"operation_type"}, "kubelet_docker_operations_latency_microseconds_sum": {"operation_type"}, @@ -126,6 +130,22 @@ func NewKubeletMetrics() KubeletMetrics { return KubeletMetrics(result) } +// GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http. +// Currently only used in integration tests. +func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) { + metricsEndpoint := "http://%s/metrics" + resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName)) + if err != nil { + return KubeletMetrics{}, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return KubeletMetrics{}, err + } + return parseKubeletMetrics(string(body)) +} + func parseKubeletMetrics(data string) (KubeletMetrics, error) { result := NewKubeletMetrics() if err := parseMetrics(data, NecessaryKubeletMetrics, (*Metrics)(&result), nil); err != nil { diff --git a/test/e2e/framework/kubelet_stats.go b/test/e2e/framework/kubelet_stats.go index be7f4488c8d..4b796b2ff71 100644 --- a/test/e2e/framework/kubelet_stats.go +++ b/test/e2e/framework/kubelet_stats.go @@ -20,8 +20,6 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" - "net/http" "sort" "strconv" "strings" @@ -34,9 +32,10 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" - "k8s.io/kubernetes/pkg/kubelet/metrics" + kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/metrics" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -44,7 +43,7 @@ import ( // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. // TODO: Get some more structure around the metrics and this type -type KubeletMetric struct { +type KubeletLatencyMetric struct { // eg: list, info, create Operation string // eg: sync_pods, pod_worker @@ -56,89 +55,217 @@ type KubeletMetric struct { // KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on // the latency field. -type KubeletMetricByLatency []KubeletMetric +type KubeletLatencyMetrics []KubeletLatencyMetric -func (a KubeletMetricByLatency) Len() int { return len(a) } -func (a KubeletMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a KubeletMetricByLatency) Less(i, j int) bool { return a[i].Latency > a[j].Latency } +func (a KubeletLatencyMetrics) Len() int { return len(a) } +func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency } -// ParseKubeletMetrics reads metrics from the kubelet server running on the given node -func ParseKubeletMetrics(metricsBlob string) ([]KubeletMetric, error) { - samples, err := extractMetricSamples(metricsBlob) +// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber; +// or else, the function will try to get kubelet metrics directly from the node. +func getKubeletMetricsFromNode(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) { + if c == nil { + return metrics.GrabKubeletMetricsWithoutProxy(nodeName) + } + grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false) if err != nil { - return nil, err + return metrics.KubeletMetrics{}, err + } + return grabber.GrabFromKubelet(nodeName) +} + +// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims +// the subsystem prefix. +func getKubeletMetrics(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) { + ms, err := getKubeletMetricsFromNode(c, nodeName) + if err != nil { + return metrics.KubeletMetrics{}, err } - acceptedMethods := sets.NewString( - metrics.PodWorkerLatencyKey, - metrics.PodWorkerStartLatencyKey, - metrics.SyncPodsLatencyKey, - metrics.PodStartLatencyKey, - metrics.PodStatusLatencyKey, - metrics.ContainerManagerOperationsKey, - metrics.DockerOperationsKey, - metrics.DockerErrorsKey, - ) - - var kms []KubeletMetric - for _, sample := range samples { - const prefix = metrics.KubeletSubsystem + "_" - metricName := string(sample.Metric[model.MetricNameLabel]) - if !strings.HasPrefix(metricName, prefix) { + kubeletMetrics := make(metrics.KubeletMetrics) + for name, samples := range ms { + const prefix = kubeletmetrics.KubeletSubsystem + "_" + if !strings.HasPrefix(name, prefix) { // Not a kubelet metric. continue } + method := strings.TrimPrefix(name, prefix) + kubeletMetrics[method] = samples + } + return kubeletMetrics, nil +} - method := strings.TrimPrefix(metricName, prefix) - if !acceptedMethods.Has(method) { +// GetKubeletLatencyMetrics gets all latency related kubelet metrics. Note that the KubeletMetrcis +// passed in should not contain subsystem prefix. +func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics { + latencyMethods := sets.NewString( + kubeletmetrics.PodWorkerLatencyKey, + kubeletmetrics.PodWorkerStartLatencyKey, + kubeletmetrics.SyncPodsLatencyKey, + kubeletmetrics.PodStartLatencyKey, + kubeletmetrics.PodStatusLatencyKey, + kubeletmetrics.ContainerManagerOperationsKey, + kubeletmetrics.DockerOperationsLatencyKey, + kubeletmetrics.PodWorkerStartLatencyKey, + kubeletmetrics.PLEGRelistLatencyKey, + ) + var latencyMetrics KubeletLatencyMetrics + for method, samples := range ms { + if !latencyMethods.Has(method) { continue } - - if method == metrics.DockerErrorsKey { - Logf("ERROR %v", sample) - } - - latency := sample.Value - operation := string(sample.Metric["operation_type"]) - var quantile float64 - if val, ok := sample.Metric[model.QuantileLabel]; ok { - var err error - if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { - continue + for _, sample := range samples { + latency := sample.Value + operation := string(sample.Metric["operation_type"]) + var quantile float64 + if val, ok := sample.Metric[model.QuantileLabel]; ok { + var err error + if quantile, err = strconv.ParseFloat(string(val), 64); err != nil { + continue + } } - } - kms = append(kms, KubeletMetric{ - operation, - method, - quantile, - time.Duration(int64(latency)) * time.Microsecond, - }) + latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{ + Operation: operation, + Method: method, + Quantile: quantile, + Latency: time.Duration(int64(latency)) * time.Microsecond, + }) + } } - return kms, nil + return latencyMetrics +} + +// RuntimeOperationMonitor is the tool getting and parsing docker operation metrics. +type RuntimeOperationMonitor struct { + client *client.Client + nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate +} + +// NodeRuntimeOperationErrorRate is the runtime operation error rate on one node. +type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate + +// RuntimeOperationErrorRate is the error rate of a specified runtime operation. +type RuntimeOperationErrorRate struct { + TotalNumber float64 + ErrorRate float64 + TimeoutRate float64 +} + +func NewRuntimeOperationMonitor(c *client.Client) *RuntimeOperationMonitor { + m := &RuntimeOperationMonitor{ + client: c, + nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate), + } + nodes, err := m.client.Nodes().List(api.ListOptions{}) + if err != nil { + Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err) + } + for _, node := range nodes.Items { + m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate) + } + // Initialize the runtime operation error rate + m.GetRuntimeOperationErrorRate() + return m +} + +// GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate +// error rates of all runtime operations. +func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + for node := range m.nodesRuntimeOps { + nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + m.nodesRuntimeOps[node] = nodeResult + } + return m.nodesRuntimeOps +} + +// GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate. +func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate { + result := make(map[string]NodeRuntimeOperationErrorRate) + for node := range m.nodesRuntimeOps { + result[node] = make(NodeRuntimeOperationErrorRate) + oldNodeResult := m.nodesRuntimeOps[node] + curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node) + if err != nil { + Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err) + continue + } + for op, cur := range curNodeResult { + t := *cur + if old, found := oldNodeResult[op]; found { + t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber) + t.TotalNumber -= old.TotalNumber + } + result[node][op] = &t + } + m.nodesRuntimeOps[node] = curNodeResult + } + return result +} + +// FormatRuntimeOperationErrorRate formats the runtime operation error rate to string. +func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string { + lines := []string{} + for node, nodeResult := range nodesResult { + lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node)) + for op, result := range nodeResult { + line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op, + result.TotalNumber, result.ErrorRate, result.TimeoutRate) + lines = append(lines, line) + } + lines = append(lines, fmt.Sprintln()) + } + return strings.Join(lines, "\n") +} + +// getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node. +func getNodeRuntimeOperationErrorRate(c *client.Client, node string) (NodeRuntimeOperationErrorRate, error) { + result := make(NodeRuntimeOperationErrorRate) + ms, err := getKubeletMetrics(c, node) + if err != nil { + return result, err + } + // If no corresponding metrics are found, the returned samples will be empty. Then the following + // loop will be skipped automatically. + allOps := ms[kubeletmetrics.DockerOperationsKey] + errOps := ms[kubeletmetrics.DockerOperationsErrorsKey] + timeoutOps := ms[kubeletmetrics.DockerOperationsTimeoutKey] + for _, sample := range allOps { + operation := string(sample.Metric["operation_type"]) + result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)} + } + for _, sample := range errOps { + operation := string(sample.Metric["operation_type"]) + // Should always find the corresponding item, just in case + if _, found := result[operation]; found { + result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber + } + } + for _, sample := range timeoutOps { + operation := string(sample.Metric["operation_type"]) + if _, found := result[operation]; found { + result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber + } + } + return result, nil } // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics. -func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) ([]KubeletMetric, error) { - var metricsBlob string - var err error - // If we haven't been given a client try scraping the nodename directly for a /metrics endpoint. - if c == nil { - metricsBlob, err = getKubeletMetricsThroughNode(nodeName) - } else { - metricsBlob, err = getKubeletMetricsThroughProxy(c, nodeName) - } +func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) (KubeletLatencyMetrics, error) { + ms, err := getKubeletMetrics(c, nodeName) if err != nil { - return []KubeletMetric{}, err + return KubeletLatencyMetrics{}, err } - metric, err := ParseKubeletMetrics(metricsBlob) - if err != nil { - return []KubeletMetric{}, err - } - sort.Sort(KubeletMetricByLatency(metric)) - var badMetrics []KubeletMetric + latencyMetrics := GetKubeletLatencyMetrics(ms) + sort.Sort(latencyMetrics) + var badMetrics KubeletLatencyMetrics Logf("\nLatency metrics for node %v", nodeName) - for _, m := range metric { + for _, m := range latencyMetrics { if m.Latency > threshold { badMetrics = append(badMetrics, m) Logf("%+v", m) @@ -389,34 +516,6 @@ type usageDataPerContainer struct { memWorkSetData []uint64 } -// Retrieve metrics from the kubelet server of the given node. -func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { - client, err := NodeProxyRequest(c, node, "metrics") - if err != nil { - return "", err - } - metric, errRaw := client.Raw() - if errRaw != nil { - return "", err - } - return string(metric), nil -} - -// Retrieve metrics from the kubelet on the given node using a simple GET over http. -// Currently only used in integration tests. -func getKubeletMetricsThroughNode(nodeName string) (string, error) { - resp, err := http.Get(fmt.Sprintf("http://%v/metrics", nodeName)) - if err != nil { - return "", err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - return string(body), nil -} - func GetKubeletHeapStats(c *client.Client, nodeName string) (string, error) { client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap") if err != nil { diff --git a/test/e2e/kubelet_perf.go b/test/e2e/kubelet_perf.go index a1b627468ae..01ce169e9bf 100644 --- a/test/e2e/kubelet_perf.go +++ b/test/e2e/kubelet_perf.go @@ -189,6 +189,7 @@ func verifyCPULimits(expected framework.ContainersCPUSummary, actual framework.N var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() { var nodeNames sets.String f := framework.NewDefaultFramework("kubelet-perf") + var om *framework.RuntimeOperationMonitor var rm *framework.ResourceMonitor BeforeEach(func() { @@ -197,12 +198,15 @@ var _ = framework.KubeDescribe("Kubelet [Serial] [Slow]", func() { for _, node := range nodes.Items { nodeNames.Insert(node.Name) } + om = framework.NewRuntimeOperationMonitor(f.Client) rm = framework.NewResourceMonitor(f.Client, framework.TargetContainers(), containerStatsPollingPeriod) rm.Start() }) AfterEach(func() { rm.Stop() + result := om.GetLatestRuntimeOperationErrorRate() + framework.Logf("runtime operation error metrics:\n%s", framework.FormatRuntimeOperationErrorRate(result)) }) framework.KubeDescribe("regular resource usage tracking", func() { // We assume that the scheduler will make reasonable scheduling choices