CSI - API source code implementation

This commit tracks source code update for the CSI volume plugin implementation.
This commit is contained in:
Vladimir Vivien 2017-10-24 22:44:48 -04:00
parent a69f9dad90
commit dd08d1b489
12 changed files with 2049 additions and 0 deletions

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/azure_file"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/fc"
"k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/flocker"
@ -58,6 +59,9 @@ import (
"k8s.io/kubernetes/pkg/volume/storageos"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// ProbeAttachableVolumePlugins collects all volume plugins for the attach/
@ -79,6 +83,9 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}
@ -105,6 +112,9 @@ func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) []
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/volume/cephfs"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/configmap"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/downwardapi"
"k8s.io/kubernetes/pkg/volume/empty_dir"
"k8s.io/kubernetes/pkg/volume/fc"
@ -58,6 +59,9 @@ import (
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
// Cloud providers
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
// features check
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// ProbeVolumePlugins collects all volume plugins into an easy to use list.
@ -96,6 +100,9 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}

4
pkg/volume/csi/OWNERS Normal file
View File

@ -0,0 +1,4 @@
approvers:
- jsafrane
- saad-ali
- vladimirvivien

View File

@ -0,0 +1,268 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"crypto/sha256"
"errors"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
)
type csiAttacher struct {
plugin *csiPlugin
k8s kubernetes.Interface
waitSleepTime time.Duration
}
// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
if spec == nil {
glog.Error(log("attacher.Attach missing volume.Spec"))
return "", errors.New("missing spec")
}
csiSource, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err))
return "", errors.New("missing CSI persistent volume")
}
pvName := spec.PersistentVolume.GetName()
attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: string(nodeName),
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{Attached: false},
}
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
alreadyExist := false
if err != nil {
if !apierrs.IsAlreadyExists(err) {
glog.Error(log("attacher.Attach failed: %v", err))
return "", err
}
alreadyExist = true
}
if alreadyExist {
glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle))
} else {
glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle))
}
// probe for attachment update here
// NOTE: any error from waiting for attachment is logged only. This is because
// the primariy intent of the enclosing method is to create VolumeAttachment.
// DONOT return that error here as it is mitigated in attacher.WaitForAttach.
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err))
}
return attachID, nil
}
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.Pod, timeout time.Duration) (string, error) {
source, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
return "", err
}
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
}
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err))
continue
}
// if being deleted, fail fast
if attach.GetDeletionTimestamp() != nil {
glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment is being deleted")
}
// attachment OK
if attach.Status.Attached {
return attachID, nil
}
// driver reports attach error
attachErr := attach.Status.AttachError
if attachErr != nil {
glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return "", errors.New(attachErr.Message)
}
case <-timer.C:
glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
}
}
}
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs)))
attached := make(map[*volume.Spec]bool)
for _, spec := range specs {
if spec == nil {
glog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
return nil, errors.New("missing spec")
}
source, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.VolumesAreAttached failed: %v", err))
continue
}
attachID := getAttachmentName(source.VolumeHandle, string(nodeName))
glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
continue
}
attached[spec] = attach.Status.Attached
}
return attached, nil
}
func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
glog.V(4).Info(log("attacher.GetDeviceMountPath is not implemented"))
return "", nil
}
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
glog.V(4).Info(log("attacher.MountDevice is not implemented"))
return nil
}
var _ volume.Detacher = &csiAttacher{}
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
if volumeName == "" {
glog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing exepected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
glog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}
volID := parts[1]
attachID := getAttachmentName(volID, string(nodeName))
err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil)
if err != nil {
glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
return err
}
glog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
return c.waitForVolumeDetachment(volID, attachID)
}
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timeout := c.waitSleepTime * 10
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
if apierrs.IsNotFound(err) {
//object deleted or never existed, done
glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
return nil
}
glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
continue
}
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
}
case <-timer.C:
glog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("detachment timed out for volume %v", volumeHandle)
}
}
}
func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
glog.V(4).Info(log("detacher.UnmountDevice is not implemented"))
return nil
}
func hashAttachmentName(volName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName)))
return fmt.Sprintf("%x", result)
}
func getAttachmentName(volName, nodeName string) string {
// TODO consider using a different prefix for attachment
return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName))
}

