From 31aa15284f124ccfd6ae2b926f26d1657aac447e Mon Sep 17 00:00:00 2001 From: phantooom Date: Sun, 21 Feb 2021 23:46:50 +0800 Subject: [PATCH 1/3] kubelet: fix raw block mode CSI NodePublishVolume stage miss pod info --- pkg/volume/csi/csi_block.go | 18 ++++++++++++-- pkg/volume/csi/csi_mounter.go | 46 +++-------------------------------- pkg/volume/csi/csi_plugin.go | 29 ++++++++++++++++++++++ pkg/volume/csi/csi_util.go | 15 ++++++++++++ 4 files changed, 64 insertions(+), 44 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 2f1d542237a..1b595b70c67 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -93,6 +93,7 @@ type csiBlockMapper struct { volumeID string readOnly bool spec *volume.Spec + pod *v1.Pod podUID types.UID } @@ -210,8 +211,21 @@ func (m *csiBlockMapper) publishVolumeForBlock( publishVolumeInfo = attachment.Status.AttachmentMetadata } + // Inject pod information into volume_attributes + volAttribs := csiSource.VolumeAttributes + podInfoEnabled, err := m.plugin.podInfoEnabled(string(m.driverName)) + if err != nil { + return "", errors.New(log("blockMapper.publishVolumeForBlock failed to assemble volume attributes: %v", err)) + } + volumeLifecycleMode, err := m.plugin.getVolumeLifecycleMode(m.spec) + if err != nil { + return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get VolumeLifecycleMode: %v", err)) + } + if podInfoEnabled { + volAttribs = mergeMap(volAttribs, GetPodInfoAttrs(m.pod, volumeLifecycleMode)) + } + nodePublishSecrets := map[string]string{} - var err error if csiSource.NodePublishSecretRef != nil { nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef) if err != nil { @@ -241,7 +255,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( publishPath, accessMode, publishVolumeInfo, - csiSource.VolumeAttributes, + volAttribs, nodePublishSecrets, fsTypeBlockName, []string{}, diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index d632ffd1433..654133fe40e 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "k8s.io/klog/v2" @@ -221,11 +220,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error } // Inject pod information into volume_attributes - podAttrs, err := c.podAttributes() + podInfoEnabled, err := c.plugin.podInfoEnabled(string(c.driverName)) if err != nil { return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err)) } - volAttribs = mergeMap(volAttribs, podAttrs) + if podInfoEnabled { + volAttribs = mergeMap(volAttribs, GetPodInfoAttrs(c.pod, c.volumeLifecycleMode)) + } // Inject pod service account token into volume attributes if utilfeature.DefaultFeatureGate.Enabled(features.CSIServiceAccountToken) { @@ -282,45 +283,6 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error return nil } -func (c *csiMountMgr) podAttributes() (map[string]string, error) { - kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost) - if ok { - kletHost.WaitForCacheSync() - } - - if c.plugin.csiDriverLister == nil { - return nil, fmt.Errorf("CSIDriverLister not found") - } - - csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName)) - if err != nil { - if apierrors.IsNotFound(err) { - klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName)) - return nil, nil - } - return nil, err - } - - // if PodInfoOnMount is not set or false we do not set pod attributes - if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false { - klog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName)) - return nil, nil - } - - attrs := map[string]string{ - "csi.storage.k8s.io/pod.name": c.pod.Name, - "csi.storage.k8s.io/pod.namespace": c.pod.Namespace, - "csi.storage.k8s.io/pod.uid": string(c.pod.UID), - "csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName, - } - if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { - attrs["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(c.volumeLifecycleMode == storage.VolumeLifecycleEphemeral) - } - - klog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName)) - return attrs, nil -} - func (c *csiMountMgr) podServiceAccountTokenAttrs() (map[string]string, error) { if c.plugin.serviceAccountTokenGetter == nil { return nil, errors.New("ServiceAccountTokenGetter is nil") diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 85c1c1f3db1..7609d0a3c47 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -685,6 +685,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt readOnly: readOnly, spec: spec, specName: spec.Name(), + pod: podRef, podUID: podRef.UID, } mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver) @@ -959,6 +960,34 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { }, nil } +// podInfoEnabled check CSIDriver enabled pod info flag +func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) { + kletHost, ok := p.host.(volume.KubeletVolumeHost) + if ok { + kletHost.WaitForCacheSync() + } + + if p.csiDriverLister == nil { + return false, fmt.Errorf("CSIDriverLister not found") + } + + csiDriver, err := p.csiDriverLister.Get(driverName) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName)) + return false, nil + } + return false, err + } + + // if PodInfoOnMount is not set or false we do not set pod attributes + if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false { + klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName)) + return false, nil + } + return true, nil +} + func unregisterDriver(driverName string) error { csiDrivers.Delete(driverName) diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index dec72a46bad..2f71ffd12b4 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -27,6 +27,7 @@ import ( "time" api "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" @@ -203,3 +204,17 @@ func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) ( ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)}) return context.WithTimeout(ctx, timeout) } + +// GetPodInfoAttrs returns pod info for NodePublish +func GetPodInfoAttrs(pod *api.Pod, volumeMode storage.VolumeLifecycleMode) map[string]string { + attrs := map[string]string{ + "csi.storage.k8s.io/pod.name": pod.Name, + "csi.storage.k8s.io/pod.namespace": pod.Namespace, + "csi.storage.k8s.io/pod.uid": string(pod.UID), + "csi.storage.k8s.io/serviceAccount.name": pod.Spec.ServiceAccountName, + } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { + attrs["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(volumeMode == storage.VolumeLifecycleEphemeral) + } + return attrs +} From 0017b602beb8e8e08be540ebcece26778ab0336f Mon Sep 17 00:00:00 2001 From: phantooom Date: Sun, 21 Feb 2021 23:24:07 +0800 Subject: [PATCH 2/3] test: add csi raw block mode wiht podinfo ut --- pkg/volume/csi/csi_block_test.go | 49 +++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 485998e738a..21dd8a599b7 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "testing" "google.golang.org/grpc/codes" @@ -40,7 +41,7 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) mapper, err := plug.NewBlockVolumeMapper( spec, - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns, Name: testPod}}, volume.VolumeOptions{}, ) if err != nil { @@ -367,6 +368,52 @@ func TestBlockMapperMapPodDeviceNotSupportAttach(t *testing.T) { } } +func TestBlockMapperMapPodDeviceWithPodInfo(t *testing.T) { + fakeClient := fakeclient.NewSimpleClientset() + attachRequired := false + podInfo := true + fakeDriver := &storagev1.CSIDriver{ + ObjectMeta: meta.ObjectMeta{ + Name: testDriver, + }, + Spec: storagev1.CSIDriverSpec{ + PodInfoOnMount: &podInfo, + AttachRequired: &attachRequired, + }, + } + _, err := fakeClient.StorageV1().CSIDrivers().Create(context.TODO(), fakeDriver, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a fakeDriver: %v", err) + } + + // after the driver is created, create the plugin. newTestPlugin waits for the informer to sync, + // such that csiMapper.SetUpDevice below sees the VolumeAttachment object in the lister. + + plug, tmpDir := newTestPlugin(t, fakeClient) + defer os.RemoveAll(tmpDir) + + csiMapper, _, _, err := prepareBlockMapperTest(plug, "test-pv", t) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + csiMapper.csiClient = setupClient(t, true) + + // Map device to global and pod device map path + _, err = csiMapper.MapPodDevice() + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + + if !reflect.DeepEqual(pvol.VolumeContext, map[string]string{"csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns", "csi.storage.k8s.io/ephemeral": "false"}) { + t.Error("csi mapper check pod info failed") + } +} + func TestBlockMapperTearDownDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) From 82c2266f25830f83ad701adba6edb0f5ac5d5094 Mon Sep 17 00:00:00 2001 From: phantooom Date: Thu, 15 Apr 2021 20:51:51 +0800 Subject: [PATCH 3/3] chore: make csi getPodInfoAttrs func private --- pkg/volume/csi/csi_block.go | 2 +- pkg/volume/csi/csi_mounter.go | 2 +- pkg/volume/csi/csi_util.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 1b595b70c67..cfda3e69f8a 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -222,7 +222,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get VolumeLifecycleMode: %v", err)) } if podInfoEnabled { - volAttribs = mergeMap(volAttribs, GetPodInfoAttrs(m.pod, volumeLifecycleMode)) + volAttribs = mergeMap(volAttribs, getPodInfoAttrs(m.pod, volumeLifecycleMode)) } nodePublishSecrets := map[string]string{} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 654133fe40e..f96e93ce512 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -225,7 +225,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err)) } if podInfoEnabled { - volAttribs = mergeMap(volAttribs, GetPodInfoAttrs(c.pod, c.volumeLifecycleMode)) + volAttribs = mergeMap(volAttribs, getPodInfoAttrs(c.pod, c.volumeLifecycleMode)) } // Inject pod service account token into volume attributes diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 2f71ffd12b4..ad9098b4ef8 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -205,8 +205,8 @@ func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) ( return context.WithTimeout(ctx, timeout) } -// GetPodInfoAttrs returns pod info for NodePublish -func GetPodInfoAttrs(pod *api.Pod, volumeMode storage.VolumeLifecycleMode) map[string]string { +// getPodInfoAttrs returns pod info for NodePublish +func getPodInfoAttrs(pod *api.Pod, volumeMode storage.VolumeLifecycleMode) map[string]string { attrs := map[string]string{ "csi.storage.k8s.io/pod.name": pod.Name, "csi.storage.k8s.io/pod.namespace": pod.Namespace,