mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Merge pull request #99069 from phantooom/master
kubelet: fix raw block mode CSI NodePublishVolume stage miss pod info
This commit is contained in:
commit
a0e6e491e9
@ -93,6 +93,7 @@ type csiBlockMapper struct {
|
|||||||
volumeID string
|
volumeID string
|
||||||
readOnly bool
|
readOnly bool
|
||||||
spec *volume.Spec
|
spec *volume.Spec
|
||||||
|
pod *v1.Pod
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,8 +211,21 @@ func (m *csiBlockMapper) publishVolumeForBlock(
|
|||||||
publishVolumeInfo = attachment.Status.AttachmentMetadata
|
publishVolumeInfo = attachment.Status.AttachmentMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inject pod information into volume_attributes
|
||||||
|
volAttribs := csiSource.VolumeAttributes
|
||||||
|
podInfoEnabled, err := m.plugin.podInfoEnabled(string(m.driverName))
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.New(log("blockMapper.publishVolumeForBlock failed to assemble volume attributes: %v", err))
|
||||||
|
}
|
||||||
|
volumeLifecycleMode, err := m.plugin.getVolumeLifecycleMode(m.spec)
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get VolumeLifecycleMode: %v", err))
|
||||||
|
}
|
||||||
|
if podInfoEnabled {
|
||||||
|
volAttribs = mergeMap(volAttribs, getPodInfoAttrs(m.pod, volumeLifecycleMode))
|
||||||
|
}
|
||||||
|
|
||||||
nodePublishSecrets := map[string]string{}
|
nodePublishSecrets := map[string]string{}
|
||||||
var err error
|
|
||||||
if csiSource.NodePublishSecretRef != nil {
|
if csiSource.NodePublishSecretRef != nil {
|
||||||
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
|
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -241,7 +255,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
|
|||||||
publishPath,
|
publishPath,
|
||||||
accessMode,
|
accessMode,
|
||||||
publishVolumeInfo,
|
publishVolumeInfo,
|
||||||
csiSource.VolumeAttributes,
|
volAttribs,
|
||||||
nodePublishSecrets,
|
nodePublishSecrets,
|
||||||
fsTypeBlockName,
|
fsTypeBlockName,
|
||||||
[]string{},
|
[]string{},
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@ -40,7 +41,7 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T
|
|||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
mapper, err := plug.NewBlockVolumeMapper(
|
mapper, err := plug.NewBlockVolumeMapper(
|
||||||
spec,
|
spec,
|
||||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns, Name: testPod}},
|
||||||
volume.VolumeOptions{},
|
volume.VolumeOptions{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -367,6 +368,52 @@ func TestBlockMapperMapPodDeviceNotSupportAttach(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperMapPodDeviceWithPodInfo(t *testing.T) {
|
||||||
|
fakeClient := fakeclient.NewSimpleClientset()
|
||||||
|
attachRequired := false
|
||||||
|
podInfo := true
|
||||||
|
fakeDriver := &storagev1.CSIDriver{
|
||||||
|
ObjectMeta: meta.ObjectMeta{
|
||||||
|
Name: testDriver,
|
||||||
|
},
|
||||||
|
Spec: storagev1.CSIDriverSpec{
|
||||||
|
PodInfoOnMount: &podInfo,
|
||||||
|
AttachRequired: &attachRequired,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := fakeClient.StorageV1().CSIDrivers().Create(context.TODO(), fakeDriver, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create a fakeDriver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// after the driver is created, create the plugin. newTestPlugin waits for the informer to sync,
|
||||||
|
// such that csiMapper.SetUpDevice below sees the VolumeAttachment object in the lister.
|
||||||
|
|
||||||
|
plug, tmpDir := newTestPlugin(t, fakeClient)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
csiMapper, _, _, err := prepareBlockMapperTest(plug, "test-pv", t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
csiMapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
|
// Map device to global and pod device map path
|
||||||
|
_, err = csiMapper.MapPodDevice()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
|
}
|
||||||
|
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||||
|
pvol, ok := pvols[csiMapper.volumeID]
|
||||||
|
if !ok {
|
||||||
|
t.Error("csi server may not have received NodePublishVolume call")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(pvol.VolumeContext, map[string]string{"csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns", "csi.storage.k8s.io/ephemeral": "false"}) {
|
||||||
|
t.Error("csi mapper check pod info failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBlockMapperTearDownDevice(t *testing.T) {
|
func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||||
plug, tmpDir := newTestPlugin(t, nil)
|
plug, tmpDir := newTestPlugin(t, nil)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
@ -221,11 +220,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inject pod information into volume_attributes
|
// Inject pod information into volume_attributes
|
||||||
podAttrs, err := c.podAttributes()
|
podInfoEnabled, err := c.plugin.podInfoEnabled(string(c.driverName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
|
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
|
||||||
}
|
}
|
||||||
volAttribs = mergeMap(volAttribs, podAttrs)
|
if podInfoEnabled {
|
||||||
|
volAttribs = mergeMap(volAttribs, getPodInfoAttrs(c.pod, c.volumeLifecycleMode))
|
||||||
|
}
|
||||||
|
|
||||||
// Inject pod service account token into volume attributes
|
// Inject pod service account token into volume attributes
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIServiceAccountToken) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIServiceAccountToken) {
|
||||||
@ -282,45 +283,6 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *csiMountMgr) podAttributes() (map[string]string, error) {
|
|
||||||
kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
|
|
||||||
if ok {
|
|
||||||
kletHost.WaitForCacheSync()
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.plugin.csiDriverLister == nil {
|
|
||||||
return nil, fmt.Errorf("CSIDriverLister not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
|
|
||||||
if err != nil {
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// if PodInfoOnMount is not set or false we do not set pod attributes
|
|
||||||
if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
|
|
||||||
klog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
attrs := map[string]string{
|
|
||||||
"csi.storage.k8s.io/pod.name": c.pod.Name,
|
|
||||||
"csi.storage.k8s.io/pod.namespace": c.pod.Namespace,
|
|
||||||
"csi.storage.k8s.io/pod.uid": string(c.pod.UID),
|
|
||||||
"csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName,
|
|
||||||
}
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
|
|
||||||
attrs["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(c.volumeLifecycleMode == storage.VolumeLifecycleEphemeral)
|
|
||||||
}
|
|
||||||
|
|
||||||
klog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName))
|
|
||||||
return attrs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *csiMountMgr) podServiceAccountTokenAttrs() (map[string]string, error) {
|
func (c *csiMountMgr) podServiceAccountTokenAttrs() (map[string]string, error) {
|
||||||
if c.plugin.serviceAccountTokenGetter == nil {
|
if c.plugin.serviceAccountTokenGetter == nil {
|
||||||
return nil, errors.New("ServiceAccountTokenGetter is nil")
|
return nil, errors.New("ServiceAccountTokenGetter is nil")
|
||||||
|
@ -685,6 +685,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
|
|||||||
readOnly: readOnly,
|
readOnly: readOnly,
|
||||||
spec: spec,
|
spec: spec,
|
||||||
specName: spec.Name(),
|
specName: spec.Name(),
|
||||||
|
pod: podRef,
|
||||||
podUID: podRef.UID,
|
podUID: podRef.UID,
|
||||||
}
|
}
|
||||||
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
|
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
|
||||||
@ -959,6 +960,34 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// podInfoEnabled check CSIDriver enabled pod info flag
|
||||||
|
func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) {
|
||||||
|
kletHost, ok := p.host.(volume.KubeletVolumeHost)
|
||||||
|
if ok {
|
||||||
|
kletHost.WaitForCacheSync()
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.csiDriverLister == nil {
|
||||||
|
return false, fmt.Errorf("CSIDriverLister not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
csiDriver, err := p.csiDriverLister.Get(driverName)
|
||||||
|
if err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// if PodInfoOnMount is not set or false we do not set pod attributes
|
||||||
|
if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
|
||||||
|
klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
func unregisterDriver(driverName string) error {
|
func unregisterDriver(driverName string) error {
|
||||||
csiDrivers.Delete(driverName)
|
csiDrivers.Delete(driverName)
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
|
storage "k8s.io/api/storage/v1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
@ -203,3 +204,17 @@ func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) (
|
|||||||
ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)})
|
ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)})
|
||||||
return context.WithTimeout(ctx, timeout)
|
return context.WithTimeout(ctx, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPodInfoAttrs returns pod info for NodePublish
|
||||||
|
func getPodInfoAttrs(pod *api.Pod, volumeMode storage.VolumeLifecycleMode) map[string]string {
|
||||||
|
attrs := map[string]string{
|
||||||
|
"csi.storage.k8s.io/pod.name": pod.Name,
|
||||||
|
"csi.storage.k8s.io/pod.namespace": pod.Namespace,
|
||||||
|
"csi.storage.k8s.io/pod.uid": string(pod.UID),
|
||||||
|
"csi.storage.k8s.io/serviceAccount.name": pod.Spec.ServiceAccountName,
|
||||||
|
}
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
|
||||||
|
attrs["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(volumeMode == storage.VolumeLifecycleEphemeral)
|
||||||
|
}
|
||||||
|
return attrs
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user