View File

@ -0,0 +1,277 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"crypto/sha256"
"fmt"
"os"
"testing"
"time"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
)
func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment {
return &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: nodeName,
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
}
func TestAttacherAttach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
testCases := []struct {
name string
pv *v1.PersistentVolume
nodeName string
attachHash [32]byte
shouldFail bool
}{
{
name: "test ok 1",
pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"),
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))),
},
{
name: "test ok 2",
pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"),
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))),
},
{
name: "missing spec",
pv: nil,
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))),
shouldFail: true,
},
}
for _, tc := range testCases {
var spec *volume.Spec
if tc.pv != nil {
spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
}
attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName))
if tc.shouldFail && err == nil {
t.Error("expected failure, but got nil err")
}
if attachID != "" {
expectedID := fmt.Sprintf("pv-%x", tc.attachHash)
if attachID != expectedID {
t.Errorf("expecting attachID %v, got %v", expectedID, attachID)
}
}
}
}
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
attached bool
attachErr *storage.VolumeError
sleepTime time.Duration
timeout time.Duration
shouldFail bool
}{
{name: "attach ok", attached: true, sleepTime: 10 * time.Millisecond, timeout: 50 * time.Millisecond},
{name: "attachment error", attachErr: &storage.VolumeError{Message: "missing volume"}, sleepTime: 10 * time.Millisecond, timeout: 30 * time.Millisecond},
{name: "time ran out", attached: false, sleepTime: 5 * time.Millisecond},
}
for i, tc := range testCases {
t.Logf("running test: %v", tc.name)
pvName := fmt.Sprintf("test-pv-%d", i)
attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = tc.attached
attachment.Status.AttachError = tc.attachErr
csiAttacher.waitSleepTime = tc.sleepTime
go func() {
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}()
retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout)
if tc.shouldFail && err == nil {
t.Error("expecting failure, but err is nil")
}
if tc.attachErr != nil {
if tc.attachErr.Message != err.Error() {
t.Errorf("expecting error [%v], got [%v]", tc.attachErr.Message, err.Error())
}
}
if err == nil && retID != attachID {
t.Errorf("attacher.WaitForAttach not returning attachment ID")
}
}
}
func TestAttacherVolumesAreAttached(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
attachedStats map[string]bool
}{
{"attach + detach", map[string]bool{"vol-01": true, "vol-02": true, "vol-03": false, "vol-04": false, "vol-05": true}},
{"all detached", map[string]bool{"vol-11": false, "vol-12": false, "vol-13": false, "vol-14": false, "vol-15": false}},
{"all attached", map[string]bool{"vol-21": true, "vol-22": true, "vol-23": true, "vol-24": true, "vol-25": true}},
}
for _, tc := range testCases {
var specs []*volume.Spec
// create and save volume attchments
for volName, stat := range tc.attachedStats {
pv := makeTestPV("test-pv", 10, testDriver, volName)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
specs = append(specs, spec)
attachID := getAttachmentName(volName, nodeName)
attachment := makeTestAttachment(attachID, nodeName, pv.GetName())
attachment.Status.Attached = stat
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}
// retrieve attached status
stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName))
if err != nil {
t.Fatal(err)
}
if len(tc.attachedStats) != len(stats) {
t.Errorf("expecting %d attachment status, got %d", len(tc.attachedStats), len(stats))
}
// compare attachment status for each spec
for spec, stat := range stats {
source, err := getCSISourceFromSpec(spec)
if err != nil {
t.Error(err)
}
if stat != tc.attachedStats[source.VolumeHandle] {
t.Errorf("expecting volume attachment %t, got %t", tc.attachedStats[source.VolumeHandle], stat)
}
}
}
}
func TestAttacherDetach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
volID string
attachID string
shouldFail bool
}{
{name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))},
{name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))},
{name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true},
}
for _, tc := range testCases {
pv := makeTestPV("test-pv", 10, testDriver, tc.volID)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
attachment := makeTestAttachment(tc.attachID, nodeName, "test-pv")
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
volumeName, err := plug.GetVolumeName(spec)
if err != nil {
t.Errorf("test case %s failed: %v", tc.name, err)
}
err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))
if tc.shouldFail && err == nil {
t.Fatal("expecting failure, but err = nil")
}
if !tc.shouldFail && err != nil {
t.Fatalf("unexpected err: %v", err)
}
attach, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{})
if err != nil {
if !apierrs.IsNotFound(err) {
t.Fatalf("unexpected err: %v", err)
}
} else {
if attach == nil {
t.Errorf("expecting attachment not to be nil, but it is")
}
}
}
}

