Merge pull request #124690 from mowangdk/test/add_e2e_test_for_volume_health

chore: Add e2e test for NodeGetVolumeStats
This commit is contained in:
Kubernetes Prow Robot 2024-08-13 13:00:43 -07:00 committed by GitHub
commit b2799bbda7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 264 additions and 31 deletions

View File

@ -395,6 +395,19 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
VolumeSourceXFS = framework.WithFeature(framework.ValidFeatures.Add("VolumeSourceXFS"))
// Ownerd by SIG Storage
// kep: https://kep.k8s.io/1432
// test-infra jobs:
// - pull-kubernetes-e2e-storage-kind-alpha-features (need manual trigger)
// - ci-kubernetes-e2e-storage-kind-alpha-features
// When this label is added to a test, it means that the cluster must be created
// with the feature-gate "CSIVolumeHealth=true".
//
// Once the feature is stable, this label should be removed and these tests will
// be run by default on any cluster. The test-infra job also should be updated to
// not focus on this feature anymore.
CSIVolumeHealth = framework.WithFeature(framework.ValidFeatures.Add("CSIVolumeHealth"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
Vsphere = framework.WithFeature(framework.ValidFeatures.Add("vsphere"))

View File

@ -56,6 +56,8 @@ const (
csiPodUnschedulableTimeout = 5 * time.Minute
csiResizeWaitPeriod = 5 * time.Minute
csiVolumeAttachmentTimeout = 7 * time.Minute
// how long to wait for GetVolumeStats
csiNodeVolumeStatWaitPeriod = 2 * time.Minute
// how long to wait for Resizing Condition on PVC to appear
csiResizingConditionWait = 2 * time.Minute
@ -96,6 +98,7 @@ type testParameters struct {
disableResizingOnDriver bool
enableSnapshot bool
enableVolumeMountGroup bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver.
enableNodeVolumeCondition bool
hooks *drivers.Hooks
tokenRequests []storagev1.TokenRequest
requiresRepublish *bool
@ -168,6 +171,7 @@ func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
DisableAttach: tp.disableAttach,
EnableResizing: tp.enableResizing,
EnableNodeExpansion: tp.enableNodeExpansion,
EnableNodeVolumeCondition: tp.enableNodeVolumeCondition,
EnableSnapshot: tp.enableSnapshot,
EnableVolumeMountGroup: tp.enableVolumeMountGroup,
TokenRequests: tp.tokenRequests,

View File

@ -0,0 +1,199 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csimock
import (
"context"
"strings"
"time"
"google.golang.org/grpc/codes"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/features"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
"k8s.io/kubernetes/test/e2e/storage/drivers"
"k8s.io/kubernetes/test/e2e/storage/utils"
admissionapi "k8s.io/pod-security-admission/api"
"github.com/onsi/ginkgo/v2"
)
var _ = utils.SIGDescribe("CSI Mock Node Volume Health", feature.CSIVolumeHealth, framework.WithFeatureGate(features.CSIVolumeHealth), func() {
f := framework.NewDefaultFramework("csi-mock-node-volume-health")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
m := newMockDriverSetup(f)
f.Context("CSI Mock Node Volume Health", f.WithSlow(), func() {
trackedCalls := []string{
"NodeGetVolumeStats",
}
tests := []struct {
name string
expectedCalls []csiCall
nodeVolumeConditionRequired bool
nodeAbnormalVolumeCondition bool
}{
{
name: "return normal volume stats",
expectedCalls: []csiCall{
{
expectedMethod: "NodeGetVolumeStats",
expectedError: codes.OK,
},
},
nodeVolumeConditionRequired: true,
nodeAbnormalVolumeCondition: false,
},
{
name: "return normal volume stats without volume condition",
expectedCalls: []csiCall{
{
expectedMethod: "NodeGetVolumeStats",
expectedError: codes.OK,
},
},
nodeVolumeConditionRequired: false,
nodeAbnormalVolumeCondition: false,
},
{
name: "return normal volume stats with abnormal volume condition",
expectedCalls: []csiCall{
{
expectedMethod: "NodeGetVolumeStats",
expectedError: codes.OK,
},
},
nodeVolumeConditionRequired: true,
nodeAbnormalVolumeCondition: true,
},
}
for _, t := range tests {
test := t
ginkgo.It(test.name, func(ctx context.Context) {
m.init(ctx, testParameters{
registerDriver: true,
enableNodeVolumeCondition: test.nodeVolumeConditionRequired,
hooks: createGetVolumeStatsHook(test.nodeAbnormalVolumeCondition),
})
ginkgo.DeferCleanup(m.cleanup)
_, claim, pod := m.createPod(ctx, pvcReference)
if pod == nil {
return
}
// Wait for PVC to get bound to make sure the CSI driver is fully started.
err := e2epv.WaitForPersistentVolumeClaimPhase(ctx, v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout)
framework.ExpectNoError(err, "while waiting for PVC to get provisioned")
ginkgo.By("Waiting for pod to be running")
err = e2epod.WaitForPodNameRunningInNamespace(ctx, m.cs, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "wait for running pod")
ginkgo.By("Waiting for all remaining expected CSI calls")
err = wait.PollUntilContextTimeout(ctx, time.Second, csiNodeVolumeStatWaitPeriod, true, func(c context.Context) (done bool, err error) {
var index int
_, index, err = compareCSICalls(ctx, trackedCalls, test.expectedCalls, m.driver.GetCalls)
if err != nil {
return true, err
}
if index == 0 {
// No CSI call received yet
return false, nil
}
if len(test.expectedCalls) == index {
// all calls received
return true, nil
}
return false, nil
})
framework.ExpectNoError(err, "while waiting for all CSI calls")
// try to use ```csi.NewMetricsCsi(pv.handler).GetMetrics()``` to get metrics from csimock driver but failed.
// the mocked csidriver register doesn't regist itself to normal csidriver.
if test.nodeVolumeConditionRequired {
pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get running pod")
grabber, err := e2emetrics.NewMetricsGrabber(ctx, f.ClientSet, nil, f.ClientConfig(), true, false, false, false, false, false)
framework.ExpectNoError(err, "creating the metrics grabber")
waitErr := wait.PollUntilContextTimeout(ctx, 30*time.Second, csiNodeVolumeStatWaitPeriod, true, func(ctx context.Context) (bool, error) {
framework.Logf("Grabbing Kubelet metrics")
// Grab kubelet metrics from the node the pod was scheduled on
var err error
kubeMetrics, err := grabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
if err != nil {
framework.Logf("Error fetching kubelet metrics err: %v", err)
return false, err
}
if !findVolumeConditionMetrics(f.Namespace.Name, claim.Name, kubeMetrics, test.nodeAbnormalVolumeCondition) {
return false, nil
}
return true, nil
})
framework.ExpectNoError(waitErr, "call metrics should not have any error")
}
})
}
})
})
func findVolumeConditionMetrics(pvcNamespace, pvcName string, kubeMetrics e2emetrics.KubeletMetrics, nodeAbnormalVolumeCondition bool) bool {
found := false
framework.Logf("Looking for sample tagged with namespace `%s`, PVC `%s`", pvcNamespace, pvcName)
for key, value := range kubeMetrics {
for _, sample := range value {
framework.Logf("Found sample %++v with key: %s", sample, key)
samplePVC, ok := sample.Metric["persistentvolumeclaim"]
if !ok {
break
}
sampleNS, ok := sample.Metric["namespace"]
if !ok {
break
}
if string(samplePVC) == pvcName && string(sampleNS) == pvcNamespace && strings.Contains(key, kubeletmetrics.VolumeStatsHealthStatusAbnormalKey) {
if (nodeAbnormalVolumeCondition && sample.Value.String() == "1") || (!nodeAbnormalVolumeCondition && sample.Value.String() == "0") {
found = true
break
}
}
}
}
return found
}
func createGetVolumeStatsHook(abnormalVolumeCondition bool) *drivers.Hooks {
return &drivers.Hooks{
Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
if req, ok := request.(*csipbv1.NodeGetVolumeStatsRequest); ok {
if abnormalVolumeCondition {
req.VolumePath = "/tmp/csi/health/abnormal"
}
}
return nil, nil
},
}
}

View File

@ -359,13 +359,6 @@ func (s *service) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
}
if s.config.NodeExpansionRequired {
capabilities = append(capabilities, &csi.NodeServiceCapability{
@ -377,6 +370,16 @@ func (s *service) NodeGetCapabilities(
})
}
if s.config.NodeVolumeConditionRequired {
capabilities = append(capabilities, &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
})
}
if s.config.VolumeMountGroupRequired {
capabilities = append(capabilities, &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
@ -417,7 +420,10 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{},
VolumeCondition: &csi.VolumeCondition{
Abnormal: false,
Message: "volume is normal",
},
}
if len(req.GetVolumeId()) == 0 {
@ -437,6 +443,19 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
nodeMntPathKey := path.Join(s.nodeID, req.VolumePath)
resp.Usage = []*csi.VolumeUsage{
{
Total: v.GetCapacityBytes(),
Unit: csi.VolumeUsage_BYTES,
},
}
if req.GetVolumePath() == "/tmp/csi/health/abnormal" {
resp.VolumeCondition.Abnormal = true
resp.VolumeCondition.Message = "volume is abnormal"
return resp, nil
}
_, exists := v.VolumeContext[nodeMntPathKey]
if !exists {
msg := fmt.Sprintf("volume %q doest not exist on the specified path %q", req.VolumeId, req.VolumePath)
@ -449,12 +468,5 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
return nil, status.Errorf(hookVal, hookMsg)
}
resp.Usage = []*csi.VolumeUsage{
{
Total: v.GetCapacityBytes(),
Unit: csi.VolumeUsage_BYTES,
},
}
return resp, nil
}

View File

@ -55,6 +55,7 @@ type Config struct {
DriverName string
AttachLimit int64
NodeExpansionRequired bool
NodeVolumeConditionRequired bool
VolumeMountGroupRequired bool
DisableControllerExpansion bool
DisableOnlineExpansion bool

View File

@ -344,6 +344,7 @@ type mockCSIDriver struct {
requiresRepublish *bool
fsGroupPolicy *storagev1.FSGroupPolicy
enableVolumeMountGroup bool
enableNodeVolumeCondition bool
embedded bool
calls MockCSICalls
embeddedCSIDriver *mockdriver.CSIDriver
@ -393,6 +394,7 @@ type CSIMockDriverOpts struct {
EnableNodeExpansion bool
EnableSnapshot bool
EnableVolumeMountGroup bool
EnableNodeVolumeCondition bool
TokenRequests []storagev1.TokenRequest
RequiresRepublish *bool
FSGroupPolicy *storagev1.FSGroupPolicy
@ -547,6 +549,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
attachable: !driverOpts.DisableAttach,
attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion,
enableNodeVolumeCondition: driverOpts.EnableNodeVolumeCondition,
tokenRequests: driverOpts.TokenRequests,
requiresRepublish: driverOpts.RequiresRepublish,
fsGroupPolicy: driverOpts.FSGroupPolicy,
@ -625,6 +628,7 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework)
DriverName: "csi-mock-" + f.UniqueName,
AttachLimit: int64(m.attachLimit),
NodeExpansionRequired: m.enableNodeExpansion,
NodeVolumeConditionRequired: m.enableNodeVolumeCondition,
VolumeMountGroupRequired: m.enableVolumeMountGroup,
EnableTopology: m.enableTopology,
IO: proxy.PodDirIO{