mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Merge pull request #120330 from rohitssingh/master
Retry NodeStageVolume if CSI Driver Is Missing; Treat this Error as Transient
This commit is contained in:
commit
a06e5a7307
@ -287,7 +287,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
|
||||
if c.csiClient == nil {
|
||||
c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
|
||||
if err != nil {
|
||||
return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
|
||||
}
|
||||
}
|
||||
csi := c.csiClient
|
||||
@ -607,7 +609,9 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
|
||||
if c.csiClient == nil {
|
||||
c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
|
||||
if err != nil {
|
||||
return errors.New(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
|
||||
}
|
||||
}
|
||||
csi := c.csiClient
|
||||
|
@ -1109,6 +1109,7 @@ func TestAttacherMountDevice(t *testing.T) {
|
||||
exitError error
|
||||
spec *volume.Spec
|
||||
watchTimeout time.Duration
|
||||
skipClientSetup bool
|
||||
}{
|
||||
{
|
||||
testName: "normal PV",
|
||||
@ -1249,6 +1250,20 @@ func TestAttacherMountDevice(t *testing.T) {
|
||||
createAttachment: true,
|
||||
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
|
||||
},
|
||||
{
|
||||
testName: "driver not specified",
|
||||
volName: "test-vol1",
|
||||
devicePath: "path1",
|
||||
deviceMountPath: "path2",
|
||||
fsGroup: &testFSGroup,
|
||||
stageUnstageSet: true,
|
||||
createAttachment: true,
|
||||
populateDeviceMountPath: false,
|
||||
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, "not-found", "test-vol1"), false),
|
||||
exitError: transientError,
|
||||
shouldFail: true,
|
||||
skipClientSetup: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -1277,7 +1292,9 @@ func TestAttacherMountDevice(t *testing.T) {
|
||||
t.Fatalf("failed to create new attacher: %v", err0)
|
||||
}
|
||||
csiAttacher := getCsiAttacherFromVolumeAttacher(attacher, tc.watchTimeout)
|
||||
csiAttacher.csiClient = setupClientWithVolumeMountGroup(t, tc.stageUnstageSet, tc.driverSupportsVolumeMountGroup)
|
||||
if !tc.skipClientSetup {
|
||||
csiAttacher.csiClient = setupClientWithVolumeMountGroup(t, tc.stageUnstageSet, tc.driverSupportsVolumeMountGroup)
|
||||
}
|
||||
|
||||
if tc.deviceMountPath != "" {
|
||||
tc.deviceMountPath = filepath.Join(tmpDir, tc.deviceMountPath)
|
||||
@ -1342,16 +1359,15 @@ func TestAttacherMountDevice(t *testing.T) {
|
||||
t.Errorf("failed to modify permissions after test: %v", err)
|
||||
}
|
||||
}
|
||||
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(tc.exitError), reflect.TypeOf(err), err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if tc.shouldFail {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
}
|
||||
|
||||
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError: %v got: %v", tc.exitError, err)
|
||||
}
|
||||
|
||||
// Verify call goes through all the way
|
||||
numStaged := 1
|
||||
if !tc.stageUnstageSet {
|
||||
@ -1569,6 +1585,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAttacherUnmountDevice(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
testCases := []struct {
|
||||
testName string
|
||||
volID string
|
||||
@ -1578,6 +1595,8 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
stageUnstageSet bool
|
||||
shouldFail bool
|
||||
watchTimeout time.Duration
|
||||
exitError error
|
||||
unsetClient bool
|
||||
}{
|
||||
// PV agnostic path positive test cases
|
||||
{
|
||||
@ -1609,6 +1628,17 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
stageUnstageSet: true,
|
||||
shouldFail: true,
|
||||
},
|
||||
// Ensure that a transient error is returned if the client is not established
|
||||
{
|
||||
testName: "fail with transient error, json file exists but client not found",
|
||||
volID: "project/zone/test-vol1",
|
||||
deviceMountPath: "plugins/csi/" + generateSha("project/zone/test-vol1") + "/globalmount",
|
||||
jsonFile: `{"driverName": "unknown-driver", "volumeHandle":"project/zone/test-vol1"}`,
|
||||
stageUnstageSet: true,
|
||||
shouldFail: true,
|
||||
exitError: transientError,
|
||||
unsetClient: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -1656,6 +1686,11 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
t.Fatalf("Failed to create PV: %v", err)
|
||||
}
|
||||
}
|
||||
// Clear out the client if specified
|
||||
// The lookup to generate a new client will fail
|
||||
if tc.unsetClient {
|
||||
csiAttacher.csiClient = nil
|
||||
}
|
||||
|
||||
// Run
|
||||
err := csiAttacher.UnmountDevice(tc.deviceMountPath)
|
||||
@ -1664,6 +1699,9 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
if !tc.shouldFail {
|
||||
t.Errorf("test should not fail, but error occurred: %v", err)
|
||||
}
|
||||
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(tc.exitError), reflect.TypeOf(err), err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if tc.shouldFail {
|
||||
|
@ -319,7 +319,9 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||
|
||||
csiClient, err := m.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return "", volumetypes.NewTransientOperationFailure(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
// Call NodeStageVolume
|
||||
@ -379,7 +381,9 @@ func (m *csiBlockMapper) MapPodDevice() (string, error) {
|
||||
|
||||
csiClient, err := m.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return "", volumetypes.NewTransientOperationFailure(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
// Call NodePublishVolume
|
||||
@ -444,7 +448,9 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||
|
||||
csiClient, err := m.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
// Call NodeUnstageVolume
|
||||
@ -506,7 +512,9 @@ func (m *csiBlockMapper) UnmapPodDevice() error {
|
||||
|
||||
csiClient, err := m.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
|
||||
@ -283,6 +284,45 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperSetupDeviceNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
pvName := pv.GetName()
|
||||
nodeName := string(plug.host.GetNodeName())
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
t.Log("created attachment ", attachID)
|
||||
|
||||
// Clear out the clients
|
||||
// The lookup to generate a new client will fail when it tries to query a driver with an unknown name
|
||||
csiMapper.csiClient = nil
|
||||
csiMapper.csiClientGetter.csiClient = nil
|
||||
// Note that prepareBlockMapperTest above will create a driver with a name of "test-driver"
|
||||
csiMapper.csiClientGetter.driverName = "unknown-driver"
|
||||
|
||||
_, err = csiMapper.SetUpDevice()
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperMapPodDevice(t *testing.T) {
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
@ -413,6 +453,45 @@ func TestBlockMapperMapPodDeviceWithPodInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperMapPodDeviceNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
pvName := pv.GetName()
|
||||
nodeName := string(plug.host.GetNodeName())
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName)
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
t.Log("created attachment ", attachID)
|
||||
|
||||
// Clear out the clients
|
||||
// The lookup to generate a new client will fail when it tries to query a driver with an unknown name
|
||||
csiMapper.csiClient = nil
|
||||
csiMapper.csiClientGetter.csiClient = nil
|
||||
// Note that prepareBlockMapperTest above will create a driver with a name of "test-driver"
|
||||
csiMapper.csiClientGetter.driverName = "unknown-driver"
|
||||
|
||||
_, err = csiMapper.MapPodDevice()
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
@ -471,6 +550,62 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperTearDownDeviceNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
_, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
// save volume data
|
||||
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
|
||||
t.Errorf("failed to create dir [%s]: %v", dir, err)
|
||||
}
|
||||
|
||||
if err := saveVolumeData(
|
||||
dir,
|
||||
volDataFileName,
|
||||
map[string]string{
|
||||
volDataKey.specVolID: pv.ObjectMeta.Name,
|
||||
volDataKey.driverName: testDriver,
|
||||
volDataKey.volHandle: testVol,
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("failed to save volume data: %v", err)
|
||||
}
|
||||
|
||||
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make a new Unmapper: %v", err)
|
||||
}
|
||||
|
||||
csiUnmapper := unmapper.(*csiBlockMapper)
|
||||
csiUnmapper.csiClient = setupClient(t, true)
|
||||
|
||||
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
|
||||
}
|
||||
|
||||
// Clear out the clients
|
||||
// The lookup to generate a new client will fail when it tries to query a driver with an unknown name
|
||||
csiUnmapper.csiClient = nil
|
||||
csiUnmapper.csiClientGetter.csiClient = nil
|
||||
// Note that prepareBlockMapperTest above will create a driver with a name of "test-driver"
|
||||
csiUnmapper.csiClientGetter.driverName = "unknown-driver"
|
||||
|
||||
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeSetupTeardown(t *testing.T) {
|
||||
// Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device.
|
||||
// Focus on testing that there were no leftover files present after the cleanup.
|
||||
@ -587,3 +722,86 @@ func TestVolumeSetupTeardown(t *testing.T) {
|
||||
t.Errorf("volume staging path %s was not deleted", stagingPath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmapPodDeviceNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
pvName := pv.GetName()
|
||||
nodeName := string(plug.host.GetNodeName())
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
t.Log("created attachment ", attachID)
|
||||
|
||||
stagingPath, err := csiMapper.SetUpDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||
}
|
||||
// Check if NodeStageVolume staged to the right path
|
||||
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||
svol, ok := svols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
t.Error("csi server may not have received NodeStageVolume call")
|
||||
}
|
||||
if svol.Path != stagingPath {
|
||||
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
|
||||
}
|
||||
|
||||
path, err := csiMapper.MapPodDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||
}
|
||||
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||
pvol, ok := pvols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
t.Error("csi server may not have received NodePublishVolume call")
|
||||
}
|
||||
publishPath := csiMapper.getPublishPath()
|
||||
if pvol.Path != publishPath {
|
||||
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
|
||||
}
|
||||
if path != publishPath {
|
||||
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
|
||||
}
|
||||
|
||||
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make a new Unmapper: %v", err)
|
||||
}
|
||||
|
||||
csiUnmapper := unmapper.(*csiBlockMapper)
|
||||
csiUnmapper.csiClient = csiMapper.csiClient
|
||||
|
||||
_, err = csiUnmapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
|
||||
}
|
||||
|
||||
// Clear out the clients
|
||||
// The lookup to generate a new client will fail when it tries to query a driver with an unknown name
|
||||
csiUnmapper.csiClient = nil
|
||||
csiUnmapper.csiClientGetter.csiClient = nil
|
||||
// Note that prepareBlockMapperTest above will create a driver with a name of "test-driver"
|
||||
csiUnmapper.csiClientGetter.driverName = "unknown-driver"
|
||||
|
||||
err = csiUnmapper.UnmapPodDevice()
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
var _ volume.MetricsProvider = &metricsCsi{}
|
||||
@ -60,7 +61,9 @@ func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) {
|
||||
// Get CSI client
|
||||
csiClient, err := mc.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return nil, volumetypes.NewTransientOperationFailure(err.Error())
|
||||
}
|
||||
// Check whether "GET_VOLUME_STATS" is set
|
||||
volumeStatsSet, err := csiClient.NodeSupportsVolumeStats(ctx)
|
||||
|
@ -18,12 +18,14 @@ package csi
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/fake"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
func TestGetMetrics(t *testing.T) {
|
||||
@ -137,6 +139,36 @@ func TestGetMetricsDriverNotSupportStats(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// test GetMetrics with a volume that does not support stats
|
||||
func TestGetMetricsDriverNotFound(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
tests := []struct {
|
||||
name string
|
||||
volumeID string
|
||||
targetPath string
|
||||
exitError error
|
||||
}{
|
||||
{
|
||||
name: "volume with no driver",
|
||||
volumeID: "foobar",
|
||||
targetPath: "/mnt/foo",
|
||||
exitError: transientError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
|
||||
metricsGetter.driverName = "unknown-driver"
|
||||
_, err := metricsGetter.GetMetrics()
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getRawVolumeInfo() *csipbv1.NodeGetVolumeStatsResponse {
|
||||
return &csipbv1.NodeGetVolumeStatsResponse{
|
||||
Usage: []*csipbv1.VolumeUsage{
|
||||
|
@ -105,6 +105,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
|
||||
|
||||
csi, err := c.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
@ -419,7 +421,9 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
|
||||
volID := c.volumeID
|
||||
csi, err := c.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return errors.New(log("Unmounter.TearDownAt failed to get CSI client: %v", err))
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return volumetypes.NewTransientOperationFailure(log("Unmounter.TearDownAt failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
// Could not get spec info on whether this is a migrated operation because c.spec is nil
|
||||
|
@ -366,6 +366,7 @@ func TestMounterSetUp(t *testing.T) {
|
||||
func TestMounterSetUpSimple(t *testing.T) {
|
||||
fakeClient := fakeclient.NewSimpleClientset()
|
||||
plug, tmpDir := newTestPlugin(t, fakeClient)
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
testCases := []struct {
|
||||
@ -377,6 +378,8 @@ func TestMounterSetUpSimple(t *testing.T) {
|
||||
spec func(string, []string) *volume.Spec
|
||||
newMounterShouldFail bool
|
||||
setupShouldFail bool
|
||||
unsetClient bool
|
||||
exitError error
|
||||
}{
|
||||
{
|
||||
name: "setup with ephemeral source",
|
||||
@ -415,6 +418,21 @@ func TestMounterSetUpSimple(t *testing.T) {
|
||||
newMounterShouldFail: true,
|
||||
spec: func(fsType string, options []string) *volume.Spec { return nil },
|
||||
},
|
||||
{
|
||||
name: "setup with unknown CSI driver",
|
||||
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
|
||||
mode: storage.VolumeLifecyclePersistent,
|
||||
fsType: "zfs",
|
||||
spec: func(fsType string, options []string) *volume.Spec {
|
||||
pvSrc := makeTestPV("pv1", 20, "unknown-driver", "vol1")
|
||||
pvSrc.Spec.CSI.FSType = fsType
|
||||
pvSrc.Spec.MountOptions = options
|
||||
return volume.NewSpecFromPersistentVolume(pvSrc, false)
|
||||
},
|
||||
setupShouldFail: true,
|
||||
unsetClient: true,
|
||||
exitError: transientError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -450,13 +468,26 @@ func TestMounterSetUpSimple(t *testing.T) {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
|
||||
if tc.unsetClient {
|
||||
// Clear out the clients
|
||||
csiMounter.csiClient = nil
|
||||
csiMounter.csiClientGetter.csiClient = nil
|
||||
t.Log("driver name is ", csiMounter.csiClientGetter.driverName)
|
||||
}
|
||||
|
||||
// Mounter.SetUp()
|
||||
err = csiMounter.SetUp(volume.MounterArgs{})
|
||||
if tc.setupShouldFail && err != nil {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
if !tc.setupShouldFail && err != nil {
|
||||
if tc.setupShouldFail {
|
||||
if err != nil {
|
||||
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(tc.exitError), reflect.TypeOf(err), err)
|
||||
}
|
||||
t.Log(err)
|
||||
return
|
||||
} else {
|
||||
t.Error("test should fail, but no error occurred")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Fatal("unexpected error:", err)
|
||||
}
|
||||
|
||||
@ -1063,6 +1094,64 @@ func TestUnmounterTeardown(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestUnmounterTeardownNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
|
||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||
|
||||
// save the data file prior to unmount
|
||||
targetDir := getTargetPath(testPodUID, pv.ObjectMeta.Name, plug.host)
|
||||
dir := filepath.Join(targetDir, "mount")
|
||||
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
|
||||
t.Errorf("failed to create dir [%s]: %v", dir, err)
|
||||
}
|
||||
|
||||
// do a fake local mount
|
||||
diskMounter := util.NewSafeFormatAndMountFromHost(plug.GetPluginName(), plug.host)
|
||||
device := "/fake/device"
|
||||
if goruntime.GOOS == "windows" {
|
||||
// We need disk numbers on Windows.
|
||||
device = "1"
|
||||
}
|
||||
if err := diskMounter.FormatAndMount(device, dir, "testfs", nil); err != nil {
|
||||
t.Errorf("failed to mount dir [%s]: %v", dir, err)
|
||||
}
|
||||
|
||||
if err := saveVolumeData(
|
||||
targetDir,
|
||||
volDataFileName,
|
||||
map[string]string{
|
||||
volDataKey.specVolID: pv.ObjectMeta.Name,
|
||||
volDataKey.driverName: testDriver,
|
||||
volDataKey.volHandle: testVol,
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("failed to save volume data: %v", err)
|
||||
}
|
||||
|
||||
unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make a new Unmounter: %v", err)
|
||||
}
|
||||
|
||||
csiUnmounter := unmounter.(*csiMountMgr)
|
||||
|
||||
// Clear out the cached client
|
||||
// The lookup to generate a new client will fail when it tries to query a driver with an unknown name
|
||||
csiUnmounter.csiClientGetter.csiClient = nil
|
||||
// Note that registerFakePlugin above will create a driver with a name of "test-driver"
|
||||
csiUnmounter.csiClientGetter.driverName = "unknown-driver"
|
||||
|
||||
err = csiUnmounter.TearDownAt(dir)
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsCorruptedDir(t *testing.T) {
|
||||
existingMountPath, err := os.MkdirTemp(os.TempDir(), "blobfuse-csi-mount-test")
|
||||
if err != nil {
|
||||
|
@ -46,7 +46,9 @@ func (c *csiPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, er
|
||||
|
||||
csClient, err := newCsiDriverClient(csiDriverName(csiSource.Driver))
|
||||
if err != nil {
|
||||
return false, err
|
||||
// Treat the absence of the CSI driver as a transient error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/120268
|
||||
return false, volumetypes.NewTransientOperationFailure(err.Error())
|
||||
}
|
||||
fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
|
||||
if err != nil {
|
||||
|
@ -19,6 +19,7 @@ package csi
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -192,3 +193,28 @@ func TestNodeExpand(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeExpandNoClientError(t *testing.T) {
|
||||
transientError := volumetypes.NewTransientOperationFailure("")
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "expandable", "test-vol"), false)
|
||||
|
||||
newSize, _ := resource.ParseQuantity("20Gi")
|
||||
|
||||
resizeOptions := volume.NodeResizeOptions{
|
||||
VolumeSpec: spec,
|
||||
NewSize: newSize,
|
||||
DeviceMountPath: "/foo/bar",
|
||||
DeviceStagePath: "/foo/bar",
|
||||
DevicePath: "/mnt/foobar",
|
||||
}
|
||||
|
||||
_, err := plug.NodeExpand(resizeOptions)
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("test should fail, but no error occurred")
|
||||
} else if reflect.TypeOf(transientError) != reflect.TypeOf(err) {
|
||||
t.Fatalf("expected exitError type: %v got: %v (%v)", reflect.TypeOf(transientError), reflect.TypeOf(err), err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user