View File

@ -0,0 +1,233 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"bytes"
"errors"
"fmt"
"net"
"time"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
grpctx "golang.org/x/net/context"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
)
type csiClient interface {
AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error
NodePublishVolume(
ctx grpctx.Context,
volumeid string,
readOnly bool,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
fsType string,
) error
NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error
}
// csiClient encapsulates all csi-plugin methods
type csiDriverClient struct {
network string
addr string
conn *grpc.ClientConn
idClient csipb.IdentityClient
nodeClient csipb.NodeClient
ctrlClient csipb.ControllerClient
versionAsserted bool
versionSupported bool
publishAsserted bool
publishCapable bool
}
func newCsiDriverClient(network, addr string) *csiDriverClient {
return &csiDriverClient{network: network, addr: addr}
}
// assertConnection ensures a valid connection has been established
// if not, it creates a new connection and associated clients
func (c *csiDriverClient) assertConnection() error {
if c.conn == nil {
conn, err := grpc.Dial(
c.addr,
grpc.WithInsecure(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.Dial(c.network, target)
}),
)
if err != nil {
return err
}
c.conn = conn
c.idClient = csipb.NewIdentityClient(conn)
c.nodeClient = csipb.NewNodeClient(conn)
c.ctrlClient = csipb.NewControllerClient(conn)
// set supported version
}
return nil
}
// AssertSupportedVersion ensures driver supports specified spec version.
// If version is not supported, the assertion fails with an error.
// This test should be done early during the storage operation flow to avoid
// unnecessary calls later.
func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error {
if c.versionAsserted {
if !c.versionSupported {
return fmt.Errorf("version %s not supported", verToStr(ver))
}
return nil
}
if err := c.assertConnection(); err != nil {
c.versionAsserted = false
return err
}
glog.V(4).Info(log("asserting version supported by driver"))
rsp, err := c.idClient.GetSupportedVersions(ctx, &csipb.GetSupportedVersionsRequest{})
if err != nil {
c.versionAsserted = false
return err
}
supported := false
vers := rsp.GetSupportedVersions()
glog.V(4).Info(log("driver reports %d versions supported: %s", len(vers), versToStr(vers)))
for _, v := range vers {
//TODO (vladimirvivien) use more lenient/heuristic for exact or match of ranges etc
if verToStr(v) == verToStr(ver) {
supported = true
break
}
}
c.versionAsserted = true
c.versionSupported = supported
if !supported {
return fmt.Errorf("version %s not supported", verToStr(ver))
}
glog.V(4).Info(log("version %s supported", verToStr(ver)))
return nil
}
func (c *csiDriverClient) NodePublishVolume(
ctx grpctx.Context,
volID string,
readOnly bool,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
fsType string,
) error {
if volID == "" {
return errors.New("missing volume id")
}
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)
return err
}
req := &csipb.NodePublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishVolumeInfo: volumeInfo,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
},
AccessType: &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
FsType: fsType,
},
},
},
}
_, err := c.nodeClient.NodePublishVolume(ctx, req)
return err
}
func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error {
if volID == "" {
return errors.New("missing volume id")
}
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Error(log("failed to assert a connection: %v", err))
return err
}
req := &csipb.NodeUnpublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
}
_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
return err
}
func asCSIAccessMode(am api.PersistentVolumeAccessMode) csipb.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipb.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER
case api.ReadWriteMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
return csipb.VolumeCapability_AccessMode_UNKNOWN
}
func verToStr(ver *csipb.Version) string {
if ver == nil {
return ""
}
return fmt.Sprintf("%d.%d.%d", ver.GetMajor(), ver.GetMinor(), ver.GetPatch())
}
func versToStr(vers []*csipb.Version) string {
if vers == nil {
return ""
}
str := bytes.NewBufferString("[")
for _, v := range vers {
str.WriteString(fmt.Sprintf("{%s};", verToStr(v)))
}
str.WriteString("]")
return str.String()
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"testing"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
grpctx "golang.org/x/net/context"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
func setupClient(t *testing.T) *csiDriverClient {
client := newCsiDriverClient("unix", "/tmp/test.sock")
client.conn = new(grpc.ClientConn) //avoids creating conn object
// setup mock grpc clients
client.idClient = fake.NewIdentityClient()
client.nodeClient = fake.NewNodeClient()
client.ctrlClient = fake.NewControllerClient()
return client
}
func TestClientAssertSupportedVersion(t *testing.T) {
testCases := []struct {
testName string
ver *csipb.Version
mustFail bool
err error
}{
{testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}},
{testName: "unsupported version", ver: &csipb.Version{Major: 0, Minor: 0, Patch: 0}, mustFail: true},
{testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
t.Log("case: ", tc.testName)
client := setupClient(t)
client.idClient.(*fake.IdentityClient).SetNextError(tc.err)
err := client.AssertSupportedVersion(grpctx.Background(), tc.ver)
if tc.mustFail && err == nil {
t.Error("must fail, but err = nil")
}
}
}
func TestClientNodePublishVolume(t *testing.T) {
testCases := []struct {
name string
volID string
targetPath string
fsType string
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
}
client := setupClient(t)
for _, tc := range testCases {
t.Log("case: ", tc.name)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodePublishVolume(
grpctx.Background(),
tc.volID,
false,
tc.targetPath,
api.ReadWriteOnce,
map[string]string{"device": "/dev/null"},
tc.fsType,
)
if tc.mustFail && err == nil {
t.Error("must fail, but err is nil: ", err)
}
}
}
func TestClientNodeUnpublishVolume(t *testing.T) {
testCases := []struct {
name string
volID string
targetPath string
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
}
client := setupClient(t)
for _, tc := range testCases {
t.Log("case: ", tc.name)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodeUnpublishVolume(grpctx.Background(), tc.volID, tc.targetPath)
if tc.mustFail && err == nil {
t.Error("must fail, but err is nil: ", err)
}
}
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"fmt"
"path"
"github.com/golang/glog"
grpctx "golang.org/x/net/context"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
type csiMountMgr struct {
k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin
driverName string
volumeID string
readOnly bool
spec *volume.Spec
pod *api.Pod
podUID types.UID
options volume.VolumeOptions
volumeInfo map[string]string
volume.MetricsNil
}
// volume.Volume methods
var _ volume.Volume = &csiMountMgr{}
func (c *csiMountMgr) GetPath() string {
return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host)
}
func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string {
// driverName validated at Mounter creation
// sanitize (replace / with ~) in volumeID before it's appended to path:w
driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID))
return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath)
}
// volume.Mounter methods
var _ volume.Mounter = &csiMountMgr{}
func (c *csiMountMgr) CanMount() error {
//TODO (vladimirvivien) use this method to probe controller using CSI.NodeProbe() call
// to ensure Node service is ready in the CSI plugin
return nil
}
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
defer cancel()
csi := c.csiClient
pvName := c.spec.PersistentVolume.GetName()
// ensure version is supported
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
glog.Errorf(log("failed to assert version: %v", err))
return err
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.volumeInfo == nil {
//TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve
//the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable)
attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{})
if err != nil {
glog.Error(log("failed to get volume attachments: %v", err))
return err
}
var attachment *storage.VolumeAttachment
for _, attach := range attachList.Items {
if attach.Spec.Source.PersistentVolumeName != nil &&
*attach.Spec.Source.PersistentVolumeName == pvName {
attachment = &attach
break
}
}
if attachment == nil {
glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName))
return errors.New("no existing VolumeAttachment found")
}
c.volumeInfo = attachment.Status.AttachmentMetadata
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := api.ReadWriteOnce
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
err := csi.NodePublishVolume(
ctx,
c.volumeID,
c.readOnly,
dir,
accessMode,
c.volumeInfo,
"ext4", //TODO needs to be sourced from PV or somewhere else
)
if err != nil {
glog.Errorf(log("Mounter.Setup failed: %v", err))
return err
}
glog.V(4).Infof(log("successfully mounted %s", dir))
return nil
}
func (c *csiMountMgr) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: c.readOnly,
Managed: !c.readOnly,
SupportsSELinux: false,
}
}
// volume.Unmounter methods
var _ volume.Unmounter = &csiMountMgr{}
func (c *csiMountMgr) TearDown() error {
return c.TearDownAt(c.GetPath())
}
func (c *csiMountMgr) TearDownAt(dir string) error {
glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
// extract driverName and volID from path
base, volID := path.Split(dir)
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
driverName := path.Base(base)
if c.csiClient == nil {
addr := fmt.Sprintf(csiAddrTemplate, driverName)
client := newCsiDriverClient("unix", addr)
glog.V(4).Infof(log("unmounter csiClient setup [volume=%v,driver=%v]", volID, driverName))
c.csiClient = client
}
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
defer cancel()
csi := c.csiClient
// TODO make all assertion calls private within the client itself
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
glog.Errorf(log("failed to assert version: %v", err))
return err
}
err := csi.NodeUnpublishVolume(ctx, volID, dir)
if err != nil {
glog.Errorf(log("Mounter.Setup failed: %v", err))
return err
}
glog.V(4).Infof(log("successfully unmounted %s", dir))
return nil
}

