Move fsGroupPolicy from NewMounter to SetUpAt

getFSGroupPolicy needs to get CSIDriver from the API server, which may not
be available during volume reconstruction at kubelet startup.
This commit is contained in:
Jan Safranek 2022-10-24 15:30:12 +02:00
parent 04183005e4
commit 483fd45e8e
4 changed files with 96 additions and 93 deletions

View File

@ -67,7 +67,6 @@ type csiMountMgr struct {
plugin *csiPlugin
driverName csiDriverName
volumeLifecycleMode storage.VolumeLifecycleMode
fsGroupPolicy storage.FSGroupPolicy
volumeID string
specVolumeID string
readOnly bool
@ -123,6 +122,11 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
return volumetypes.NewTransientOperationFailure(log("mounter.SetupAt failed to check volume lifecycle mode: %s", err))
}
fsGroupPolicy, err := c.getFSGroupPolicy()
if err != nil {
return volumetypes.NewTransientOperationFailure(log("mounter.SetupAt failed to check fsGroup policy: %s", err))
}
driverName := c.driverName
volumeHandle := c.volumeID
readOnly := c.readOnly
@ -294,7 +298,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
}
}
if !driverSupportsCSIVolumeMountGroup && c.supportsFSGroup(fsType, mounterArgs.FsGroup, c.fsGroupPolicy) {
if !driverSupportsCSIVolumeMountGroup && c.supportsFSGroup(fsType, mounterArgs.FsGroup, fsGroupPolicy) {
// Driver doesn't support applying FSGroup. Kubelet must apply it instead.
// fullPluginName helps to distinguish different driver from csi plugin
@ -441,6 +445,35 @@ func (c *csiMountMgr) supportsFSGroup(fsType string, fsGroup *int64, driverPolic
return false
}
// getFSGroupPolicy returns if the CSI driver supports a volume in the given mode.
// An error indicates that it isn't supported and explains why.
func (c *csiMountMgr) getFSGroupPolicy() (storage.FSGroupPolicy, error) {
// Retrieve CSIDriver. It's not an error if that isn't
// possible (we don't have the lister if CSIDriverRegistry is
// disabled) or the driver isn't found (CSIDriver is
// optional)
var csiDriver *storage.CSIDriver
driver := string(c.driverName)
if c.plugin.csiDriverLister != nil {
c, err := c.plugin.getCSIDriver(driver)
if err != nil && !apierrors.IsNotFound(err) {
// Some internal error.
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err
}
csiDriver = c
}
// If the csiDriver isn't defined, return the default behavior
if csiDriver == nil {
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil
}
// If the csiDriver exists but the fsGroupPolicy isn't defined, return an error
if csiDriver.Spec.FSGroupPolicy == nil || *csiDriver.Spec.FSGroupPolicy == "" {
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, errors.New(log("expected valid fsGroupPolicy, received nil value or empty string"))
}
return *csiDriver.Spec.FSGroupPolicy, nil
}
// supportsVolumeMode checks whether the CSI driver supports a volume in the given mode.
// An error indicates that it isn't supported and explains why.
func (c *csiMountMgr) supportsVolumeLifecycleMode() error {

View File

@ -30,7 +30,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
@ -895,9 +894,6 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
}
csiMounter := mounter.(*csiMountMgr)
if tc.driverFSGroupPolicy {
csiMounter.fsGroupPolicy = tc.supportMode
}
csiMounter.csiClient = setupClientWithVolumeMountGroup(t, true /* stageUnstageSet */, tc.driverSupportsVolumeMountGroup)
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
@ -1145,7 +1141,6 @@ func Test_csiMountMgr_supportsFSGroup(t *testing.T) {
plugin *csiPlugin
driverName csiDriverName
volumeLifecycleMode storage.VolumeLifecycleMode
fsGroupPolicy storage.FSGroupPolicy
volumeID string
specVolumeID string
readOnly bool
@ -1271,7 +1266,6 @@ func Test_csiMountMgr_supportsFSGroup(t *testing.T) {
plugin: tt.fields.plugin,
driverName: tt.fields.driverName,
volumeLifecycleMode: tt.fields.volumeLifecycleMode,
fsGroupPolicy: tt.fields.fsGroupPolicy,
volumeID: tt.fields.volumeID,
specVolumeID: tt.fields.specVolumeID,
readOnly: tt.fields.readOnly,
@ -1289,3 +1283,64 @@ func Test_csiMountMgr_supportsFSGroup(t *testing.T) {
})
}
}
func TestMounterGetFSGroupPolicy(t *testing.T) {
defaultPolicy := storage.ReadWriteOnceWithFSTypeFSGroupPolicy
testCases := []struct {
name string
defined bool
expectedFSGroupPolicy storage.FSGroupPolicy
}{
{
name: "no FSGroupPolicy defined, expect default",
defined: false,
expectedFSGroupPolicy: storage.ReadWriteOnceWithFSTypeFSGroupPolicy,
},
{
name: "File FSGroupPolicy defined, expect File",
defined: true,
expectedFSGroupPolicy: storage.FileFSGroupPolicy,
},
{
name: "None FSGroupPolicy defined, expected None",
defined: true,
expectedFSGroupPolicy: storage.NoneFSGroupPolicy,
},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
// Define the driver and set the FSGroupPolicy
driver := getTestCSIDriver(testDriver, nil, nil, nil)
if tc.defined {
driver.Spec.FSGroupPolicy = &tc.expectedFSGroupPolicy
} else {
driver.Spec.FSGroupPolicy = &defaultPolicy
}
// Create the client and register the resources
fakeClient := fakeclient.NewSimpleClientset(driver)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.3.0"}, t)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(makeTestPV("test.vol.id", 20, testDriver, "testvol-handle1"), true),
&corev1.Pod{ObjectMeta: meta.ObjectMeta{UID: "1", Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Error creating a new mounter: %s", err)
}
csiMounter := mounter.(*csiMountMgr)
// Check to see if we can obtain the CSIDriver, along with examining its FSGroupPolicy
fsGroup, err := csiMounter.getFSGroupPolicy()
if err != nil {
t.Fatalf("Error attempting to obtain FSGroupPolicy: %v", err)
}
if fsGroup != *driver.Spec.FSGroupPolicy {
t.Fatalf("FSGroupPolicy doesn't match expected value: %v, %v", fsGroup, tc.expectedFSGroupPolicy)
}
}
}

View File

@ -388,11 +388,6 @@ func (p *csiPlugin) NewMounter(
return nil, err
}
fsGroupPolicy, err := p.getFSGroupPolicy(driverName)
if err != nil {
return nil, err
}
k8s := p.host.GetKubeClient()
if k8s == nil {
return nil, errors.New(log("failed to get a kubernetes client"))
@ -411,7 +406,6 @@ func (p *csiPlugin) NewMounter(
podUID: pod.UID,
driverName: csiDriverName(driverName),
volumeLifecycleMode: volumeLifecycleMode,
fsGroupPolicy: fsGroupPolicy,
volumeID: volumeHandle,
specVolumeID: spec.Name(),
readOnly: readOnly,
@ -834,34 +828,6 @@ func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLif
return storage.VolumeLifecyclePersistent, nil
}
// getFSGroupPolicy returns if the CSI driver supports a volume in the given mode.
// An error indicates that it isn't supported and explains why.
func (p *csiPlugin) getFSGroupPolicy(driver string) (storage.FSGroupPolicy, error) {
// Retrieve CSIDriver. It's not an error if that isn't
// possible (we don't have the lister if CSIDriverRegistry is
// disabled) or the driver isn't found (CSIDriver is
// optional)
var csiDriver *storage.CSIDriver
if p.csiDriverLister != nil {
c, err := p.getCSIDriver(driver)
if err != nil && !apierrors.IsNotFound(err) {
// Some internal error.
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err
}
csiDriver = c
}
// If the csiDriver isn't defined, return the default behavior
if csiDriver == nil {
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil
}
// If the csiDriver exists but the fsGroupPolicy isn't defined, return an error
if csiDriver.Spec.FSGroupPolicy == nil || *csiDriver.Spec.FSGroupPolicy == "" {
return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, errors.New(log("expected valid fsGroupPolicy, received nil value or empty string"))
}
return *csiDriver.Spec.FSGroupPolicy, nil
}
func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
skip, err := p.skipAttach(driver)
if err != nil {

View File

@ -159,57 +159,6 @@ func TestPluginGetPluginName(t *testing.T) {
}
}
func TestPluginGetFSGroupPolicy(t *testing.T) {
defaultPolicy := storage.ReadWriteOnceWithFSTypeFSGroupPolicy
testCases := []struct {
name string
defined bool
expectedFSGroupPolicy storage.FSGroupPolicy
}{
{
name: "no FSGroupPolicy defined, expect default",
defined: false,
expectedFSGroupPolicy: storage.ReadWriteOnceWithFSTypeFSGroupPolicy,
},
{
name: "File FSGroupPolicy defined, expect File",
defined: true,
expectedFSGroupPolicy: storage.FileFSGroupPolicy,
},
{
name: "None FSGroupPolicy defined, expected None",
defined: true,
expectedFSGroupPolicy: storage.NoneFSGroupPolicy,
},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
// Define the driver and set the FSGroupPolicy
driver := getTestCSIDriver(testDriver, nil, nil, nil)
if tc.defined {
driver.Spec.FSGroupPolicy = &tc.expectedFSGroupPolicy
} else {
driver.Spec.FSGroupPolicy = &defaultPolicy
}
// Create the client and register the resources
fakeClient := fakeclient.NewSimpleClientset(driver)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.3.0"}, t)
// Check to see if we can obtain the CSIDriver, along with examining its FSGroupPolicy
fsGroup, err := plug.getFSGroupPolicy(testDriver)
if err != nil {
t.Fatalf("Error attempting to obtain FSGroupPolicy: %v", err)
}
if fsGroup != *driver.Spec.FSGroupPolicy {
t.Fatalf("FSGroupPolicy doesn't match expected value: %v, %v", fsGroup, tc.expectedFSGroupPolicy)
}
}
}
func TestPluginGetVolumeName(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)