Merge pull request #35489 from Random-Liu/instrumented-cri-service

Automatic merge from submit-queue

CRI: Instrumented cri service

For https://github.com/kubernetes/kubernetes/issues/29478.

This PR added instrumented CRI service. Because we are adding the instrumented wrapper inside kuberuntime, it should work for both grpc and non-grpc integration.

This will be useful to compare latency difference between grpc and non-grpc integration, although there shouldn't be too much difference.

@yujuhong @feiskyer 
/cc @kubernetes/sig-node
This commit is contained in:
Kubernetes Submit Queue 2016-10-25 20:48:14 -07:00 committed by GitHub
commit 6d81e916a6
4 changed files with 260 additions and 2 deletions

View File

@ -16,6 +16,7 @@ go_library(
"doc.go", "doc.go",
"fake_kuberuntime_manager.go", "fake_kuberuntime_manager.go",
"helpers.go", "helpers.go",
"instrumented_services.go",
"kuberuntime_container.go", "kuberuntime_container.go",
"kuberuntime_gc.go", "kuberuntime_gc.go",
"kuberuntime_image.go", "kuberuntime_image.go",
@ -38,6 +39,7 @@ go_library(
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library", "//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/qos:go_default_library",

View File

@ -0,0 +1,223 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"io"
"time"
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
// instrumentedRuntimeService wraps the RuntimeService and records the operations
// and errors metrics.
type instrumentedRuntimeService struct {
service internalApi.RuntimeService
}
// Creates an instrumented RuntimeInterface from an existing RuntimeService.
func NewInstrumentedRuntimeService(service internalApi.RuntimeService) internalApi.RuntimeService {
return &instrumentedRuntimeService{service: service}
}
// instrumentedImageManagerService wraps the ImageManagerService and records the operations
// and errors metrics.
type instrumentedImageManagerService struct {
service internalApi.ImageManagerService
}
// Creates an instrumented ImageManagerService from an existing ImageManagerService.
func NewInstrumentedImageManagerService(service internalApi.ImageManagerService) internalApi.ImageManagerService {
return &instrumentedImageManagerService{service: service}
}
// recordOperation records the duration of the operation.
func recordOperation(operation string, start time.Time) {
metrics.RuntimeOperations.WithLabelValues(operation).Inc()
metrics.RuntimeOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start))
}
// recordError records error for metric if an error occurred.
func recordError(operation string, err error) {
if err != nil {
metrics.RuntimeOperationsErrors.WithLabelValues(operation).Inc()
}
}
func (in instrumentedRuntimeService) Version(apiVersion string) (*runtimeApi.VersionResponse, error) {
const operation = "version"
defer recordOperation(operation, time.Now())
out, err := in.service.Version(apiVersion)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) {
const operation = "create_container"
defer recordOperation(operation, time.Now())
out, err := in.service.CreateContainer(podSandboxID, config, sandboxConfig)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) StartContainer(containerID string) error {
const operation = "start_container"
defer recordOperation(operation, time.Now())
err := in.service.StartContainer(containerID)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) StopContainer(containerID string, timeout int64) error {
const operation = "stop_container"
defer recordOperation(operation, time.Now())
err := in.service.StopContainer(containerID, timeout)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) RemoveContainer(containerID string) error {
const operation = "remove_container"
defer recordOperation(operation, time.Now())
err := in.service.RemoveContainer(containerID)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error) {
const operation = "list_containers"
defer recordOperation(operation, time.Now())
out, err := in.service.ListContainers(filter)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error) {
const operation = "container_status"
defer recordOperation(operation, time.Now())
out, err := in.service.ContainerStatus(containerID)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
const operation = "exec"
defer recordOperation(operation, time.Now())
err := in.service.Exec(containerID, cmd, tty, stdin, stdout, stderr)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) {
const operation = "run_podsandbox"
defer recordOperation(operation, time.Now())
out, err := in.service.RunPodSandbox(config)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) StopPodSandbox(podSandboxID string) error {
const operation = "stop_podsandbox"
defer recordOperation(operation, time.Now())
err := in.service.StopPodSandbox(podSandboxID)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) RemovePodSandbox(podSandboxID string) error {
const operation = "remove_podsandbox"
defer recordOperation(operation, time.Now())
err := in.service.RemovePodSandbox(podSandboxID)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error) {
const operation = "podsandbox_status"
defer recordOperation(operation, time.Now())
out, err := in.service.PodSandboxStatus(podSandboxID)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) {
const operation = "list_podsandbox"
defer recordOperation(operation, time.Now())
out, err := in.service.ListPodSandbox(filter)
recordError(operation, err)
return out, err
}
func (in instrumentedRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
const operation = "update_runtime_config"
defer recordOperation(operation, time.Now())
err := in.service.UpdateRuntimeConfig(runtimeConfig)
recordError(operation, err)
return err
}
func (in instrumentedImageManagerService) ListImages(filter *runtimeApi.ImageFilter) ([]*runtimeApi.Image, error) {
const operation = "list_images"
defer recordOperation(operation, time.Now())
out, err := in.service.ListImages(filter)
recordError(operation, err)
return out, err
}
func (in instrumentedImageManagerService) ImageStatus(image *runtimeApi.ImageSpec) (*runtimeApi.Image, error) {
const operation = "image_status"
defer recordOperation(operation, time.Now())
out, err := in.service.ImageStatus(image)
recordError(operation, err)
return out, err
}
func (in instrumentedImageManagerService) PullImage(image *runtimeApi.ImageSpec, auth *runtimeApi.AuthConfig) error {
const operation = "pull_image"
defer recordOperation(operation, time.Now())
err := in.service.PullImage(image, auth)
recordError(operation, err)
return err
}
func (in instrumentedImageManagerService) RemoveImage(image *runtimeApi.ImageSpec) error {
const operation = "remove_image"
defer recordOperation(operation, time.Now())
err := in.service.RemoveImage(image)
recordError(operation, err)
return err
}