View File

@ -0,0 +1,152 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"fmt"
"os"
"path"
"testing"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
var (
testDriver = "test-driver"
testVol = "vol-123"
testns = "test-ns"
testPodUID = types.UID("test-pod")
)
func TestMounterGetPath(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
csiMounter := mounter.(*csiMountMgr)
expectedPath := path.Join(tmpDir, fmt.Sprintf(
"pods/%s/volumes/kubernetes.io~csi/%s/%s",
testPodUID,
csiMounter.driverName,
csiMounter.volumeID,
))
mountPath := csiMounter.GetPath()
if mountPath != expectedPath {
t.Errorf("Got unexpected path: %s", mountPath)
}
}
func TestMounterSetUp(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
pvName := pv.GetName()
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t)
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: "pv-1234556775313",
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
_, err = csiMounter.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
if err := csiMounter.SetUp(nil); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes()
if pubs[csiMounter.volumeID] != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
}
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID)
if err != nil {
t.Fatalf("failed to make a new Unmounter: %v", err)
}
csiUnmounter := unmounter.(*csiMountMgr)
csiUnmounter.csiClient = setupClient(t)
dir := csiUnmounter.GetPath()
err = csiUnmounter.TearDownAt(dir)
if err != nil {
t.Fatal(err)
}
// ensure csi client call
pubs := csiUnmounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes()
if _, ok := pubs[csiUnmounter.volumeID]; ok {
t.Error("csi server may not have received NodeUnpublishVolume call")
}
}

