From e2d8e575f04527bc2036bcaa0bae6d2d67409d0c Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 13 Feb 2020 12:19:31 +0100 Subject: [PATCH 1/3] Add CSI block volume directory cleanup CSI volume plugin creates number of files/directories when processing block volumes. These files must be cleaned when the plugin is done with the volume, i.e. at the end on TearDownDevice(). --- pkg/volume/csi/csi_block.go | 62 ++++++++++- pkg/volume/csi/csi_block_test.go | 170 +++++++++++++++++++++++++++++ pkg/volume/csi/csi_client_test.go | 98 ++++++++++++----- pkg/volume/csi/fake/fake_client.go | 28 ++++- 4 files changed, 328 insertions(+), 30 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 43484b3ad2c..38004d9210d 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -72,9 +72,11 @@ import ( "os" "path/filepath" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -299,6 +301,13 @@ func (m *csiBlockMapper) SetUpDevice() error { // Call NodeStageVolume _, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { + if volumetypes.IsOperationFinishedError(err) { + cleanupErr := m.cleanupOrphanDeviceFiles() + if cleanupErr != nil { + // V(4) for not so serious error + klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr) + } + } return err } @@ -435,6 +444,57 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return err } } + if err = m.cleanupOrphanDeviceFiles(); err != nil { + return err + } + + return nil +} + +// Clean up any orphan files / directories when a block volume is being unstaged. +// At this point we can be sure that there is no pod using the volume and all +// files are indeed orphaned. +func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { + // Remove artifacts of NodePublish. + // publishPath: xxx/plugins/kubernetes.io/csi/volumeDevices/publish// + // publishPath was removed by the driver. We need to remove the / dir. + publishPath := m.getPublishPath() + publishDir := filepath.Dir(publishPath) + if m.podUID == "" { + // Pod UID is not known during device teardown ("NodeUnstage"). + // getPublishPath() squashed "/" into "/". + publishDir = publishPath + } + if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to publish directory [%s]: %v", publishDir, err)) + } + + // Remove artifacts of NodeStage. + // stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/ + stagingPath := m.getStagingPath() + if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err)) + } + + // Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/. + // At this point it contains only "data/vol_data.json" and empty "dev/". + dataDir := getVolumeDeviceDataDir(m.specName, m.plugin.host) + dataFile := filepath.Join(dataDir, volDataFileName) + if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err)) + } + if err := os.Remove(dataDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume data directory [%s]: %v", dataDir, err)) + } + + volumeDir := filepath.Dir(dataDir) + deviceDir := filepath.Join(volumeDir, "dev") + if err := os.Remove(deviceDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume directory [%s]: %v", deviceDir, err)) + } + if err := os.Remove(volumeDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume directory [%s]: %v", volumeDir, err)) + } return nil } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 0823e9d2107..762e74e003b 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -18,6 +18,7 @@ package csi import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) { } } +func TestBlockMapperSetupDeviceError(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + fClient := csiMapper.csiClient.(*fakeCsiDriverClient) + fClient.nodeClient.SetNextError(errors.New("mock final error")) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + err = csiMapper.SetUpDevice() + if err == nil { + t.Fatal("mapper unexpectedly succeeded") + } + + // Check that all directories have been cleaned + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + stagingPath := csiMapper.getStagingPath() + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} + func TestBlockMapperMapPodDevice(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() @@ -430,3 +479,124 @@ func TestBlockMapperTearDownDevice(t *testing.T) { t.Error("csi server may not have received NodeUnstageVolume call") } } + +func TestVolumeSetupTeardown(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + // SetupDevice + err = csiMapper.SetUpDevice() + if err != nil { + t.Fatalf("mapper failed to SetupDevice: %v", err) + } + // Check if NodeStageVolume staged to the right path + stagingPath := csiMapper.getStagingPath() + svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + svol, ok := svols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodeStageVolume call") + } + if svol.Path != stagingPath { + t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) + } + + // MapPodDevice + path, err := csiMapper.MapPodDevice() + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + publishPath := csiMapper.getPublishPath() + if pvol.Path != publishPath { + t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) + } + if path != publishPath { + t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path) + } + + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + + csiUnmapper := unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec) + if err != nil { + t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) + } + + // UnmapDevice + err = csiUnmapper.UnmapPodDevice() + if err != nil { + t.Errorf("unmapper failed to call UnmapPodDevice: %v", err) + } + + // GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume + unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "") + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + csiUnmapper = unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + // TearDownDevice + err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") + if err != nil { + t.Fatal(err) + } + pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + if _, ok := pubs[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnpublishVolume call") + } + vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if _, ok := vols[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnstageVolume call") + } + + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + if _, err := os.Stat(publishPath); err == nil { + t.Errorf("volume publish path %s was not deleted", publishPath) + } + publishDir := filepath.Dir(publishPath) + if _, err := os.Stat(publishDir); err == nil { + t.Errorf("volume publish parent directory %s was not deleted", publishDir) + } + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 83080da2575..97c6da7dea7 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -20,12 +20,15 @@ import ( "context" "errors" "io" + "os" + "path/filepath" "reflect" "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -147,15 +150,22 @@ func (c *fakeCsiDriverClient) NodePublishVolume( AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } + _, err := c.nodeClient.NodePublishVolume(ctx, req) if err != nil && !isFinalError(err) { return volumetypes.NewUncertainProgressError(err.Error()) @@ -193,16 +203,22 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context, AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, Secrets: secrets, VolumeContext: volumeContext, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } _, err := c.nodeClient.NodeStageVolume(ctx, req) if err != nil && !isFinalError(err) { @@ -370,6 +386,13 @@ func TestClientNodeGetInfo(t *testing.T) { } func TestClientNodePublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -378,11 +401,11 @@ func TestClientNodePublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -419,6 +442,13 @@ func TestClientNodePublishVolume(t *testing.T) { } func TestClientNodeUnpublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -426,10 +456,10 @@ func TestClientNodeUnpublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, - {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, + {name: "missing target path", volID: testPath, mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -454,6 +484,13 @@ func TestClientNodeUnpublishVolume(t *testing.T) { } func TestClientNodeStageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -464,11 +501,11 @@ func TestClientNodeStageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "ext4", mountOptions: []string{"unvalidated"}}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -503,6 +540,13 @@ func TestClientNodeStageVolume(t *testing.T) { } func TestClientNodeUnstageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -510,10 +554,10 @@ func TestClientNodeUnstageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path"}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 0ce54f50ad8..463683894b9 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -19,6 +19,9 @@ package fake import ( "context" "errors" + "fmt" + "io/ioutil" + "os" "strings" csipb "github.com/container-storage-interface/spec/lib/go/csi" @@ -172,14 +175,29 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli return nil, timeoutErr } - f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ + // "Creation of target_path is the responsibility of the SP." + // Our plugin depends on it. + if req.VolumeCapability.GetBlock() != nil { + if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil { + return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err) + } + } else { + if err := os.MkdirAll(req.TargetPath, 0755); err != nil { + return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err) + } + } + + publishedVolume := CSIVolume{ VolumeHandle: req.GetVolumeId(), Path: req.GetTargetPath(), DeviceMountPath: req.GetStagingTargetPath(), VolumeContext: req.GetVolumeContext(), FSType: req.GetVolumeCapability().GetMount().GetFsType(), - MountFlags: req.GetVolumeCapability().GetMount().MountFlags, } + if req.GetVolumeCapability().GetMount() != nil { + publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags + } + f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume return &csipb.NodePublishVolumeResponse{}, nil } @@ -196,6 +214,12 @@ func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnp return nil, errors.New("missing target path") } delete(f.nodePublishedVolumes, req.GetVolumeId()) + + // "The SP MUST delete the file or directory it created at this path." + if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err) + } + return &csipb.NodeUnpublishVolumeResponse{}, nil } From 0bd2e629c73cb3b32364bb82fad7875fde3f8a43 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 13 Feb 2020 11:26:54 +0100 Subject: [PATCH 2/3] Fix unit tests --- pkg/volume/csi/csi_block.go | 2 +- pkg/volume/csi/csi_block_test.go | 13 ++++++------- pkg/volume/csi/fake/fake_client.go | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 38004d9210d..20cd9791790 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -466,7 +466,7 @@ func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { publishDir = publishPath } if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { - return errors.New(log("failed to publish directory [%s]: %v", publishDir, err)) + return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err)) } // Remove artifacts of NodeStage. diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 762e74e003b..e856da79427 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -304,7 +304,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) { attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = true - _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to setup VolumeAttachment: %v", err) } @@ -347,10 +347,10 @@ func TestBlockMapperMapPodDevice(t *testing.T) { csiMapper.csiClient = setupClient(t, true) - attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = true - _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to setup VolumeAttachment: %v", err) } @@ -483,6 +483,9 @@ func TestBlockMapperTearDownDevice(t *testing.T) { func TestVolumeSetupTeardown(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + // Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device. + // Focus on testing that there were no leftover files present after the cleanup. + plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) @@ -505,7 +508,6 @@ func TestVolumeSetupTeardown(t *testing.T) { } t.Log("created attachement ", attachID) - // SetupDevice err = csiMapper.SetUpDevice() if err != nil { t.Fatalf("mapper failed to SetupDevice: %v", err) @@ -521,7 +523,6 @@ func TestVolumeSetupTeardown(t *testing.T) { t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) } - // MapPodDevice path, err := csiMapper.MapPodDevice() if err != nil { t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) @@ -552,7 +553,6 @@ func TestVolumeSetupTeardown(t *testing.T) { t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) } - // UnmapDevice err = csiUnmapper.UnmapPodDevice() if err != nil { t.Errorf("unmapper failed to call UnmapPodDevice: %v", err) @@ -566,7 +566,6 @@ func TestVolumeSetupTeardown(t *testing.T) { csiUnmapper = unmapper.(*csiBlockMapper) csiUnmapper.csiClient = csiMapper.csiClient - // TearDownDevice err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") if err != nil { t.Fatal(err) diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 463683894b9..0c9c446fba1 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -192,9 +192,9 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli Path: req.GetTargetPath(), DeviceMountPath: req.GetStagingTargetPath(), VolumeContext: req.GetVolumeContext(), - FSType: req.GetVolumeCapability().GetMount().GetFsType(), } if req.GetVolumeCapability().GetMount() != nil { + publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags } f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume From 073d0b234076b91b83f97085656c8b614955a04f Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 17 Feb 2020 10:51:39 +0100 Subject: [PATCH 3/3] Add getPublishDir and getVolumePluginDir So we don't need to compute these backwards from getPublishPath and getVolumeDevicePluginDir. --- pkg/volume/csi/BUILD | 1 + pkg/volume/csi/csi_block.go | 51 +++++++++++++++---------------------- pkg/volume/csi/csi_util.go | 12 ++++++--- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index f81314c7565..6d237e7f052 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/features:go_default_library", + "//pkg/util/removeall:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi/nodeinfomanager:go_default_library", "//pkg/volume/util:go_default_library", diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 20cd9791790..ee2ae319ee4 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -72,16 +72,15 @@ import ( "os" "path/filepath" - volumetypes "k8s.io/kubernetes/pkg/volume/util/types" - - "k8s.io/klog" - v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/removeall" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" utilstrings "k8s.io/utils/strings" ) @@ -115,10 +114,16 @@ func (m *csiBlockMapper) getStagingPath() string { return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName) } +// getPublishDir returns path to a directory, where the volume is published to each pod. +// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName} +func (m *csiBlockMapper) getPublishDir() string { + return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName) +} + // getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} func (m *csiBlockMapper) getPublishPath() string { - return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID)) + return filepath.Join(m.getPublishDir(), string(m.podUID)) } // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume @@ -445,7 +450,8 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error } } if err = m.cleanupOrphanDeviceFiles(); err != nil { - return err + // V(4) for not so serious error + klog.V(4).Infof("Failed to clean up block volume directory %s", err) } return nil @@ -456,15 +462,10 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error // files are indeed orphaned. func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { // Remove artifacts of NodePublish. - // publishPath: xxx/plugins/kubernetes.io/csi/volumeDevices/publish// - // publishPath was removed by the driver. We need to remove the / dir. - publishPath := m.getPublishPath() - publishDir := filepath.Dir(publishPath) - if m.podUID == "" { - // Pod UID is not known during device teardown ("NodeUnstage"). - // getPublishPath() squashed "/" into "/". - publishDir = publishPath - } + // publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/ + // Each PublishVolume() created a subdirectory there. Since everything should be + // already unpublished at this point, the directory should be empty by now. + publishDir := m.getPublishDir() if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err)) } @@ -478,22 +479,10 @@ func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { // Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/. // At this point it contains only "data/vol_data.json" and empty "dev/". - dataDir := getVolumeDeviceDataDir(m.specName, m.plugin.host) - dataFile := filepath.Join(dataDir, volDataFileName) - if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) { - return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err)) - } - if err := os.Remove(dataDir); err != nil && !os.IsNotExist(err) { - return errors.New(log("failed to delete volume data directory [%s]: %v", dataDir, err)) - } - - volumeDir := filepath.Dir(dataDir) - deviceDir := filepath.Join(volumeDir, "dev") - if err := os.Remove(deviceDir); err != nil && !os.IsNotExist(err) { - return errors.New(log("failed to delete volume directory [%s]: %v", deviceDir, err)) - } - if err := os.Remove(volumeDir); err != nil && !os.IsNotExist(err) { - return errors.New(log("failed to delete volume directory [%s]: %v", volumeDir, err)) + volumeDir := getVolumePluginDir(m.specName, m.plugin.host) + mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName()) + if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil { + return err } return nil diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index ca678bfac45..96b3c497f0e 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -108,20 +108,24 @@ func log(msg string, parts ...interface{}) string { return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...) } +// getVolumePluginDir returns the path where CSI plugin keeps metadata for given volume +func getVolumePluginDir(specVolID string, host volume.VolumeHost) string { + sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) + return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID) +} + // getVolumeDevicePluginDir returns the path where the CSI plugin keeps the // symlink for a block device associated with a given specVolumeID. // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string { - sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev") + return filepath.Join(getVolumePluginDir(specVolID, host), "dev") } // getVolumeDeviceDataDir returns the path where the CSI plugin keeps the // volume data for a block device associated with a given specVolumeID. // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string { - sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data") + return filepath.Join(getVolumePluginDir(specVolID, host), "data") } // hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce