mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #87978 from jsafrane/block-csi-test
Add CSI block volume directory cleanup
This commit is contained in:
commit
da9db64f9c
@ -17,6 +17,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/util/removeall:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csi/nodeinfomanager:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
|
@ -72,14 +72,15 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/util/removeall"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
utilstrings "k8s.io/utils/strings"
|
||||
)
|
||||
|
||||
@ -113,10 +114,16 @@ func (m *csiBlockMapper) getStagingPath() string {
|
||||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
|
||||
}
|
||||
|
||||
// getPublishDir returns path to a directory, where the volume is published to each pod.
|
||||
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}
|
||||
func (m *csiBlockMapper) getPublishDir() string {
|
||||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName)
|
||||
}
|
||||
|
||||
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
|
||||
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID}
|
||||
func (m *csiBlockMapper) getPublishPath() string {
|
||||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID))
|
||||
return filepath.Join(m.getPublishDir(), string(m.podUID))
|
||||
}
|
||||
|
||||
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
|
||||
@ -299,6 +306,13 @@ func (m *csiBlockMapper) SetUpDevice() error {
|
||||
// Call NodeStageVolume
|
||||
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
|
||||
if err != nil {
|
||||
if volumetypes.IsOperationFinishedError(err) {
|
||||
cleanupErr := m.cleanupOrphanDeviceFiles()
|
||||
if cleanupErr != nil {
|
||||
// V(4) for not so serious error
|
||||
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -435,6 +449,41 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = m.cleanupOrphanDeviceFiles(); err != nil {
|
||||
// V(4) for not so serious error
|
||||
klog.V(4).Infof("Failed to clean up block volume directory %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clean up any orphan files / directories when a block volume is being unstaged.
|
||||
// At this point we can be sure that there is no pod using the volume and all
|
||||
// files are indeed orphaned.
|
||||
func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
|
||||
// Remove artifacts of NodePublish.
|
||||
// publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name>
|
||||
// Each PublishVolume() created a subdirectory there. Since everything should be
|
||||
// already unpublished at this point, the directory should be empty by now.
|
||||
publishDir := m.getPublishDir()
|
||||
if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
|
||||
return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err))
|
||||
}
|
||||
|
||||
// Remove artifacts of NodeStage.
|
||||
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
|
||||
stagingPath := m.getStagingPath()
|
||||
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
|
||||
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
|
||||
}
|
||||
|
||||
// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>.
|
||||
// At this point it contains only "data/vol_data.json" and empty "dev/".
|
||||
volumeDir := getVolumePluginDir(m.specName, m.plugin.host)
|
||||
mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName())
|
||||
if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package csi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperSetupDeviceError(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
|
||||
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
pvName := pv.GetName()
|
||||
nodeName := string(plug.host.GetNodeName())
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
fClient := csiMapper.csiClient.(*fakeCsiDriverClient)
|
||||
fClient.nodeClient.SetNextError(errors.New("mock final error"))
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
t.Log("created attachement ", attachID)
|
||||
|
||||
err = csiMapper.SetUpDevice()
|
||||
if err == nil {
|
||||
t.Fatal("mapper unexpectedly succeeded")
|
||||
}
|
||||
|
||||
// Check that all directories have been cleaned
|
||||
// Check that all metadata / staging / publish directories were deleted
|
||||
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||
if _, err := os.Stat(dataDir); err == nil {
|
||||
t.Errorf("volume publish data directory %s was not deleted", dataDir)
|
||||
}
|
||||
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||
if _, err := os.Stat(devDir); err == nil {
|
||||
t.Errorf("volume publish device directory %s was not deleted", devDir)
|
||||
}
|
||||
stagingPath := csiMapper.getStagingPath()
|
||||
if _, err := os.Stat(stagingPath); err == nil {
|
||||
t.Errorf("volume staging path %s was not deleted", stagingPath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMapperMapPodDevice(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
|
||||
|
||||
@ -298,10 +347,10 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName)
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
@ -430,3 +479,123 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||
t.Error("csi server may not have received NodeUnstageVolume call")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeSetupTeardown(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
|
||||
|
||||
// Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device.
|
||||
// Focus on testing that there were no leftover files present after the cleanup.
|
||||
|
||||
plug, tmpDir := newTestPlugin(t, nil)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
pvName := pv.GetName()
|
||||
nodeName := string(plug.host.GetNodeName())
|
||||
|
||||
csiMapper.csiClient = setupClient(t, true)
|
||||
|
||||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
|
||||
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||
attachment.Status.Attached = true
|
||||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||
}
|
||||
t.Log("created attachement ", attachID)
|
||||
|
||||
err = csiMapper.SetUpDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||
}
|
||||
// Check if NodeStageVolume staged to the right path
|
||||
stagingPath := csiMapper.getStagingPath()
|
||||
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||
svol, ok := svols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
t.Error("csi server may not have received NodeStageVolume call")
|
||||
}
|
||||
if svol.Path != stagingPath {
|
||||
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
|
||||
}
|
||||
|
||||
path, err := csiMapper.MapPodDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||
}
|
||||
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||
pvol, ok := pvols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
t.Error("csi server may not have received NodePublishVolume call")
|
||||
}
|
||||
publishPath := csiMapper.getPublishPath()
|
||||
if pvol.Path != publishPath {
|
||||
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
|
||||
}
|
||||
if path != publishPath {
|
||||
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
|
||||
}
|
||||
|
||||
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make a new Unmapper: %v", err)
|
||||
}
|
||||
|
||||
csiUnmapper := unmapper.(*csiBlockMapper)
|
||||
csiUnmapper.csiClient = csiMapper.csiClient
|
||||
|
||||
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
|
||||
}
|
||||
|
||||
err = csiUnmapper.UnmapPodDevice()
|
||||
if err != nil {
|
||||
t.Errorf("unmapper failed to call UnmapPodDevice: %v", err)
|
||||
}
|
||||
|
||||
// GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume
|
||||
unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make a new Unmapper: %v", err)
|
||||
}
|
||||
csiUnmapper = unmapper.(*csiBlockMapper)
|
||||
csiUnmapper.csiClient = csiMapper.csiClient
|
||||
|
||||
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||
if _, ok := pubs[csiUnmapper.volumeID]; ok {
|
||||
t.Error("csi server may not have received NodeUnpublishVolume call")
|
||||
}
|
||||
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||
if _, ok := vols[csiUnmapper.volumeID]; ok {
|
||||
t.Error("csi server may not have received NodeUnstageVolume call")
|
||||
}
|
||||
|
||||
// Check that all metadata / staging / publish directories were deleted
|
||||
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||
if _, err := os.Stat(dataDir); err == nil {
|
||||
t.Errorf("volume publish data directory %s was not deleted", dataDir)
|
||||
}
|
||||
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||
if _, err := os.Stat(devDir); err == nil {
|
||||
t.Errorf("volume publish device directory %s was not deleted", devDir)
|
||||
}
|
||||
if _, err := os.Stat(publishPath); err == nil {
|
||||
t.Errorf("volume publish path %s was not deleted", publishPath)
|
||||
}
|
||||
publishDir := filepath.Dir(publishPath)
|
||||
if _, err := os.Stat(publishDir); err == nil {
|
||||
t.Errorf("volume publish parent directory %s was not deleted", publishDir)
|
||||
}
|
||||
if _, err := os.Stat(stagingPath); err == nil {
|
||||
t.Errorf("volume staging path %s was not deleted", stagingPath)
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,15 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/fake"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
@ -147,15 +150,22 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(accessMode),
|
||||
},
|
||||
AccessType: &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: fsType,
|
||||
MountFlags: mountOptions,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if fsType == fsTypeBlockName {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
|
||||
Block: &csipbv1.VolumeCapability_BlockVolume{},
|
||||
}
|
||||
} else {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: fsType,
|
||||
MountFlags: mountOptions,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
_, err := c.nodeClient.NodePublishVolume(ctx, req)
|
||||
if err != nil && !isFinalError(err) {
|
||||
return volumetypes.NewUncertainProgressError(err.Error())
|
||||
@ -193,16 +203,22 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(accessMode),
|
||||
},
|
||||
AccessType: &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: fsType,
|
||||
MountFlags: mountOptions,
|
||||
},
|
||||
},
|
||||
},
|
||||
Secrets: secrets,
|
||||
VolumeContext: volumeContext,
|
||||
}
|
||||
if fsType == fsTypeBlockName {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
|
||||
Block: &csipbv1.VolumeCapability_BlockVolume{},
|
||||
}
|
||||
} else {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: fsType,
|
||||
MountFlags: mountOptions,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
_, err := c.nodeClient.NodeStageVolume(ctx, req)
|
||||
if err != nil && !isFinalError(err) {
|
||||
@ -370,6 +386,13 @@ func TestClientNodeGetInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClientNodePublishVolume(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
testPath := filepath.Join(tmpDir, "path")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
volID string
|
||||
@ -378,11 +401,11 @@ func TestClientNodePublishVolume(t *testing.T) {
|
||||
mustFail bool
|
||||
err error
|
||||
}{
|
||||
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
|
||||
{name: "missing volID", targetPath: "/test/path", mustFail: true},
|
||||
{name: "test ok", volID: "vol-test", targetPath: testPath},
|
||||
{name: "missing volID", targetPath: testPath, mustFail: true},
|
||||
{name: "missing target path", volID: "vol-test", mustFail: true},
|
||||
{name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
|
||||
{name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -419,6 +442,13 @@ func TestClientNodePublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClientNodeUnpublishVolume(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
testPath := filepath.Join(tmpDir, "path")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
volID string
|
||||
@ -426,10 +456,10 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
|
||||
mustFail bool
|
||||
err error
|
||||
}{
|
||||
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
|
||||
{name: "missing volID", targetPath: "/test/path", mustFail: true},
|
||||
{name: "missing target path", volID: "vol-test", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
|
||||
{name: "test ok", volID: "vol-test", targetPath: testPath},
|
||||
{name: "missing volID", targetPath: testPath, mustFail: true},
|
||||
{name: "missing target path", volID: testPath, mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -454,6 +484,13 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClientNodeStageVolume(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
testPath := filepath.Join(tmpDir, "/test/path")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
volID string
|
||||
@ -464,11 +501,11 @@ func TestClientNodeStageVolume(t *testing.T) {
|
||||
mustFail bool
|
||||
err error
|
||||
}{
|
||||
{name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "ext4", mountOptions: []string{"unvalidated"}},
|
||||
{name: "missing volID", stagingTargetPath: "/test/path", mustFail: true},
|
||||
{name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}},
|
||||
{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
|
||||
{name: "missing target path", volID: "vol-test", mustFail: true},
|
||||
{name: "bad fs", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "badfs", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
|
||||
{name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -503,6 +540,13 @@ func TestClientNodeStageVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClientNodeUnstageVolume(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
testPath := filepath.Join(tmpDir, "/test/path")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
volID string
|
||||
@ -510,10 +554,10 @@ func TestClientNodeUnstageVolume(t *testing.T) {
|
||||
mustFail bool
|
||||
err error
|
||||
}{
|
||||
{name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path"},
|
||||
{name: "missing volID", stagingTargetPath: "/test/path", mustFail: true},
|
||||
{name: "test ok", volID: "vol-test", stagingTargetPath: testPath},
|
||||
{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
|
||||
{name: "missing target path", volID: "vol-test", mustFail: true},
|
||||
{name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
|
||||
{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
@ -108,20 +108,24 @@ func log(msg string, parts ...interface{}) string {
|
||||
return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...)
|
||||
}
|
||||
|
||||
// getVolumePluginDir returns the path where CSI plugin keeps metadata for given volume
|
||||
func getVolumePluginDir(specVolID string, host volume.VolumeHost) string {
|
||||
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
|
||||
return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID)
|
||||
}
|
||||
|
||||
// getVolumeDevicePluginDir returns the path where the CSI plugin keeps the
|
||||
// symlink for a block device associated with a given specVolumeID.
|
||||
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev
|
||||
func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
|
||||
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
|
||||
return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev")
|
||||
return filepath.Join(getVolumePluginDir(specVolID, host), "dev")
|
||||
}
|
||||
|
||||
// getVolumeDeviceDataDir returns the path where the CSI plugin keeps the
|
||||
// volume data for a block device associated with a given specVolumeID.
|
||||
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data
|
||||
func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
|
||||
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
|
||||
return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data")
|
||||
return filepath.Join(getVolumePluginDir(specVolID, host), "data")
|
||||
}
|
||||
|
||||
// hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce
|
||||
|
@ -19,6 +19,9 @@ package fake
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
csipb "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -172,14 +175,29 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
|
||||
return nil, timeoutErr
|
||||
}
|
||||
|
||||
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
|
||||
// "Creation of target_path is the responsibility of the SP."
|
||||
// Our plugin depends on it.
|
||||
if req.VolumeCapability.GetBlock() != nil {
|
||||
if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err)
|
||||
}
|
||||
} else {
|
||||
if err := os.MkdirAll(req.TargetPath, 0755); err != nil {
|
||||
return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
publishedVolume := CSIVolume{
|
||||
VolumeHandle: req.GetVolumeId(),
|
||||
Path: req.GetTargetPath(),
|
||||
DeviceMountPath: req.GetStagingTargetPath(),
|
||||
VolumeContext: req.GetVolumeContext(),
|
||||
FSType: req.GetVolumeCapability().GetMount().GetFsType(),
|
||||
MountFlags: req.GetVolumeCapability().GetMount().MountFlags,
|
||||
}
|
||||
if req.GetVolumeCapability().GetMount() != nil {
|
||||
publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType
|
||||
publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags
|
||||
}
|
||||
f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume
|
||||
return &csipb.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
@ -196,6 +214,12 @@ func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnp
|
||||
return nil, errors.New("missing target path")
|
||||
}
|
||||
delete(f.nodePublishedVolumes, req.GetVolumeId())
|
||||
|
||||
// "The SP MUST delete the file or directory it created at this path."
|
||||
if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err)
|
||||
}
|
||||
|
||||
return &csipb.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user