View File

@ -0,0 +1,257 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"fmt"
"path"
"regexp"
"time"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
const (
csiName = "csi"
csiPluginName = "kubernetes.io/csi"
// TODO (vladimirvivien) implement a more dynamic way to discover
// the unix domain socket path for each installed csi driver.
// TODO (vladimirvivien) would be nice to name socket with a .sock extension
// for consistency.
csiAddrTemplate = "/var/lib/kubelet/plugins/%v"
csiTimeout = 15 * time.Second
volNameSep = "^"
)
var (
// csiVersion supported csi version
csiVersion = &csipb.Version{Major: 0, Minor: 1, Patch: 0}
driverNameRexp = regexp.MustCompile(`^[A-Za-z]+(\.?-?_?[A-Za-z0-9-])+$`)
)
type csiPlugin struct {
host volume.VolumeHost
}
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &csiPlugin{
host: nil,
}
return []volume.VolumePlugin{p}
}
// volume.VolumePlugin methods
var _ volume.VolumePlugin = &csiPlugin{}
func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host
return nil
}
func (p *csiPlugin) GetPluginName() string {
return csiPluginName
}
// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
// That string value is used in Detach() to extract driver name and volumeName.
func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
csi, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
return "", err
}
//TODO (vladimirvivien) this validation should be done at the API validation check
if !isDriverNameValid(csi.Driver) {
glog.Error(log("plugin.GetVolumeName failed to create volume name: invalid csi driver name %s", csi.Driver))
return "", errors.New("invalid csi driver name")
}
// return driverName<separator>volumeHandle
return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
}
func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
// TODO (vladimirvivien) CanSupport should also take into account
// the availability/registration of specified Driver in the volume source
return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
}
func (p *csiPlugin) RequiresRemount() bool {
return false
}
func (p *csiPlugin) NewMounter(
spec *volume.Spec,
pod *api.Pod,
_ volume.VolumeOptions) (volume.Mounter, error) {
pvSource, err := getCSISourceFromSpec(spec)
if err != nil {
return nil, err
}
// TODO (vladimirvivien) consider moving this check in API validation
// check Driver name to conform to CSI spec
if !isDriverNameValid(pvSource.Driver) {
glog.Error(log("driver name does not conform to CSI spec: %s", pvSource.Driver))
return nil, errors.New("driver name is invalid")
}
// before it is used in any paths such as socket etc
addr := fmt.Sprintf(csiAddrTemplate, pvSource.Driver)
glog.V(4).Infof(log("setting up mounter for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
client := newCsiDriverClient("unix", addr)
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("failed to get a kubernetes client"))
return nil, errors.New("failed to get a Kubernetes client")
}
mounter := &csiMountMgr{
plugin: p,
k8s: k8s,
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: pvSource.Driver,
volumeID: pvSource.VolumeHandle,
csiClient: client,
}
return mounter, nil
}
func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
unmounter := &csiMountMgr{
plugin: p,
podUID: podUID,
}
return unmounter, nil
}
func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath))
// extract driverName/volumeId from end of mountPath
dir, volID := path.Split(mountPath)
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
driverName := path.Base(dir)
// TODO (vladimirvivien) consider moving this check in API validation
if !isDriverNameValid(driverName) {
glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName))
return nil, errors.New("invalid csi driver name from path")
}
glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName))
pv := &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: volumeName,
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volID,
},
},
},
}
return volume.NewSpecFromPersistentVolume(pv, false), nil
}
func (p *csiPlugin) SupportsMountOption() bool {
// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
// to probe for the result for this method:w
return false
}
func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
return false
}
// volume.AttachableVolumePlugin methods
var _ volume.AttachableVolumePlugin = &csiPlugin{}
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
m := p.host.GetMounter(p.GetPluginName())
return mount.GetMountRefs(m, deviceMountPath)
}
func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.CSI != nil {
return spec.PersistentVolume.Spec.CSI, nil
}
return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
}
// log prepends log string with `kubernetes.io/csi`
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
}
// isDriverNameValid validates the driverName using CSI spec
func isDriverNameValid(name string) bool {
if len(name) == 0 || len(name) > 63 {
return false
}
return driverNameRexp.MatchString(name)
}

