Address review comments

This commit is contained in:
David Zhu 2019-03-05 17:35:54 -08:00
parent 7d2f4e97b8
commit 41b3579345
15 changed files with 156 additions and 91 deletions

View File

@ -102,6 +102,9 @@ const (
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// TODO(dyzz) Comment // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or
// CSI Backend for a volume plugin on a specific node.
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
) )

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/controller/volume/attachdetach/reconciler:go_default_library", "//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library", "//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library", "//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
@ -31,6 +32,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1beta1" storageinformers "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -49,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -125,20 +127,24 @@ func NewAttachDetachController(
// dropped pods so they are continuously processed until it is accepted or // dropped pods so they are continuously processed until it is accepted or
// deleted (probably can't do this with sharedInformer), etc. // deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{ adc := &attachDetachController{
kubeClient: kubeClient, kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(), pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced, pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(), pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced, pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(), podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced, podsSynced: podInformer.Informer().HasSynced,
podIndexer: podInformer.Informer().GetIndexer(), podIndexer: podInformer.Informer().GetIndexer(),
nodeLister: nodeInformer.Lister(), nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced, nodesSynced: nodeInformer.Informer().HasSynced,
csiNodeLister: csiNodeInformer.Lister(), cloud: cloud,
csiNodeSynced: csiNodeInformer.Informer().HasSynced, pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
cloud: cloud, }
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
adc.csiNodeLister = csiNodeInformer.Lister()
adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
} }
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
@ -317,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting attach detach controller") klog.Infof("Starting attach detach controller")
defer klog.Infof("Shutting down attach detach controller") defer klog.Infof("Shutting down attach detach controller")
if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return return
} }
@ -658,8 +669,8 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister
return adc.csiNodeLister return adc.csiNodeLister
} }
func (adc *attachDetachController) CSINodeSynced() bool { func (adc *attachDetachController) IsAttachDetachController() bool {
return adc.csiNodeSynced() return true
} }
// VolumeHost implementation // VolumeHost implementation

View File

@ -1376,8 +1376,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
var pluginName string var pluginName string
provisionerName := storageClass.Provisioner provisionerName := storageClass.Provisioner
if plugin != nil { if plugin != nil {
// TODO(dyzz) Just temporary to test without dynamic provisioning if plugin.IsMigratedToCSI() {
if plugin.IsMigratedToCSI() && false {
// pluginName is not set here to align with existing behavior // pluginName is not set here to align with existing behavior
// of not setting pluginName for external provisioners (including CSI) // of not setting pluginName for external provisioners (including CSI)
// Set provisionerName to CSI plugin name for setClaimProvisioner // Set provisionerName to CSI plugin name for setClaimProvisioner
@ -1402,7 +1401,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
} }
claim = newClaim claim = newClaim
if plugin == nil { if plugin == nil || plugin.IsMigratedToCSI() {
// findProvisionablePlugin returned no error nor plugin. // findProvisionablePlugin returned no error nor plugin.
// This means that an unknown provisioner is requested. Report an event // This means that an unknown provisioner is requested. Report an event
// and wait for the external provisioner // and wait for the external provisioner

View File

@ -776,6 +776,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
tokenManager := token.NewManager(kubeDeps.KubeClient) tokenManager := token.NewManager(kubeDeps.KubeClient)
// NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
// ReadyState is accurate with the storage state.
klet.volumePluginMgr, err = klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber) NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil { if err != nil {

View File

@ -97,7 +97,6 @@ type kubeletVolumeHost struct {
func (kvh *kubeletVolumeHost) SetKubeletError(err error) { func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
kvh.kubelet.runtimeState.setStorageState(err) kvh.kubelet.runtimeState.setStorageState(err)
return
} }
func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string { func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {

View File

@ -222,7 +222,12 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
csitranslationplugins.GCEPDInTreePluginName: func() bool { csitranslationplugins.GCEPDInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
}, },
// TODO(leakingtpan): Add AWS migration feature gates and place them here csitranslationplugins.AWSEBSInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
},
csitranslationplugins.CinderInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
},
} }
// Initializing the label management channels // Initializing the label management channels
@ -232,8 +237,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// This function prevents Kubelet from posting Ready status until CSINodeInfo // This function prevents Kubelet from posting Ready status until CSINodeInfo
// is both installed and initialized // is both installed and initialized
err := initializeCSINodeInfo(host) if err := initializeCSINode(host); err != nil {
if err != nil {
return fmt.Errorf("failed to initialize CSINodeInfo: %v", err) return fmt.Errorf("failed to initialize CSINodeInfo: %v", err)
} }
} }
@ -241,20 +245,20 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
return nil return nil
} }
func initializeCSINodeInfo(host volume.VolumeHost) error { func initializeCSINode(host volume.VolumeHost) error {
kvh, ok := host.(volume.KubeletVolumeHost) kvh, ok := host.(volume.KubeletVolumeHost)
if !ok { if !ok {
klog.V(4).Infof("Skipping CSINodeInfo initialization, not running on Kubelet") klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
return nil return nil
} }
kubeClient := host.GetKubeClient() kubeClient := host.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
// Kubelet running in standalone mode. Skip CSINodeInfo initialization // Kubelet running in standalone mode. Skip CSINodeInfo initialization
klog.Warningf("Skipping CSINodeInfo initialization, Kubelet running in standalone mode") klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
return nil return nil
} }
kvh.SetKubeletError(fmt.Errorf("CSINodeInfo is not yet intialized")) kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
go func() { go func() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -269,8 +273,7 @@ func initializeCSINodeInfo(host volume.VolumeHost) error {
} }
err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
// TODO(dyzz): Just augment CreateCSINodeInfo to create the annotation on itself. Also update all updating functions to double check that the annotation is correct (yes) err := nim.InitializeCSINodeWithAnnotation()
_, err := nim.CreateCSINode()
if err != nil { if err != nil {
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
klog.Errorf("Failed to initialize CSINodeInfo: %v", err) klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
@ -282,6 +285,10 @@ func initializeCSINodeInfo(host volume.VolumeHost) error {
return true, nil return true, nil
}) })
if err != nil { if err != nil {
// 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
// are permanently enabled the apiserver/controllers can assume that the kubelet is
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
// code can be dropped from Kubelet.
// Kill the Kubelet process and allow it to restart to retry initialization // Kill the Kubelet process and allow it to restart to retry initialization
klog.Fatalf("Failed to initialize CSINodeInfo after retrying") klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
} }

