From 290a7f12ceced1e7f30cbd5be1e6a385112f9964 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Mon, 22 Jul 2019 16:10:48 -0700 Subject: [PATCH] Add migration shim for VerifyVolumesAreAttached and BulkVolumeVerify --- .../operationexecutor/operation_executor.go | 48 +++++++++++++---- .../operationexecutor/operation_generator.go | 52 +++++++++++++++---- 2 files changed, 82 insertions(+), 18 deletions(-) diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 4a93472e62d..8e059dc11f0 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -635,19 +636,48 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName) continue } - volumePlugin, err := - oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) - if err != nil || volumePlugin == nil { - klog.Errorf( - "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", - volumeAttached.VolumeName, - volumeAttached.VolumeSpec.Name(), - volumeAttached.NodeName, - err) + // Migration: Must also check the Node since Attach would have been done with in-tree if node is not using Migration + nu, err := nodeUsingCSIPlugin(oe.operationGenerator, volumeAttached.VolumeSpec, node) + if err != nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.NodeUsingCSIPlugin failed", err).Error()) continue } + var volumePlugin volume.VolumePlugin + if useCSIPlugin(oe.operationGenerator.GetVolumePluginMgr(), volumeAttached.VolumeSpec) && nu { + // The volume represented by this spec is CSI and thus should be migrated + volumePlugin, err = oe.operationGenerator.GetVolumePluginMgr().FindPluginByName(csi.CSIPluginName) + if err != nil || volumePlugin == nil { + klog.Errorf( + "VolumesAreAttached.Name failed for volume %q (spec.Name: %q) on node %q with error: %v", + volumeAttached.VolumeName, + volumeAttached.VolumeSpec.Name(), + volumeAttached.NodeName, + err) + continue + } + + csiSpec, err := translateSpec(volumeAttached.VolumeSpec) + if err != nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.TranslateSpec failed", err).Error()) + continue + } + volumeAttached.VolumeSpec = csiSpec + } else { + volumePlugin, err = + oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) + if err != nil || volumePlugin == nil { + klog.Errorf( + "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", + volumeAttached.VolumeName, + volumeAttached.VolumeSpec.Name(), + volumeAttached.NodeName, + err) + continue + } + } + pluginName := volumePlugin.GetPluginName() if volumePlugin.SupportsBulkVolumeVerification() { diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index fc6e2ffea67..79245d18afc 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -138,31 +138,58 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong // to this type of plugin volumesPerPlugin := make(map[string][]*volume.Spec) // volumeSpecMap maps from a volume spec to its unique volumeName which will be used // when calling MarkVolumeAsDetached volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName) + // Iterate each volume spec and put them into a map index by the pluginName for _, volumeAttached := range attachedVolumes { if volumeAttached.VolumeSpec == nil { klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName) continue } - volumePlugin, err := - og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) - if err != nil || volumePlugin == nil { - klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error()) + + // Migration: Must also check the Node since Attach would have been done with in-tree if node is not using Migration + nu, err := nodeUsingCSIPlugin(og, volumeAttached.VolumeSpec, nodeName) + if err != nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.NodeUsingCSIPlugin failed", err).Error()) continue } + + var volumePlugin volume.VolumePlugin + if useCSIPlugin(og.volumePluginMgr, volumeAttached.VolumeSpec) && nu { + // The volume represented by this spec is CSI and thus should be migrated + volumePlugin, err = og.volumePluginMgr.FindPluginByName(csi.CSIPluginName) + if err != nil || volumePlugin == nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginByName failed", err).Error()) + continue + } + + csiSpec, err := translateSpec(volumeAttached.VolumeSpec) + if err != nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.TranslateSpec failed", err).Error()) + continue + } + volumeAttached.VolumeSpec = csiSpec + } else { + volumePlugin, err = + og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) + if err != nil || volumePlugin == nil { + klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error()) + continue + } + } + volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] if !pluginExists { volumeSpecList = []*volume.Spec{} } volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList + // Migration: VolumeSpecMap contains original VolumeName for use in ActualStateOfWorld volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName } @@ -207,6 +234,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( } } } + return nil, nil } @@ -224,6 +252,10 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + // Migration: All inputs already should be translated by caller for this + // function except volumeSpecMap which contains original volume names for + // use with actualStateOfWorld + bulkVolumeVerifyFunc := func() (error, error) { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginByName(pluginName) @@ -1774,8 +1806,10 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { return false } -func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { - migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) +func nodeUsingCSIPlugin(og OperationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { + vpm := og.GetVolumePluginMgr() + + migratable, err := vpm.IsPluginMigratableBySpec(spec) if err != nil { return false, err } @@ -1789,7 +1823,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type return false, goerrors.New("nodeName is empty") } - kubeClient := og.volumePluginMgr.Host.GetKubeClient() + kubeClient := vpm.Host.GetKubeClient() if kubeClient == nil { // Don't handle the controller/kubelet version skew check and fallback // to just checking the feature gates. This can happen if @@ -1797,7 +1831,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type return true, nil } - adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) + adcHost, ok := vpm.Host.(volume.AttachDetachVolumeHost) if !ok { // Don't handle the controller/kubelet version skew check and fallback // to just checking the feature gates. This can happen if