View File

@ -0,0 +1,297 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"fmt"
"os"
"testing"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
fakeclient "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
// create a plugin mgr to load plugins and setup a fake client
func newTestPlugin(t *testing.T) (*csiPlugin, string) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHost(
tmpDir,
fakeClient,
nil,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csiPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
}
csiPlug, ok := plug.(*csiPlugin)
if !ok {
t.Fatalf("cannot assert plugin to be type csiPlugin")
}
return csiPlug, tmpDir
}
func makeTestPV(name string, sizeGig int, driverName, volID string) *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: testns,
},
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(
fmt.Sprintf("%dGi", sizeGig),
),
},
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volID,
ReadOnly: false,
},
},
},
}
}
func TestPluginGetPluginName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
if plug.GetPluginName() != "kubernetes.io/csi" {
t.Errorf("unexpected plugin name %v", plug.GetPluginName())
}
}
func TestPluginGetVolumeName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
driverName string
volName string
shouldFail bool
}{
{"alphanum names", "testdr", "testvol", false},
{"mixchar driver", "test.dr.cc", "testvol", false},
{"mixchar volume", "testdr", "test-vol-name", false},
{"mixchars all", "test-driver", "test.vol.name", false},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName)
spec := volume.NewSpecFromPersistentVolume(pv, false)
name, err := plug.GetVolumeName(spec)
if tc.shouldFail && err == nil {
t.Fatal("GetVolumeName should fail, but got err=nil")
}
if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) {
t.Errorf("unexpected volume name %s", name)
}
}
}
func TestPluginCanSupport(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, false)
if !plug.CanSupport(spec) {
t.Errorf("should support CSI spec")
}
}
func TestPluginConstructVolumeSpec(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
driverName string
volID string
shouldFail bool
}{
{"valid driver and vol", "test.csi-driver", "abc-cde", false},
{"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false},
{"invalid driver name", "_test.csi.driver>", "a/b/c/d", true},
}
for _, tc := range testCases {
dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host)
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", dir)
if tc.shouldFail {
if err == nil {
t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error")
}
continue
}
volID := spec.PersistentVolume.Spec.CSI.VolumeHandle
unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID)
if volID != unsanitizedVolID {
t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID)
}
if spec.Name() != "test-pv" {
t.Errorf("Unexpected spec name %s", spec.Name())
}
}
}
func TestPluginNewMounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
// validate mounter fields
if csiMounter.driverName != testDriver {
t.Error("mounter driver name not set")
}
if csiMounter.volumeID != testVol {
t.Error("mounter volume id not set")
}
if csiMounter.pod == nil {
t.Error("mounter pod not set")
}
if csiMounter.podUID == types.UID("") {
t.Error("mounter podUID mot set")
}
}
func TestPluginNewUnmounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID)
csiUnmounter := unmounter.(*csiMountMgr)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if csiUnmounter == nil {
t.Fatal("failed to create CSI mounter")
}
if csiUnmounter.podUID != testPodUID {
t.Error("podUID not set")
}
}
func TestValidateDriverName(t *testing.T) {
testCases := []struct {
name string
driverName string
valid bool
}{
{"ok no punctuations", "comgooglestoragecsigcepd", true},
{"ok dot only", "io.kubernetes.storage.csi.flex", true},
{"ok dash only", "io-kubernetes-storage-csi-flex", true},
{"ok underscore only", "io_kubernetes_storage_csi_flex", true},
{"ok dot underscores", "io.kubernetes.storage_csi.flex", true},
{"ok dot dash underscores", "io.kubernetes-storage.csi_flex", true},
{"invalid length 0", "", false},
{"invalid length > 63", "comgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepd", false},
{"invalid start char", "_comgooglestoragecsigcepd", false},
{"invalid end char", "comgooglestoragecsigcepd/", false},
{"invalid separators", "com/google/storage/csi~gcepd", false},
}
for _, tc := range testCases {
t.Logf("test case: %v", tc.name)
drValid := isDriverNameValid(tc.driverName)
if tc.valid != drValid {
t.Errorf("expecting driverName %s as valid=%t, but got valid=%t", tc.driverName, tc.valid, drValid)
}
}
}
func TestPluginNewAttacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
if csiAttacher.plugin == nil {
t.Error("plugin not set for attacher")
}
if csiAttacher.k8s == nil {
t.Error("Kubernetes client not set for attacher")
}
}
func TestPluginNewDetacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
detacher, err := plug.NewDetacher()
if err != nil {
t.Fatalf("failed to create new detacher: %v", err)
}
csiDetacher := detacher.(*csiAttacher)
if csiDetacher.plugin == nil {
t.Error("plugin not set for detacher")
}
if csiDetacher.k8s == nil {
t.Error("Kubernetes client not set for attacher")
}
}

