diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 43484b3ad2c..38004d9210d 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -72,9 +72,11 @@ import ( "os" "path/filepath" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -299,6 +301,13 @@ func (m *csiBlockMapper) SetUpDevice() error { // Call NodeStageVolume _, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { + if volumetypes.IsOperationFinishedError(err) { + cleanupErr := m.cleanupOrphanDeviceFiles() + if cleanupErr != nil { + // V(4) for not so serious error + klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr) + } + } return err } @@ -435,6 +444,57 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return err } } + if err = m.cleanupOrphanDeviceFiles(); err != nil { + return err + } + + return nil +} + +// Clean up any orphan files / directories when a block volume is being unstaged. +// At this point we can be sure that there is no pod using the volume and all +// files are indeed orphaned. +func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { + // Remove artifacts of NodePublish. + // publishPath: xxx/plugins/kubernetes.io/csi/volumeDevices/publish// + // publishPath was removed by the driver. We need to remove the / dir. + publishPath := m.getPublishPath() + publishDir := filepath.Dir(publishPath) + if m.podUID == "" { + // Pod UID is not known during device teardown ("NodeUnstage"). + // getPublishPath() squashed "/" into "/". + publishDir = publishPath + } + if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to publish directory [%s]: %v", publishDir, err)) + } + + // Remove artifacts of NodeStage. + // stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/ + stagingPath := m.getStagingPath() + if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err)) + } + + // Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/. + // At this point it contains only "data/vol_data.json" and empty "dev/". + dataDir := getVolumeDeviceDataDir(m.specName, m.plugin.host) + dataFile := filepath.Join(dataDir, volDataFileName) + if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err)) + } + if err := os.Remove(dataDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume data directory [%s]: %v", dataDir, err)) + } + + volumeDir := filepath.Dir(dataDir) + deviceDir := filepath.Join(volumeDir, "dev") + if err := os.Remove(deviceDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume directory [%s]: %v", deviceDir, err)) + } + if err := os.Remove(volumeDir); err != nil && !os.IsNotExist(err) { + return errors.New(log("failed to delete volume directory [%s]: %v", volumeDir, err)) + } return nil } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 0823e9d2107..762e74e003b 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -18,6 +18,7 @@ package csi import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) { } } +func TestBlockMapperSetupDeviceError(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + fClient := csiMapper.csiClient.(*fakeCsiDriverClient) + fClient.nodeClient.SetNextError(errors.New("mock final error")) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + err = csiMapper.SetUpDevice() + if err == nil { + t.Fatal("mapper unexpectedly succeeded") + } + + // Check that all directories have been cleaned + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + stagingPath := csiMapper.getStagingPath() + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} + func TestBlockMapperMapPodDevice(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() @@ -430,3 +479,124 @@ func TestBlockMapperTearDownDevice(t *testing.T) { t.Error("csi server may not have received NodeUnstageVolume call") } } + +func TestVolumeSetupTeardown(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + + csiMapper.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + // SetupDevice + err = csiMapper.SetUpDevice() + if err != nil { + t.Fatalf("mapper failed to SetupDevice: %v", err) + } + // Check if NodeStageVolume staged to the right path + stagingPath := csiMapper.getStagingPath() + svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + svol, ok := svols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodeStageVolume call") + } + if svol.Path != stagingPath { + t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) + } + + // MapPodDevice + path, err := csiMapper.MapPodDevice() + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + publishPath := csiMapper.getPublishPath() + if pvol.Path != publishPath { + t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) + } + if path != publishPath { + t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path) + } + + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + + csiUnmapper := unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec) + if err != nil { + t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) + } + + // UnmapDevice + err = csiUnmapper.UnmapPodDevice() + if err != nil { + t.Errorf("unmapper failed to call UnmapPodDevice: %v", err) + } + + // GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume + unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "") + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + csiUnmapper = unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = csiMapper.csiClient + + // TearDownDevice + err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") + if err != nil { + t.Fatal(err) + } + pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + if _, ok := pubs[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnpublishVolume call") + } + vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if _, ok := vols[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnstageVolume call") + } + + // Check that all metadata / staging / publish directories were deleted + dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(dataDir); err == nil { + t.Errorf("volume publish data directory %s was not deleted", dataDir) + } + devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if _, err := os.Stat(devDir); err == nil { + t.Errorf("volume publish device directory %s was not deleted", devDir) + } + if _, err := os.Stat(publishPath); err == nil { + t.Errorf("volume publish path %s was not deleted", publishPath) + } + publishDir := filepath.Dir(publishPath) + if _, err := os.Stat(publishDir); err == nil { + t.Errorf("volume publish parent directory %s was not deleted", publishDir) + } + if _, err := os.Stat(stagingPath); err == nil { + t.Errorf("volume staging path %s was not deleted", stagingPath) + } +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 83080da2575..97c6da7dea7 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -20,12 +20,15 @@ import ( "context" "errors" "io" + "os" + "path/filepath" "reflect" "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -147,15 +150,22 @@ func (c *fakeCsiDriverClient) NodePublishVolume( AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } + _, err := c.nodeClient.NodePublishVolume(ctx, req) if err != nil && !isFinalError(err) { return volumetypes.NewUncertainProgressError(err.Error()) @@ -193,16 +203,22 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context, AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), }, - AccessType: &csipbv1.VolumeCapability_Mount{ - Mount: &csipbv1.VolumeCapability_MountVolume{ - FsType: fsType, - MountFlags: mountOptions, - }, - }, }, Secrets: secrets, VolumeContext: volumeContext, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: fsType, + MountFlags: mountOptions, + }, + } + } _, err := c.nodeClient.NodeStageVolume(ctx, req) if err != nil && !isFinalError(err) { @@ -370,6 +386,13 @@ func TestClientNodeGetInfo(t *testing.T) { } func TestClientNodePublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -378,11 +401,11 @@ func TestClientNodePublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -419,6 +442,13 @@ func TestClientNodePublishVolume(t *testing.T) { } func TestClientNodeUnpublishVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "path") + testCases := []struct { name string volID string @@ -426,10 +456,10 @@ func TestClientNodeUnpublishVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, - {name: "missing volID", targetPath: "/test/path", mustFail: true}, - {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "test ok", volID: "vol-test", targetPath: testPath}, + {name: "missing volID", targetPath: testPath, mustFail: true}, + {name: "missing target path", volID: testPath, mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -454,6 +484,13 @@ func TestClientNodeUnpublishVolume(t *testing.T) { } func TestClientNodeStageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -464,11 +501,11 @@ func TestClientNodeStageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "ext4", mountOptions: []string{"unvalidated"}}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "bad fs", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "badfs", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { @@ -503,6 +540,13 @@ func TestClientNodeStageVolume(t *testing.T) { } func TestClientNodeUnstageVolume(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + testPath := filepath.Join(tmpDir, "/test/path") + testCases := []struct { name string volID string @@ -510,10 +554,10 @@ func TestClientNodeUnstageVolume(t *testing.T) { mustFail bool err error }{ - {name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path"}, - {name: "missing volID", stagingTargetPath: "/test/path", mustFail: true}, + {name: "test ok", volID: "vol-test", stagingTargetPath: testPath}, + {name: "missing volID", stagingTargetPath: testPath, mustFail: true}, {name: "missing target path", volID: "vol-test", mustFail: true}, - {name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")}, } for _, tc := range testCases { diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 0ce54f50ad8..463683894b9 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -19,6 +19,9 @@ package fake import ( "context" "errors" + "fmt" + "io/ioutil" + "os" "strings" csipb "github.com/container-storage-interface/spec/lib/go/csi" @@ -172,14 +175,29 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli return nil, timeoutErr } - f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ + // "Creation of target_path is the responsibility of the SP." + // Our plugin depends on it. + if req.VolumeCapability.GetBlock() != nil { + if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil { + return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err) + } + } else { + if err := os.MkdirAll(req.TargetPath, 0755); err != nil { + return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err) + } + } + + publishedVolume := CSIVolume{ VolumeHandle: req.GetVolumeId(), Path: req.GetTargetPath(), DeviceMountPath: req.GetStagingTargetPath(), VolumeContext: req.GetVolumeContext(), FSType: req.GetVolumeCapability().GetMount().GetFsType(), - MountFlags: req.GetVolumeCapability().GetMount().MountFlags, } + if req.GetVolumeCapability().GetMount() != nil { + publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags + } + f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume return &csipb.NodePublishVolumeResponse{}, nil } @@ -196,6 +214,12 @@ func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnp return nil, errors.New("missing target path") } delete(f.nodePublishedVolumes, req.GetVolumeId()) + + // "The SP MUST delete the file or directory it created at this path." + if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err) + } + return &csipb.NodeUnpublishVolumeResponse{}, nil }