Add migration shim for VerifyVolumesAreAttached and BulkVolumeVerify

This commit is contained in:
David Zhu 2019-07-22 16:10:48 -07:00
parent 04c9bd9bfa
commit 290a7f12ce
2 changed files with 82 additions and 18 deletions

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types" volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -635,19 +636,48 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName) klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
continue continue
} }
volumePlugin, err :=
oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil { // Migration: Must also check the Node since Attach would have been done with in-tree if node is not using Migration
klog.Errorf( nu, err := nodeUsingCSIPlugin(oe.operationGenerator, volumeAttached.VolumeSpec, node)
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", if err != nil {
volumeAttached.VolumeName, klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.NodeUsingCSIPlugin failed", err).Error())
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
continue continue
} }
var volumePlugin volume.VolumePlugin
if useCSIPlugin(oe.operationGenerator.GetVolumePluginMgr(), volumeAttached.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated
volumePlugin, err = oe.operationGenerator.GetVolumePluginMgr().FindPluginByName(csi.CSIPluginName)
if err != nil || volumePlugin == nil {
klog.Errorf(
"VolumesAreAttached.Name failed for volume %q (spec.Name: %q) on node %q with error: %v",
volumeAttached.VolumeName,
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
continue
}
csiSpec, err := translateSpec(volumeAttached.VolumeSpec)
if err != nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.TranslateSpec failed", err).Error())
continue
}
volumeAttached.VolumeSpec = csiSpec
} else {
volumePlugin, err =
oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil {
klog.Errorf(
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
volumeAttached.VolumeName,
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
continue
}
}
pluginName := volumePlugin.GetPluginName() pluginName := volumePlugin.GetPluginName()
if volumePlugin.SupportsBulkVolumeVerification() { if volumePlugin.SupportsBulkVolumeVerification() {

View File

@ -138,31 +138,58 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
attachedVolumes []AttachedVolume, attachedVolumes []AttachedVolume,
nodeName types.NodeName, nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
// to this type of plugin // to this type of plugin
volumesPerPlugin := make(map[string][]*volume.Spec) volumesPerPlugin := make(map[string][]*volume.Spec)
// volumeSpecMap maps from a volume spec to its unique volumeName which will be used // volumeSpecMap maps from a volume spec to its unique volumeName which will be used
// when calling MarkVolumeAsDetached // when calling MarkVolumeAsDetached
volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName) volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName)
// Iterate each volume spec and put them into a map index by the pluginName // Iterate each volume spec and put them into a map index by the pluginName
for _, volumeAttached := range attachedVolumes { for _, volumeAttached := range attachedVolumes {
if volumeAttached.VolumeSpec == nil { if volumeAttached.VolumeSpec == nil {
klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName) klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName)
continue continue
} }
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) // Migration: Must also check the Node since Attach would have been done with in-tree if node is not using Migration
if err != nil || volumePlugin == nil { nu, err := nodeUsingCSIPlugin(og, volumeAttached.VolumeSpec, nodeName)
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error()) if err != nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.NodeUsingCSIPlugin failed", err).Error())
continue continue
} }
var volumePlugin volume.VolumePlugin
if useCSIPlugin(og.volumePluginMgr, volumeAttached.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated
volumePlugin, err = og.volumePluginMgr.FindPluginByName(csi.CSIPluginName)
if err != nil || volumePlugin == nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginByName failed", err).Error())
continue
}
csiSpec, err := translateSpec(volumeAttached.VolumeSpec)
if err != nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.TranslateSpec failed", err).Error())
continue
}
volumeAttached.VolumeSpec = csiSpec
} else {
volumePlugin, err =
og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error())
continue
}
}
volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
if !pluginExists { if !pluginExists {
volumeSpecList = []*volume.Spec{} volumeSpecList = []*volume.Spec{}
} }
volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
// Migration: VolumeSpecMap contains original VolumeName for use in ActualStateOfWorld
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
} }
@ -207,6 +234,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
} }
} }
} }
return nil, nil return nil, nil
} }
@ -224,6 +252,10 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
// Migration: All inputs already should be translated by caller for this
// function except volumeSpecMap which contains original volume names for
// use with actualStateOfWorld
bulkVolumeVerifyFunc := func() (error, error) { bulkVolumeVerifyFunc := func() (error, error) {
attachableVolumePlugin, err := attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginByName(pluginName) og.volumePluginMgr.FindAttachablePluginByName(pluginName)
@ -1774,8 +1806,10 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
return false return false
} }
func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { func nodeUsingCSIPlugin(og OperationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) {
migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) vpm := og.GetVolumePluginMgr()
migratable, err := vpm.IsPluginMigratableBySpec(spec)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -1789,7 +1823,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
return false, goerrors.New("nodeName is empty") return false, goerrors.New("nodeName is empty")
} }
kubeClient := og.volumePluginMgr.Host.GetKubeClient() kubeClient := vpm.Host.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
// Don't handle the controller/kubelet version skew check and fallback // Don't handle the controller/kubelet version skew check and fallback
// to just checking the feature gates. This can happen if // to just checking the feature gates. This can happen if
@ -1797,7 +1831,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
return true, nil return true, nil
} }
adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) adcHost, ok := vpm.Host.(volume.AttachDetachVolumeHost)
if !ok { if !ok {
// Don't handle the controller/kubelet version skew check and fallback // Don't handle the controller/kubelet version skew check and fallback
// to just checking the feature gates. This can happen if // to just checking the feature gates. This can happen if