diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 8f829cfaa93..88a2e857fe5 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/util/mount:go_default_library", "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", @@ -38,7 +39,6 @@ go_test( importpath = "k8s.io/kubernetes/pkg/volume/csi", library = ":go_default_library", deps = [ - "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi/fake:go_default_library", "//pkg/volume/testing:go_default_library", diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 9db7f868e09..ee1776ce4f8 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -32,6 +32,7 @@ import ( type csiClient interface { AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error + NodeProbe(ctx grpctx.Context, ver *csipb.Version) error NodePublishVolume( ctx grpctx.Context, volumeid string, @@ -135,6 +136,13 @@ func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb. return nil } +func (c *csiDriverClient) NodeProbe(ctx grpctx.Context, ver *csipb.Version) error { + glog.V(4).Info(log("sending NodeProbe rpc call to csi driver: [version %v]", ver)) + req := &csipb.NodeProbeRequest{Version: ver} + _, err := c.nodeClient.NodeProbe(ctx, req) + return err +} + func (c *csiDriverClient) NodePublishVolume( ctx grpctx.Context, volID string, @@ -145,7 +153,7 @@ func (c *csiDriverClient) NodePublishVolume( volumeAttribs map[string]string, fsType string, ) error { - + glog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath)) if volID == "" { return errors.New("missing volume id") } @@ -182,7 +190,7 @@ func (c *csiDriverClient) NodePublishVolume( } func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error { - + glog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath)) if volID == "" { return errors.New("missing volume id") } diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 18d2e83639c..744f2bcc8cc 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -62,6 +62,28 @@ func TestClientAssertSupportedVersion(t *testing.T) { } } +func TestClientNodeProbe(t *testing.T) { + testCases := []struct { + testName string + ver *csipb.Version + mustFail bool + err error + }{ + {testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}}, + {testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")}, + } + + for _, tc := range testCases { + t.Log("case: ", tc.testName) + client := setupClient(t) + client.nodeClient.(*fake.NodeClient).SetNextError(tc.err) + err := client.NodeProbe(grpctx.Background(), tc.ver) + if tc.mustFail && err == nil { + t.Error("must fail, but err = nil") + } + } +} + func TestClientNodePublishVolume(t *testing.T) { testCases := []struct { name string diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 2c4e0ad8492..84d10362bdb 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -20,6 +20,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "path" "github.com/golang/glog" @@ -30,20 +31,39 @@ import ( "k8s.io/client-go/kubernetes" kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" +) + +//TODO (vladimirvivien) move this in a central loc later +var ( + volDataKey = struct { + specVolID, + volHandle, + driverName, + nodeName, + attachmentID string + }{ + "specVolID", + "volumeHandle", + "driverName", + "nodeName", + "attachmentID", + } ) type csiMountMgr struct { - k8s kubernetes.Interface - csiClient csiClient - plugin *csiPlugin - driverName string - volumeID string - readOnly bool - spec *volume.Spec - pod *api.Pod - podUID types.UID - options volume.VolumeOptions - volumeInfo map[string]string + k8s kubernetes.Interface + csiClient csiClient + plugin *csiPlugin + driverName string + volumeID string + specVolumeID string + readOnly bool + spec *volume.Spec + pod *api.Pod + podUID types.UID + options volume.VolumeOptions + volumeInfo map[string]string volume.MetricsNil } @@ -51,14 +71,14 @@ type csiMountMgr struct { var _ volume.Volume = &csiMountMgr{} func (c *csiMountMgr) GetPath() string { - return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host) + dir := path.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host), "/mount") + glog.V(4).Info(log("mounter.GetPath generated [%s]", dir)) + return dir } -func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string { - // driverName validated at Mounter creation - // sanitize (replace / with ~) in volumeID before it's appended to path:w - driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID)) - return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath) +func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string { + specVolID := kstrings.EscapeQualifiedNameForDisk(specVolumeID) + return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), specVolID) } // volume.Mounter methods @@ -77,6 +97,17 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error { func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir)) + mounted, err := isDirMounted(c.plugin, dir) + if err != nil { + glog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir)) + return err + } + + if mounted { + glog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir)) + return nil + } + csiSource, err := getCSISourceFromSpec(c.spec) if err != nil { glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err)) @@ -92,13 +123,19 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { // ensure version is supported if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { - glog.Errorf(log("failed to assert version: %v", err)) + glog.Error(log("mounter.SetUpAt failed to assert version: %v", err)) + return err + } + + // probe driver + // TODO (vladimirvivien) move probe call where it is done only when it is needed. + if err := csi.NodeProbe(ctx, csiVersion); err != nil { + glog.Error(log("mounter.SetUpAt failed to probe driver: %v", err)) return err } // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName if c.volumeInfo == nil { - attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err)) @@ -121,6 +158,31 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { return err } + // create target_dir before call to NodePublish + if err := os.MkdirAll(dir, 0750); err != nil { + glog.Error(log("mouter.SetUpAt failed to create dir %#v: %v", dir, err)) + return err + } + glog.V(4).Info(log("created target path successfully [%s]", dir)) + + // persist volume info data for teardown + volData := map[string]string{ + volDataKey.specVolID: c.spec.Name(), + volDataKey.volHandle: csiSource.VolumeHandle, + volDataKey.driverName: csiSource.Driver, + volDataKey.nodeName: nodeName, + volDataKey.attachmentID: attachID, + } + + if err := saveVolumeData(c.plugin, c.podUID, c.spec.Name(), volData); err != nil { + glog.Error(log("mounter.SetUpAt failed to save volume info data: %v", err)) + if err := removeMountDir(c.plugin, dir); err != nil { + glog.Error(log("mounter.SetUpAt failed to remove mount dir after a saveVolumeData() error [%s]: %v", dir, err)) + return err + } + return err + } + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI accessMode := api.ReadWriteOnce if c.spec.PersistentVolume.Spec.AccessModes != nil { @@ -139,11 +201,15 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { ) if err != nil { - glog.Errorf(log("Mounter.SetupAt failed: %v", err)) + glog.Errorf(log("mounter.SetupAt failed: %v", err)) + if err := removeMountDir(c.plugin, dir); err != nil { + glog.Error(log("mounter.SetuAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err)) + return err + } return err } - glog.V(4).Infof(log("successfully mounted %s", dir)) + glog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir)) return nil } @@ -164,10 +230,30 @@ func (c *csiMountMgr) TearDown() error { func (c *csiMountMgr) TearDownAt(dir string) error { glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir)) - // extract driverName and volID from path - base, volID := path.Split(dir) - volID = kstrings.UnescapeQualifiedNameForDisk(volID) - driverName := path.Base(base) + // is dir even mounted ? + // TODO (vladimirvivien) this check may not work for an emptyDir or local storage + // see https://github.com/kubernetes/kubernetes/pull/56836#discussion_r155834524 + mounted, err := isDirMounted(c.plugin, dir) + if err != nil { + glog.Error(log("unmounter.Teardown failed while checking mount status for dir [%s]: %v", dir, err)) + return err + } + + if !mounted { + glog.V(4).Info(log("unmounter.Teardown skipping unmout, dir not mounted [%s]", dir)) + return nil + } + + // load volume info from file + dataDir := path.Dir(dir) // dropoff /mount at end + data, err := loadVolumeData(dataDir, volDataFileName) + if err != nil { + glog.Error(log("unmounter.Teardown failed to load volume data file using dir [%s]: %v", dir, err)) + return err + } + + volID := data[volDataKey.volHandle] + driverName := data[volDataKey.driverName] if c.csiClient == nil { addr := fmt.Sprintf(csiAddrTemplate, driverName) @@ -183,18 +269,21 @@ func (c *csiMountMgr) TearDownAt(dir string) error { // TODO make all assertion calls private within the client itself if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { - glog.Errorf(log("failed to assert version: %v", err)) + glog.Errorf(log("mounter.SetUpAt failed to assert version: %v", err)) return err } - err := csi.NodeUnpublishVolume(ctx, volID, dir) - - if err != nil { - glog.Errorf(log("Mounter.Setup failed: %v", err)) + if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil { + glog.Errorf(log("mounter.SetUpAt failed: %v", err)) return err } - glog.V(4).Infof(log("successfully unmounted %s", dir)) + // clean mount point dir + if err := removeMountDir(c.plugin, dir); err != nil { + glog.Error(log("mounter.SetUpAt failed to clean mount dir [%s]: %v", dir, err)) + return err + } + glog.V(4).Infof(log("mounte.SetUpAt successfully unmounted dir [%s]", dir)) return nil } @@ -221,3 +310,92 @@ func getVolAttribsFromSpec(spec *volume.Spec) (map[string]string, error) { } return attribs, nil } + +// saveVolumeData persists parameter data as json file using the locagion +// generated by /var/lib/kubelet/pods//volumes/kubernetes.io~csi//volume_data.json +func saveVolumeData(p *csiPlugin, podUID types.UID, specVolID string, data map[string]string) error { + dir := getTargetPath(podUID, specVolID, p.host) + dataFilePath := path.Join(dir, volDataFileName) + + file, err := os.Create(dataFilePath) + if err != nil { + glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) + return err + } + defer file.Close() + if err := json.NewEncoder(file).Encode(data); err != nil { + glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) + return err + } + glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath)) + return nil +} + +// loadVolumeData uses the directory returned by mounter.GetPath with value +// /var/lib/kubelet/pods//volumes/kubernetes.io~csi//mount. +// The function extracts specVolumeID and uses it to load the json data file from dir +// /var/lib/kubelet/pods//volumes/kubernetes.io~csi//volume_data.json +func loadVolumeData(dir string, fileName string) (map[string]string, error) { + // remove /mount at the end + dataFileName := path.Join(dir, fileName) + glog.V(4).Info(log("loading volume data file [%s]", dataFileName)) + + file, err := os.Open(dataFileName) + if err != nil { + glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err)) + return nil, err + } + defer file.Close() + data := map[string]string{} + if err := json.NewDecoder(file).Decode(&data); err != nil { + glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err)) + return nil, err + } + + return data, nil +} + +// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check +func isDirMounted(plug *csiPlugin, dir string) (bool, error) { + mounter := plug.host.GetMounter(plug.GetPluginName()) + notMnt, err := mounter.IsLikelyNotMountPoint(dir) + if err != nil && !os.IsNotExist(err) { + glog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir)) + return false, err + } + return !notMnt, nil +} + +// removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir +func removeMountDir(plug *csiPlugin, mountPath string) error { + glog.V(4).Info(log("removing mount path [%s]", mountPath)) + if pathExists, pathErr := util.PathExists(mountPath); pathErr != nil { + glog.Error(log("failed while checking mount path stat [%s]", pathErr)) + return pathErr + } else if !pathExists { + glog.Warning(log("skipping mount dir removal, path does not exist [%v]", mountPath)) + return nil + } + + mounter := plug.host.GetMounter(plug.GetPluginName()) + notMnt, err := mounter.IsLikelyNotMountPoint(mountPath) + if err != nil { + glog.Error(log("mount dir removal failed [%s]: %v", mountPath, err)) + return err + } + if notMnt { + glog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath)) + if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) { + glog.Error(log("failed to remove dir [%s]: %v", mountPath, err)) + return err + } + // remove volume data file as well + dataFile := path.Join(path.Dir(mountPath), volDataFileName) + glog.V(4).Info(log("also deleting volume info data file [%s]", dataFile)) + if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) { + glog.Error(log("failed to delete volume data file [%s]: %v", dataFile, err)) + return err + } + } + return nil +} diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 19e931ed4ce..13423a52f58 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -17,7 +17,10 @@ limitations under the License. package csi import ( + "bytes" + "encoding/json" "fmt" + "io/ioutil" "os" "path" "testing" @@ -43,28 +46,44 @@ func TestMounterGetPath(t *testing.T) { plug, tmpDir := newTestPlugin(t) defer os.RemoveAll(tmpDir) - pv := makeTestPV("test-pv", 10, testDriver, testVol) - - mounter, err := plug.NewMounter( - volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) - if err != nil { - t.Fatalf("Failed to make a new Mounter: %v", err) - } - csiMounter := mounter.(*csiMountMgr) - expectedPath := path.Join(tmpDir, fmt.Sprintf( - "pods/%s/volumes/kubernetes.io~csi/%s/%s", - testPodUID, - csiMounter.driverName, - csiMounter.volumeID, - )) - mountPath := csiMounter.GetPath() - if mountPath != expectedPath { - t.Errorf("Got unexpected path: %s", mountPath) + // TODO (vladimirvivien) specName with slashes will not work + testCases := []struct { + name string + specVolumeName string + path string + }{ + { + name: "simple specName", + specVolumeName: "spec-0", + path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "spec-0", "/mount")), + }, + { + name: "specName with dots", + specVolumeName: "test.spec.1", + path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "test.spec.1", "/mount")), + }, } + for _, tc := range testCases { + t.Log("test case:", tc.name) + pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + mounter, err := plug.NewMounter( + spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new Mounter: %v", err) + } + csiMounter := mounter.(*csiMountMgr) + path := csiMounter.GetPath() + t.Log("*** GetPath: ", path) + + if tc.path != path { + t.Errorf("expecting path %s, got %s", tc.path, path) + } + } } func TestMounterSetUp(t *testing.T) { @@ -125,6 +144,14 @@ func TestMounterSetUp(t *testing.T) { if err := csiMounter.SetUp(nil); err != nil { t.Fatalf("mounter.Setup failed: %v", err) } + path := csiMounter.GetPath() + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + t.Errorf("SetUp() failed, volume path not created: %s", path) + } else { + t.Errorf("SetUp() failed: %v", err) + } + } // ensure call went all the way pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes() @@ -149,6 +176,19 @@ func TestUnmounterTeardown(t *testing.T) { dir := csiUnmounter.GetPath() + // save the data file prior to unmount + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", dir, err) + } + if err := saveVolumeData( + plug, + testPodUID, + "test-pv", + map[string]string{volDataKey.specVolID: "test-pv", volDataKey.driverName: "driver", volDataKey.volHandle: "vol-handle"}, + ); err != nil { + t.Fatal("failed to save volume data:", err) + } + err = csiUnmounter.TearDownAt(dir) if err != nil { t.Fatal(err) @@ -208,3 +248,51 @@ func TestGetVolAttribsFromSpec(t *testing.T) { } } } + +func TestSaveVolumeData(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + testCases := []struct { + name string + data map[string]string + shouldFail bool + }{ + {name: "test with data ok", data: map[string]string{"key0": "val0", "_key1": "val1", "key2": "val2"}}, + {name: "test with data ok 2 ", data: map[string]string{"_key0_": "val0", "&key1": "val1", "key2": "val2"}}, + } + + for i, tc := range testCases { + t.Log("test case:", tc.name) + specVolID := fmt.Sprintf("spec-volid-%d", i) + mountDir := path.Join(getTargetPath(testPodUID, specVolID, plug.host), "/mount") + if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", mountDir, err) + } + + err := saveVolumeData(plug, testPodUID, specVolID, tc.data) + + if !tc.shouldFail && err != nil { + t.Error("unexpected failure: ", err) + } + // did file get created + dataDir := getTargetPath(testPodUID, specVolID, plug.host) + file := path.Join(dataDir, volDataFileName) + if _, err := os.Stat(file); err != nil { + t.Error("failed to create data dir:", err) + } + + // validate content + data, err := ioutil.ReadFile(file) + if !tc.shouldFail && err != nil { + t.Error("failed to read data file:", err) + } + + jsonData := new(bytes.Buffer) + if err := json.NewEncoder(jsonData).Encode(tc.data); err != nil { + t.Error("failed to encode json:", err) + } + if string(data) != jsonData.String() { + t.Errorf("expecting encoded data %v, got %v", string(data), jsonData) + } + } +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 8862cda98d9..be40992df04 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -19,7 +19,6 @@ package csi import ( "errors" "fmt" - "path" "regexp" "time" @@ -29,7 +28,6 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/util/mount" - kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" ) @@ -44,6 +42,7 @@ const ( csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock" csiTimeout = 15 * time.Second volNameSep = "^" + volDataFileName = "vol_data.json" ) var ( @@ -134,14 +133,15 @@ func (p *csiPlugin) NewMounter( } mounter := &csiMountMgr{ - plugin: p, - k8s: k8s, - spec: spec, - pod: pod, - podUID: pod.UID, - driverName: pvSource.Driver, - volumeID: pvSource.VolumeHandle, - csiClient: client, + plugin: p, + k8s: k8s, + spec: spec, + pod: pod, + podUID: pod.UID, + driverName: pvSource.Driver, + volumeID: pvSource.VolumeHandle, + specVolumeID: spec.Name(), + csiClient: client, } return mounter, nil } @@ -149,37 +149,33 @@ func (p *csiPlugin) NewMounter( func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) { glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID)) unmounter := &csiMountMgr{ - plugin: p, - podUID: podUID, + plugin: p, + podUID: podUID, + specVolumeID: specName, } return unmounter, nil } func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { - glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath)) + glog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath)) - // extract driverName/volumeId from end of mountPath - dir, volID := path.Split(mountPath) - volID = kstrings.UnescapeQualifiedNameForDisk(volID) - driverName := path.Base(dir) - - // TODO (vladimirvivien) consider moving this check in API validation - if !isDriverNameValid(driverName) { - glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName)) - return nil, errors.New("invalid csi driver name from path") + volData, err := loadVolumeData(mountPath, volDataFileName) + if err != nil { + glog.Error(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err)) + return nil, err } - glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName)) + glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData)) pv := &api.PersistentVolume{ ObjectMeta: meta.ObjectMeta{ - Name: volumeName, + Name: volData[volDataKey.specVolID], }, Spec: api.PersistentVolumeSpec{ PersistentVolumeSource: api.PersistentVolumeSource{ CSI: &api.CSIPersistentVolumeSource{ - Driver: driverName, - VolumeHandle: volID, + Driver: volData[volDataKey.driverName], + VolumeHandle: volData[volDataKey.volHandle], }, }, }, diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 020430bb313..26d5f8c14de 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -19,6 +19,7 @@ package csi import ( "fmt" "os" + "path" "testing" api "k8s.io/api/core/v1" @@ -27,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/types" fakeclient "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" - kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" ) @@ -140,17 +140,31 @@ func TestPluginConstructVolumeSpec(t *testing.T) { testCases := []struct { name string - driverName string - volID string + specVolID string + data map[string]string shouldFail bool }{ - {"valid driver and vol", "test.csi-driver", "abc-cde", false}, - {"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false}, - {"invalid driver name", "_test.csi.driver>", "a/b/c/d", true}, + { + name: "valid spec name", + specVolID: "test.vol.id", + data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"}, + }, } for _, tc := range testCases { - dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host) + t.Logf("test case: %s", tc.name) + dir := getTargetPath(testPodUID, tc.specVolID, plug.host) + + // create the data file + if tc.data != nil { + mountDir := path.Join(getTargetPath(testPodUID, tc.specVolID, plug.host), "/mount") + if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", mountDir, err) + } + if err := saveVolumeData(plug, testPodUID, tc.specVolID, tc.data); err != nil { + t.Fatal(err) + } + } // rebuild spec spec, err := plug.ConstructVolumeSpec("test-pv", dir) @@ -161,13 +175,12 @@ func TestPluginConstructVolumeSpec(t *testing.T) { continue } - volID := spec.PersistentVolume.Spec.CSI.VolumeHandle - unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID) - if volID != unsanitizedVolID { - t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID) + volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle + if volHandle != tc.data[volDataKey.volHandle] { + t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle) } - if spec.Name() != "test-pv" { + if spec.Name() != tc.specVolID { t.Errorf("Unexpected spec name %s", spec.Name()) } } diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 0d790cc9e58..d96ed620db0 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -109,6 +109,17 @@ func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublis return &csipb.NodePublishVolumeResponse{}, nil } +// NodeProbe implements csi NodeProbe +func (f *NodeClient) NodeProbe(ctx context.Context, req *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) { + if f.nextErr != nil { + return nil, f.nextErr + } + if req.Version == nil { + return nil, errors.New("missing version") + } + return &csipb.NodeProbeResponse{}, nil +} + // NodeUnpublishVolume implements csi method func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) { if f.nextErr != nil { @@ -130,11 +141,6 @@ func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest, return nil, nil } -// NodeProbe implements csi method -func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) { - return nil, nil -} - // NodeGetCapabilities implements csi method func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) { return nil, nil