From 43bc6fa80607c9541c9b82bc694e31817fbacd9d Mon Sep 17 00:00:00 2001 From: Jiawei Wang Date: Wed, 10 Feb 2021 21:09:38 -0800 Subject: [PATCH] Add csi_operations_seconds metrics on kubelet --- pkg/volume/csi/csi_attacher.go | 5 +-- pkg/volume/csi/csi_block.go | 6 ++-- pkg/volume/csi/csi_client.go | 31 ++++++++++-------- pkg/volume/csi/csi_client_test.go | 12 +++---- pkg/volume/csi/csi_metrics.go | 51 ++++++++++++++++++++++++++++++ pkg/volume/csi/csi_metrics_test.go | 4 +-- pkg/volume/csi/csi_mounter.go | 6 ++-- pkg/volume/csi/csi_util.go | 10 ++++++ pkg/volume/csi/csi_util_test.go | 48 ++++++++++++++++++++++++++++ pkg/volume/csi/expander.go | 3 +- pkg/volume/util/BUILD | 2 ++ pkg/volume/util/metrics.go | 39 +++++++++++++++++++++++ pkg/volume/util/types/types.go | 1 - 13 files changed, 185 insertions(+), 33 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 0b668ec8fcf..31841132b41 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -257,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) + ctx, cancel := createCSIOperationContext(spec, c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) @@ -516,7 +516,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) + // could not get whether this is migrated because there is no spec + ctx, cancel := createCSIOperationContext(nil, csiTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index e5ca3efec40..41fdd1f081f 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -357,7 +357,7 @@ func (m *csiBlockMapper) MapPodDevice() (string, error) { accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] } - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := createCSIOperationContext(m.spec, csiTimeout) defer cancel() csiClient, err := m.csiClientGetter.Get() @@ -426,7 +426,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return errors.New("CSIBlockVolume feature not enabled") } - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := createCSIOperationContext(m.spec, csiTimeout) defer cancel() csiClient, err := m.csiClientGetter.Get() @@ -499,7 +499,7 @@ func (m *csiBlockMapper) UnmapPodDevice() error { return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err)) } - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := createCSIOperationContext(m.spec, csiTimeout) defer cancel() // Call NodeUnpublishVolume. diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 9c06736f790..c4175851378 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -92,6 +92,7 @@ type csiDriverName string type csiDriverClient struct { driverName csiDriverName addr csiAddr + metricsManager *MetricsManager nodeV1ClientCreator nodeV1ClientCreator } @@ -111,7 +112,7 @@ type csiResizeOptions struct { var _ csiClient = &csiDriverClient{} -type nodeV1ClientCreator func(addr csiAddr) ( +type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) ( nodeClient csipbv1.NodeClient, closer io.Closer, err error, @@ -122,9 +123,9 @@ type nodeV1ClientCreator func(addr csiAddr) ( // the gRPC connection when the NodeClient is not used anymore. // This is the default implementation for the nodeV1ClientCreator, used in // newCsiDriverClient. -func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) { +func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) { var conn *grpc.ClientConn - conn, err = newGrpcConn(addr) + conn, err = newGrpcConn(addr, metricsManager) if err != nil { return nil, nil, err } @@ -148,6 +149,7 @@ func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) { driverName: driverName, addr: csiAddr(existingDriver.endpoint), nodeV1ClientCreator: nodeV1ClientCreator, + metricsManager: NewCSIMetricsManager(string(driverName)), }, nil } @@ -172,7 +174,7 @@ func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) ( accessibleTopology map[string]string, err error) { - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return "", 0, nil, err } @@ -216,7 +218,7 @@ func (c *csiDriverClient) NodePublishVolume( } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return err } @@ -275,7 +277,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp return opts.newSize, errors.New("size can not be less than 0") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return opts.newSize, err } @@ -331,7 +333,7 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, return errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return err } @@ -367,7 +369,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, return errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return err } @@ -418,7 +420,7 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT return errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return err } @@ -438,7 +440,7 @@ func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, err return false, errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return false, err } @@ -469,7 +471,7 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e return false, errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return false, err } @@ -508,7 +510,7 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili return csipbv1.VolumeCapability_AccessMode_UNKNOWN } -func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) { +func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) { network := "unix" klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr)) @@ -518,6 +520,7 @@ func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) { grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, network, target) }), + grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor), ) } @@ -560,7 +563,7 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er return false, errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return false, err } @@ -594,7 +597,7 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, return nil, errors.New("nodeV1ClientCreate is nil") } - nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return nil, err } diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 714c2181ad8..71861b4d96a 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -371,7 +371,7 @@ func TestClientNodeGetInfo(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{ @@ -434,7 +434,7 @@ func TestClientNodePublishVolume(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) return nodeClient, fakeCloser, nil @@ -488,7 +488,7 @@ func TestClientNodeUnpublishVolume(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) return nodeClient, fakeCloser, nil @@ -534,7 +534,7 @@ func TestClientNodeStageVolume(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) return nodeClient, fakeCloser, nil @@ -586,7 +586,7 @@ func TestClientNodeUnstageVolume(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) return nodeClient, fakeCloser, nil @@ -647,7 +647,7 @@ func TestNodeExpandVolume(t *testing.T) { fakeCloser := fake.NewCloser(t) client := &csiDriverClient{ driverName: "Fake Driver Name", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClient(false /* stagingCapable */) nodeClient.SetNextError(tc.err) return nodeClient, fakeCloser, nil diff --git a/pkg/volume/csi/csi_metrics.go b/pkg/volume/csi/csi_metrics.go index 2e0c984cad4..96fcff66a87 100644 --- a/pkg/volume/csi/csi_metrics.go +++ b/pkg/volume/csi/csi_metrics.go @@ -19,9 +19,12 @@ package csi import ( "context" "fmt" + "time" + "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/volume" + volumeutil "k8s.io/kubernetes/pkg/volume/util" ) var _ volume.MetricsProvider = &metricsCsi{} @@ -79,3 +82,51 @@ func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) { metrics.Time = currentTime return metrics, nil } + +// MetricsManager defines the metrics mananger for CSI operation +type MetricsManager struct { + driverName string +} + +// NewCSIMetricsManager creates a CSIMetricsManager object +func NewCSIMetricsManager(driverName string) *MetricsManager { + cmm := MetricsManager{ + driverName: driverName, + } + return &cmm +} + +type additionalInfo struct { + Migrated string +} +type additionalInfoKeyType struct{} + +var additionalInfoKey additionalInfoKeyType + +// RecordMetricsInterceptor is a grpc interceptor that is used to +// record CSI operation +func (cmm *MetricsManager) RecordMetricsInterceptor( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption) error { + start := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + duration := time.Since(start) + // Check if this is migrated operation + additionalInfoVal := ctx.Value(additionalInfoKey) + migrated := "false" + if additionalInfoVal != nil { + additionalInfoVal, ok := additionalInfoVal.(additionalInfo) + if !ok { + return err + } + migrated = additionalInfoVal.Migrated + } + // Record the metric latency + volumeutil.RecordCSIOperationLatencyMetrics(cmm.driverName, method, err, duration, migrated) + + return err +} diff --git a/pkg/volume/csi/csi_metrics_test.go b/pkg/volume/csi/csi_metrics_test.go index 9806587bb13..9eecdf62d5f 100644 --- a/pkg/volume/csi/csi_metrics_test.go +++ b/pkg/volume/csi/csi_metrics_test.go @@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) { metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath} metricsGetter.csiClient = &csiDriverClient{ driverName: "com.google.gcepd", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClientWithVolumeStats(true /* VolumeStatsCapable */) fakeCloser := fake.NewCloser(t) nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo()) @@ -114,7 +114,7 @@ func TestGetMetricsDriverNotSupportStats(t *testing.T) { metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath} metricsGetter.csiClient = &csiDriverClient{ driverName: "com.simple.SimpleDriver", - nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { nodeClient := fake.NewNodeClientWithVolumeStats(false /* VolumeStatsCapable */) fakeCloser := fake.NewCloser(t) nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo()) diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 2ff94fe8c80..d632ffd1433 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -17,7 +17,6 @@ limitations under the License. package csi import ( - "context" "crypto/sha256" "encoding/json" "errors" @@ -114,7 +113,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err)) } - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := createCSIOperationContext(c.spec, csiTimeout) defer cancel() volSrc, pvSrc, err := getSourceFromSpec(c.spec) @@ -396,7 +395,8 @@ func (c *csiMountMgr) TearDownAt(dir string) error { return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err)) } - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + // Could not get spec info on whether this is a migrated operation because c.spec is nil + ctx, cancel := createCSIOperationContext(c.spec, csiTimeout) defer cancel() if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil { diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 05cf175daae..dec72a46bad 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "time" api "k8s.io/api/core/v1" @@ -193,3 +194,12 @@ func GetCSIDriverName(spec *volume.Spec) (string, error) { return "", errors.New(log("volume source not found in volume.Spec")) } } + +func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) (context.Context, context.CancelFunc) { + migrated := false + if volumeSpec != nil { + migrated = volumeSpec.Migrated + } + ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)}) + return context.WithTimeout(ctx, timeout) +} diff --git a/pkg/volume/csi/csi_util_test.go b/pkg/volume/csi/csi_util_test.go index f4d3582b004..04d66408f5a 100644 --- a/pkg/volume/csi/csi_util_test.go +++ b/pkg/volume/csi/csi_util_test.go @@ -26,12 +26,14 @@ import ( "path" "path/filepath" "testing" + "time" api "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/volume" ) // TestMain starting point for all tests. @@ -146,3 +148,49 @@ func TestSaveVolumeData(t *testing.T) { } } } + +func TestCreateCSIOperationContext(t *testing.T) { + testCases := []struct { + name string + spec *volume.Spec + migrated string + }{ + { + name: "test volume spec nil", + spec: nil, + migrated: "false", + }, + { + name: "test volume normal spec with migrated true", + spec: &volume.Spec{ + Migrated: true, + }, + migrated: "true", + }, + { + name: "test volume normal spec with migrated false", + spec: &volume.Spec{ + Migrated: false, + }, + migrated: "false", + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + timeout := time.Minute + ctx, _ := createCSIOperationContext(tc.spec, timeout) + + additionalInfoVal := ctx.Value(additionalInfoKey) + if additionalInfoVal == nil { + t.Error("Could not load additional info from context") + } + additionalInfoV, ok := additionalInfoVal.(additionalInfo) + if !ok { + t.Errorf("Additional info type assertion fail, additionalInfo object: %v", additionalInfoVal) + } + migrated := additionalInfoV.Migrated + if migrated != tc.migrated { + t.Errorf("Expect migrated value: %v, got: %v", tc.migrated, migrated) + } + } +} diff --git a/pkg/volume/csi/expander.go b/pkg/volume/csi/expander.go index bfa6d714716..79f856a2feb 100644 --- a/pkg/volume/csi/expander.go +++ b/pkg/volume/csi/expander.go @@ -17,7 +17,6 @@ limitations under the License. package csi import ( - "context" "errors" "fmt" @@ -71,7 +70,7 @@ func (c *csiPlugin) nodeExpandWithClient( fsVolume bool) (bool, error) { driverName := csiSource.Driver - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := createCSIOperationContext(resizeOptions.VolumeSpec, csiTimeout) defer cancel() nodeExpandSet, err := csClient.NodeSupportsNodeExpand(ctx) diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index 9f0e8621cc0..6fde8009093 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -44,6 +44,8 @@ go_library( "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library", "//staging/src/k8s.io/mount-utils:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/strings:go_default_library", diff --git a/pkg/volume/util/metrics.go b/pkg/volume/util/metrics.go index c0052d18b2d..2cacae3f8bb 100644 --- a/pkg/volume/util/metrics.go +++ b/pkg/volume/util/metrics.go @@ -21,6 +21,8 @@ import ( "strconv" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" @@ -80,6 +82,17 @@ var storageOperationEndToEndLatencyMetric = metrics.NewHistogramVec( []string{"plugin_name", "operation_name"}, ) +var csiOperationsLatencyMetric = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: "csi", + Name: "operations_seconds", + Help: "Container Storage Interface operation duration with gRPC error code status total", + Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600}, + StabilityLevel: metrics.ALPHA, + }, + []string{"driver_name", "method_name", "grpc_status_code", "migrated"}, +) + func init() { registerMetrics() } @@ -91,6 +104,7 @@ func registerMetrics() { legacyregistry.MustRegister(storageOperationErrorMetric) legacyregistry.MustRegister(storageOperationStatusMetric) legacyregistry.MustRegister(storageOperationEndToEndLatencyMetric) + legacyregistry.MustRegister(csiOperationsLatencyMetric) } // OperationCompleteHook returns a hook to call when an operation is completed @@ -143,3 +157,28 @@ func GetFullQualifiedPluginNameForVolume(pluginName string, spec *volume.Spec) s func RecordOperationLatencyMetric(plugin, operationName string, secondsTaken float64) { storageOperationEndToEndLatencyMetric.WithLabelValues(plugin, operationName).Observe(secondsTaken) } + +// RecordCSIOperationLatencyMetrics records the CSI operation latency and grpc status +// into metric csi_kubelet_operations_seconds +func RecordCSIOperationLatencyMetrics(driverName string, + operationName string, + operationErr error, + operationDuration time.Duration, + migrated string) { + csiOperationsLatencyMetric.WithLabelValues(driverName, operationName, getErrorCode(operationErr), migrated).Observe(operationDuration.Seconds()) +} + +func getErrorCode(err error) string { + if err == nil { + return codes.OK.String() + } + + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + return "unknown-non-grpc" + } + + return st.Code().String() +} diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 1262f50f3e1..c07dcfe215c 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -68,7 +68,6 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { Err: &context.DetailedErr, Migrated: &context.Migrated, } - c.Err = &detailedErr defer o.CompleteFunc(c) } if o.EventRecorderFunc != nil {