View File

@ -0,0 +1,224 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"context"
"errors"
"strings"
"google.golang.org/grpc"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
grpctx "golang.org/x/net/context"
)
// IdentityClient is a CSI identity client used for testing
type IdentityClient struct {
nextErr error
}
// NewIdentityClient returns a new IdentityClient
func NewIdentityClient() *IdentityClient {
return &IdentityClient{}
}
// SetNextError injects expected error
func (f *IdentityClient) SetNextError(err error) {
f.nextErr = err
}
// GetSupportedVersions returns supported version
func (f *IdentityClient) GetSupportedVersions(ctx grpctx.Context, req *csipb.GetSupportedVersionsRequest, opts ...grpc.CallOption) (*csipb.GetSupportedVersionsResponse, error) {
// short circuit with an error
if f.nextErr != nil {
return nil, f.nextErr
}
rsp := &csipb.GetSupportedVersionsResponse{
SupportedVersions: []*csipb.Version{
{Major: 0, Minor: 0, Patch: 1},
{Major: 0, Minor: 1, Patch: 0},
{Major: 1, Minor: 0, Patch: 0},
{Major: 1, Minor: 0, Patch: 1},
{Major: 1, Minor: 1, Patch: 1},
},
}
return rsp, nil
}
// GetPluginInfo returns plugin info
func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) {
return nil, nil
}
// NodeClient returns CSI node client
type NodeClient struct {
nodePublishedVolumes map[string]string
nextErr error
}
// NewNodeClient returns fake node client
func NewNodeClient() *NodeClient {
return &NodeClient{nodePublishedVolumes: make(map[string]string)}
}
// SetNextError injects next expected error
func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
}
// GetNodePublishedVolumes returns node published volumes
func (f *NodeClient) GetNodePublishedVolumes() map[string]string {
return f.nodePublishedVolumes
}
// NodePublishVolume implements CSI NodePublishVolume
func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetTargetPath() == "" {
return nil, errors.New("missing target path")
}
fsTypes := "ext4|xfs|zfs"
fsType := req.GetVolumeCapability().GetMount().GetFsType()
if !strings.Contains(fsTypes, fsType) {
return nil, errors.New("invlid fstype")
}
f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath()
return &csipb.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume implements csi method
func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetTargetPath() == "" {
return nil, errors.New("missing target path")
}
delete(f.nodePublishedVolumes, req.GetVolumeId())
return &csipb.NodeUnpublishVolumeResponse{}, nil
}
// GetNodeID implements method
func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest, opts ...grpc.CallOption) (*csipb.GetNodeIDResponse, error) {
return nil, nil
}
// NodeProbe implements csi method
func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) {
return nil, nil
}
// NodeGetCapabilities implements csi method
func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
return nil, nil
}
// ControllerClient represents a CSI Controller client
type ControllerClient struct {
nextCapabilities []*csipb.ControllerServiceCapability
nextErr error
}
// NewControllerClient returns a ControllerClient
func NewControllerClient() *ControllerClient {
return &ControllerClient{}
}
// SetNextError injects next expected error
func (f *ControllerClient) SetNextError(err error) {
f.nextErr = err
}
// SetNextCapabilities injects next expected capabilities
func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) {
f.nextCapabilities = caps
}
// ControllerGetCapabilities implements csi method
func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if f.nextCapabilities == nil {
f.nextCapabilities = []*csipb.ControllerServiceCapability{
{
Type: &csipb.ControllerServiceCapability_Rpc{
Rpc: &csipb.ControllerServiceCapability_RPC{
Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
},
},
},
}
}
return &csipb.ControllerGetCapabilitiesResponse{
Capabilities: f.nextCapabilities,
}, nil
}
// CreateVolume implements csi method
func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) {
return nil, nil
}
// DeleteVolume implements csi method
func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) {
return nil, nil
}
// ControllerPublishVolume implements csi method
func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) {
return nil, nil
}
// ControllerUnpublishVolume implements csi method
func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) {
return nil, nil
}
// ValidateVolumeCapabilities implements csi method
func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) {
return nil, nil
}
// ListVolumes implements csi method
func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) {
return nil, nil
}
// GetCapacity implements csi method
func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
return nil, nil
}
// ControllerProbe implements csi method
func (f *ControllerClient) ControllerProbe(ctx context.Context, in *csipb.ControllerProbeRequest, opts ...grpc.CallOption) (*csipb.ControllerProbeResponse, error) {
return nil, nil
}