diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index 85b7a590429..688287611e8 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -102,6 +102,9 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" - // TODO(dyzz) Comment + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated + // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. + // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or + // CSI Backend for a volume plugin on a specific node. MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index ec124c66ecd..cc1c9ce6c6c 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/controller/volume/attachdetach/reconciler:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/controller/volume/attachdetach/util:go_default_library", + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", @@ -31,6 +32,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 27992b6e5e0..77ffbc7fb39 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" @@ -49,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" volumeutil "k8s.io/kubernetes/pkg/volume/util" @@ -125,20 +127,24 @@ func NewAttachDetachController( // dropped pods so they are continuously processed until it is accepted or // deleted (probably can't do this with sharedInformer), etc. adc := &attachDetachController{ - kubeClient: kubeClient, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvsSynced: pvInformer.Informer().HasSynced, - podLister: podInformer.Lister(), - podsSynced: podInformer.Informer().HasSynced, - podIndexer: podInformer.Informer().GetIndexer(), - nodeLister: nodeInformer.Lister(), - nodesSynced: nodeInformer.Informer().HasSynced, - csiNodeLister: csiNodeInformer.Lister(), - csiNodeSynced: csiNodeInformer.Informer().HasSynced, - cloud: cloud, - pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), + kubeClient: kubeClient, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvsSynced: pvInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podsSynced: podInformer.Informer().HasSynced, + podIndexer: podInformer.Informer().GetIndexer(), + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + cloud: cloud, + pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && + utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + adc.csiNodeLister = csiNodeInformer.Lister() + adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced } if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { @@ -317,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { klog.Infof("Starting attach detach controller") defer klog.Infof("Shutting down attach detach controller") - if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { + synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced} + if adc.csiNodeSynced != nil { + synced = append(synced, adc.csiNodeSynced) + } + + if !controller.WaitForCacheSync("attach detach", stopCh, synced...) { return } @@ -658,8 +669,8 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister return adc.csiNodeLister } -func (adc *attachDetachController) CSINodeSynced() bool { - return adc.csiNodeSynced() +func (adc *attachDetachController) IsAttachDetachController() bool { + return true } // VolumeHost implementation diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 2db7d8b7108..e8f0e75f191 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -1376,8 +1376,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis var pluginName string provisionerName := storageClass.Provisioner if plugin != nil { - // TODO(dyzz) Just temporary to test without dynamic provisioning - if plugin.IsMigratedToCSI() && false { + if plugin.IsMigratedToCSI() { // pluginName is not set here to align with existing behavior // of not setting pluginName for external provisioners (including CSI) // Set provisionerName to CSI plugin name for setClaimProvisioner @@ -1402,7 +1401,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis } claim = newClaim - if plugin == nil { + if plugin == nil || plugin.IsMigratedToCSI() { // findProvisionablePlugin returned no error nor plugin. // This means that an unknown provisioner is requested. Report an event // and wait for the external provisioner diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index edee172ef10..4a934497321 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -776,6 +776,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tokenManager := token.NewManager(kubeDeps.KubeClient) + // NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init) + // which affects node ready status. This function must be called before Kubelet is initialized so that the Node + // ReadyState is accurate with the storage state. klet.volumePluginMgr, err = NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber) if err != nil { diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 2efe38a720b..5b2077f348d 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -97,7 +97,6 @@ type kubeletVolumeHost struct { func (kvh *kubeletVolumeHost) SetKubeletError(err error) { kvh.kubelet.runtimeState.setStorageState(err) - return } func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string { diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 26bfa64f33c..31cbdf72968 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -222,7 +222,12 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { csitranslationplugins.GCEPDInTreePluginName: func() bool { return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) }, - // TODO(leakingtpan): Add AWS migration feature gates and place them here + csitranslationplugins.AWSEBSInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) + }, + csitranslationplugins.CinderInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) + }, } // Initializing the label management channels @@ -232,8 +237,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { // This function prevents Kubelet from posting Ready status until CSINodeInfo // is both installed and initialized - err := initializeCSINodeInfo(host) - if err != nil { + if err := initializeCSINode(host); err != nil { return fmt.Errorf("failed to initialize CSINodeInfo: %v", err) } } @@ -241,20 +245,20 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { return nil } -func initializeCSINodeInfo(host volume.VolumeHost) error { +func initializeCSINode(host volume.VolumeHost) error { kvh, ok := host.(volume.KubeletVolumeHost) if !ok { - klog.V(4).Infof("Skipping CSINodeInfo initialization, not running on Kubelet") + klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet") return nil } kubeClient := host.GetKubeClient() if kubeClient == nil { // Kubelet running in standalone mode. Skip CSINodeInfo initialization - klog.Warningf("Skipping CSINodeInfo initialization, Kubelet running in standalone mode") + klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode") return nil } - kvh.SetKubeletError(fmt.Errorf("CSINodeInfo is not yet intialized")) + kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized")) go func() { defer utilruntime.HandleCrash() @@ -269,8 +273,7 @@ func initializeCSINodeInfo(host volume.VolumeHost) error { } err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") - // TODO(dyzz): Just augment CreateCSINodeInfo to create the annotation on itself. Also update all updating functions to double check that the annotation is correct (yes) - _, err := nim.CreateCSINode() + err := nim.InitializeCSINodeWithAnnotation() if err != nil { kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) klog.Errorf("Failed to initialize CSINodeInfo: %v", err) @@ -282,6 +285,10 @@ func initializeCSINodeInfo(host volume.VolumeHost) error { return true, nil }) if err != nil { + // 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin) + // are permanently enabled the apiserver/controllers can assume that the kubelet is + // using CSI for all Migrated volume plugins. Then all the CSINode initialization + // code can be dropped from Kubelet. // Kill the Kubelet process and allow it to restart to retry initialization klog.Fatalf("Failed to initialize CSINodeInfo after retrying") } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index be84560f9f2..ba25b07e653 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -20,6 +20,7 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana import ( "encoding/json" + goerrors "errors" "fmt" "strings" @@ -73,6 +74,9 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error) type Interface interface { CreateCSINode() (*storagev1beta1.CSINode, error) + // Updates or Creates the CSINode object with annotations for CSI Migration + InitializeCSINodeWithAnnotation() error + // Record in the cluster the given node information from the CSI driver with the given name. // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls // to other methods in this interface. @@ -388,6 +392,45 @@ func (nim *nodeInfoManager) tryUpdateCSINode( return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology) } +func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error { + csiKubeClient := nim.volumeHost.GetKubeClient() + if csiKubeClient == nil { + return goerrors.New("error getting CSI client") + } + + var updateErrs []error + err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { + if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil { + updateErrs = append(updateErrs, err) + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) + } + + return nil +} + +func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error { + nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{}) + if nodeInfo == nil || errors.IsNotFound(err) { + // CreateCSINode will set the annotation + _, err = nim.CreateCSINode() + return err + } + + annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + + if annotationModified { + _, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo) + return err + } + return nil + +} + func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) { kubeClient := nim.volumeHost.GetKubeClient() @@ -437,9 +480,14 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo * nodeInfoAnnotations = map[string]string{} } + var oldAnnotationSet sets.String mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] tok := strings.Split(mpa, ",") - oldAnnotationSet := sets.NewString(tok...) + if len(mpa) == 0 { + oldAnnotationSet = sets.NewString() + } else { + oldAnnotationSet = sets.NewString(tok...) + } newAnnotationSet := sets.NewString() for pluginName, migratedFunc := range migratedPlugins { @@ -480,7 +528,6 @@ func (nim *nodeInfoManager) installDriverToCSINode( } specModified := true - statusModified := true // Clone driver list, omitting the driver that matches the given driverName newDriverSpecs := []storagev1beta1.CSINodeDriver{} for _, driverInfoSpec := range nodeInfo.Spec.Drivers { @@ -497,7 +544,7 @@ func (nim *nodeInfoManager) installDriverToCSINode( annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) - if !specModified && !statusModified && !annotationModified { + if !specModified && !annotationModified { return nil } @@ -562,9 +609,7 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode( } } - annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) - - if !hasModified && !annotationModified { + if !hasModified { // No changes, don't update return nil } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 85f5223e0af..76780a5f6f1 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -300,9 +300,9 @@ type AttachDetachVolumeHost interface { // CSINodeLister returns the informer lister for the CSINode API Object CSINodeLister() storagelisters.CSINodeLister - // CSINodeSynced returns a boolean representing whether the CSINode API Object - // informer has been synced - CSINodeSynced() bool + // IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost + // to the attachDetachController + IsAttachDetachController() bool } // VolumeHost is an interface that plugins can use to access the kubelet. diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index f7e40f612ed..314dcaa8c32 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -25,7 +25,6 @@ go_library( "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index e49d10ccb08..d2ed9edc690 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -17,13 +17,13 @@ limitations under the License. package operationexecutor import ( + goerrors "errors" "fmt" "path" "strings" "time" "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -86,7 +86,6 @@ func NewOperationGenerator(kubeClient clientset.Interface, checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, blkUtil: blkUtil, } - // TODO(dyzz) look at default resync time } // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable @@ -875,7 +874,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( if deviceOpened { return deviceToDetach.GenerateError( "UnmountDevice failed", - fmt.Errorf("the device is in use when it was no longer expected to be in use")) + goerrors.New("the device is in use when it was no longer expected to be in use")) } klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) @@ -982,7 +981,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( devicePath = pluginDevicePath } if len(devicePath) == 0 { - return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) + return volumeToMount.GenerateError("MapVolume failed", goerrors.New("Device path of the volume is empty")) } // When kubelet is containerized, devicePath may be a symlink at a place unavailable to @@ -1547,8 +1546,11 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo return deviceOpened, nil } -// TODO(dyzz): need to also add logic to check CSINodeInfo for Kubelet migration status func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { + // TODO(#75146) Check whether the driver is installed as well so that + // we can throw a better error when the driver is not installed. + // The error should be of the approximate form: + // fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName) if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { return false } @@ -1562,8 +1564,6 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { } func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { - var err error - migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) if err != nil { return false, err @@ -1575,37 +1575,32 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type } if len(nodeName) == 0 { - return false, fmt.Errorf("nodeName is empty") + return false, goerrors.New("nodeName is empty") } - vpm := og.volumePluginMgr - - kubeClient := vpm.Host.GetKubeClient() + kubeClient := og.volumePluginMgr.Host.GetKubeClient() if kubeClient == nil { - // TODO(dyzz) check this error case, what should we do in standalone kubelet mode? - return false, fmt.Errorf("failed to get kube client from volume host") + // Don't handle the controller/kubelet version skew check and fallback + // to just checking the feature gates. This can happen if + // we are in a standalone (headless) Kubelet + return true, nil } adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) if !ok { - // This function is running not on the AttachDetachController - // We assume that Kubelet is servicing this function and therefore is - // trivially "using CSI Plugin" + // Don't handle the controller/kubelet version skew check and fallback + // to just checking the feature gates. This can happen if + // "enableControllerAttachDetach" is set to true on kubelet return true, nil } - var csiNode *storagev1beta1.CSINode - if adcHost.CSINodeSynced() { - csiNode, err = adcHost.CSINodeLister().Get(string(nodeName)) - if err != nil { - return false, err - } - } else { - // Fallback to GET - klog.Warningf("CSINode informer not synced, falling back to GET directly from API Server") - csiNode, err = kubeClient.StorageV1beta1().CSINodes().Get(string(nodeName), metav1.GetOptions{}) - if err != nil { - return false, err - } + + if adcHost.CSINodeLister() == nil { + return false, goerrors.New("could not find CSINodeLister in attachDetachController") + } + + csiNode, err := adcHost.CSINodeLister().Get(string(nodeName)) + if err != nil { + return false, err } ann := csiNode.GetAnnotations() @@ -1613,7 +1608,14 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type return false, nil } - mpaSet := sets.NewString(strings.Split(ann[v1.MigratedPluginsAnnotationKey], ",")...) + var mpaSet sets.String + mpa := ann[v1.MigratedPluginsAnnotationKey] + tok := strings.Split(mpa, ",") + if len(mpa) == 0 { + mpaSet = sets.NewString() + } else { + mpaSet = sets.NewString(tok...) + } pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume) if err != nil { @@ -1621,7 +1623,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type } if len(pluginName) == 0 { - // Could not find a plugin name from translation directory + // Could not find a plugin name from translation directory, assume not translated return false, nil } @@ -1629,7 +1631,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type if isMigratedOnNode { installed := false - driverName, err := csilib.GetCSINameFromIntreeName(pluginName) + driverName, err := csilib.GetCSINameFromInTreeName(pluginName) if err != nil { return isMigratedOnNode, err } @@ -1640,7 +1642,7 @@ func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName type } } if !installed { - return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed.", pluginName, string(nodeName), driverName) + return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName) } } @@ -1660,8 +1662,8 @@ func translateSpec(spec *volume.Spec) (*volume.Spec, error) { ReadOnly: spec.ReadOnly, }, nil } else if spec.Volume != nil { - return &volume.Spec{}, fmt.Errorf("translation is not supported for in-line volumes yet") + return &volume.Spec{}, goerrors.New("translation is not supported for in-line volumes yet") } else { - return &volume.Spec{}, fmt.Errorf("not a valid volume spec") + return &volume.Spec{}, goerrors.New("not a valid volume spec") } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 6bbe8d23d8b..d1e83dd99cd 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -76,7 +76,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie()) } - if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie()) } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index bad6b830b40..2c64f63a963 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -66,14 +66,6 @@ items: - get - list - watch - - apiGroups: - - storage.k8s.io - resources: - - csinodes - verbs: - - get - - list - - watch - apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: diff --git a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go index 442245ff8a0..edc9b4d6008 100644 --- a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go +++ b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go @@ -98,6 +98,9 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" - // TODO(dyzz) Comment + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated + // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. + // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or + // CSI Backend for a volume plugin on a specific node. MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/staging/src/k8s.io/csi-translation-lib/translate.go b/staging/src/k8s.io/csi-translation-lib/translate.go index 477fb680f2a..ac1a4dd3d86 100644 --- a/staging/src/k8s.io/csi-translation-lib/translate.go +++ b/staging/src/k8s.io/csi-translation-lib/translate.go @@ -17,6 +17,7 @@ limitations under the License. package csitranslation import ( + "errors" "fmt" "k8s.io/api/core/v1" @@ -48,7 +49,7 @@ func TranslateInTreeStorageClassParametersToCSI(inTreePluginName string, scParam // be modified func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { if pv == nil { - return nil, fmt.Errorf("persistent volume was nil") + return nil, errors.New("persistent volume was nil") } copiedPV := pv.DeepCopy() for _, curPlugin := range inTreePlugins { @@ -64,7 +65,7 @@ func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, erro // by the `Driver` field in the CSI Source. The input PV object will not be modified. func TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { if pv == nil || pv.Spec.CSI == nil { - return nil, fmt.Errorf("CSI persistent volume was nil") + return nil, errors.New("CSI persistent volume was nil") } copiedPV := pv.DeepCopy() for driverName, curPlugin := range inTreePlugins { @@ -106,16 +107,15 @@ func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (strin return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv) } else if vol != nil { // TODO(dyzz): Implement inline volume migration support - return "", fmt.Errorf("inline volume migration not yet supported") + return "", errors.New("inline volume migration not yet supported") } else { - return "", fmt.Errorf("both persistent volume and volume are nil") + return "", errors.New("both persistent volume and volume are nil") } } // GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the // in-tree plugin with the given name func GetCSINameFromInTreeName(pluginName string) (string, error) { - for csiDriverName, curPlugin := range inTreePlugins { if curPlugin.GetInTreePluginName() == pluginName { return csiDriverName, nil