diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index f81314c7565..6d237e7f052 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/features:go_default_library", + "//pkg/util/removeall:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi/nodeinfomanager:go_default_library", "//pkg/volume/util:go_default_library", diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 43484b3ad2c..ee2ae319ee4 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -72,14 +72,15 @@ import ( "os" "path/filepath" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/removeall" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" utilstrings "k8s.io/utils/strings" ) @@ -113,10 +114,16 @@ func (m *csiBlockMapper) getStagingPath() string { return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName) } +// getPublishDir returns path to a directory, where the volume is published to each pod. +// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName} +func (m *csiBlockMapper) getPublishDir() string { + return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName) +} + // getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} func (m *csiBlockMapper) getPublishPath() string { - return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID)) + return filepath.Join(m.getPublishDir(), string(m.podUID)) } // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume @@ -299,6 +306,13 @@ func (m *csiBlockMapper) SetUpDevice() error { // Call NodeStageVolume _, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { + if volumetypes.IsOperationFinishedError(err) { + cleanupErr := m.cleanupOrphanDeviceFiles() + if cleanupErr != nil { + // V(4) for not so serious error + klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr) + } + } return err } @@ -435,6 +449,41 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return err } } + if err = m.cleanupOrphanDeviceFiles(); err != nil { + // V(4) for not so serious error + klog.V(4).Infof("Failed to clean up block volume directory %s", err) + } + + return nil +} + +// Clean up any orphan files / directories when a block volume is being unstaged. +// At this point we can be sure that there is no pod using the volume and all +// files are indeed orphaned. +func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { + // Remove artifacts of NodePublish. + // publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/ + // Each PublishVolume() created a subdirectory there. Since everything should be + // already unpublished at this point, the directory should be empty by now. + publishDir := m.getPublishDir() + if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err)) + } + + // Remove artifacts of NodeStage. + // stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/ + stagingPath := m.getStagingPath() + if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err)) + } + + // Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/. + // At this point it contains only "data/vol_data.json" and empty "dev/". + volumeDir := getVolumePluginDir(m.specName, m.plugin.host) + mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName()) + if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil { + return err + } return nil } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 0823e9d2107..e856da79427 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -18,6 +18,7 @@ package csi import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) { } } +func TestBlockMapperSetupDeviceError(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + fClient := csiMapper.csiClient.(*fakeCsiDriverClient) + fClient.nodeClient.SetNextError(errors.New("mock final error")) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + err = csiMapper.SetUpDevice() + if err == nil { + t.Fatal("mapper unexpectedly succeeded") + } + + // Check that all directories have been cleaned + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + stagingPath := csiMapper.getStagingPath() + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} + func TestBlockMapperMapPodDevice(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() @@ -298,10 +347,10 @@ func TestBlockMapperMapPodDevice(t *testing.T) { csiMapper.csiClient = setupClient(t, true) - attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = true - _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to setup VolumeAttachment: %v", err) } @@ -430,3 +479,123 @@ func TestBlockMapperTearDownDevice(t *testing.T) { t.Error("csi server may not have received NodeUnstageVolume call") } } + +func TestVolumeSetupTeardown(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + // Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device. + // Focus on testing that there were no leftover files present after the cleanup. + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + err = csiMapper.SetUpDevice() + if err != nil { + t.Fatalf("mapper failed to SetupDevice: %v", err) + } + // Check if NodeStageVolume staged to the right path + stagingPath := csiMapper.getStagingPath() + svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + svol, ok := svols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodeStageVolume call") + } + if svol.Path != stagingPath { + t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) + } + + path, err := csiMapper.MapPodDevice() + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + publishPath := csiMapper.getPublishPath() + if pvol.Path != publishPath { + t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) + } + if path != publishPath { + t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path) + } + + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + + csiUnmapper := unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec) + if err != nil { + t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) + } + + err = csiUnmapper.UnmapPodDevice() + if err != nil { + t.Errorf("unmapper failed to call UnmapPodDevice: %v", err) + } + + // GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume + unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "") + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + csiUnmapper = unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") + if err != nil { + t.Fatal(err) + } + pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + if _, ok := pubs[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnpublishVolume call") + } + vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if _, ok := vols[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnstageVolume call") + } + + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + if _, err := os.Stat(publishPath); err == nil { + t.Errorf("volume publish path %s was not deleted", publishPath) + } + publishDir := filepath.Dir(publishPath) + if _, err := os.Stat(publishDir); err == nil { + t.Errorf("volume publish parent directory %s was not deleted", publishDir) + } + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 83080da2575..97c6da7dea7 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -20,12 +20,15 @@ import ( "context" "errors" "io" + "os" + "path/filepath" "reflect" "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -147,15 +150,22 @@ func (c *fakeCsiDriverClient) NodePublishVolume( AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } + _, err := c.nodeClient.NodePublishVolume(ctx, req) if err != nil && !isFinalError(err) { return volumetypes.NewUncertainProgressError(err.Error()) @@ -193,16 +203,22 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context, AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, Secrets: secrets, VolumeContext: volumeContext, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } _, err := c.nodeClient.NodeStageVolume(ctx, req) if err != nil && !isFinalError(err) { @@ -370,6 +386,13 @@ func TestClientNodeGetInfo(t *testing.T) { } func TestClientNodePublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -378,11 +401,11 @@ func TestClientNodePublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -419,6 +442,13 @@ func TestClientNodePublishVolume(t *testing.T) { } func TestClientNodeUnpublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -426,10 +456,10 @@ func TestClientNodeUnpublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, - {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, + {name: "missing target path", volID: testPath, mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -454,6 +484,13 @@ func TestClientNodeUnpublishVolume(t *testing.T) { } func TestClientNodeStageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -464,11 +501,11 @@ func TestClientNodeStageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "ext4", mountOptions: []string{"unvalidated"}}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -503,6 +540,13 @@ func TestClientNodeStageVolume(t *testing.T) { } func TestClientNodeUnstageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -510,10 +554,10 @@ func TestClientNodeUnstageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path"}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index ca678bfac45..96b3c497f0e 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -108,20 +108,24 @@ func log(msg string, parts ...interface{}) string { return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...) } +// getVolumePluginDir returns the path where CSI plugin keeps metadata for given volume +func getVolumePluginDir(specVolID string, host volume.VolumeHost) string { + sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) + return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID) +} + // getVolumeDevicePluginDir returns the path where the CSI plugin keeps the // symlink for a block device associated with a given specVolumeID. // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string { - sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev") + return filepath.Join(getVolumePluginDir(specVolID, host), "dev") } // getVolumeDeviceDataDir returns the path where the CSI plugin keeps the // volume data for a block device associated with a given specVolumeID. // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string { - sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data") + return filepath.Join(getVolumePluginDir(specVolID, host), "data") } // hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 0ce54f50ad8..0c9c446fba1 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -19,6 +19,9 @@ package fake import ( "context" "errors" + "fmt" + "io/ioutil" + "os" "strings" csipb "github.com/container-storage-interface/spec/lib/go/csi" @@ -172,14 +175,29 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli return nil, timeoutErr } - f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ + // "Creation of target_path is the responsibility of the SP." + // Our plugin depends on it. + if req.VolumeCapability.GetBlock() != nil { + if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil { + return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err) + } + } else { + if err := os.MkdirAll(req.TargetPath, 0755); err != nil { + return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err) + } + } + + publishedVolume := CSIVolume{ VolumeHandle: req.GetVolumeId(), Path: req.GetTargetPath(), DeviceMountPath: req.GetStagingTargetPath(), VolumeContext: req.GetVolumeContext(), - FSType: req.GetVolumeCapability().GetMount().GetFsType(), - MountFlags: req.GetVolumeCapability().GetMount().MountFlags, } + if req.GetVolumeCapability().GetMount() != nil { + publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType + publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags + } + f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume return &csipb.NodePublishVolumeResponse{}, nil } @@ -196,6 +214,12 @@ func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnp return nil, errors.New("missing target path") } delete(f.nodePublishedVolumes, req.GetVolumeId()) + + // "The SP MUST delete the file or directory it created at this path." + if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err) + } + return &csipb.NodeUnpublishVolumeResponse{}, nil }