test:Add volume condition test cases

This commit is contained in:
mowangdk 2024-06-05 09:39:10 +08:00
parent 726eeb7778
commit 4c1a2d6851
7 changed files with 189 additions and 46 deletions

View File

@ -364,6 +364,19 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
VolumeSourceXFS = framework.WithFeature(framework.ValidFeatures.Add("VolumeSourceXFS"))
// Ownerd by SIG Storage
// kep: https://kep.k8s.io/1432
// test-infra jobs:
// - pull-kubernetes-e2e-storage-kind-alpha-features (need manual trigger)
// - ci-kubernetes-e2e-storage-kind-alpha-features
// When this label is added to a test, it means that the cluster must be created
// with the feature-gate "CSIVolumeHealth=true".
//
// Once the feature is stable, this label should be removed and these tests will
// be run by default on any cluster. The test-infra job also should be updated to
// not focus on this feature anymore.
CSIVolumeHealth = framework.WithFeature(framework.ValidFeatures.Add("CSIVolumeHealth"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
Vsphere = framework.WithFeature(framework.ValidFeatures.Add("vsphere"))

View File

@ -856,3 +856,14 @@ func WaitForContainerTerminated(ctx context.Context, c clientset.Interface, name
return false, nil
})
}
func GetPodNodeName(ctx context.Context, c clientset.Interface, ns, name string) (string, error) {
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
}
if pod.Spec.NodeName == "" {
return "", fmt.Errorf("pod %s/%s has no node name", ns, name)
}
return pod.Spec.NodeName, nil
}

View File

