From edc1c62471d710d246ca0647375fa62cf6f2cdb8 Mon Sep 17 00:00:00 2001 From: fengzixu Date: Wed, 10 Mar 2021 01:16:37 +0900 Subject: [PATCH] feature: add CSIVolumeHealth feature and gate 1. add EventRecorder to ResourceAnalyzer 2. add CSIVolumeHealth feature and gate --- pkg/features/kube_features.go | 7 + pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/runonce_test.go | 2 +- pkg/kubelet/server/server_test.go | 3 +- .../server/stats/fs_resource_analyzer.go | 7 +- pkg/kubelet/server/stats/resource_analyzer.go | 5 +- .../server/stats/volume_stat_calculator.go | 18 ++- .../stats/volume_stat_calculator_test.go | 78 ++++++++++- pkg/volume/csi/csi_client.go | 39 ++++++ pkg/volume/csi/csi_client_test.go | 130 +++++++++++++++++- pkg/volume/csi/fake/fake_client.go | 18 +++ pkg/volume/volume.go | 11 ++ 13 files changed, 304 insertions(+), 18 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5b2ce6fa433..2a818ec9fbd 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -729,6 +729,12 @@ const ( // // Labels all namespaces with a default label "kubernetes.io/metadata.name: " NamespaceDefaultLabelName featuregate.Feature = "NamespaceDefaultLabelName" + + // owner: @fengzixu + // alpha: v1.21 + // + // Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it. + CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth" ) func init() { @@ -839,6 +845,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha}, SuspendJob: {Default: false, PreRelease: featuregate.Alpha}, NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.Beta}, // graduate to GA and lock to default in 1.22, remove in 1.24 + CSIVolumeHealth: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a626cac7474..b5c9b86e1a4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -601,7 +601,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet) - klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration) + klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) klet.dockerLegacyService = kubeDeps.dockerLegacyService klet.runtimeService = kubeDeps.RemoteRuntimeService diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ea8f135d770..6959e65354d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -255,7 +255,7 @@ func newTestKubeletWithImageList( } volumeStatsAggPeriod := time.Second * 10 - kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod) + kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.recorder) fakeHostStatsProvider := stats.NewFakeHostStatsProvider() diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 351fea2b47b..75f695560d0 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -115,7 +115,7 @@ func TestRunOnce(t *testing.T) { // TODO: Factor out "stats.Provider" from Kubelet so we don't have a cyclic dependency volumeStatsAggPeriod := time.Second * 10 - kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod) + kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.recorder) nodeRef := &v1.ObjectReference{ Kind: "Node", Name: string(kb.nodeName), diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 12c32f71d5e..99974c1d7d3 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/remotecommand" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" @@ -343,7 +344,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo } server := NewServer( fw.fakeKubelet, - stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute), + stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), fw.fakeAuth, kubeCfg) fw.serverUnderTest = &server diff --git a/pkg/kubelet/server/stats/fs_resource_analyzer.go b/pkg/kubelet/server/stats/fs_resource_analyzer.go index 956e67d21b4..01ad396422a 100644 --- a/pkg/kubelet/server/stats/fs_resource_analyzer.go +++ b/pkg/kubelet/server/stats/fs_resource_analyzer.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ) @@ -40,15 +41,17 @@ type fsResourceAnalyzer struct { calcPeriod time.Duration cachedVolumeStats atomic.Value startOnce sync.Once + eventRecorder record.EventRecorder } var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{} // newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation -func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer { +func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration, eventRecorder record.EventRecorder) *fsResourceAnalyzer { r := &fsResourceAnalyzer{ statsProvider: statsProvider, calcPeriod: calcVolumePeriod, + eventRecorder: eventRecorder, } r.cachedVolumeStats.Store(make(statCache)) return r @@ -74,7 +77,7 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() { // Copy existing entries to new map, creating/starting new entries for pods missing from the cache for _, pod := range s.statsProvider.GetPods() { if value, found := oldCache[pod.GetUID()]; !found { - newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce() + newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod, s.eventRecorder).StartOnce() } else { newCache[pod.GetUID()] = value } diff --git a/pkg/kubelet/server/stats/resource_analyzer.go b/pkg/kubelet/server/stats/resource_analyzer.go index bcddff0b3c4..853eeb67b0f 100644 --- a/pkg/kubelet/server/stats/resource_analyzer.go +++ b/pkg/kubelet/server/stats/resource_analyzer.go @@ -17,6 +17,7 @@ limitations under the License. package stats import ( + "k8s.io/client-go/tools/record" "time" ) @@ -37,8 +38,8 @@ type resourceAnalyzer struct { var _ ResourceAnalyzer = &resourceAnalyzer{} // NewResourceAnalyzer returns a new ResourceAnalyzer -func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer { - fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency) +func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration, eventRecorder record.EventRecorder) ResourceAnalyzer { + fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency, eventRecorder) summaryProvider := NewSummaryProvider(statsProvider) return &resourceAnalyzer{fsAnalyzer, summaryProvider} } diff --git a/pkg/kubelet/server/stats/volume_stat_calculator.go b/pkg/kubelet/server/stats/volume_stat_calculator.go index fae72f9a6e6..b12349a1e5a 100644 --- a/pkg/kubelet/server/stats/volume_stat_calculator.go +++ b/pkg/kubelet/server/stats/volume_stat_calculator.go @@ -17,17 +17,20 @@ limitations under the License. package stats import ( + "fmt" "sync" "sync/atomic" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" - - "k8s.io/klog/v2" ) // volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result @@ -39,6 +42,7 @@ type volumeStatCalculator struct { startO sync.Once stopO sync.Once latest atomic.Value + eventRecorder record.EventRecorder } // PodVolumeStats encapsulates the VolumeStats for a pod. @@ -49,12 +53,13 @@ type PodVolumeStats struct { } // newVolumeStatCalculator creates a new VolumeStatCalculator -func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator { +func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator { return &volumeStatCalculator{ statsProvider: statsProvider, jitterPeriod: jitterPeriod, pod: pod, stopChannel: make(chan struct{}), + eventRecorder: eventRecorder, } } @@ -129,6 +134,11 @@ func (s *volumeStatCalculator) calcAndStoreStats() { persistentStats = append(persistentStats, volumeStats) } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) { + if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) { + s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message)) + } + } } // Store the new stats diff --git a/pkg/kubelet/server/stats/volume_stat_calculator_test.go b/pkg/kubelet/server/stats/volume_stat_calculator_test.go index 69d3eec7f2c..687586b7f91 100644 --- a/pkg/kubelet/server/stats/volume_stat_calculator_test.go +++ b/pkg/kubelet/server/stats/volume_stat_calculator_test.go @@ -17,15 +17,22 @@ limitations under the License. package stats import ( + "errors" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" k8sv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" kubestats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/features" statstest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing" "k8s.io/kubernetes/pkg/volume" ) @@ -43,9 +50,10 @@ const ( pvcClaimName = "pvc-fake" ) -func TestPVCRef(t *testing.T) { +var ( + ErrorWatchTimeout = errors.New("watch event timeout") // Create pod spec to test against - podVolumes := []k8sv1.Volume{ + podVolumes = []k8sv1.Volume{ { Name: vol0, VolumeSource: k8sv1.VolumeSource{ @@ -64,7 +72,7 @@ func TestPVCRef(t *testing.T) { }, } - fakePod := &k8sv1.Pod{ + fakePod = &k8sv1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: pName0, Namespace: namespace0, @@ -75,13 +83,22 @@ func TestPVCRef(t *testing.T) { }, } + volumeCondition = &csipbv1.VolumeCondition{} +) + +func TestPVCRef(t *testing.T) { // Setup mock stats provider mockStats := new(statstest.StatsProvider) volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}} mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + eventStore := make(chan string, 1) + fakeEventRecorder := record.FakeRecorder{ + Events: eventStore, + } + // Calculate stats for pod - statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod) + statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder) statsCalculator.calcAndStoreStats() vs, _ := statsCalculator.GetLatest() @@ -102,6 +119,57 @@ func TestPVCRef(t *testing.T) { }) } +func TestNormalVolumeEvent(t *testing.T) { + mockStats := new(statstest.StatsProvider) + volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}} + mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + + eventStore := make(chan string, 2) + fakeEventRecorder := record.FakeRecorder{ + Events: eventStore, + } + + // Calculate stats for pod + statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder) + statsCalculator.calcAndStoreStats() + + event, err := WatchEvent(eventStore) + assert.NotNil(t, err) + assert.Equal(t, "", event) +} + +func TestAbnormalVolumeEvent(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)() + // Setup mock stats provider + mockStats := new(statstest.StatsProvider) + volumes := map[string]volume.Volume{vol0: &fakeVolume{}} + mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + + eventStore := make(chan string, 2) + fakeEventRecorder := record.FakeRecorder{ + Events: eventStore, + } + + // Calculate stats for pod + volumeCondition.Message = "The target path of the volume doesn't exist" + volumeCondition.Abnormal = true + statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder) + statsCalculator.calcAndStoreStats() + + event, err := WatchEvent(eventStore) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("Warning VolumeConditionAbnormal Volume %s: The target path of the volume doesn't exist", "vol0"), event) +} + +func WatchEvent(eventChan <-chan string) (string, error) { + select { + case event := <-eventChan: + return event, nil + case <-time.After(5 * time.Second): + return "", ErrorWatchTimeout + } +} + // Fake volume/metrics provider var _ volume.Volume = &fakeVolume{} @@ -121,6 +189,8 @@ func expectedMetrics() *volume.Metrics { Inodes: resource.NewQuantity(inodesTotal, resource.BinarySI), InodesFree: resource.NewQuantity(inodesFree, resource.BinarySI), InodesUsed: resource.NewQuantity(inodesTotal-inodesFree, resource.BinarySI), + Message: &volumeCondition.Message, + Abnormal: &volumeCondition.Abnormal, } } diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index c4175851378..ce75b0bd589 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -30,7 +30,9 @@ import ( "google.golang.org/grpc/status" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -624,6 +626,19 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, Inodes: resource.NewQuantity(int64(0), resource.BinarySI), InodesFree: resource.NewQuantity(int64(0), resource.BinarySI), } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) { + isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, nodeClient) + if err != nil { + return nil, err + } + + if isSupportNodeVolumeCondition { + abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage() + metrics.Abnormal, metrics.Message = &abnormal, &message + } + } + for _, usage := range usages { if usage == nil { continue @@ -646,6 +661,30 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, return metrics, nil } +func supportNodeGetVolumeCondition(ctx context.Context, nodeClient csipbv1.NodeClient) (supportNodeGetVolumeCondition bool, err error) { + req := csipbv1.NodeGetCapabilitiesRequest{} + rsp, err := nodeClient.NodeGetCapabilities(ctx, &req) + if err != nil { + return false, err + } + + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + t := rpc.GetType() + if t == csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION { + return true, nil + } + } + + return false, nil +} + func isFinalError(err error) bool { // Sources: // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 71861b4d96a..0729d5b2afb 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -26,9 +26,14 @@ import ( "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/stretchr/testify/assert" + api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" utiltesting "k8s.io/client-go/util/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -60,6 +65,13 @@ func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *f } } +func newFakeCsiDriverClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) *fakeCsiDriverClient { + return &fakeCsiDriverClient{ + t: t, + nodeClient: fake.NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet), + } +} + func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) ( nodeID string, maxVolumePerNode int64, @@ -80,15 +92,30 @@ func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID stri VolumeId: volID, VolumePath: targetPath, } + + c.nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo()) resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req) if err != nil { return nil, err } + usages := resp.GetUsage() - metrics := &volume.Metrics{} if usages == nil { return nil, nil } + + metrics := &volume.Metrics{} + + isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, c.nodeClient) + if err != nil { + return nil, err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) && isSupportNodeVolumeCondition { + abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage() + metrics.Abnormal, metrics.Message = &abnormal, &message + } + for _, usage := range usages { if usage == nil { continue @@ -325,6 +352,10 @@ func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet b return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet) } +func setupClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) csiClient { + return newFakeCsiDriverClientWithVolumeStatsAndCondition(t, volumeStatsSet, volumeConditionSet) +} + func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient { return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet) } @@ -674,13 +705,108 @@ type VolumeStatsOptions struct { DeviceMountPath string } -func TestVolumeStats(t *testing.T) { +func TestVolumeHealthEnable(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)() + spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false) + tests := []struct { + name string + volumeStatsSet bool + volumeConditionSet bool + volumeData VolumeStatsOptions + success bool + }{ + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=on", + volumeStatsSet: true, + volumeConditionSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "/foo/bar", + }, + success: true, + }, + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off", + volumeStatsSet: true, + volumeConditionSet: false, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "/foo/bar", + }, + success: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec) + csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, tc.volumeConditionSet) + metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath) + if tc.success { + assert.Nil(t, err) + } + + if tc.volumeConditionSet { + assert.NotNil(t, metrics.Abnormal) + assert.NotNil(t, metrics.Message) + } else { + assert.Nil(t, metrics.Abnormal) + assert.Nil(t, metrics.Message) + } + }) + } +} + +func TestVolumeHealthDisable(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, false)() spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false) tests := []struct { name string volumeStatsSet bool volumeData VolumeStatsOptions success bool + }{ + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off", + volumeStatsSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "/foo/bar", + }, + success: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec) + csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, false) + metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath) + if tc.success { + assert.Nil(t, err) + } + + assert.Nil(t, metrics.Abnormal) + assert.Nil(t, metrics.Message) + }) + } +} + +func TestVolumeStats(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)() + spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false) + tests := []struct { + name string + volumeStatsSet bool + volumeConditionSet bool + volumeData VolumeStatsOptions + success bool }{ { name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on", diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 4cd276f719a..f5c470b2ed3 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -84,6 +84,7 @@ type NodeClient struct { stageUnstageSet bool expansionSet bool volumeStatsSet bool + volumeConditionSet bool nodeGetInfoResp *csipb.NodeGetInfoResponse nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest @@ -115,6 +116,13 @@ func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient { } } +func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet bool) *NodeClient { + return &NodeClient{ + volumeStatsSet: volumeStatsSet, + volumeConditionSet: volumeConditionSet, + } +} + // SetNextError injects next expected error func (f *NodeClient) SetNextError(err error) { f.nextErr = err @@ -346,6 +354,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC }, }) } + + if f.volumeConditionSet { + resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{ + Type: &csipb.NodeServiceCapability_Rpc{ + Rpc: &csipb.NodeServiceCapability_RPC{ + Type: csipb.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }) + } return resp, nil } diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 85b8c4aaa52..2ec1e5233ba 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -92,6 +92,17 @@ type Metrics struct { // a filesystem with the host (e.g. emptydir, hostpath), this is the free inodes // on the underlying storage, and is shared with host processes and other volumes InodesFree *resource.Quantity + + // Normal volumes are available for use and operating optimally. + // An abnormal volume does not meet these criteria. + // This field is OPTIONAL. Only some csi drivers which support NodeServiceCapability_RPC_VOLUME_CONDITION + // need to fill it. + Abnormal *bool + + // The message describing the condition of the volume. + // This field is OPTIONAL. Only some csi drivers which support capability_RPC_VOLUME_CONDITION + // need to fill it. + Message *string } // Attributes represents the attributes of this mounter.