diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index be6e8bf0738..b121e91fbda 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -386,10 +386,16 @@ const ( // // Allow TTL controller to clean up Pods and Jobs after they finish. TTLAfterFinished utilfeature.Feature = "TTLAfterFinished" + // owner: @jsafrane // Kubernetes skips attaching CSI volumes that don't require attachment. // CSISkipAttach utilfeature.Feature = "CSISkipAttach" + + // owner: @jsafrane + // + // Kubelet sends pod information in NodePublish CSI call when a CSI driver wants so. + CSIPodInfo utilfeature.Feature = "CSIPodInfo" ) func init() { @@ -456,6 +462,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS ProcMountType: {Default: false, PreRelease: utilfeature.Alpha}, TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha}, CSISkipAttach: {Default: false, PreRelease: utilfeature.Alpha}, + CSIPodInfo: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 611049dbbd2..e57b9446bce 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -47,6 +47,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi/fake:go_default_library", "//pkg/volume/testing:go_default_library", @@ -60,6 +61,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 99f4f3f094a..c39d536fe72 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -747,12 +747,12 @@ func TestAttacherMountDevice(t *testing.T) { t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged)) } if tc.stageUnstageSet { - gotPath, ok := staged[tc.volName] + vol, ok := staged[tc.volName] if !ok { t.Errorf("could not find staged volume: %s", tc.volName) } - if gotPath != tc.deviceMountPath { - t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, gotPath) + if vol.Path != tc.deviceMountPath { + t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path) } } } @@ -836,7 +836,7 @@ func TestAttacherUnmountDevice(t *testing.T) { // Add the volume to NodeStagedVolumes cdc := csiAttacher.csiClient.(*fakeCsiDriverClient) - cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath) + cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath, nil) // Make JSON for this object if tc.deviceMountPath != "" { diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 20b9057d0d5..b9de55609a1 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -129,9 +129,13 @@ func TestBlockMapperSetupDevice(t *testing.T) { } vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() - if vols[csiMapper.volumeID] != devicePath { + vol, ok := vols[csiMapper.volumeID] + if !ok { t.Error("csi server may not have received NodePublishVolume call") } + if vol.Path != devicePath { + t.Errorf("csi server expected device path %s, got %s", devicePath, vol.Path) + } } func TestBlockMapperMapDevice(t *testing.T) { @@ -198,9 +202,13 @@ func TestBlockMapperMapDevice(t *testing.T) { } pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() - if pubs[csiMapper.volumeID] != podVolumeBlockFilePath { + vol, ok := pubs[csiMapper.volumeID] + if !ok { t.Error("csi server may not have received NodePublishVolume call") } + if vol.Path != podVolumeBlockFilePath { + t.Errorf("csi server expected path %s, got %s", podVolumeBlockFilePath, vol.Path) + } } func TestBlockMapperTearDownDevice(t *testing.T) { diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 4d57e4f6fa3..b1e8ea958ae 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -18,6 +18,7 @@ package csi import ( "context" + "errors" "fmt" "os" "path" @@ -25,8 +26,11 @@ import ( "github.com/golang/glog" api "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/features" kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -47,6 +51,7 @@ var ( "nodeName", "attachmentID", } + currentPodInfoMountVersion = "v1" ) type csiMountMgr struct { @@ -162,6 +167,22 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] } + // Inject pod information into volume_attributes + podAttrs, err := c.podAttributes() + if err != nil { + glog.Error(log("mouter.SetUpAt failed to assemble volume attributes: %v", err)) + return err + } + if podAttrs != nil { + if attribs == nil { + attribs = podAttrs + } else { + for k, v := range podAttrs { + attribs[k] = v + } + } + } + fsType := csiSource.FSType err = csi.NodePublishVolume( ctx, @@ -216,6 +237,39 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { return nil } +func (c *csiMountMgr) podAttributes() (map[string]string, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) { + return nil, nil + } + if c.plugin.csiDriverLister == nil { + return nil, errors.New("CSIDriver lister does not exist") + } + + csiDriver, err := c.plugin.csiDriverLister.Get(c.driverName) + if err != nil { + if apierrs.IsNotFound(err) { + glog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName)) + return nil, nil + } + return nil, err + } + + // if PodInfoOnMountVersion is not set or not v1 we do not set pod attributes + if csiDriver.Spec.PodInfoOnMountVersion == nil || *csiDriver.Spec.PodInfoOnMountVersion != currentPodInfoMountVersion { + glog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName)) + return nil, nil + } + + attrs := map[string]string{ + "csi.storage.k8s.io/pod.name": c.pod.Name, + "csi.storage.k8s.io/pod.namespace": c.pod.Namespace, + "csi.storage.k8s.io/pod.uid": string(c.pod.UID), + "csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName, + } + glog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName)) + return attrs, nil +} + func (c *csiMountMgr) GetAttributes() volume.Attributes { mounter := c.plugin.host.GetMounter(c.plugin.GetPluginName()) path := c.GetPath() diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index f0334d84ff4..6d3eb493628 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -25,14 +25,20 @@ import ( "path" "testing" + "reflect" + + "github.com/golang/glog" api "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" fakeclient "k8s.io/client-go/kubernetes/fake" csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1" + fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" - volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" ) @@ -88,86 +94,171 @@ func TestMounterGetPath(t *testing.T) { } } +func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIPodInfo, podInfoEnabled)() + tests := []struct { + name string + driver string + attributes map[string]string + + expectedAttributes map[string]string + }{ + { + name: "no pod info", + driver: "no-info", + attributes: nil, + expectedAttributes: nil, + }, + { + name: "no CSIDriver -> no pod info", + driver: "unknown-driver", + attributes: nil, + expectedAttributes: nil, + }, + { + name: "CSIDriver with PodInfoRequiredOnMount=nil -> no pod info", + driver: "nil", + attributes: nil, + expectedAttributes: nil, + }, + { + name: "no pod info -> keep existing attributes", + driver: "no-info", + attributes: map[string]string{"foo": "bar"}, + expectedAttributes: map[string]string{"foo": "bar"}, + }, + { + name: "add pod info", + driver: "info", + attributes: nil, + expectedAttributes: map[string]string{"csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"}, + }, + { + name: "add pod info -> keep existing attributes", + driver: "info", + attributes: map[string]string{"foo": "bar"}, + expectedAttributes: map[string]string{"foo": "bar", "csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"}, + }, + } + + emptyPodMountInfoVersion := "" + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + glog.Infof("Starting test %s", test.name) + fakeClient := fakeclient.NewSimpleClientset() + fakeCSIClient := fakecsi.NewSimpleClientset( + getCSIDriver("no-info", &emptyPodMountInfoVersion, nil), + getCSIDriver("info", ¤tPodInfoMountVersion, nil), + getCSIDriver("nil", nil, nil), + ) + plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient) + defer os.RemoveAll(tmpDir) + + for { + // Wait until the informer in CSI volume plugin has all CSIDrivers. + if plug.csiDriverInformer.Informer().HasSynced() { + break + } + } + pv := makeTestPV("test-pv", 10, test.driver, testVol) + pv.Spec.CSI.VolumeAttributes = test.attributes + pvName := pv.GetName() + + mounter, err := plug.NewMounter( + volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), + &api.Pod{ + ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns, Name: testPod}, + Spec: api.PodSpec{ + ServiceAccountName: testAccount, + }, + }, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("failed to make a new Mounter: %v", err) + } + + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName())) + + attachment := &storage.VolumeAttachment{ + ObjectMeta: meta.ObjectMeta{ + Name: attachID, + }, + Spec: storage.VolumeAttachmentSpec{ + NodeName: "test-node", + Attacher: csiPluginName, + Source: storage.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + }, + Status: storage.VolumeAttachmentStatus{ + Attached: false, + AttachError: nil, + DetachError: nil, + }, + } + _, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + + // Mounter.SetUp() + fsGroup := int64(2000) + if err := csiMounter.SetUp(&fsGroup); err != nil { + t.Fatalf("mounter.Setup failed: %v", err) + } + + //Test the default value of file system type is not overridden + if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != 0 { + t.Errorf("default value of file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType) + } + + path := csiMounter.GetPath() + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + t.Errorf("SetUp() failed, volume path not created: %s", path) + } else { + t.Errorf("SetUp() failed: %v", err) + } + } + + // ensure call went all the way + pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + vol, ok := pubs[csiMounter.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + if vol.Path != csiMounter.GetPath() { + t.Errorf("csi server expected path %s, got %s", csiMounter.GetPath(), vol.Path) + } + if podInfoEnabled { + if !reflect.DeepEqual(vol.Attributes, test.expectedAttributes) { + t.Errorf("csi server expected attributes %+v, got %+v", test.expectedAttributes, vol.Attributes) + } + } else { + // CSIPodInfo feature is disabled, we expect no modifications to attributes. + if !reflect.DeepEqual(vol.Attributes, test.attributes) { + t.Errorf("csi server expected attributes %+v, got %+v", test.attributes, vol.Attributes) + } + } + }) + } +} + func TestMounterSetUp(t *testing.T) { - plug, tmpDir := newTestPlugin(t, nil, nil) - defer os.RemoveAll(tmpDir) - fakeClient := fakeclient.NewSimpleClientset() - host := volumetest.NewFakeVolumeHostWithCSINodeName( - tmpDir, - fakeClient, - nil, - nil, - "fakeNode", - ) - plug.host = host - pv := makeTestPV("test-pv", 10, testDriver, testVol) - pvName := pv.GetName() - - mounter, err := plug.NewMounter( - volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) - if err != nil { - t.Fatalf("Failed to make a new Mounter: %v", err) - } - - if mounter == nil { - t.Fatal("failed to create CSI mounter") - } - - csiMounter := mounter.(*csiMountMgr) - csiMounter.csiClient = setupClient(t, true) - - attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName())) - - attachment := &storage.VolumeAttachment{ - ObjectMeta: meta.ObjectMeta{ - Name: attachID, - }, - Spec: storage.VolumeAttachmentSpec{ - NodeName: "test-node", - Attacher: csiPluginName, - Source: storage.VolumeAttachmentSource{ - PersistentVolumeName: &pvName, - }, - }, - Status: storage.VolumeAttachmentStatus{ - Attached: false, - AttachError: nil, - DetachError: nil, - }, - } - _, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) - if err != nil { - t.Fatalf("failed to setup VolumeAttachment: %v", err) - } - - // Mounter.SetUp() - fsGroup := int64(2000) - if err := csiMounter.SetUp(&fsGroup); err != nil { - t.Fatalf("mounter.Setup failed: %v", err) - } - - //Test the default value of file system type is not overridden - if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != 0 { - t.Errorf("default value of file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType) - } - - path := csiMounter.GetPath() - if _, err := os.Stat(path); err != nil { - if os.IsNotExist(err) { - t.Errorf("SetUp() failed, volume path not created: %s", path) - } else { - t.Errorf("SetUp() failed: %v", err) - } - } - - // ensure call went all the way - pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() - if pubs[csiMounter.volumeID] != csiMounter.GetPath() { - t.Error("csi server may not have received NodePublishVolume call") - } + t.Run("WithCSIPodInfo", func(t *testing.T) { + MounterSetUpTests(t, true) + }) + t.Run("WithoutCSIPodInfo", func(t *testing.T) { + MounterSetUpTests(t, false) + }) } func TestUnmounterTeardown(t *testing.T) { @@ -267,14 +358,13 @@ func TestSaveVolumeData(t *testing.T) { } } -func getCSIDriver(name string, requiresPodInfo *bool, attachable *bool) *csiapi.CSIDriver { - podInfoMountVersion := "v1" +func getCSIDriver(name string, podInfoMountVersion *string, attachable *bool) *csiapi.CSIDriver { return &csiapi.CSIDriver{ ObjectMeta: meta.ObjectMeta{ Name: name, }, Spec: csiapi.CSIDriverSpec{ - PodInfoOnMountVersion: &podInfoMountVersion, + PodInfoOnMountVersion: podInfoMountVersion, AttachRequired: attachable, }, } diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 346c9949170..40b08f264bb 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -56,10 +56,15 @@ func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts return nil, nil } +type CSIVolume struct { + Attributes map[string]string + Path string +} + // NodeClient returns CSI node client type NodeClient struct { - nodePublishedVolumes map[string]string - nodeStagedVolumes map[string]string + nodePublishedVolumes map[string]CSIVolume + nodeStagedVolumes map[string]CSIVolume stageUnstageSet bool nodeGetInfoResp *csipb.NodeGetInfoResponse nextErr error @@ -68,8 +73,8 @@ type NodeClient struct { // NewNodeClient returns fake node client func NewNodeClient(stageUnstageSet bool) *NodeClient { return &NodeClient{ - nodePublishedVolumes: make(map[string]string), - nodeStagedVolumes: make(map[string]string), + nodePublishedVolumes: make(map[string]CSIVolume), + nodeStagedVolumes: make(map[string]CSIVolume), stageUnstageSet: stageUnstageSet, } } @@ -84,17 +89,20 @@ func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) { } // GetNodePublishedVolumes returns node published volumes -func (f *NodeClient) GetNodePublishedVolumes() map[string]string { +func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume { return f.nodePublishedVolumes } // GetNodeStagedVolumes returns node staged volumes -func (f *NodeClient) GetNodeStagedVolumes() map[string]string { +func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume { return f.nodeStagedVolumes } -func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string) { - f.nodeStagedVolumes[volID] = deviceMountPath +func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, attributes map[string]string) { + f.nodeStagedVolumes[volID] = CSIVolume{ + Path: deviceMountPath, + Attributes: attributes, + } } // NodePublishVolume implements CSI NodePublishVolume @@ -115,7 +123,10 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli if !strings.Contains(fsTypes, fsType) { return nil, errors.New("invalid fstype") } - f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath() + f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ + Path: req.GetTargetPath(), + Attributes: req.GetVolumeAttributes(), + } return &csipb.NodePublishVolumeResponse{}, nil } @@ -158,7 +169,10 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo return nil, errors.New("invalid fstype") } - f.nodeStagedVolumes[req.GetVolumeId()] = req.GetStagingTargetPath() + f.nodeStagedVolumes[req.GetVolumeId()] = CSIVolume{ + Path: req.GetStagingTargetPath(), + Attributes: req.GetVolumeAttributes(), + } return &csipb.NodeStageVolumeResponse{}, nil } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 5909c4daf15..d5b7845ea59 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -159,7 +159,7 @@ func NodeRules() []rbacv1.PolicyRule { if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { volAttachRule := rbacv1helpers.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie() nodePolicyRules = append(nodePolicyRules, volAttachRule) - if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) || utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) { csiDriverRule := rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie() nodePolicyRules = append(nodePolicyRules, csiDriverRule) }