Merge pull request #98979 from Jiawei0227/kubelet_csi

Add csi_operations_seconds metrics on kubelet
This commit is contained in:
Kubernetes Prow Robot 2021-02-18 19:04:36 -08:00 committed by GitHub
commit b2a5d67dd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 185 additions and 33 deletions

View File

@ -257,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout)
ctx, cancel := createCSIOperationContext(spec, c.watchTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
@ -516,7 +516,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout)
// could not get whether this is migrated because there is no spec
ctx, cancel := createCSIOperationContext(nil, csiTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)

View File

@ -357,7 +357,7 @@ func (m *csiBlockMapper) MapPodDevice() (string, error) {
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()
csiClient, err := m.csiClientGetter.Get()
@ -426,7 +426,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New("CSIBlockVolume feature not enabled")
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()
csiClient, err := m.csiClientGetter.Get()
@ -499,7 +499,7 @@ func (m *csiBlockMapper) UnmapPodDevice() error {
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()
// Call NodeUnpublishVolume.

View File

@ -92,6 +92,7 @@ type csiDriverName string
type csiDriverClient struct {
driverName csiDriverName
addr csiAddr
metricsManager *MetricsManager
nodeV1ClientCreator nodeV1ClientCreator
}
@ -111,7 +112,7 @@ type csiResizeOptions struct {
var _ csiClient = &csiDriverClient{}
type nodeV1ClientCreator func(addr csiAddr) (
type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
nodeClient csipbv1.NodeClient,
closer io.Closer,
err error,
@ -122,9 +123,9 @@ type nodeV1ClientCreator func(addr csiAddr) (
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// newCsiDriverClient.
func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
conn, err = newGrpcConn(addr, metricsManager)
if err != nil {
return nil, nil, err
}
@ -148,6 +149,7 @@ func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
driverName: driverName,
addr: csiAddr(existingDriver.endpoint),
nodeV1ClientCreator: nodeV1ClientCreator,
metricsManager: NewCSIMetricsManager(string(driverName)),
}, nil
}
@ -172,7 +174,7 @@ func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
accessibleTopology map[string]string,
err error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return "", 0, nil, err
}
@ -216,7 +218,7 @@ func (c *csiDriverClient) NodePublishVolume(
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
@ -275,7 +277,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
return opts.newSize, errors.New("size can not be less than 0")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return opts.newSize, err
}
@ -331,7 +333,7 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string,
return errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
@ -367,7 +369,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
return errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
@ -418,7 +420,7 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
return errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
@ -438,7 +440,7 @@ func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, err
return false, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
@ -469,7 +471,7 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e
return false, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
@ -508,7 +510,7 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}
func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
@ -518,6 +520,7 @@ func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor),
)
}
@ -560,7 +563,7 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er
return false, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
@ -594,7 +597,7 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
return nil, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return nil, err
}

View File

@ -371,7 +371,7 @@ func TestClientNodeGetInfo(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{
@ -434,7 +434,7 @@ func TestClientNodePublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -488,7 +488,7 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -534,7 +534,7 @@ func TestClientNodeStageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -586,7 +586,7 @@ func TestClientNodeUnstageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -647,7 +647,7 @@ func TestNodeExpandVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil

View File

@ -19,9 +19,12 @@ package csi
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
var _ volume.MetricsProvider = &metricsCsi{}
@ -79,3 +82,51 @@ func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) {
metrics.Time = currentTime
return metrics, nil
}
// MetricsManager defines the metrics mananger for CSI operation
type MetricsManager struct {
driverName string
}
// NewCSIMetricsManager creates a CSIMetricsManager object
func NewCSIMetricsManager(driverName string) *MetricsManager {
cmm := MetricsManager{
driverName: driverName,
}
return &cmm
}
type additionalInfo struct {
Migrated string
}
type additionalInfoKeyType struct{}
var additionalInfoKey additionalInfoKeyType
// RecordMetricsInterceptor is a grpc interceptor that is used to
// record CSI operation
func (cmm *MetricsManager) RecordMetricsInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
// Check if this is migrated operation
additionalInfoVal := ctx.Value(additionalInfoKey)
migrated := "false"
if additionalInfoVal != nil {
additionalInfoVal, ok := additionalInfoVal.(additionalInfo)
if !ok {
return err
}
migrated = additionalInfoVal.Migrated
}
// Record the metric latency
volumeutil.RecordCSIOperationLatencyMetrics(cmm.driverName, method, err, duration, migrated)
return err
}

View File