@ -56,7 +56,7 @@ const (
csiResizeWaitPeriod = 5 * time.Minute
csiVolumeAttachmentTimeout = 7 * time.Minute
// how long to wait for GetVolumeStats
csiNodeVolumeStatWaitPeriod = 1 * time.Minute
csiNodeVolumeStatWaitPeriod = 2 * time.Minute
// how long to wait for Resizing Condition on PVC to appear
csiResizingConditionWait = 2 * time.Minute
@ -98,6 +98,7 @@ type testParameters struct {
enableSnapshot bool
enableVolumeMountGroup bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver.
enableNodeVolumeStat bool
enableNodeVolumeCondition bool
hooks *drivers.Hooks
tokenRequests []storagev1.TokenRequest
requiresRepublish *bool
@ -168,6 +169,7 @@ func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
EnableResizing: tp.enableResizing,
EnableNodeExpansion: tp.enableNodeExpansion,
EnableNodeVolumeStat: tp.enableNodeVolumeStat,
EnableNodeVolumeCondition: tp.enableNodeVolumeCondition,
EnableSnapshot: tp.enableSnapshot,
EnableVolumeMountGroup: tp.enableVolumeMountGroup,
TokenRequests: tp.tokenRequests,

View File

@ -19,23 +19,30 @@ package csimock
import (
"context"
"fmt"
"strings"
"time"
"github.com/onsi/gomega"
"google.golang.org/grpc/codes"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/features"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
"k8s.io/kubernetes/test/e2e/storage/drivers"
"k8s.io/kubernetes/test/e2e/storage/utils"
admissionapi "k8s.io/pod-security-admission/api"
"github.com/onsi/ginkgo/v2"
)
var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", feature.CSIVolumeHealth, framework.WithFeatureGate(features.CSIVolumeHealth), func() {
f := framework.NewDefaultFramework("csi-mock-node-volume-stats")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
m := newMockDriverSetup(f)
@ -48,15 +55,15 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
name string
expectedCalls []csiCall
nodeVolumeStatRequired bool
nodeGetVolumeStatsHook func(counter int64) error
nodeVolumeConditionRequired bool
nodeAbnormalVolumeCondition bool
}{
{
name: "return abnormal volume stats",
name: "volume stats not returned",
expectedCalls: []csiCall{},
nodeVolumeStatRequired: false,
nodeGetVolumeStatsHook: func(counter int64) error {
return nil
},
nodeVolumeConditionRequired: false,
nodeAbnormalVolumeCondition: false,
},
{
name: "return normal volume stats",
@ -67,9 +74,32 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
},
},
nodeVolumeStatRequired: true,
nodeGetVolumeStatsHook: func(counter int64) error {
return nil
nodeVolumeConditionRequired: true,
nodeAbnormalVolumeCondition: false,
},
{
name: "return normal volume stats without volume condition",
expectedCalls: []csiCall{
{
expectedMethod: "NodeGetVolumeStats",
expectedError: codes.OK,
},
},
nodeVolumeStatRequired: true,
nodeVolumeConditionRequired: false,
nodeAbnormalVolumeCondition: false,
},
{
name: "return normal volume stats with abnormal volume condition",
expectedCalls: []csiCall{
{
expectedMethod: "NodeGetVolumeStats",
expectedError: codes.OK,
},
},
nodeVolumeStatRequired: true,
nodeVolumeConditionRequired: true,
nodeAbnormalVolumeCondition: true,
},
}
for _, t := range tests {
@ -77,11 +107,11 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
ginkgo.It(test.name, func(ctx context.Context) {
ginkgo.By(fmt.Sprintf("volume stats: %+v", test))
// Hooks appear to be required for enableNodeVolumeStat.
hooks := createPreHook("NodeGetVolumeStats", test.nodeGetVolumeStatsHook)
m.init(ctx, testParameters{
registerDriver: true,
enableNodeVolumeStat: test.nodeVolumeStatRequired,
hooks: hooks,
enableNodeVolumeCondition: test.nodeVolumeConditionRequired,
hooks: createGetVolumeStatsHook(test.nodeAbnormalVolumeCondition),
})
ginkgo.DeferCleanup(m.cleanup)
_, claim, pod := m.createPod(ctx, pvcReference)
@ -95,7 +125,6 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
ginkgo.By("Waiting for pod to be running")
err = e2epod.WaitForPodNameRunningInNamespace(ctx, m.cs, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "Failed to start pod: %v", err)
ginkgo.By("Waiting for all remaining expected CSI calls")
err = wait.PollUntilContextTimeout(ctx, time.Second, csiNodeVolumeStatWaitPeriod, true, func(c context.Context) (done bool, err error) {
var index int
@ -114,6 +143,29 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
return false, nil
})
if test.nodeVolumeStatRequired {
// try to use ```csi.NewMetricsCsi(pv.handler).GetMetrics()``` to get metrics from csimock driver but failed.
// the mocked csidriver register doesn't regist itself to normal csidriver.
if test.nodeVolumeConditionRequired {
nodeName, err := e2epod.GetPodNodeName(ctx, f.ClientSet, pod.Namespace, pod.Name)
framework.ExpectNoError(err, "no error of node name")
grabber, err := e2emetrics.NewMetricsGrabber(ctx, f.ClientSet, nil, f.ClientConfig(), true, false, false, false, false, false)
framework.ExpectNoError(err, "creating the metrics grabber")
waitErr := wait.PollUntilContextTimeout(ctx, 30*time.Second, csiNodeVolumeStatWaitPeriod, true, func(ctx context.Context) (bool, error) {
framework.Logf("Grabbing Kubelet metrics")
// Grab kubelet metrics from the node the pod was scheduled on
var err error
kubeMetrics, err := grabber.GrabFromKubelet(ctx, nodeName)
if err != nil {
framework.Logf("Error fetching kubelet metrics err: %v", err)
return false, err
}
if !findVolumeConditionMetrics(f.Namespace.Name, claim.Name, kubeMetrics, test.nodeAbnormalVolumeCondition) {
return false, nil
}
return true, nil
})
framework.ExpectNoError(waitErr, "call metrics should not have any error")
}
framework.ExpectNoError(err, "while waiting for all CSI calls")
} else {
gomega.Expect(err).To(gomega.HaveOccurred(), "an error should have occurred")
@ -123,3 +175,44 @@ var _ = utils.SIGDescribe("CSI Mock Node Volume Stats", func() {
})
})
func findVolumeConditionMetrics(pvcNamespace, pvcName string, kubeMetrics e2emetrics.KubeletMetrics, nodeAbnormalVolumeCondition bool) bool {
found := false
framework.Logf("Looking for sample tagged with namespace `%s`, PVC `%s`", pvcNamespace, pvcName)
for key, value := range kubeMetrics {
for _, sample := range value {
framework.Logf("Found sample %++v with key: %s", sample, key)
samplePVC, ok := sample.Metric["persistentvolumeclaim"]
if !ok {
break
}
sampleNS, ok := sample.Metric["namespace"]
if !ok {
break
}
if string(samplePVC) == pvcName && string(sampleNS) == pvcNamespace && strings.Contains(key, kubeletmetrics.VolumeStatsHealthStatusAbnormalKey) {
if (nodeAbnormalVolumeCondition && sample.Value.String() == "1") || (!nodeAbnormalVolumeCondition && sample.Value.String() == "0") {
found = true
break
}
}
}
}
return found
}
func createGetVolumeStatsHook(abnormalVolumeCondition bool) *drivers.Hooks {
return &drivers.Hooks{
Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
if req, ok := request.(*csipbv1.NodeGetVolumeStatsRequest); ok {
if abnormalVolumeCondition {
req.VolumePath = "/tmp/csi/health/abnormal"
}
}
return nil, nil
},
}
}

View File

@ -380,6 +380,16 @@ func (s *service) NodeGetCapabilities(
})
}
if s.config.NodeVolumeConditionRequired {
capabilities = append(capabilities, &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
})
}
if s.config.VolumeMountGroupRequired {
capabilities = append(capabilities, &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
@ -420,7 +430,10 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{},
VolumeCondition: &csi.VolumeCondition{
Abnormal: false,
Message: "volume is normal",
},
}
if len(req.GetVolumeId()) == 0 {
@ -440,6 +453,19 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
nodeMntPathKey := path.Join(s.nodeID, req.VolumePath)
resp.Usage = []*csi.VolumeUsage{
{
Total: v.GetCapacityBytes(),
Unit: csi.VolumeUsage_BYTES,
},
}
if req.GetVolumePath() == "/tmp/csi/health/abnormal" {
resp.VolumeCondition.Abnormal = true
resp.VolumeCondition.Message = "volume is abnormal"
return resp, nil
}
_, exists := v.VolumeContext[nodeMntPathKey]
if !exists {
msg := fmt.Sprintf("volume %q doest not exist on the specified path %q", req.VolumeId, req.VolumePath)
@ -452,12 +478,5 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
return nil, status.Errorf(hookVal, hookMsg)
}
resp.Usage = []*csi.VolumeUsage{
{
Total: v.GetCapacityBytes(),
Unit: csi.VolumeUsage_BYTES,
},
}
return resp, nil
}

View File

@ -56,6 +56,7 @@ type Config struct {
AttachLimit int64
NodeExpansionRequired bool
NodeVolumeStatRequired bool
NodeVolumeConditionRequired bool
VolumeMountGroupRequired bool
DisableControllerExpansion bool
DisableOnlineExpansion bool

View File

@ -307,6 +307,7 @@ type mockCSIDriver struct {
fsGroupPolicy *storagev1.FSGroupPolicy
enableVolumeMountGroup bool
enableNodeVolumeStat bool
enableNodeVolumeCondition bool
embedded bool
calls MockCSICalls
embeddedCSIDriver *mockdriver.CSIDriver
@ -356,6 +357,7 @@ type CSIMockDriverOpts struct {
EnableSnapshot bool
EnableVolumeMountGroup bool
EnableNodeVolumeStat bool
EnableNodeVolumeCondition bool
TokenRequests []storagev1.TokenRequest
RequiresRepublish *bool
FSGroupPolicy *storagev1.FSGroupPolicy
@ -510,6 +512,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion,
enableNodeVolumeStat: driverOpts.EnableNodeVolumeStat,
enableNodeVolumeCondition: driverOpts.EnableNodeVolumeCondition,
tokenRequests: driverOpts.TokenRequests,
requiresRepublish: driverOpts.RequiresRepublish,
fsGroupPolicy: driverOpts.FSGroupPolicy,
@ -588,6 +591,7 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework)
AttachLimit: int64(m.attachLimit),
NodeExpansionRequired: m.enableNodeExpansion,
NodeVolumeStatRequired: m.enableNodeVolumeStat,
NodeVolumeConditionRequired: m.enableNodeVolumeCondition,
VolumeMountGroupRequired: m.enableVolumeMountGroup,
EnableTopology: m.enableTopology,
IO: proxy.PodDirIO{