View File

@ -20,6 +20,7 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana
import ( import (
"encoding/json" "encoding/json"
goerrors "errors"
"fmt" "fmt"
"strings" "strings"
@ -73,6 +74,9 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
type Interface interface { type Interface interface {
CreateCSINode() (*storagev1beta1.CSINode, error) CreateCSINode() (*storagev1beta1.CSINode, error)
// Updates or Creates the CSINode object with annotations for CSI Migration
InitializeCSINodeWithAnnotation() error
// Record in the cluster the given node information from the CSI driver with the given name. // Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
// to other methods in this interface. // to other methods in this interface.
@ -388,6 +392,45 @@ func (nim *nodeInfoManager) tryUpdateCSINode(
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology) return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
} }
func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return goerrors.New("error getting CSI client")
}
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
}
return nil
}
func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// CreateCSINode will set the annotation
_, err = nim.CreateCSINode()
return err
}
annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
if annotationModified {
_, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo)
return err
}
return nil
}
func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) { func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) {
kubeClient := nim.volumeHost.GetKubeClient() kubeClient := nim.volumeHost.GetKubeClient()
@ -437,9 +480,14 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *
nodeInfoAnnotations = map[string]string{} nodeInfoAnnotations = map[string]string{}
} }
var oldAnnotationSet sets.String
mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
tok := strings.Split(mpa, ",") tok := strings.Split(mpa, ",")
oldAnnotationSet := sets.NewString(tok...) if len(mpa) == 0 {
oldAnnotationSet = sets.NewString()
} else {
oldAnnotationSet = sets.NewString(tok...)
}
newAnnotationSet := sets.NewString() newAnnotationSet := sets.NewString()
for pluginName, migratedFunc := range migratedPlugins { for pluginName, migratedFunc := range migratedPlugins {
@ -480,7 +528,6 @@ func (nim *nodeInfoManager) installDriverToCSINode(
} }
specModified := true specModified := true
statusModified := true
// Clone driver list, omitting the driver that matches the given driverName // Clone driver list, omitting the driver that matches the given driverName
newDriverSpecs := []storagev1beta1.CSINodeDriver{} newDriverSpecs := []storagev1beta1.CSINodeDriver{}
for _, driverInfoSpec := range nodeInfo.Spec.Drivers { for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
@ -497,7 +544,7 @@ func (nim *nodeInfoManager) installDriverToCSINode(
annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
if !specModified && !statusModified && !annotationModified { if !specModified && !annotationModified {
return nil return nil
} }
@ -562,9 +609,7 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
} }
} }
annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) if !hasModified {
if !hasModified && !annotationModified {
// No changes, don't update // No changes, don't update
return nil return nil
} }

