Add CSI block volume directory cleanup

CSI volume plugin creates number of files/directories when processing block
volumes. These files must be cleaned when the plugin is done with the
volume, i.e. at the end on TearDownDevice().
This commit is contained in:
Jan Safranek 2020-02-13 12:19:31 +01:00
parent 8ca96f3e07
commit e2d8e575f0
4 changed files with 328 additions and 30 deletions

View File

@ -72,9 +72,11 @@ import (
"os"
"path/filepath"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -299,6 +301,13 @@ func (m *csiBlockMapper) SetUpDevice() error {
// Call NodeStageVolume
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
if volumetypes.IsOperationFinishedError(err) {
cleanupErr := m.cleanupOrphanDeviceFiles()
if cleanupErr != nil {
// V(4) for not so serious error
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
}
}
return err
}
@ -435,6 +444,57 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return err
}
}
if err = m.cleanupOrphanDeviceFiles(); err != nil {
return err
}
return nil
}
// Clean up any orphan files / directories when a block volume is being unstaged.
// At this point we can be sure that there is no pod using the volume and all
// files are indeed orphaned.
func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
// Remove artifacts of NodePublish.
// publishPath: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name>/<pod UUID>
// publishPath was removed by the driver. We need to remove the <volume name>/ dir.
publishPath := m.getPublishPath()
publishDir := filepath.Dir(publishPath)
if m.podUID == "" {
// Pod UID is not known during device teardown ("NodeUnstage").
// getPublishPath() squashed "<volume name>/<pod UUID>" into "<volume name>/".
publishDir = publishPath
}
if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to publish directory [%s]: %v", publishDir, err))
}
// Remove artifacts of NodeStage.
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
stagingPath := m.getStagingPath()
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
}
// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>.
// At this point it contains only "data/vol_data.json" and empty "dev/".
dataDir := getVolumeDeviceDataDir(m.specName, m.plugin.host)
dataFile := filepath.Join(dataDir, volDataFileName)
if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err))
}
if err := os.Remove(dataDir); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to delete volume data directory [%s]: %v", dataDir, err))
}
volumeDir := filepath.Dir(dataDir)
deviceDir := filepath.Join(volumeDir, "dev")
if err := os.Remove(deviceDir); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to delete volume directory [%s]: %v", deviceDir, err))
}
if err := os.Remove(volumeDir); err != nil && !os.IsNotExist(err) {
return errors.New(log("failed to delete volume directory [%s]: %v", volumeDir, err))
}
return nil
}

View File

@ -18,6 +18,7 @@ package csi
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) {
}
}
func TestBlockMapperSetupDeviceError(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
pvName := pv.GetName()
nodeName := string(plug.host.GetNodeName())
csiMapper.csiClient = setupClient(t, true)
fClient := csiMapper.csiClient.(*fakeCsiDriverClient)
fClient.nodeClient.SetNextError(errors.New("mock final error"))
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = true
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
t.Log("created attachement ", attachID)
err = csiMapper.SetUpDevice()
if err == nil {
t.Fatal("mapper unexpectedly succeeded")
}
// Check that all directories have been cleaned
// Check that all metadata / staging / publish directories were deleted
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if _, err := os.Stat(dataDir); err == nil {
t.Errorf("volume publish data directory %s was not deleted", dataDir)
}
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if _, err := os.Stat(devDir); err == nil {
t.Errorf("volume publish device directory %s was not deleted", devDir)
}
stagingPath := csiMapper.getStagingPath()
if _, err := os.Stat(stagingPath); err == nil {
t.Errorf("volume staging path %s was not deleted", stagingPath)
}
}
func TestBlockMapperMapPodDevice(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
@ -430,3 +479,124 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
t.Error("csi server may not have received NodeUnstageVolume call")
}
}
func TestVolumeSetupTeardown(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
pvName := pv.GetName()
nodeName := string(plug.host.GetNodeName())
csiMapper.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = true
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
t.Log("created attachement ", attachID)
// SetupDevice
err = csiMapper.SetUpDevice()
if err != nil {
t.Fatalf("mapper failed to SetupDevice: %v", err)
}
// Check if NodeStageVolume staged to the right path
stagingPath := csiMapper.getStagingPath()
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
svol, ok := svols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodeStageVolume call")
}
if svol.Path != stagingPath {
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
}
// MapPodDevice
path, err := csiMapper.MapPodDevice()
if err != nil {
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
}
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
pvol, ok := pvols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
publishPath := csiMapper.getPublishPath()
if pvol.Path != publishPath {
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
}
if path != publishPath {
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
}
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
if err != nil {
t.Fatalf("failed to make a new Unmapper: %v", err)
}
csiUnmapper := unmapper.(*csiBlockMapper)
csiUnmapper.csiClient = csiMapper.csiClient
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
if err != nil {
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
}
// UnmapDevice
err = csiUnmapper.UnmapPodDevice()
if err != nil {
t.Errorf("unmapper failed to call UnmapPodDevice: %v", err)
}
// GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume
unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "")
if err != nil {
t.Fatalf("failed to make a new Unmapper: %v", err)
}
csiUnmapper = unmapper.(*csiBlockMapper)
csiUnmapper.csiClient = csiMapper.csiClient
// TearDownDevice
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
if err != nil {
t.Fatal(err)
}
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if _, ok := pubs[csiUnmapper.volumeID]; ok {
t.Error("csi server may not have received NodeUnpublishVolume call")
}
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
if _, ok := vols[csiUnmapper.volumeID]; ok {
t.Error("csi server may not have received NodeUnstageVolume call")
}
// Check that all metadata / staging / publish directories were deleted
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if _, err := os.Stat(dataDir); err == nil {
t.Errorf("volume publish data directory %s was not deleted", dataDir)
}
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if _, err := os.Stat(devDir); err == nil {
t.Errorf("volume publish device directory %s was not deleted", devDir)
}
if _, err := os.Stat(publishPath); err == nil {
t.Errorf("volume publish path %s was not deleted", publishPath)
}
publishDir := filepath.Dir(publishPath)
if _, err := os.Stat(publishDir); err == nil {
t.Errorf("volume publish parent directory %s was not deleted", publishDir)
}
if _, err := os.Stat(stagingPath); err == nil {
t.Errorf("volume staging path %s was not deleted", stagingPath)
}
}