@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) {
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
metricsGetter.csiClient = &csiDriverClient{
driverName: "com.google.gcepd",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithVolumeStats(true /* VolumeStatsCapable */)
fakeCloser := fake.NewCloser(t)
nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
@ -114,7 +114,7 @@ func TestGetMetricsDriverNotSupportStats(t *testing.T) {
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
metricsGetter.csiClient = &csiDriverClient{
driverName: "com.simple.SimpleDriver",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithVolumeStats(false /* VolumeStatsCapable */)
fakeCloser := fake.NewCloser(t)
nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())

View File

@ -17,7 +17,6 @@ limitations under the License.
package csi
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
@ -114,7 +113,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
defer cancel()
volSrc, pvSrc, err := getSourceFromSpec(c.spec)
@ -396,7 +395,8 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
// Could not get spec info on whether this is a migrated operation because c.spec is nil
ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
defer cancel()
if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {

View File

@ -23,6 +23,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"
api "k8s.io/api/core/v1"
@ -193,3 +194,12 @@ func GetCSIDriverName(spec *volume.Spec) (string, error) {
return "", errors.New(log("volume source not found in volume.Spec"))
}
}
func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) (context.Context, context.CancelFunc) {
migrated := false
if volumeSpec != nil {
migrated = volumeSpec.Migrated
}
ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)})
return context.WithTimeout(ctx, timeout)
}

View File

@ -26,12 +26,14 @@ import (
"path"
"path/filepath"
"testing"
"time"
api "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume"
)
// TestMain starting point for all tests.
@ -146,3 +148,49 @@ func TestSaveVolumeData(t *testing.T) {
}
}
}
func TestCreateCSIOperationContext(t *testing.T) {
testCases := []struct {
name string
spec *volume.Spec
migrated string
}{
{
name: "test volume spec nil",
spec: nil,
migrated: "false",
},
{
name: "test volume normal spec with migrated true",
spec: &volume.Spec{
Migrated: true,
},
migrated: "true",
},
{
name: "test volume normal spec with migrated false",
spec: &volume.Spec{
Migrated: false,
},
migrated: "false",
},
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
timeout := time.Minute
ctx, _ := createCSIOperationContext(tc.spec, timeout)
additionalInfoVal := ctx.Value(additionalInfoKey)
if additionalInfoVal == nil {
t.Error("Could not load additional info from context")
}
additionalInfoV, ok := additionalInfoVal.(additionalInfo)
if !ok {
t.Errorf("Additional info type assertion fail, additionalInfo object: %v", additionalInfoVal)
}
migrated := additionalInfoV.Migrated
if migrated != tc.migrated {
t.Errorf("Expect migrated value: %v, got: %v", tc.migrated, migrated)
}
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package csi
import (
"context"
"errors"
"fmt"
@ -71,7 +70,7 @@ func (c *csiPlugin) nodeExpandWithClient(
fsVolume bool) (bool, error) {
driverName := csiSource.Driver
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(resizeOptions.VolumeSpec, csiTimeout)
defer cancel()
nodeExpandSet, err := csClient.NodeSupportsNodeExpand(ctx)

View File

@ -44,6 +44,8 @@ go_library(
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library",
"//staging/src/k8s.io/mount-utils:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",

View File

@ -21,6 +21,8 @@ import (
"strconv"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
@ -80,6 +82,17 @@ var storageOperationEndToEndLatencyMetric = metrics.NewHistogramVec(
[]string{"plugin_name", "operation_name"},
)
var csiOperationsLatencyMetric = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: "csi",
Name: "operations_seconds",
Help: "Container Storage Interface operation duration with gRPC error code status total",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
StabilityLevel: metrics.ALPHA,
},
[]string{"driver_name", "method_name", "grpc_status_code", "migrated"},
)
func init() {
registerMetrics()
}
@ -91,6 +104,7 @@ func registerMetrics() {
legacyregistry.MustRegister(storageOperationErrorMetric)
legacyregistry.MustRegister(storageOperationStatusMetric)
legacyregistry.MustRegister(storageOperationEndToEndLatencyMetric)
legacyregistry.MustRegister(csiOperationsLatencyMetric)
}
// OperationCompleteHook returns a hook to call when an operation is completed
@ -143,3 +157,28 @@ func GetFullQualifiedPluginNameForVolume(pluginName string, spec *volume.Spec) s
func RecordOperationLatencyMetric(plugin, operationName string, secondsTaken float64) {
storageOperationEndToEndLatencyMetric.WithLabelValues(plugin, operationName).Observe(secondsTaken)
}
// RecordCSIOperationLatencyMetrics records the CSI operation latency and grpc status
// into metric csi_kubelet_operations_seconds
func RecordCSIOperationLatencyMetrics(driverName string,
operationName string,
operationErr error,
operationDuration time.Duration,
migrated string) {
csiOperationsLatencyMetric.WithLabelValues(driverName, operationName, getErrorCode(operationErr), migrated).Observe(operationDuration.Seconds())
}
func getErrorCode(err error) string {
if err == nil {
return codes.OK.String()
}
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return "unknown-non-grpc"
}
return st.Code().String()
}

View File

@ -68,7 +68,6 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
Err: &context.DetailedErr,
Migrated: &context.Migrated,
}
c.Err = &detailedErr
defer o.CompleteFunc(c)
}
if o.EventRecorderFunc != nil {