View File

@ -300,9 +300,9 @@ type AttachDetachVolumeHost interface {
// CSINodeLister returns the informer lister for the CSINode API Object // CSINodeLister returns the informer lister for the CSINode API Object
CSINodeLister() storagelisters.CSINodeLister CSINodeLister() storagelisters.CSINodeLister
// CSINodeSynced returns a boolean representing whether the CSINode API Object // IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
// informer has been synced // to the attachDetachController
CSINodeSynced() bool IsAttachDetachController() bool
} }
// VolumeHost is an interface that plugins can use to access the kubelet. // VolumeHost is an interface that plugins can use to access the kubelet.

View File

@ -25,7 +25,6 @@ go_library(
"//pkg/volume/util/types:go_default_library", "//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -17,13 +17,13 @@ limitations under the License.
package operationexecutor package operationexecutor
import ( import (
goerrors "errors"
"fmt" "fmt"
"path" "path"
"strings" "strings"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -86,7 +86,6 @@ func NewOperationGenerator(kubeClient clientset.Interface,
checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
blkUtil: blkUtil, blkUtil: blkUtil,
} }
// TODO(dyzz) look at default resync time
} }
// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
@ -875,7 +874,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
if deviceOpened { if deviceOpened {
return deviceToDetach.GenerateError( return deviceToDetach.GenerateError(
"UnmountDevice failed", "UnmountDevice failed",
fmt.Errorf("the device is in use when it was no longer expected to be in use")) goerrors.New("the device is in use when it was no longer expected to be in use"))
} }
klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", ""))
@ -982,7 +981,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
devicePath = pluginDevicePath devicePath = pluginDevicePath
} }
if len(devicePath) == 0 { if len(devicePath) == 0 {
return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) return volumeToMount.GenerateError("MapVolume failed", goerrors.New("Device path of the volume is empty"))
} }
// When kubelet is containerized, devicePath may be a symlink at a place unavailable to // When kubelet is containerized, devicePath may be a symlink at a place unavailable to
@ -1547,8 +1546,11 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo
return deviceOpened, nil return deviceOpened, nil
} }
// TODO(dyzz): need to also add logic to check CSINodeInfo for Kubelet migration status
func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
// TODO(#75146) Check whether the driver is installed as well so that
// we can throw a better error when the driver is not installed.
// The error should be of the approximate form:
// fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
return false return false
} }
@ -1562,8 +1564,6 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
} }
func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) {
var err error
migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
if err != nil { if err != nil {
return false, err return false, err
@ -1575,37 +1575,32 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
} }
if len(nodeName) == 0 { if len(nodeName) == 0 {
return false, fmt.Errorf("nodeName is empty") return false, goerrors.New("nodeName is empty")
} }
vpm := og.volumePluginMgr kubeClient := og.volumePluginMgr.Host.GetKubeClient()
kubeClient := vpm.Host.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
// TODO(dyzz) check this error case, what should we do in standalone kubelet mode? // Don't handle the controller/kubelet version skew check and fallback
return false, fmt.Errorf("failed to get kube client from volume host") // to just checking the feature gates. This can happen if
// we are in a standalone (headless) Kubelet
return true, nil
} }
adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost)
if !ok { if !ok {
// This function is running not on the AttachDetachController // Don't handle the controller/kubelet version skew check and fallback
// We assume that Kubelet is servicing this function and therefore is // to just checking the feature gates. This can happen if
// trivially "using CSI Plugin" // "enableControllerAttachDetach" is set to true on kubelet
return true, nil return true, nil
} }
var csiNode *storagev1beta1.CSINode
if adcHost.CSINodeSynced() { if adcHost.CSINodeLister() == nil {
csiNode, err = adcHost.CSINodeLister().Get(string(nodeName)) return false, goerrors.New("could not find CSINodeLister in attachDetachController")
if err != nil { }
return false, err
} csiNode, err := adcHost.CSINodeLister().Get(string(nodeName))
} else { if err != nil {
// Fallback to GET return false, err
klog.Warningf("CSINode informer not synced, falling back to GET directly from API Server")
csiNode, err = kubeClient.StorageV1beta1().CSINodes().Get(string(nodeName), metav1.GetOptions{})
if err != nil {
return false, err
}
} }
ann := csiNode.GetAnnotations() ann := csiNode.GetAnnotations()
@ -1613,7 +1608,14 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
return false, nil return false, nil
} }
mpaSet := sets.NewString(strings.Split(ann[v1.MigratedPluginsAnnotationKey], ",")...) var mpaSet sets.String
mpa := ann[v1.MigratedPluginsAnnotationKey]
tok := strings.Split(mpa, ",")
if len(mpa) == 0 {
mpaSet = sets.NewString()
} else {
mpaSet = sets.NewString(tok...)
}
pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume) pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
if err != nil { if err != nil {
@ -1621,7 +1623,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
} }
if len(pluginName) == 0 { if len(pluginName) == 0 {
// Could not find a plugin name from translation directory // Could not find a plugin name from translation directory, assume not translated
return false, nil return false, nil
} }
@ -1629,7 +1631,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
if isMigratedOnNode { if isMigratedOnNode {
installed := false installed := false
driverName, err := csilib.GetCSINameFromIntreeName(pluginName) driverName, err := csilib.GetCSINameFromInTreeName(pluginName)
if err != nil { if err != nil {
return isMigratedOnNode, err return isMigratedOnNode, err
} }
@ -1640,7 +1642,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type
} }
} }
if !installed { if !installed {
return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed.", pluginName, string(nodeName), driverName) return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
} }
} }
@ -1660,8 +1662,8 @@ func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
ReadOnly: spec.ReadOnly, ReadOnly: spec.ReadOnly,
}, nil }, nil
} else if spec.Volume != nil { } else if spec.Volume != nil {
return &volume.Spec{}, fmt.Errorf("translation is not supported for in-line volumes yet") return &volume.Spec{}, goerrors.New("translation is not supported for in-line volumes yet")
} else { } else {
return &volume.Spec{}, fmt.Errorf("not a valid volume spec") return &volume.Spec{}, goerrors.New("not a valid volume spec")
} }
} }

