diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index a3aaf165093..e0bf1591639 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -395,6 +395,19 @@ var ( // TODO: document the feature (owning SIG, when to use this feature for a test) VolumeSourceXFS = framework.WithFeature(framework.ValidFeatures.Add("VolumeSourceXFS")) + // Ownerd by SIG Storage + // kep: https://kep.k8s.io/1432 + // test-infra jobs: + // - pull-kubernetes-e2e-storage-kind-alpha-features (need manual trigger) + // - ci-kubernetes-e2e-storage-kind-alpha-features + // When this label is added to a test, it means that the cluster must be created + // with the feature-gate "CSIVolumeHealth=true". + // + // Once the feature is stable, this label should be removed and these tests will + // be run by default on any cluster. The test-infra job also should be updated to + // not focus on this feature anymore. + CSIVolumeHealth = framework.WithFeature(framework.ValidFeatures.Add("CSIVolumeHealth")) + // TODO: document the feature (owning SIG, when to use this feature for a test) Vsphere = framework.WithFeature(framework.ValidFeatures.Add("vsphere")) diff --git a/test/e2e/storage/csimock/base.go b/test/e2e/storage/csimock/base.go index 6a73998eb6a..71cfeb8bcbd 100644 --- a/test/e2e/storage/csimock/base.go +++ b/test/e2e/storage/csimock/base.go @@ -56,6 +56,8 @@ const ( csiPodUnschedulableTimeout = 5 * time.Minute csiResizeWaitPeriod = 5 * time.Minute csiVolumeAttachmentTimeout = 7 * time.Minute + // how long to wait for GetVolumeStats + csiNodeVolumeStatWaitPeriod = 2 * time.Minute // how long to wait for Resizing Condition on PVC to appear csiResizingConditionWait = 2 * time.Minute @@ -96,6 +98,7 @@ type testParameters struct { disableResizingOnDriver bool enableSnapshot bool enableVolumeMountGroup bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver. + enableNodeVolumeCondition bool hooks *drivers.Hooks tokenRequests []storagev1.TokenRequest requiresRepublish *bool @@ -168,6 +171,7 @@ func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) { DisableAttach: tp.disableAttach, EnableResizing: tp.enableResizing, EnableNodeExpansion: tp.enableNodeExpansion, + EnableNodeVolumeCondition: tp.enableNodeVolumeCondition, EnableSnapshot: tp.enableSnapshot, EnableVolumeMountGroup: tp.enableVolumeMountGroup, TokenRequests: tp.tokenRequests, diff --git a/test/e2e/storage/csimock/csi_node_volume_health.go b/test/e2e/storage/csimock/csi_node_volume_health.go new file mode 100644 index 00000000000..c98433aaca2 --- /dev/null +++ b/test/e2e/storage/csimock/csi_node_volume_health.go @@ -0,0 +1,199 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csimock + +import ( + "context" + "strings" + "time" + + "google.golang.org/grpc/codes" + + csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/features" + kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/test/e2e/feature" + "k8s.io/kubernetes/test/e2e/framework" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2epv "k8s.io/kubernetes/test/e2e/framework/pv" + "k8s.io/kubernetes/test/e2e/storage/drivers" + "k8s.io/kubernetes/test/e2e/storage/utils" + admissionapi "k8s.io/pod-security-admission/api" + + "github.com/onsi/ginkgo/v2" +) + +var _ = utils.SIGDescribe("CSI Mock Node Volume Health", feature.CSIVolumeHealth, framework.WithFeatureGate(features.CSIVolumeHealth), func() { + f := framework.NewDefaultFramework("csi-mock-node-volume-health") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + m := newMockDriverSetup(f) + + f.Context("CSI Mock Node Volume Health", f.WithSlow(), func() { + trackedCalls := []string{ + "NodeGetVolumeStats", + } + tests := []struct { + name string + expectedCalls []csiCall + nodeVolumeConditionRequired bool + nodeAbnormalVolumeCondition bool + }{ + { + name: "return normal volume stats", + expectedCalls: []csiCall{ + { + expectedMethod: "NodeGetVolumeStats", + expectedError: codes.OK, + }, + }, + nodeVolumeConditionRequired: true, + nodeAbnormalVolumeCondition: false, + }, + { + name: "return normal volume stats without volume condition", + expectedCalls: []csiCall{ + { + expectedMethod: "NodeGetVolumeStats", + expectedError: codes.OK, + }, + }, + nodeVolumeConditionRequired: false, + nodeAbnormalVolumeCondition: false, + }, + { + name: "return normal volume stats with abnormal volume condition", + expectedCalls: []csiCall{ + { + expectedMethod: "NodeGetVolumeStats", + expectedError: codes.OK, + }, + }, + nodeVolumeConditionRequired: true, + nodeAbnormalVolumeCondition: true, + }, + } + for _, t := range tests { + test := t + ginkgo.It(test.name, func(ctx context.Context) { + m.init(ctx, testParameters{ + registerDriver: true, + enableNodeVolumeCondition: test.nodeVolumeConditionRequired, + hooks: createGetVolumeStatsHook(test.nodeAbnormalVolumeCondition), + }) + ginkgo.DeferCleanup(m.cleanup) + _, claim, pod := m.createPod(ctx, pvcReference) + if pod == nil { + return + } + // Wait for PVC to get bound to make sure the CSI driver is fully started. + err := e2epv.WaitForPersistentVolumeClaimPhase(ctx, v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout) + framework.ExpectNoError(err, "while waiting for PVC to get provisioned") + + ginkgo.By("Waiting for pod to be running") + err = e2epod.WaitForPodNameRunningInNamespace(ctx, m.cs, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "wait for running pod") + ginkgo.By("Waiting for all remaining expected CSI calls") + err = wait.PollUntilContextTimeout(ctx, time.Second, csiNodeVolumeStatWaitPeriod, true, func(c context.Context) (done bool, err error) { + var index int + _, index, err = compareCSICalls(ctx, trackedCalls, test.expectedCalls, m.driver.GetCalls) + if err != nil { + return true, err + } + if index == 0 { + // No CSI call received yet + return false, nil + } + if len(test.expectedCalls) == index { + // all calls received + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err, "while waiting for all CSI calls") + // try to use ```csi.NewMetricsCsi(pv.handler).GetMetrics()``` to get metrics from csimock driver but failed. + // the mocked csidriver register doesn't regist itself to normal csidriver. + if test.nodeVolumeConditionRequired { + pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get running pod") + grabber, err := e2emetrics.NewMetricsGrabber(ctx, f.ClientSet, nil, f.ClientConfig(), true, false, false, false, false, false) + framework.ExpectNoError(err, "creating the metrics grabber") + waitErr := wait.PollUntilContextTimeout(ctx, 30*time.Second, csiNodeVolumeStatWaitPeriod, true, func(ctx context.Context) (bool, error) { + framework.Logf("Grabbing Kubelet metrics") + // Grab kubelet metrics from the node the pod was scheduled on + var err error + kubeMetrics, err := grabber.GrabFromKubelet(ctx, pod.Spec.NodeName) + if err != nil { + framework.Logf("Error fetching kubelet metrics err: %v", err) + return false, err + } + if !findVolumeConditionMetrics(f.Namespace.Name, claim.Name, kubeMetrics, test.nodeAbnormalVolumeCondition) { + return false, nil + } + return true, nil + }) + framework.ExpectNoError(waitErr, "call metrics should not have any error") + } + }) + } + + }) +}) + +func findVolumeConditionMetrics(pvcNamespace, pvcName string, kubeMetrics e2emetrics.KubeletMetrics, nodeAbnormalVolumeCondition bool) bool { + + found := false + framework.Logf("Looking for sample tagged with namespace `%s`, PVC `%s`", pvcNamespace, pvcName) + for key, value := range kubeMetrics { + for _, sample := range value { + framework.Logf("Found sample %++v with key: %s", sample, key) + samplePVC, ok := sample.Metric["persistentvolumeclaim"] + if !ok { + break + } + sampleNS, ok := sample.Metric["namespace"] + if !ok { + break + } + + if string(samplePVC) == pvcName && string(sampleNS) == pvcNamespace && strings.Contains(key, kubeletmetrics.VolumeStatsHealthStatusAbnormalKey) { + if (nodeAbnormalVolumeCondition && sample.Value.String() == "1") || (!nodeAbnormalVolumeCondition && sample.Value.String() == "0") { + found = true + break + } + } + } + } + return found +} + +func createGetVolumeStatsHook(abnormalVolumeCondition bool) *drivers.Hooks { + return &drivers.Hooks{ + Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) { + if req, ok := request.(*csipbv1.NodeGetVolumeStatsRequest); ok { + if abnormalVolumeCondition { + req.VolumePath = "/tmp/csi/health/abnormal" + } + } + return nil, nil + }, + } + +} diff --git a/test/e2e/storage/drivers/csi-test/mock/service/node.go b/test/e2e/storage/drivers/csi-test/mock/service/node.go index ce79904aa5e..60e80601e5f 100644 --- a/test/e2e/storage/drivers/csi-test/mock/service/node.go +++ b/test/e2e/storage/drivers/csi-test/mock/service/node.go @@ -359,13 +359,6 @@ func (s *service) NodeGetCapabilities( }, }, }, - { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, - }, - }, - }, } if s.config.NodeExpansionRequired { capabilities = append(capabilities, &csi.NodeServiceCapability{ @@ -377,6 +370,16 @@ func (s *service) NodeGetCapabilities( }) } + if s.config.NodeVolumeConditionRequired { + capabilities = append(capabilities, &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }) + } + if s.config.VolumeMountGroupRequired { capabilities = append(capabilities, &csi.NodeServiceCapability{ Type: &csi.NodeServiceCapability_Rpc{ @@ -417,7 +420,10 @@ func (s *service) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { resp := &csi.NodeGetVolumeStatsResponse{ - VolumeCondition: &csi.VolumeCondition{}, + VolumeCondition: &csi.VolumeCondition{ + Abnormal: false, + Message: "volume is normal", + }, } if len(req.GetVolumeId()) == 0 { @@ -437,6 +443,19 @@ func (s *service) NodeGetVolumeStats(ctx context.Context, nodeMntPathKey := path.Join(s.nodeID, req.VolumePath) + resp.Usage = []*csi.VolumeUsage{ + { + Total: v.GetCapacityBytes(), + Unit: csi.VolumeUsage_BYTES, + }, + } + + if req.GetVolumePath() == "/tmp/csi/health/abnormal" { + resp.VolumeCondition.Abnormal = true + resp.VolumeCondition.Message = "volume is abnormal" + return resp, nil + } + _, exists := v.VolumeContext[nodeMntPathKey] if !exists { msg := fmt.Sprintf("volume %q doest not exist on the specified path %q", req.VolumeId, req.VolumePath) @@ -449,12 +468,5 @@ func (s *service) NodeGetVolumeStats(ctx context.Context, return nil, status.Errorf(hookVal, hookMsg) } - resp.Usage = []*csi.VolumeUsage{ - { - Total: v.GetCapacityBytes(), - Unit: csi.VolumeUsage_BYTES, - }, - } - return resp, nil } diff --git a/test/e2e/storage/drivers/csi-test/mock/service/service.go b/test/e2e/storage/drivers/csi-test/mock/service/service.go index 1c96ddc2982..17ad811ca74 100644 --- a/test/e2e/storage/drivers/csi-test/mock/service/service.go +++ b/test/e2e/storage/drivers/csi-test/mock/service/service.go @@ -51,16 +51,17 @@ var Manifest = map[string]string{ } type Config struct { - DisableAttach bool - DriverName string - AttachLimit int64 - NodeExpansionRequired bool - VolumeMountGroupRequired bool - DisableControllerExpansion bool - DisableOnlineExpansion bool - PermissiveTargetPath bool - EnableTopology bool - IO DirIO + DisableAttach bool + DriverName string + AttachLimit int64 + NodeExpansionRequired bool + NodeVolumeConditionRequired bool + VolumeMountGroupRequired bool + DisableControllerExpansion bool + DisableOnlineExpansion bool + PermissiveTargetPath bool + EnableTopology bool + IO DirIO } // DirIO is an abstraction over direct os calls. diff --git a/test/e2e/storage/drivers/csi.go b/test/e2e/storage/drivers/csi.go index 68930456f19..f11964a0bb3 100644 --- a/test/e2e/storage/drivers/csi.go +++ b/test/e2e/storage/drivers/csi.go @@ -344,6 +344,7 @@ type mockCSIDriver struct { requiresRepublish *bool fsGroupPolicy *storagev1.FSGroupPolicy enableVolumeMountGroup bool + enableNodeVolumeCondition bool embedded bool calls MockCSICalls embeddedCSIDriver *mockdriver.CSIDriver @@ -393,6 +394,7 @@ type CSIMockDriverOpts struct { EnableNodeExpansion bool EnableSnapshot bool EnableVolumeMountGroup bool + EnableNodeVolumeCondition bool TokenRequests []storagev1.TokenRequest RequiresRepublish *bool FSGroupPolicy *storagev1.FSGroupPolicy @@ -547,6 +549,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver { attachable: !driverOpts.DisableAttach, attachLimit: driverOpts.AttachLimit, enableNodeExpansion: driverOpts.EnableNodeExpansion, + enableNodeVolumeCondition: driverOpts.EnableNodeVolumeCondition, tokenRequests: driverOpts.TokenRequests, requiresRepublish: driverOpts.RequiresRepublish, fsGroupPolicy: driverOpts.FSGroupPolicy, @@ -621,12 +624,13 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) // for cleanup callbacks. ctx, cancel := context.WithCancel(context.Background()) serviceConfig := mockservice.Config{ - DisableAttach: !m.attachable, - DriverName: "csi-mock-" + f.UniqueName, - AttachLimit: int64(m.attachLimit), - NodeExpansionRequired: m.enableNodeExpansion, - VolumeMountGroupRequired: m.enableVolumeMountGroup, - EnableTopology: m.enableTopology, + DisableAttach: !m.attachable, + DriverName: "csi-mock-" + f.UniqueName, + AttachLimit: int64(m.attachLimit), + NodeExpansionRequired: m.enableNodeExpansion, + NodeVolumeConditionRequired: m.enableNodeVolumeCondition, + VolumeMountGroupRequired: m.enableVolumeMountGroup, + EnableTopology: m.enableTopology, IO: proxy.PodDirIO{ F: f, Namespace: m.driverNamespace.Name,