View File

@ -20,12 +20,15 @@ import (
"context"
"errors"
"io"
"os"
"path/filepath"
"reflect"
"testing"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -147,15 +150,22 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
AccessType: &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
},
},
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
Block: &csipbv1.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
}
}
_, err := c.nodeClient.NodePublishVolume(ctx, req)
if err != nil && !isFinalError(err) {
return volumetypes.NewUncertainProgressError(err.Error())
@ -193,16 +203,22 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
AccessType: &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
},
},
Secrets: secrets,
VolumeContext: volumeContext,
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
Block: &csipbv1.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
}
}
_, err := c.nodeClient.NodeStageVolume(ctx, req)
if err != nil && !isFinalError(err) {
@ -370,6 +386,13 @@ func TestClientNodeGetInfo(t *testing.T) {
}
func TestClientNodePublishVolume(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
testPath := filepath.Join(tmpDir, "path")
testCases := []struct {
name string
volID string
@ -378,11 +401,11 @@ func TestClientNodePublishVolume(t *testing.T) {
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "test ok", volID: "vol-test", targetPath: testPath},
{name: "missing volID", targetPath: testPath, mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
{name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
@ -419,6 +442,13 @@ func TestClientNodePublishVolume(t *testing.T) {
}
func TestClientNodeUnpublishVolume(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
testPath := filepath.Join(tmpDir, "path")
testCases := []struct {
name string
volID string
@ -426,10 +456,10 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
{name: "test ok", volID: "vol-test", targetPath: testPath},
{name: "missing volID", targetPath: testPath, mustFail: true},
{name: "missing target path", volID: testPath, mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
@ -454,6 +484,13 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
}
func TestClientNodeStageVolume(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
testPath := filepath.Join(tmpDir, "/test/path")
testCases := []struct {
name string
volID string
@ -464,11 +501,11 @@ func TestClientNodeStageVolume(t *testing.T) {
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "ext4", mountOptions: []string{"unvalidated"}},
{name: "missing volID", stagingTargetPath: "/test/path", mustFail: true},
{name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}},
{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "bad fs", volID: "vol-test", stagingTargetPath: "/test/path", fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
{name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
@ -503,6 +540,13 @@ func TestClientNodeStageVolume(t *testing.T) {
}
func TestClientNodeUnstageVolume(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
testPath := filepath.Join(tmpDir, "/test/path")
testCases := []struct {
name string
volID string
@ -510,10 +554,10 @@ func TestClientNodeUnstageVolume(t *testing.T) {
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", stagingTargetPath: "/test/path"},
{name: "missing volID", stagingTargetPath: "/test/path", mustFail: true},
{name: "test ok", volID: "vol-test", stagingTargetPath: testPath},
{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "grpc error", volID: "vol-test", stagingTargetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {

View File

@ -19,6 +19,9 @@ package fake
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
@ -172,14 +175,29 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
return nil, timeoutErr
}
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
// "Creation of target_path is the responsibility of the SP."
// Our plugin depends on it.
if req.VolumeCapability.GetBlock() != nil {
if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil {
return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err)
}
} else {
if err := os.MkdirAll(req.TargetPath, 0755); err != nil {
return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err)
}
}
publishedVolume := CSIVolume{
VolumeHandle: req.GetVolumeId(),
Path: req.GetTargetPath(),
DeviceMountPath: req.GetStagingTargetPath(),
VolumeContext: req.GetVolumeContext(),
FSType: req.GetVolumeCapability().GetMount().GetFsType(),
MountFlags: req.GetVolumeCapability().GetMount().MountFlags,
}
if req.GetVolumeCapability().GetMount() != nil {
publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags
}
f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume
return &csipb.NodePublishVolumeResponse{}, nil
}
@ -196,6 +214,12 @@ func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnp
return nil, errors.New("missing target path")
}
delete(f.nodePublishedVolumes, req.GetVolumeId())
// "The SP MUST delete the file or directory it created at this path."
if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err)
}
return &csipb.NodeUnpublishVolumeResponse{}, nil
}