Merge pull request #67945 from jsafrane/csi-workload-csidriver3-saad

Automatic merge from submit-queue (batch tested with PRs 68171, 67945, 68233). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

CSI pod information in NodePublish

**What this PR does / why we need it**:
This is implementation of https://github.com/kubernetes/community/pull/2439. It brings CSI closer to Flex volume features, CSI driver can (optionally) get name/namespace/UID/ServiceAccount of the pod that requires the CSI volume mounted. This allows CSI drivers to either do AAA before the volume is mounted or tailor content of  volume to the pod.

Work in progress:
  * contains #67803 to get `CSIDriver` API. Ignore the first commit.

Related to #64984 #66362 (fixes only part of these issues)

/sig storage

cc: @saad-ali @vladimirvivien @verult @msau42 @gnufied 

**Release note**:

```release-note
CSI NodePublish call can optionally contain information about the pod that requested the CSI volume.
```
This commit is contained in:
Kubernetes Submit Queue 2018-09-05 21:36:37 -07:00 committed by GitHub
commit 7b756e6836
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 275 additions and 100 deletions

View File

@ -386,10 +386,16 @@ const (
//
// Allow TTL controller to clean up Pods and Jobs after they finish.
TTLAfterFinished utilfeature.Feature = "TTLAfterFinished"
// owner: @jsafrane
// Kubernetes skips attaching CSI volumes that don't require attachment.
//
CSISkipAttach utilfeature.Feature = "CSISkipAttach"
// owner: @jsafrane
//
// Kubelet sends pod information in NodePublish CSI call when a CSI driver wants so.
CSIPodInfo utilfeature.Feature = "CSIPodInfo"
)
func init() {
@ -456,6 +462,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
ProcMountType: {Default: false, PreRelease: utilfeature.Alpha},
TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha},
CSISkipAttach: {Default: false, PreRelease: utilfeature.Alpha},
CSIPodInfo: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -47,6 +47,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
@ -60,6 +61,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",

View File

