diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 716539b9fe0..9851816bd6f 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/volume/azure_dd" "k8s.io/kubernetes/pkg/volume/azure_file" "k8s.io/kubernetes/pkg/volume/cinder" + "k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/fc" "k8s.io/kubernetes/pkg/volume/flexvolume" "k8s.io/kubernetes/pkg/volume/flocker" @@ -58,6 +59,9 @@ import ( "k8s.io/kubernetes/pkg/volume/storageos" volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/vsphere_volume" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // ProbeAttachableVolumePlugins collects all volume plugins for the attach/ @@ -79,6 +83,9 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) + } return allPlugins } @@ -105,6 +112,9 @@ func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) [] allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) + } return allPlugins } diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index e14513a18b2..002b5de8c5d 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/volume/cephfs" "k8s.io/kubernetes/pkg/volume/cinder" "k8s.io/kubernetes/pkg/volume/configmap" + "k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/downwardapi" "k8s.io/kubernetes/pkg/volume/empty_dir" "k8s.io/kubernetes/pkg/volume/fc" @@ -58,6 +59,9 @@ import ( "k8s.io/kubernetes/pkg/volume/vsphere_volume" // Cloud providers _ "k8s.io/kubernetes/pkg/cloudprovider/providers" + // features check + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // ProbeVolumePlugins collects all volume plugins into an easy to use list. @@ -96,6 +100,9 @@ func ProbeVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) + } return allPlugins } diff --git a/pkg/volume/csi/OWNERS b/pkg/volume/csi/OWNERS new file mode 100644 index 00000000000..7d0605ba3dd --- /dev/null +++ b/pkg/volume/csi/OWNERS @@ -0,0 +1,4 @@ +approvers: +- jsafrane +- saad-ali +- vladimirvivien diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go new file mode 100644 index 00000000000..11ca86e66f5 --- /dev/null +++ b/pkg/volume/csi/csi_attacher.go @@ -0,0 +1,268 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "crypto/sha256" + "errors" + "fmt" + "strings" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1alpha1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/volume" +) + +type csiAttacher struct { + plugin *csiPlugin + k8s kubernetes.Interface + waitSleepTime time.Duration +} + +// volume.Attacher methods +var _ volume.Attacher = &csiAttacher{} + +func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { + if spec == nil { + glog.Error(log("attacher.Attach missing volume.Spec")) + return "", errors.New("missing spec") + } + + csiSource, err := getCSISourceFromSpec(spec) + if err != nil { + glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err)) + return "", errors.New("missing CSI persistent volume") + } + + pvName := spec.PersistentVolume.GetName() + attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName)) + + attachment := &storage.VolumeAttachment{ + ObjectMeta: meta.ObjectMeta{ + Name: attachID, + }, + Spec: storage.VolumeAttachmentSpec{ + NodeName: string(nodeName), + Attacher: csiPluginName, + Source: storage.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + }, + Status: storage.VolumeAttachmentStatus{Attached: false}, + } + + attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + alreadyExist := false + if err != nil { + if !apierrs.IsAlreadyExists(err) { + glog.Error(log("attacher.Attach failed: %v", err)) + return "", err + } + alreadyExist = true + } + + if alreadyExist { + glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle)) + } else { + glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle)) + } + + // probe for attachment update here + // NOTE: any error from waiting for attachment is logged only. This is because + // the primariy intent of the enclosing method is to create VolumeAttachment. + // DONOT return that error here as it is mitigated in attacher.WaitForAttach. + if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil { + glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err)) + } + + return attachID, nil +} + +func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.Pod, timeout time.Duration) (string, error) { + source, err := getCSISourceFromSpec(spec) + if err != nil { + glog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err)) + return "", err + } + + return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout) +} + +func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) { + glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) + + ticker := time.NewTicker(c.waitSleepTime) + defer ticker.Stop() + + timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable + defer timer.Stop() + + //TODO (vladimirvivien) instead of polling api-server, change to a api-server watch + for { + select { + case <-ticker.C: + glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) + attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err)) + continue + } + // if being deleted, fail fast + if attach.GetDeletionTimestamp() != nil { + glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID)) + return "", errors.New("volume attachment is being deleted") + } + // attachment OK + if attach.Status.Attached { + return attachID, nil + } + // driver reports attach error + attachErr := attach.Status.AttachError + if attachErr != nil { + glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) + return "", errors.New(attachErr.Message) + } + case <-timer.C: + glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) + return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle) + } + } +} + +func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs))) + + attached := make(map[*volume.Spec]bool) + + for _, spec := range specs { + if spec == nil { + glog.Error(log("attacher.VolumesAreAttached missing volume.Spec")) + return nil, errors.New("missing spec") + } + source, err := getCSISourceFromSpec(spec) + if err != nil { + glog.Error(log("attacher.VolumesAreAttached failed: %v", err)) + continue + } + attachID := getAttachmentName(source.VolumeHandle, string(nodeName)) + glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID)) + attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err)) + continue + } + attached[spec] = attach.Status.Attached + } + + return attached, nil +} + +func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { + glog.V(4).Info(log("attacher.GetDeviceMountPath is not implemented")) + return "", nil +} + +func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { + glog.V(4).Info(log("attacher.MountDevice is not implemented")) + return nil +} + +var _ volume.Detacher = &csiAttacher{} + +func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { + // volumeName in format driverNamevolumeHandle generated by plugin.GetVolumeName() + if volumeName == "" { + glog.Error(log("detacher.Detach missing value for parameter volumeName")) + return errors.New("missing exepected parameter volumeName") + } + parts := strings.Split(volumeName, volNameSep) + if len(parts) != 2 { + glog.Error(log("detacher.Detach insufficient info encoded in volumeName")) + return errors.New("volumeName missing expected data") + } + volID := parts[1] + attachID := getAttachmentName(volID, string(nodeName)) + err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil) + if err != nil { + glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err)) + return err + } + + glog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) + return c.waitForVolumeDetachment(volID, attachID) +} + +func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error { + glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) + + ticker := time.NewTicker(c.waitSleepTime) + defer ticker.Stop() + + timeout := c.waitSleepTime * 10 + timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable + defer timer.Stop() + + //TODO (vladimirvivien) instead of polling api-server, change to a api-server watch + for { + select { + case <-ticker.C: + glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) + attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + //object deleted or never existed, done + glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle)) + return nil + } + glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) + continue + } + + // driver reports attach error + detachErr := attach.Status.DetachError + if detachErr != nil { + glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) + return errors.New(detachErr.Message) + } + case <-timer.C: + glog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) + return fmt.Errorf("detachment timed out for volume %v", volumeHandle) + } + } +} + +func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { + glog.V(4).Info(log("detacher.UnmountDevice is not implemented")) + return nil +} + +func hashAttachmentName(volName, nodeName string) string { + result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName))) + return fmt.Sprintf("%x", result) +} + +func getAttachmentName(volName, nodeName string) string { + // TODO consider using a different prefix for attachment + return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName)) +} diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go new file mode 100644 index 00000000000..2e1cb3fc61f --- /dev/null +++ b/pkg/volume/csi/csi_attacher_test.go @@ -0,0 +1,277 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "crypto/sha256" + "fmt" + "os" + "testing" + "time" + + "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1alpha1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/volume" +) + +func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment { + return &storage.VolumeAttachment{ + ObjectMeta: meta.ObjectMeta{ + Name: attachID, + }, + Spec: storage.VolumeAttachmentSpec{ + NodeName: nodeName, + Attacher: csiPluginName, + Source: storage.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + }, + Status: storage.VolumeAttachmentStatus{ + Attached: false, + AttachError: nil, + DetachError: nil, + }, + } +} + +func TestAttacherAttach(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + + csiAttacher := attacher.(*csiAttacher) + + testCases := []struct { + name string + pv *v1.PersistentVolume + nodeName string + attachHash [32]byte + shouldFail bool + }{ + { + name: "test ok 1", + pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"), + nodeName: "test-node", + attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))), + }, + { + name: "test ok 2", + pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"), + nodeName: "test-node", + attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))), + }, + { + name: "missing spec", + pv: nil, + nodeName: "test-node", + attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))), + shouldFail: true, + }, + } + + for _, tc := range testCases { + var spec *volume.Spec + if tc.pv != nil { + spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + } + + attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName)) + if tc.shouldFail && err == nil { + t.Error("expected failure, but got nil err") + } + if attachID != "" { + expectedID := fmt.Sprintf("pv-%x", tc.attachHash) + if attachID != expectedID { + t.Errorf("expecting attachID %v, got %v", expectedID, attachID) + } + } + } +} + +func TestAttacherWaitForVolumeAttachment(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + nodeName := "test-node" + + testCases := []struct { + name string + attached bool + attachErr *storage.VolumeError + sleepTime time.Duration + timeout time.Duration + shouldFail bool + }{ + {name: "attach ok", attached: true, sleepTime: 10 * time.Millisecond, timeout: 50 * time.Millisecond}, + {name: "attachment error", attachErr: &storage.VolumeError{Message: "missing volume"}, sleepTime: 10 * time.Millisecond, timeout: 30 * time.Millisecond}, + {name: "time ran out", attached: false, sleepTime: 5 * time.Millisecond}, + } + + for i, tc := range testCases { + t.Logf("running test: %v", tc.name) + pvName := fmt.Sprintf("test-pv-%d", i) + attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName)) + + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = tc.attached + attachment.Status.AttachError = tc.attachErr + csiAttacher.waitSleepTime = tc.sleepTime + + go func() { + _, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + }() + + retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout) + if tc.shouldFail && err == nil { + t.Error("expecting failure, but err is nil") + } + if tc.attachErr != nil { + if tc.attachErr.Message != err.Error() { + t.Errorf("expecting error [%v], got [%v]", tc.attachErr.Message, err.Error()) + } + } + if err == nil && retID != attachID { + t.Errorf("attacher.WaitForAttach not returning attachment ID") + } + } +} + +func TestAttacherVolumesAreAttached(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + nodeName := "test-node" + + testCases := []struct { + name string + attachedStats map[string]bool + }{ + {"attach + detach", map[string]bool{"vol-01": true, "vol-02": true, "vol-03": false, "vol-04": false, "vol-05": true}}, + {"all detached", map[string]bool{"vol-11": false, "vol-12": false, "vol-13": false, "vol-14": false, "vol-15": false}}, + {"all attached", map[string]bool{"vol-21": true, "vol-22": true, "vol-23": true, "vol-24": true, "vol-25": true}}, + } + + for _, tc := range testCases { + var specs []*volume.Spec + // create and save volume attchments + for volName, stat := range tc.attachedStats { + pv := makeTestPV("test-pv", 10, testDriver, volName) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + specs = append(specs, spec) + attachID := getAttachmentName(volName, nodeName) + attachment := makeTestAttachment(attachID, nodeName, pv.GetName()) + attachment.Status.Attached = stat + _, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + } + + // retrieve attached status + stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName)) + if err != nil { + t.Fatal(err) + } + if len(tc.attachedStats) != len(stats) { + t.Errorf("expecting %d attachment status, got %d", len(tc.attachedStats), len(stats)) + } + + // compare attachment status for each spec + for spec, stat := range stats { + source, err := getCSISourceFromSpec(spec) + if err != nil { + t.Error(err) + } + if stat != tc.attachedStats[source.VolumeHandle] { + t.Errorf("expecting volume attachment %t, got %t", tc.attachedStats[source.VolumeHandle], stat) + } + } + } +} + +func TestAttacherDetach(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + nodeName := "test-node" + testCases := []struct { + name string + volID string + attachID string + shouldFail bool + }{ + {name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))}, + {name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))}, + {name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true}, + } + + for _, tc := range testCases { + pv := makeTestPV("test-pv", 10, testDriver, tc.volID) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + attachment := makeTestAttachment(tc.attachID, nodeName, "test-pv") + _, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + volumeName, err := plug.GetVolumeName(spec) + if err != nil { + t.Errorf("test case %s failed: %v", tc.name, err) + } + err = csiAttacher.Detach(volumeName, types.NodeName(nodeName)) + if tc.shouldFail && err == nil { + t.Fatal("expecting failure, but err = nil") + } + if !tc.shouldFail && err != nil { + t.Fatalf("unexpected err: %v", err) + } + attach, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{}) + if err != nil { + if !apierrs.IsNotFound(err) { + t.Fatalf("unexpected err: %v", err) + } + } else { + if attach == nil { + t.Errorf("expecting attachment not to be nil, but it is") + } + } + } +} diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go new file mode 100644 index 00000000000..22b4d5bae6b --- /dev/null +++ b/pkg/volume/csi/csi_client.go @@ -0,0 +1,233 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "bytes" + "errors" + "fmt" + "net" + "time" + + csipb "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + grpctx "golang.org/x/net/context" + "google.golang.org/grpc" + api "k8s.io/api/core/v1" +) + +type csiClient interface { + AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error + NodePublishVolume( + ctx grpctx.Context, + volumeid string, + readOnly bool, + targetPath string, + accessMode api.PersistentVolumeAccessMode, + volumeInfo map[string]string, + fsType string, + ) error + NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error +} + +// csiClient encapsulates all csi-plugin methods +type csiDriverClient struct { + network string + addr string + conn *grpc.ClientConn + idClient csipb.IdentityClient + nodeClient csipb.NodeClient + ctrlClient csipb.ControllerClient + versionAsserted bool + versionSupported bool + publishAsserted bool + publishCapable bool +} + +func newCsiDriverClient(network, addr string) *csiDriverClient { + return &csiDriverClient{network: network, addr: addr} +} + +// assertConnection ensures a valid connection has been established +// if not, it creates a new connection and associated clients +func (c *csiDriverClient) assertConnection() error { + if c.conn == nil { + conn, err := grpc.Dial( + c.addr, + grpc.WithInsecure(), + grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) { + return net.Dial(c.network, target) + }), + ) + if err != nil { + return err + } + c.conn = conn + c.idClient = csipb.NewIdentityClient(conn) + c.nodeClient = csipb.NewNodeClient(conn) + c.ctrlClient = csipb.NewControllerClient(conn) + + // set supported version + } + + return nil +} + +// AssertSupportedVersion ensures driver supports specified spec version. +// If version is not supported, the assertion fails with an error. +// This test should be done early during the storage operation flow to avoid +// unnecessary calls later. +func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error { + if c.versionAsserted { + if !c.versionSupported { + return fmt.Errorf("version %s not supported", verToStr(ver)) + } + return nil + } + + if err := c.assertConnection(); err != nil { + c.versionAsserted = false + return err + } + + glog.V(4).Info(log("asserting version supported by driver")) + rsp, err := c.idClient.GetSupportedVersions(ctx, &csipb.GetSupportedVersionsRequest{}) + if err != nil { + c.versionAsserted = false + return err + } + + supported := false + vers := rsp.GetSupportedVersions() + glog.V(4).Info(log("driver reports %d versions supported: %s", len(vers), versToStr(vers))) + + for _, v := range vers { + //TODO (vladimirvivien) use more lenient/heuristic for exact or match of ranges etc + if verToStr(v) == verToStr(ver) { + supported = true + break + } + } + + c.versionAsserted = true + c.versionSupported = supported + + if !supported { + return fmt.Errorf("version %s not supported", verToStr(ver)) + } + + glog.V(4).Info(log("version %s supported", verToStr(ver))) + return nil +} + +func (c *csiDriverClient) NodePublishVolume( + ctx grpctx.Context, + volID string, + readOnly bool, + targetPath string, + accessMode api.PersistentVolumeAccessMode, + volumeInfo map[string]string, + fsType string, +) error { + + if volID == "" { + return errors.New("missing volume id") + } + if targetPath == "" { + return errors.New("missing target path") + } + if err := c.assertConnection(); err != nil { + glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err) + return err + } + + req := &csipb.NodePublishVolumeRequest{ + Version: csiVersion, + VolumeId: volID, + TargetPath: targetPath, + Readonly: readOnly, + PublishVolumeInfo: volumeInfo, + + VolumeCapability: &csipb.VolumeCapability{ + AccessMode: &csipb.VolumeCapability_AccessMode{ + Mode: asCSIAccessMode(accessMode), + }, + AccessType: &csipb.VolumeCapability_Mount{ + Mount: &csipb.VolumeCapability_MountVolume{ + FsType: fsType, + }, + }, + }, + } + + _, err := c.nodeClient.NodePublishVolume(ctx, req) + return err +} + +func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error { + + if volID == "" { + return errors.New("missing volume id") + } + if targetPath == "" { + return errors.New("missing target path") + } + if err := c.assertConnection(); err != nil { + glog.Error(log("failed to assert a connection: %v", err)) + return err + } + + req := &csipb.NodeUnpublishVolumeRequest{ + Version: csiVersion, + VolumeId: volID, + TargetPath: targetPath, + } + + _, err := c.nodeClient.NodeUnpublishVolume(ctx, req) + return err +} + +func asCSIAccessMode(am api.PersistentVolumeAccessMode) csipb.VolumeCapability_AccessMode_Mode { + switch am { + case api.ReadWriteOnce: + return csipb.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + case api.ReadOnlyMany: + return csipb.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER + case api.ReadWriteMany: + return csipb.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER + } + return csipb.VolumeCapability_AccessMode_UNKNOWN +} + +func verToStr(ver *csipb.Version) string { + if ver == nil { + return "" + } + return fmt.Sprintf("%d.%d.%d", ver.GetMajor(), ver.GetMinor(), ver.GetPatch()) +} + +func versToStr(vers []*csipb.Version) string { + if vers == nil { + return "" + } + str := bytes.NewBufferString("[") + for _, v := range vers { + str.WriteString(fmt.Sprintf("{%s};", verToStr(v))) + } + str.WriteString("]") + return str.String() +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go new file mode 100644 index 00000000000..54605cdb483 --- /dev/null +++ b/pkg/volume/csi/csi_client_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "errors" + "testing" + + csipb "github.com/container-storage-interface/spec/lib/go/csi" + grpctx "golang.org/x/net/context" + "google.golang.org/grpc" + api "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/volume/csi/fake" +) + +func setupClient(t *testing.T) *csiDriverClient { + client := newCsiDriverClient("unix", "/tmp/test.sock") + client.conn = new(grpc.ClientConn) //avoids creating conn object + + // setup mock grpc clients + client.idClient = fake.NewIdentityClient() + client.nodeClient = fake.NewNodeClient() + client.ctrlClient = fake.NewControllerClient() + + return client +} + +func TestClientAssertSupportedVersion(t *testing.T) { + testCases := []struct { + testName string + ver *csipb.Version + mustFail bool + err error + }{ + {testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}}, + {testName: "unsupported version", ver: &csipb.Version{Major: 0, Minor: 0, Patch: 0}, mustFail: true}, + {testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")}, + } + + for _, tc := range testCases { + t.Log("case: ", tc.testName) + client := setupClient(t) + client.idClient.(*fake.IdentityClient).SetNextError(tc.err) + err := client.AssertSupportedVersion(grpctx.Background(), tc.ver) + if tc.mustFail && err == nil { + t.Error("must fail, but err = nil") + } + } +} + +func TestClientNodePublishVolume(t *testing.T) { + testCases := []struct { + name string + volID string + targetPath string + fsType string + mustFail bool + err error + }{ + {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, + {name: "missing volID", targetPath: "/test/path", mustFail: true}, + {name: "missing target path", volID: "vol-test", mustFail: true}, + {name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + } + + client := setupClient(t) + + for _, tc := range testCases { + t.Log("case: ", tc.name) + client.nodeClient.(*fake.NodeClient).SetNextError(tc.err) + err := client.NodePublishVolume( + grpctx.Background(), + tc.volID, + false, + tc.targetPath, + api.ReadWriteOnce, + map[string]string{"device": "/dev/null"}, + tc.fsType, + ) + + if tc.mustFail && err == nil { + t.Error("must fail, but err is nil: ", err) + } + } +} + +func TestClientNodeUnpublishVolume(t *testing.T) { + testCases := []struct { + name string + volID string + targetPath string + mustFail bool + err error + }{ + {name: "test ok", volID: "vol-test", targetPath: "/test/path"}, + {name: "missing volID", targetPath: "/test/path", mustFail: true}, + {name: "missing target path", volID: "vol-test", mustFail: true}, + {name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")}, + } + + client := setupClient(t) + + for _, tc := range testCases { + t.Log("case: ", tc.name) + client.nodeClient.(*fake.NodeClient).SetNextError(tc.err) + err := client.NodeUnpublishVolume(grpctx.Background(), tc.volID, tc.targetPath) + if tc.mustFail && err == nil { + t.Error("must fail, but err is nil: ", err) + } + } +} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go new file mode 100644 index 00000000000..8645752ad24 --- /dev/null +++ b/pkg/volume/csi/csi_mounter.go @@ -0,0 +1,194 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "errors" + "fmt" + "path" + + "github.com/golang/glog" + grpctx "golang.org/x/net/context" + api "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1alpha1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + kstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" +) + +type csiMountMgr struct { + k8s kubernetes.Interface + csiClient csiClient + plugin *csiPlugin + driverName string + volumeID string + readOnly bool + spec *volume.Spec + pod *api.Pod + podUID types.UID + options volume.VolumeOptions + volumeInfo map[string]string + volume.MetricsNil +} + +// volume.Volume methods +var _ volume.Volume = &csiMountMgr{} + +func (c *csiMountMgr) GetPath() string { + return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host) +} + +func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string { + // driverName validated at Mounter creation + // sanitize (replace / with ~) in volumeID before it's appended to path:w + driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID)) + return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath) +} + +// volume.Mounter methods +var _ volume.Mounter = &csiMountMgr{} + +func (c *csiMountMgr) CanMount() error { + //TODO (vladimirvivien) use this method to probe controller using CSI.NodeProbe() call + // to ensure Node service is ready in the CSI plugin + return nil +} + +func (c *csiMountMgr) SetUp(fsGroup *int64) error { + return c.SetUpAt(c.GetPath(), fsGroup) +} + +func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { + glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir)) + + ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout) + defer cancel() + + csi := c.csiClient + pvName := c.spec.PersistentVolume.GetName() + + // ensure version is supported + if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { + glog.Errorf(log("failed to assert version: %v", err)) + return err + } + + // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + if c.volumeInfo == nil { + + //TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve + //the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable) + attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{}) + if err != nil { + glog.Error(log("failed to get volume attachments: %v", err)) + return err + } + + var attachment *storage.VolumeAttachment + for _, attach := range attachList.Items { + if attach.Spec.Source.PersistentVolumeName != nil && + *attach.Spec.Source.PersistentVolumeName == pvName { + attachment = &attach + break + } + } + + if attachment == nil { + glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName)) + return errors.New("no existing VolumeAttachment found") + } + c.volumeInfo = attachment.Status.AttachmentMetadata + } + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + accessMode := api.ReadWriteOnce + if c.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] + } + + err := csi.NodePublishVolume( + ctx, + c.volumeID, + c.readOnly, + dir, + accessMode, + c.volumeInfo, + "ext4", //TODO needs to be sourced from PV or somewhere else + ) + + if err != nil { + glog.Errorf(log("Mounter.Setup failed: %v", err)) + return err + } + glog.V(4).Infof(log("successfully mounted %s", dir)) + + return nil +} + +func (c *csiMountMgr) GetAttributes() volume.Attributes { + return volume.Attributes{ + ReadOnly: c.readOnly, + Managed: !c.readOnly, + SupportsSELinux: false, + } +} + +// volume.Unmounter methods +var _ volume.Unmounter = &csiMountMgr{} + +func (c *csiMountMgr) TearDown() error { + return c.TearDownAt(c.GetPath()) +} +func (c *csiMountMgr) TearDownAt(dir string) error { + glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir)) + + // extract driverName and volID from path + base, volID := path.Split(dir) + volID = kstrings.UnescapeQualifiedNameForDisk(volID) + driverName := path.Base(base) + + if c.csiClient == nil { + addr := fmt.Sprintf(csiAddrTemplate, driverName) + client := newCsiDriverClient("unix", addr) + glog.V(4).Infof(log("unmounter csiClient setup [volume=%v,driver=%v]", volID, driverName)) + c.csiClient = client + } + + ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout) + defer cancel() + + csi := c.csiClient + + // TODO make all assertion calls private within the client itself + if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { + glog.Errorf(log("failed to assert version: %v", err)) + return err + } + + err := csi.NodeUnpublishVolume(ctx, volID, dir) + + if err != nil { + glog.Errorf(log("Mounter.Setup failed: %v", err)) + return err + } + + glog.V(4).Infof(log("successfully unmounted %s", dir)) + + return nil +} diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go new file mode 100644 index 00000000000..5e7f6083b60 --- /dev/null +++ b/pkg/volume/csi/csi_mounter_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "fmt" + "os" + "path" + "testing" + + api "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1alpha1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi/fake" +) + +var ( + testDriver = "test-driver" + testVol = "vol-123" + testns = "test-ns" + testPodUID = types.UID("test-pod") +) + +func TestMounterGetPath(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + + mounter, err := plug.NewMounter( + volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new Mounter: %v", err) + } + csiMounter := mounter.(*csiMountMgr) + expectedPath := path.Join(tmpDir, fmt.Sprintf( + "pods/%s/volumes/kubernetes.io~csi/%s/%s", + testPodUID, + csiMounter.driverName, + csiMounter.volumeID, + )) + mountPath := csiMounter.GetPath() + if mountPath != expectedPath { + t.Errorf("Got unexpected path: %s", mountPath) + } + +} + +func TestMounterSetUp(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + pvName := pv.GetName() + + mounter, err := plug.NewMounter( + volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new Mounter: %v", err) + } + + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t) + + attachment := &storage.VolumeAttachment{ + ObjectMeta: meta.ObjectMeta{ + Name: "pv-1234556775313", + }, + Spec: storage.VolumeAttachmentSpec{ + NodeName: "test-node", + Attacher: csiPluginName, + Source: storage.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + }, + Status: storage.VolumeAttachmentStatus{ + Attached: false, + AttachError: nil, + DetachError: nil, + }, + } + _, err = csiMounter.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + + // Mounter.SetUp() + if err := csiMounter.SetUp(nil); err != nil { + t.Fatalf("mounter.Setup failed: %v", err) + } + + // ensure call went all the way + pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes() + if pubs[csiMounter.volumeID] != csiMounter.GetPath() { + t.Error("csi server may not have received NodePublishVolume call") + } +} + +func TestUnmounterTeardown(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + + unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmounter: %v", err) + } + + csiUnmounter := unmounter.(*csiMountMgr) + csiUnmounter.csiClient = setupClient(t) + + dir := csiUnmounter.GetPath() + + err = csiUnmounter.TearDownAt(dir) + if err != nil { + t.Fatal(err) + } + + // ensure csi client call + pubs := csiUnmounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes() + if _, ok := pubs[csiUnmounter.volumeID]; ok { + t.Error("csi server may not have received NodeUnpublishVolume call") + } + +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go new file mode 100644 index 00000000000..876de2281bc --- /dev/null +++ b/pkg/volume/csi/csi_plugin.go @@ -0,0 +1,257 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "errors" + "fmt" + "path" + "regexp" + "time" + + csipb "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + kstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" +) + +const ( + csiName = "csi" + csiPluginName = "kubernetes.io/csi" + + // TODO (vladimirvivien) implement a more dynamic way to discover + // the unix domain socket path for each installed csi driver. + // TODO (vladimirvivien) would be nice to name socket with a .sock extension + // for consistency. + csiAddrTemplate = "/var/lib/kubelet/plugins/%v" + csiTimeout = 15 * time.Second + volNameSep = "^" +) + +var ( + // csiVersion supported csi version + csiVersion = &csipb.Version{Major: 0, Minor: 1, Patch: 0} + driverNameRexp = regexp.MustCompile(`^[A-Za-z]+(\.?-?_?[A-Za-z0-9-])+$`) +) + +type csiPlugin struct { + host volume.VolumeHost +} + +// ProbeVolumePlugins returns implemented plugins +func ProbeVolumePlugins() []volume.VolumePlugin { + p := &csiPlugin{ + host: nil, + } + return []volume.VolumePlugin{p} +} + +// volume.VolumePlugin methods +var _ volume.VolumePlugin = &csiPlugin{} + +func (p *csiPlugin) Init(host volume.VolumeHost) error { + glog.Info(log("plugin initializing...")) + p.host = host + return nil +} + +func (p *csiPlugin) GetPluginName() string { + return csiPluginName +} + +// GetvolumeName returns a concatenated string of CSIVolumeSource.DriverCSIVolumeSource.VolumeHandle +// That string value is used in Detach() to extract driver name and volumeName. +func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) { + csi, err := getCSISourceFromSpec(spec) + if err != nil { + glog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err)) + return "", err + } + + //TODO (vladimirvivien) this validation should be done at the API validation check + if !isDriverNameValid(csi.Driver) { + glog.Error(log("plugin.GetVolumeName failed to create volume name: invalid csi driver name %s", csi.Driver)) + return "", errors.New("invalid csi driver name") + } + + // return driverNamevolumeHandle + return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil +} + +func (p *csiPlugin) CanSupport(spec *volume.Spec) bool { + // TODO (vladimirvivien) CanSupport should also take into account + // the availability/registration of specified Driver in the volume source + return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil +} + +func (p *csiPlugin) RequiresRemount() bool { + return false +} + +func (p *csiPlugin) NewMounter( + spec *volume.Spec, + pod *api.Pod, + _ volume.VolumeOptions) (volume.Mounter, error) { + pvSource, err := getCSISourceFromSpec(spec) + if err != nil { + return nil, err + } + + // TODO (vladimirvivien) consider moving this check in API validation + // check Driver name to conform to CSI spec + if !isDriverNameValid(pvSource.Driver) { + glog.Error(log("driver name does not conform to CSI spec: %s", pvSource.Driver)) + return nil, errors.New("driver name is invalid") + } + + // before it is used in any paths such as socket etc + addr := fmt.Sprintf(csiAddrTemplate, pvSource.Driver) + glog.V(4).Infof(log("setting up mounter for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver)) + client := newCsiDriverClient("unix", addr) + + k8s := p.host.GetKubeClient() + if k8s == nil { + glog.Error(log("failed to get a kubernetes client")) + return nil, errors.New("failed to get a Kubernetes client") + } + + mounter := &csiMountMgr{ + plugin: p, + k8s: k8s, + spec: spec, + pod: pod, + podUID: pod.UID, + driverName: pvSource.Driver, + volumeID: pvSource.VolumeHandle, + csiClient: client, + } + return mounter, nil +} + +func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) { + glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID)) + unmounter := &csiMountMgr{ + plugin: p, + podUID: podUID, + } + return unmounter, nil +} + +func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath)) + + // extract driverName/volumeId from end of mountPath + dir, volID := path.Split(mountPath) + volID = kstrings.UnescapeQualifiedNameForDisk(volID) + driverName := path.Base(dir) + + // TODO (vladimirvivien) consider moving this check in API validation + if !isDriverNameValid(driverName) { + glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName)) + return nil, errors.New("invalid csi driver name from path") + } + + glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName)) + + pv := &api.PersistentVolume{ + ObjectMeta: meta.ObjectMeta{ + Name: volumeName, + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + CSI: &api.CSIPersistentVolumeSource{ + Driver: driverName, + VolumeHandle: volID, + }, + }, + }, + } + + return volume.NewSpecFromPersistentVolume(pv, false), nil +} + +func (p *csiPlugin) SupportsMountOption() bool { + // TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags + // to probe for the result for this method:w + return false +} + +func (p *csiPlugin) SupportsBulkVolumeVerification() bool { + return false +} + +// volume.AttachableVolumePlugin methods +var _ volume.AttachableVolumePlugin = &csiPlugin{} + +func (p *csiPlugin) NewAttacher() (volume.Attacher, error) { + k8s := p.host.GetKubeClient() + if k8s == nil { + glog.Error(log("unable to get kubernetes client from host")) + return nil, errors.New("unable to get Kubernetes client") + } + + return &csiAttacher{ + plugin: p, + k8s: k8s, + waitSleepTime: 1 * time.Second, + }, nil +} + +func (p *csiPlugin) NewDetacher() (volume.Detacher, error) { + k8s := p.host.GetKubeClient() + if k8s == nil { + glog.Error(log("unable to get kubernetes client from host")) + return nil, errors.New("unable to get Kubernetes client") + } + + return &csiAttacher{ + plugin: p, + k8s: k8s, + waitSleepTime: 1 * time.Second, + }, nil +} + +func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + m := p.host.GetMounter(p.GetPluginName()) + return mount.GetMountRefs(m, deviceMountPath) +} + +func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) { + if spec.PersistentVolume != nil && + spec.PersistentVolume.Spec.CSI != nil { + return spec.PersistentVolume.Spec.CSI, nil + } + + return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") +} + +// log prepends log string with `kubernetes.io/csi` +func log(msg string, parts ...interface{}) string { + return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...) +} + +// isDriverNameValid validates the driverName using CSI spec +func isDriverNameValid(name string) bool { + if len(name) == 0 || len(name) > 63 { + return false + } + return driverNameRexp.MatchString(name) +} diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go new file mode 100644 index 00000000000..020430bb313 --- /dev/null +++ b/pkg/volume/csi/csi_plugin_test.go @@ -0,0 +1,297 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "fmt" + "os" + "testing" + + api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + fakeclient "k8s.io/client-go/kubernetes/fake" + utiltesting "k8s.io/client-go/util/testing" + kstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" +) + +// create a plugin mgr to load plugins and setup a fake client +func newTestPlugin(t *testing.T) (*csiPlugin, string) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHost( + tmpDir, + fakeClient, + nil, + ) + plugMgr := &volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + + plug, err := plugMgr.FindPluginByName(csiPluginName) + if err != nil { + t.Fatalf("can't find plugin %v", csiPluginName) + } + + csiPlug, ok := plug.(*csiPlugin) + if !ok { + t.Fatalf("cannot assert plugin to be type csiPlugin") + } + + return csiPlug, tmpDir +} + +func makeTestPV(name string, sizeGig int, driverName, volID string) *api.PersistentVolume { + return &api.PersistentVolume{ + ObjectMeta: meta.ObjectMeta{ + Name: name, + Namespace: testns, + }, + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse( + fmt.Sprintf("%dGi", sizeGig), + ), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + CSI: &api.CSIPersistentVolumeSource{ + Driver: driverName, + VolumeHandle: volID, + ReadOnly: false, + }, + }, + }, + } +} + +func TestPluginGetPluginName(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + if plug.GetPluginName() != "kubernetes.io/csi" { + t.Errorf("unexpected plugin name %v", plug.GetPluginName()) + } +} + +func TestPluginGetVolumeName(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + testCases := []struct { + name string + driverName string + volName string + shouldFail bool + }{ + {"alphanum names", "testdr", "testvol", false}, + {"mixchar driver", "test.dr.cc", "testvol", false}, + {"mixchar volume", "testdr", "test-vol-name", false}, + {"mixchars all", "test-driver", "test.vol.name", false}, + } + + for _, tc := range testCases { + t.Logf("testing: %s", tc.name) + pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName) + spec := volume.NewSpecFromPersistentVolume(pv, false) + name, err := plug.GetVolumeName(spec) + if tc.shouldFail && err == nil { + t.Fatal("GetVolumeName should fail, but got err=nil") + } + if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) { + t.Errorf("unexpected volume name %s", name) + } + } +} + +func TestPluginCanSupport(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, false) + + if !plug.CanSupport(spec) { + t.Errorf("should support CSI spec") + } +} + +func TestPluginConstructVolumeSpec(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + driverName string + volID string + shouldFail bool + }{ + {"valid driver and vol", "test.csi-driver", "abc-cde", false}, + {"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false}, + {"invalid driver name", "_test.csi.driver>", "a/b/c/d", true}, + } + + for _, tc := range testCases { + dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host) + + // rebuild spec + spec, err := plug.ConstructVolumeSpec("test-pv", dir) + if tc.shouldFail { + if err == nil { + t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error") + } + continue + } + + volID := spec.PersistentVolume.Spec.CSI.VolumeHandle + unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID) + if volID != unsanitizedVolID { + t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID) + } + + if spec.Name() != "test-pv" { + t.Errorf("Unexpected spec name %s", spec.Name()) + } + } +} + +func TestPluginNewMounter(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + mounter, err := plug.NewMounter( + volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new Mounter: %v", err) + } + + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + csiMounter := mounter.(*csiMountMgr) + + // validate mounter fields + if csiMounter.driverName != testDriver { + t.Error("mounter driver name not set") + } + if csiMounter.volumeID != testVol { + t.Error("mounter volume id not set") + } + if csiMounter.pod == nil { + t.Error("mounter pod not set") + } + if csiMounter.podUID == types.UID("") { + t.Error("mounter podUID mot set") + } +} + +func TestPluginNewUnmounter(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + + unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID) + csiUnmounter := unmounter.(*csiMountMgr) + + if err != nil { + t.Fatalf("Failed to make a new Mounter: %v", err) + } + + if csiUnmounter == nil { + t.Fatal("failed to create CSI mounter") + } + + if csiUnmounter.podUID != testPodUID { + t.Error("podUID not set") + } + +} + +func TestValidateDriverName(t *testing.T) { + testCases := []struct { + name string + driverName string + valid bool + }{ + + {"ok no punctuations", "comgooglestoragecsigcepd", true}, + {"ok dot only", "io.kubernetes.storage.csi.flex", true}, + {"ok dash only", "io-kubernetes-storage-csi-flex", true}, + {"ok underscore only", "io_kubernetes_storage_csi_flex", true}, + {"ok dot underscores", "io.kubernetes.storage_csi.flex", true}, + {"ok dot dash underscores", "io.kubernetes-storage.csi_flex", true}, + + {"invalid length 0", "", false}, + {"invalid length > 63", "comgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepd", false}, + {"invalid start char", "_comgooglestoragecsigcepd", false}, + {"invalid end char", "comgooglestoragecsigcepd/", false}, + {"invalid separators", "com/google/storage/csi~gcepd", false}, + } + + for _, tc := range testCases { + t.Logf("test case: %v", tc.name) + drValid := isDriverNameValid(tc.driverName) + if tc.valid != drValid { + t.Errorf("expecting driverName %s as valid=%t, but got valid=%t", tc.driverName, tc.valid, drValid) + } + } +} + +func TestPluginNewAttacher(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + + csiAttacher := attacher.(*csiAttacher) + if csiAttacher.plugin == nil { + t.Error("plugin not set for attacher") + } + if csiAttacher.k8s == nil { + t.Error("Kubernetes client not set for attacher") + } +} + +func TestPluginNewDetacher(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + detacher, err := plug.NewDetacher() + if err != nil { + t.Fatalf("failed to create new detacher: %v", err) + } + + csiDetacher := detacher.(*csiAttacher) + if csiDetacher.plugin == nil { + t.Error("plugin not set for detacher") + } + if csiDetacher.k8s == nil { + t.Error("Kubernetes client not set for attacher") + } +} diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go new file mode 100644 index 00000000000..0d790cc9e58 --- /dev/null +++ b/pkg/volume/csi/fake/fake_client.go @@ -0,0 +1,224 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "context" + "errors" + "strings" + + "google.golang.org/grpc" + + csipb "github.com/container-storage-interface/spec/lib/go/csi" + grpctx "golang.org/x/net/context" +) + +// IdentityClient is a CSI identity client used for testing +type IdentityClient struct { + nextErr error +} + +// NewIdentityClient returns a new IdentityClient +func NewIdentityClient() *IdentityClient { + return &IdentityClient{} +} + +// SetNextError injects expected error +func (f *IdentityClient) SetNextError(err error) { + f.nextErr = err +} + +// GetSupportedVersions returns supported version +func (f *IdentityClient) GetSupportedVersions(ctx grpctx.Context, req *csipb.GetSupportedVersionsRequest, opts ...grpc.CallOption) (*csipb.GetSupportedVersionsResponse, error) { + // short circuit with an error + if f.nextErr != nil { + return nil, f.nextErr + } + + rsp := &csipb.GetSupportedVersionsResponse{ + SupportedVersions: []*csipb.Version{ + {Major: 0, Minor: 0, Patch: 1}, + {Major: 0, Minor: 1, Patch: 0}, + {Major: 1, Minor: 0, Patch: 0}, + {Major: 1, Minor: 0, Patch: 1}, + {Major: 1, Minor: 1, Patch: 1}, + }, + } + return rsp, nil +} + +// GetPluginInfo returns plugin info +func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) { + return nil, nil +} + +// NodeClient returns CSI node client +type NodeClient struct { + nodePublishedVolumes map[string]string + nextErr error +} + +// NewNodeClient returns fake node client +func NewNodeClient() *NodeClient { + return &NodeClient{nodePublishedVolumes: make(map[string]string)} +} + +// SetNextError injects next expected error +func (f *NodeClient) SetNextError(err error) { + f.nextErr = err +} + +// GetNodePublishedVolumes returns node published volumes +func (f *NodeClient) GetNodePublishedVolumes() map[string]string { + return f.nodePublishedVolumes +} + +// NodePublishVolume implements CSI NodePublishVolume +func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) { + + if f.nextErr != nil { + return nil, f.nextErr + } + + if req.GetVolumeId() == "" { + return nil, errors.New("missing volume id") + } + if req.GetTargetPath() == "" { + return nil, errors.New("missing target path") + } + fsTypes := "ext4|xfs|zfs" + fsType := req.GetVolumeCapability().GetMount().GetFsType() + if !strings.Contains(fsTypes, fsType) { + return nil, errors.New("invlid fstype") + } + f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath() + return &csipb.NodePublishVolumeResponse{}, nil +} + +// NodeUnpublishVolume implements csi method +func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) { + if f.nextErr != nil { + return nil, f.nextErr + } + + if req.GetVolumeId() == "" { + return nil, errors.New("missing volume id") + } + if req.GetTargetPath() == "" { + return nil, errors.New("missing target path") + } + delete(f.nodePublishedVolumes, req.GetVolumeId()) + return &csipb.NodeUnpublishVolumeResponse{}, nil +} + +// GetNodeID implements method +func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest, opts ...grpc.CallOption) (*csipb.GetNodeIDResponse, error) { + return nil, nil +} + +// NodeProbe implements csi method +func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) { + return nil, nil +} + +// NodeGetCapabilities implements csi method +func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) { + return nil, nil +} + +// ControllerClient represents a CSI Controller client +type ControllerClient struct { + nextCapabilities []*csipb.ControllerServiceCapability + nextErr error +} + +// NewControllerClient returns a ControllerClient +func NewControllerClient() *ControllerClient { + return &ControllerClient{} +} + +// SetNextError injects next expected error +func (f *ControllerClient) SetNextError(err error) { + f.nextErr = err +} + +// SetNextCapabilities injects next expected capabilities +func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) { + f.nextCapabilities = caps +} + +// ControllerGetCapabilities implements csi method +func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) { + if f.nextErr != nil { + return nil, f.nextErr + } + + if f.nextCapabilities == nil { + f.nextCapabilities = []*csipb.ControllerServiceCapability{ + { + Type: &csipb.ControllerServiceCapability_Rpc{ + Rpc: &csipb.ControllerServiceCapability_RPC{ + Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + }, + }, + }, + } + } + return &csipb.ControllerGetCapabilitiesResponse{ + Capabilities: f.nextCapabilities, + }, nil +} + +// CreateVolume implements csi method +func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) { + return nil, nil +} + +// DeleteVolume implements csi method +func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) { + return nil, nil +} + +// ControllerPublishVolume implements csi method +func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) { + return nil, nil +} + +// ControllerUnpublishVolume implements csi method +func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) { + return nil, nil +} + +// ValidateVolumeCapabilities implements csi method +func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) { + return nil, nil +} + +// ListVolumes implements csi method +func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) { + return nil, nil +} + +// GetCapacity implements csi method +func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) { + return nil, nil +} + +// ControllerProbe implements csi method +func (f *ControllerClient) ControllerProbe(ctx context.Context, in *csipb.ControllerProbeRequest, opts ...grpc.CallOption) (*csipb.ControllerProbeResponse, error) { + return nil, nil +}