use node informer to check volumes attachment status before backoff

fix unit tests
This commit is contained in:
Hemant Kumar 2021-12-08 11:41:55 -05:00
parent 5b7b2e2f6c
commit 7989f27044
13 changed files with 230 additions and 19 deletions

View File

@ -858,6 +858,10 @@ func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error)
return v1.ResourceList{}, nil
}
func (adc *attachDetachController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}
func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")

View File

@ -456,6 +456,10 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*
}
}
func (expc *expandController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}
func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")

View File

@ -95,6 +95,10 @@ func (ctrl *PersistentVolumeController) GetNodeAllocatable() (v1.ResourceList, e
return v1.ResourceList{}, nil
}
func (ctrl *PersistentVolumeController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}
func (ctrl *PersistentVolumeController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController")

View File

@ -214,6 +214,12 @@ func newTestKubeletWithImageList(
Address: testKubeletHostIP,
},
},
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake/fake-device",
DevicePath: "fake/path",
},
},
},
},
},

View File

@ -270,6 +270,20 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
return node.Labels, nil
}
func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
node, err := kvh.kubelet.GetNode()
if err != nil {
return nil, fmt.Errorf("error retrieving node: %v", err)
}
attachedVolumes := node.Status.VolumesAttached
result := map[v1.UniqueVolumeName]string{}
for i := range attachedVolumes {
attachedVolume := attachedVolumes[i]
result[attachedVolume.Name] = attachedVolume.DevicePath
}
return result, nil
}
func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
return kvh.kubelet.nodeName
}

View File

@ -709,5 +709,5 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err)
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
}

View File

@ -189,7 +189,20 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
// Verifies there are no attach/detach calls.
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
@ -438,7 +451,20 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Verifies there are no attach/detach calls made.
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
@ -660,9 +686,22 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{
PersistentVolume: gcepv,
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
@ -870,8 +909,22 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
PersistentVolume: gcepv,
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
@ -1179,7 +1232,21 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
// deep copy before reconciler runs to avoid data race.
pvWithSize := pv.DeepCopy()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
@ -1354,7 +1421,21 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
@ -1564,7 +1645,21 @@ func Test_UncertainVolumeMountState(t *testing.T) {
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
@ -1866,7 +1961,20 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)

View File

@ -99,7 +99,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
stopCh := runVolumeManager(manager)
defer close(stopCh)
@ -161,7 +161,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
stopCh := runVolumeManager(manager)
defer close(stopCh)
@ -251,7 +251,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
stopCh := runVolumeManager(manager)
defer close(stopCh)
@ -292,12 +292,15 @@ func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UI
return ok
}
func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface) VolumeManager {
func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager {
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
fakeRecorder := &record.FakeRecorder{}
plugMgr := &volume.VolumePluginMgr{}
// TODO (#51147) inject mock prober
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil))
fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)
fakeVolumeHost.WithNode(node)
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, fakeVolumeHost)
stateProvider := &fakePodStateProvider{}
fakePathHandler := volumetest.NewBlockVolumePathHandler()
vm := NewVolumeManager(

View File

@ -449,6 +449,8 @@ type VolumeHost interface {
// Returns the name of the node
GetNodeName() types.NodeName
GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error)
// Returns the event recorder of kubelet.
GetEventRecorder() record.EventRecorder

View File

@ -1655,6 +1655,19 @@ func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumeP
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(VolumeConfig{})
v := NewFakeKubeletVolumeHost(
t,
"", /* rootDir */
nil, /* kubeClient */
plugins, /* plugins */
)
v.WithNode(node)
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
// CreateTestPVC returns a provisionable PVC for tests
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{

View File

@ -64,6 +64,7 @@ type fakeVolumeHost struct {
nodeLabels map[string]string
nodeName string
subpather subpath.Interface
node *v1.Node
csiDriverLister storagelistersv1.CSIDriverLister
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
informerFactory informers.SharedInformerFactory
@ -153,6 +154,10 @@ func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
return f.pluginMgr
}
func (f *fakeVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}
func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
wrapperVolumeName := "wrapped_" + volName
@ -305,25 +310,25 @@ type fakeKubeletVolumeHost struct {
var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
}
func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
}
func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) FakeVolumeHost {
func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeKubeletVolumeHost {
volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
volHost.nodeLabels = labels
return volHost
}
func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
}
func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) FakeVolumeHost {
func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
}
@ -351,6 +356,11 @@ func newFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset
return host
}
func (f *fakeKubeletVolumeHost) WithNode(node *v1.Node) *fakeKubeletVolumeHost {
f.node = node
return f
}
func (f *fakeKubeletVolumeHost) SetKubeletError(err error) {
f.mux.Lock()
defer f.mux.Unlock()
@ -362,6 +372,17 @@ func (f *fakeKubeletVolumeHost) GetInformerFactory() informers.SharedInformerFac
return f.informerFactory
}
func (f *fakeKubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
result := map[v1.UniqueVolumeName]string{}
if f.node != nil {
for _, av := range f.node.Status.VolumesAttached {
result[av.Name] = av.DevicePath
}
}
return result, nil
}
func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
return f.csiDriverLister
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package operationexecutor
import (
"errors"
"fmt"
"time"
@ -418,6 +419,23 @@ const (
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
)
type MountPreConditionFailed struct {
msg string
}
func (err *MountPreConditionFailed) Error() string {
return err.msg
}
func NewMountPreConditionFailedError(msg string) *MountPreConditionFailed {
return &MountPreConditionFailed{msg: msg}
}
func IsMountFailedPreconditionError(err error) bool {
var failedPreconditionError *MountPreConditionFailed
return errors.As(err, &failedPreconditionError)
}
// GenerateMsgDetailed returns detailed msgs for volumes to mount
func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)

View File

@ -1515,6 +1515,20 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
}
// For attachable volume types, lets check if volume is attached by reading from node lister.
// This would avoid exponential back-off and creation of goroutine unnecessarily. We still
// verify status of attached volume by directly reading from API server later on.This is necessarily
// to ensure any race conditions because of cached state in the informer.
if volumeToMount.PluginIsAttachable {
cachedAttachedVolumes, _ := og.volumePluginMgr.Host.GetAttachedVolumesFromNodeStatus()
if cachedAttachedVolumes != nil {
_, volumeFound := cachedAttachedVolumes[volumeToMount.VolumeName]
if !volumeFound {
return volumetypes.GeneratedOperations{}, NewMountPreConditionFailedError(fmt.Sprintf("volume %s is not yet in node's status", volumeToMount.VolumeName))
}
}
}
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
if !volumeToMount.PluginIsAttachable {