@ -747,12 +747,12 @@ func TestAttacherMountDevice(t *testing.T) {
t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged))
}
if tc.stageUnstageSet {
gotPath, ok := staged[tc.volName]
vol, ok := staged[tc.volName]
if !ok {
t.Errorf("could not find staged volume: %s", tc.volName)
}
if gotPath != tc.deviceMountPath {
t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, gotPath)
if vol.Path != tc.deviceMountPath {
t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path)
}
}
}
@ -836,7 +836,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
// Add the volume to NodeStagedVolumes
cdc := csiAttacher.csiClient.(*fakeCsiDriverClient)
cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath)
cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath, nil)
// Make JSON for this object
if tc.deviceMountPath != "" {

View File

@ -129,9 +129,13 @@ func TestBlockMapperSetupDevice(t *testing.T) {
}
vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
if vols[csiMapper.volumeID] != devicePath {
vol, ok := vols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != devicePath {
t.Errorf("csi server expected device path %s, got %s", devicePath, vol.Path)
}
}
func TestBlockMapperMapDevice(t *testing.T) {
@ -198,9 +202,13 @@ func TestBlockMapperMapDevice(t *testing.T) {
}
pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if pubs[csiMapper.volumeID] != podVolumeBlockFilePath {
vol, ok := pubs[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != podVolumeBlockFilePath {
t.Errorf("csi server expected path %s, got %s", podVolumeBlockFilePath, vol.Path)
}
}
func TestBlockMapperTearDownDevice(t *testing.T) {

View File

@ -18,6 +18,7 @@ package csi
import (
"context"
"errors"
"fmt"
"os"
"path"
@ -25,8 +26,11 @@ import (
"github.com/golang/glog"
api "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -47,6 +51,7 @@ var (
"nodeName",
"attachmentID",
}
currentPodInfoMountVersion = "v1"
)
type csiMountMgr struct {
@ -162,6 +167,22 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
if err != nil {
glog.Error(log("mouter.SetUpAt failed to assemble volume attributes: %v", err))
return err
}
if podAttrs != nil {
if attribs == nil {
attribs = podAttrs
} else {
for k, v := range podAttrs {
attribs[k] = v
}
}
}
fsType := csiSource.FSType
err = csi.NodePublishVolume(
ctx,
@ -216,6 +237,39 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return nil
}
func (c *csiMountMgr) podAttributes() (map[string]string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) {
return nil, nil
}
if c.plugin.csiDriverLister == nil {
return nil, errors.New("CSIDriver lister does not exist")
}
csiDriver, err := c.plugin.csiDriverLister.Get(c.driverName)
if err != nil {
if apierrs.IsNotFound(err) {
glog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))
return nil, nil
}
return nil, err
}
// if PodInfoOnMountVersion is not set or not v1 we do not set pod attributes
if csiDriver.Spec.PodInfoOnMountVersion == nil || *csiDriver.Spec.PodInfoOnMountVersion != currentPodInfoMountVersion {
glog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
return nil, nil
}
attrs := map[string]string{
"csi.storage.k8s.io/pod.name": c.pod.Name,
"csi.storage.k8s.io/pod.namespace": c.pod.Namespace,
"csi.storage.k8s.io/pod.uid": string(c.pod.UID),
"csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName,
}
glog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName))
return attrs, nil
}
func (c *csiMountMgr) GetAttributes() volume.Attributes {
mounter := c.plugin.host.GetMounter(c.plugin.GetPluginName())
path := c.GetPath()

View File

@ -25,14 +25,20 @@ import (
"path"
"testing"
"reflect"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -88,86 +94,171 @@ func TestMounterGetPath(t *testing.T) {
}
}
func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIPodInfo, podInfoEnabled)()
tests := []struct {
name string
driver string
attributes map[string]string
expectedAttributes map[string]string
}{
{
name: "no pod info",
driver: "no-info",
attributes: nil,
expectedAttributes: nil,
},
{
name: "no CSIDriver -> no pod info",
driver: "unknown-driver",
attributes: nil,
expectedAttributes: nil,
},
{
name: "CSIDriver with PodInfoRequiredOnMount=nil -> no pod info",
driver: "nil",
attributes: nil,
expectedAttributes: nil,
},
{
name: "no pod info -> keep existing attributes",
driver: "no-info",
attributes: map[string]string{"foo": "bar"},
expectedAttributes: map[string]string{"foo": "bar"},
},
{
name: "add pod info",
driver: "info",
attributes: nil,
expectedAttributes: map[string]string{"csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"},
},
{
name: "add pod info -> keep existing attributes",
driver: "info",
attributes: map[string]string{"foo": "bar"},
expectedAttributes: map[string]string{"foo": "bar", "csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"},
},
}
emptyPodMountInfoVersion := ""
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
glog.Infof("Starting test %s", test.name)
fakeClient := fakeclient.NewSimpleClientset()
fakeCSIClient := fakecsi.NewSimpleClientset(
getCSIDriver("no-info", &emptyPodMountInfoVersion, nil),
getCSIDriver("info", &currentPodInfoMountVersion, nil),
getCSIDriver("nil", nil, nil),
)
plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient)
defer os.RemoveAll(tmpDir)
for {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
if plug.csiDriverInformer.Informer().HasSynced() {
break
}
}
pv := makeTestPV("test-pv", 10, test.driver, testVol)
pv.Spec.CSI.VolumeAttributes = test.attributes
pvName := pv.GetName()
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{
ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns, Name: testPod},
Spec: api.PodSpec{
ServiceAccountName: testAccount,
},
},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
fsGroup := int64(2000)
if err := csiMounter.SetUp(&fsGroup); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
//Test the default value of file system type is not overridden
if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != 0 {
t.Errorf("default value of file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType)
}
path := csiMounter.GetPath()
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
vol, ok := pubs[csiMounter.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != csiMounter.GetPath() {
t.Errorf("csi server expected path %s, got %s", csiMounter.GetPath(), vol.Path)
}
if podInfoEnabled {
if !reflect.DeepEqual(vol.Attributes, test.expectedAttributes) {
t.Errorf("csi server expected attributes %+v, got %+v", test.expectedAttributes, vol.Attributes)
}
} else {
// CSIPodInfo feature is disabled, we expect no modifications to attributes.
if !reflect.DeepEqual(vol.Attributes, test.attributes) {
t.Errorf("csi server expected attributes %+v, got %+v", test.attributes, vol.Attributes)
}
}
})
}
}
func TestMounterSetUp(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host
pv := makeTestPV("test-pv", 10, testDriver, testVol)
pvName := pv.GetName()
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
fsGroup := int64(2000)
if err := csiMounter.SetUp(&fsGroup); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
//Test the default value of file system type is not overridden
if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != 0 {
t.Errorf("default value of file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType)
}
path := csiMounter.GetPath()
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if pubs[csiMounter.volumeID] != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
t.Run("WithCSIPodInfo", func(t *testing.T) {
MounterSetUpTests(t, true)
})
t.Run("WithoutCSIPodInfo", func(t *testing.T) {
MounterSetUpTests(t, false)
})
}
func TestUnmounterTeardown(t *testing.T) {
@ -267,14 +358,13 @@ func TestSaveVolumeData(t *testing.T) {
}
}
func getCSIDriver(name string, requiresPodInfo *bool, attachable *bool) *csiapi.CSIDriver {
podInfoMountVersion := "v1"
func getCSIDriver(name string, podInfoMountVersion *string, attachable *bool) *csiapi.CSIDriver {
return &csiapi.CSIDriver{
ObjectMeta: meta.ObjectMeta{
Name: name,
},
Spec: csiapi.CSIDriverSpec{
PodInfoOnMountVersion: &podInfoMountVersion,
PodInfoOnMountVersion: podInfoMountVersion,
AttachRequired: attachable,
},
}

View File

@ -56,10 +56,15 @@ func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts
return nil, nil
}
type CSIVolume struct {
Attributes map[string]string
Path string
}
// NodeClient returns CSI node client
type NodeClient struct {
nodePublishedVolumes map[string]string
nodeStagedVolumes map[string]string
nodePublishedVolumes map[string]CSIVolume
nodeStagedVolumes map[string]CSIVolume
stageUnstageSet bool
nodeGetInfoResp *csipb.NodeGetInfoResponse
nextErr error
@ -68,8 +73,8 @@ type NodeClient struct {
// NewNodeClient returns fake node client
func NewNodeClient(stageUnstageSet bool) *NodeClient {
return &NodeClient{
nodePublishedVolumes: make(map[string]string),
nodeStagedVolumes: make(map[string]string),
nodePublishedVolumes: make(map[string]CSIVolume),
nodeStagedVolumes: make(map[string]CSIVolume),
stageUnstageSet: stageUnstageSet,
}
}
@ -84,17 +89,20 @@ func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
}
// GetNodePublishedVolumes returns node published volumes
func (f *NodeClient) GetNodePublishedVolumes() map[string]string {
func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
return f.nodePublishedVolumes
}
// GetNodeStagedVolumes returns node staged volumes
func (f *NodeClient) GetNodeStagedVolumes() map[string]string {
func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
return f.nodeStagedVolumes
}
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string) {
f.nodeStagedVolumes[volID] = deviceMountPath
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, attributes map[string]string) {
f.nodeStagedVolumes[volID] = CSIVolume{
Path: deviceMountPath,
Attributes: attributes,
}
}
// NodePublishVolume implements CSI NodePublishVolume
@ -115,7 +123,10 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
if !strings.Contains(fsTypes, fsType) {
return nil, errors.New("invalid fstype")
}
f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath()
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
Path: req.GetTargetPath(),
Attributes: req.GetVolumeAttributes(),
}
return &csipb.NodePublishVolumeResponse{}, nil
}
@ -158,7 +169,10 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo
return nil, errors.New("invalid fstype")
}
f.nodeStagedVolumes[req.GetVolumeId()] = req.GetStagingTargetPath()
f.nodeStagedVolumes[req.GetVolumeId()] = CSIVolume{
Path: req.GetStagingTargetPath(),
Attributes: req.GetVolumeAttributes(),
}
return &csipb.NodeStageVolumeResponse{}, nil
}

View File

@ -159,7 +159,7 @@ func NodeRules() []rbacv1.PolicyRule {
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
volAttachRule := rbacv1helpers.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()
nodePolicyRules = append(nodePolicyRules, volAttachRule)
if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) {
if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) || utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) {
csiDriverRule := rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie()
nodePolicyRules = append(nodePolicyRules, csiDriverRule)
}