View File

@ -76,7 +76,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie()) role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie())
} }
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie()) role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie())
} }
} }

View File

@ -66,14 +66,6 @@ items:
- get - get
- list - list
- watch - watch
- apiGroups:
- storage.k8s.io
resources:
- csinodes
verbs:
- get
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -98,6 +98,9 @@ const (
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// TODO(dyzz) Comment // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or
// CSI Backend for a volume plugin on a specific node.
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
) )

View File

@ -17,6 +17,7 @@ limitations under the License.
package csitranslation package csitranslation
import ( import (
"errors"
"fmt" "fmt"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -48,7 +49,7 @@ func TranslateInTreeStorageClassParametersToCSI(inTreePluginName string, scParam
// be modified // be modified
func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
if pv == nil { if pv == nil {
return nil, fmt.Errorf("persistent volume was nil") return nil, errors.New("persistent volume was nil")
} }
copiedPV := pv.DeepCopy() copiedPV := pv.DeepCopy()
for _, curPlugin := range inTreePlugins { for _, curPlugin := range inTreePlugins {
@ -64,7 +65,7 @@ func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, erro
// by the `Driver` field in the CSI Source. The input PV object will not be modified. // by the `Driver` field in the CSI Source. The input PV object will not be modified.
func TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { func TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
if pv == nil || pv.Spec.CSI == nil { if pv == nil || pv.Spec.CSI == nil {
return nil, fmt.Errorf("CSI persistent volume was nil") return nil, errors.New("CSI persistent volume was nil")
} }
copiedPV := pv.DeepCopy() copiedPV := pv.DeepCopy()
for driverName, curPlugin := range inTreePlugins { for driverName, curPlugin := range inTreePlugins {
@ -106,16 +107,15 @@ func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (strin
return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv) return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv)
} else if vol != nil { } else if vol != nil {
// TODO(dyzz): Implement inline volume migration support // TODO(dyzz): Implement inline volume migration support
return "", fmt.Errorf("inline volume migration not yet supported") return "", errors.New("inline volume migration not yet supported")
} else { } else {
return "", fmt.Errorf("both persistent volume and volume are nil") return "", errors.New("both persistent volume and volume are nil")
} }
} }
// GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the // GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the
// in-tree plugin with the given name // in-tree plugin with the given name
func GetCSINameFromInTreeName(pluginName string) (string, error) { func GetCSINameFromInTreeName(pluginName string) (string, error) {
for csiDriverName, curPlugin := range inTreePlugins { for csiDriverName, curPlugin := range inTreePlugins {
if curPlugin.GetInTreePluginName() == pluginName { if curPlugin.GetInTreePluginName() == pluginName {
return csiDriverName, nil return csiDriverName, nil