View File

@ -138,8 +138,8 @@ func NewKubeGenericRuntimeManager(
osInterface: osInterface, osInterface: osInterface,
networkPlugin: networkPlugin, networkPlugin: networkPlugin,
runtimeHelper: runtimeHelper, runtimeHelper: runtimeHelper,
runtimeService: runtimeService, runtimeService: NewInstrumentedRuntimeService(runtimeService),
imageService: imageService, imageService: NewInstrumentedImageManagerService(imageService),
keyring: credentialprovider.NewDockerKeyring(), keyring: credentialprovider.NewDockerKeyring(),
} }

View File

@ -39,6 +39,10 @@ const (
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
// Metrics keys of remote runtime operations
RuntimeOperationsKey = "runtime_operations"
RuntimeOperationsLatencyKey = "runtime_operations_latency_microseconds"
RuntimeOperationsErrorsKey = "runtime_operations_errors"
) )
var ( var (
@ -93,6 +97,7 @@ var (
Help: "Latency in microseconds from seeing a pod to starting a worker.", Help: "Latency in microseconds from seeing a pod to starting a worker.",
}, },
) )
// TODO(random-liu): Move the following docker metrics into shim once dockertools is deprecated.
DockerOperationsLatency = prometheus.NewSummaryVec( DockerOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
Subsystem: KubeletSubsystem, Subsystem: KubeletSubsystem,
@ -139,6 +144,31 @@ var (
Help: "Interval in microseconds between relisting in PLEG.", Help: "Interval in microseconds between relisting in PLEG.",
}, },
) )
// Metrics of remote runtime operations.
RuntimeOperations = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: RuntimeOperationsKey,
Help: "Cumulative number of runtime operations by operation type.",
},
[]string{"operation_type"},
)
RuntimeOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: RuntimeOperationsLatencyKey,
Help: "Latency in microseconds of runtime operations. Broken down by operation type.",
},
[]string{"operation_type"},
)
RuntimeOperationsErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: RuntimeOperationsErrorsKey,
Help: "Cumulative number of runtime operation errors by operation type.",
},
[]string{"operation_type"},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -161,6 +191,9 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency) prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGRelistInterval) prometheus.MustRegister(PLEGRelistInterval)
prometheus.MustRegister(RuntimeOperations)
prometheus.MustRegister(RuntimeOperationsLatency)
prometheus.MustRegister(RuntimeOperationsErrors)
}) })
} }