From 00b0ab86af9427bf60a8ea58fd4e83e14d47a77a Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Wed, 22 May 2019 16:50:24 +0200 Subject: [PATCH] Update scheduler to use volume limits from CSINode --- cmd/kube-scheduler/app/server.go | 1 + pkg/scheduler/BUILD | 4 + pkg/scheduler/algorithm/predicates/BUILD | 3 + .../predicates/csi_volume_predicate.go | 146 ++++++++++++------ .../algorithm/predicates/predicates.go | 27 +++- pkg/scheduler/algorithm/predicates/utils.go | 62 +++++++- pkg/scheduler/eventhandlers.go | 81 +++++++++- pkg/scheduler/factory/BUILD | 2 + pkg/scheduler/factory/factory.go | 24 ++- pkg/scheduler/factory/factory_test.go | 3 +- pkg/scheduler/internal/cache/BUILD | 1 + pkg/scheduler/internal/cache/cache.go | 44 +++++- pkg/scheduler/internal/cache/fake/BUILD | 1 + .../internal/cache/fake/fake_cache.go | 12 +- pkg/scheduler/internal/cache/interface.go | 12 +- pkg/scheduler/nodeinfo/BUILD | 2 + pkg/scheduler/nodeinfo/node_info.go | 37 ++++- pkg/scheduler/scheduler.go | 11 +- pkg/scheduler/scheduler_test.go | 3 +- .../authorizer/rbac/bootstrappolicy/policy.go | 61 ++++---- .../testdata/cluster-roles.yaml | 8 + test/integration/daemonset/daemonset_test.go | 3 +- test/integration/scheduler/scheduler_test.go | 5 +- test/integration/scheduler/util.go | 3 +- test/integration/util/util.go | 4 +- 25 files changed, 452 insertions(+), 108 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 58745fa9fda..833693f7b29 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -172,6 +172,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error cc.InformerFactory.Core().V1().Services(), cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), cc.InformerFactory.Storage().V1().StorageClasses(), + cc.InformerFactory.Storage().V1beta1().CSINodes(), cc.Recorder, cc.ComponentConfig.AlgorithmSource, stopCh, diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 15eb641ab1f..9538ad0fcbd 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -10,6 +10,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", @@ -23,15 +24,18 @@ go_library( "//pkg/scheduler/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index 1cea069a4bb..449beac5f6f 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -30,6 +30,7 @@ go_library( "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", @@ -41,6 +42,8 @@ go_library( "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", + "//staging/src/k8s.io/csi-translation-lib:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go index fd475403970..a93339c2dd6 100644 --- a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go @@ -19,9 +19,11 @@ package predicates import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/util/rand" utilfeature "k8s.io/apiserver/pkg/util/feature" + csilib "k8s.io/csi-translation-lib" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -30,9 +32,10 @@ import ( // CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes type CSIMaxVolumeLimitChecker struct { - pvInfo PersistentVolumeInfo - pvcInfo PersistentVolumeClaimInfo - scInfo StorageClassInfo + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo + scInfo StorageClassInfo + randomVolumeIDPrefix string } @@ -50,52 +53,48 @@ func NewCSIMaxVolumeLimitPredicate( func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { - - // if feature gate is disable we return - if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - return true, nil, nil - } - // If a pod doesn't have any volume attached to it, the predicate will always be true. - // Thus we make a fast path for it, to avoid unnecessary computations in this case. + // If the new pod doesn't have any volume attached to it, the predicate will always be true if len(pod.Spec.Volumes) == 0 { return true, nil, nil } - nodeVolumeLimits := nodeInfo.VolumeLimits() - - // if node does not have volume limits this predicate should exit - if len(nodeVolumeLimits) == 0 { + if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { return true, nil, nil } - // a map of unique volume name/csi volume handle and volume limit key newVolumes := make(map[string]string) - if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := c.filterAttachableVolumes(nodeInfo, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { return false, nil, err } + // If the pod doesn't have any new CSI volumes, the predicate will always be true if len(newVolumes) == 0 { return true, nil, nil } - // a map of unique volume name/csi volume handle and volume limit key + // If the node doesn't have volume limits, the predicate will always be true + nodeVolumeLimits := nodeInfo.VolumeLimits() + if len(nodeVolumeLimits) == 0 { + return true, nil, nil + } + attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods() { - if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { + if err := c.filterAttachableVolumes(nodeInfo, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { return false, nil, err } } - newVolumeCount := map[string]int{} attachedVolumeCount := map[string]int{} - - for volumeName, volumeLimitKey := range attachedVolumes { - if _, ok := newVolumes[volumeName]; ok { - delete(newVolumes, volumeName) + for volumeUniqueName, volumeLimitKey := range attachedVolumes { + if _, ok := newVolumes[volumeUniqueName]; ok { + // Don't count single volume used in multiple pods more than once + delete(newVolumes, volumeUniqueName) } attachedVolumeCount[volumeLimitKey]++ } + newVolumeCount := map[string]int{} for _, volumeLimitKey := range newVolumes { newVolumeCount[volumeLimitKey]++ } @@ -114,7 +113,7 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( } func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( - volumes []v1.Volume, namespace string, result map[string]string) error { + nodeInfo *schedulernodeinfo.NodeInfo, volumes []v1.Volume, namespace string, result map[string]string) error { for _, vol := range volumes { // CSI volumes can only be used as persistent volumes @@ -130,74 +129,119 @@ func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) if err != nil { - klog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName) + klog.V(5).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName) continue } - driverName, volumeHandle := c.getCSIDriver(pvc) - // if we can't find driver name or volume handle - we don't count this volume. + csiNode := nodeInfo.CSINode() + driverName, volumeHandle := c.getCSIDriverInfo(csiNode, pvc) if driverName == "" || volumeHandle == "" { + klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume") continue } - volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) - result[volumeHandle] = volumeLimitKey + volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle) + volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) + result[volumeUniqueName] = volumeLimitKey } return nil } -func (c *CSIMaxVolumeLimitChecker) getCSIDriver(pvc *v1.PersistentVolumeClaim) (string, string) { +// getCSIDriverInfo returns the CSI driver name and volume ID of a given PVC. +// If the PVC is from a migrated in-tree plugin, this function will return +// the information of the CSI driver that the plugin has been migrated to. +func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfo(csiNode *storagev1beta1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { pvName := pvc.Spec.VolumeName namespace := pvc.Namespace pvcName := pvc.Name - placeHolderCSIDriver := "" - placeHolderHandle := "" if pvName == "" { klog.V(5).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName) - return c.getDriverNameFromSC(pvc) + return c.getCSIDriverInfoFromSC(csiNode, pvc) } - pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) if err != nil { - klog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) + klog.V(5).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) // If we can't fetch PV associated with PVC, may be it got deleted // or PVC was prebound to a PVC that hasn't been created yet. // fallback to using StorageClass for volume counting - return c.getDriverNameFromSC(pvc) + return c.getCSIDriverInfoFromSC(csiNode, pvc) } csiSource := pv.Spec.PersistentVolumeSource.CSI if csiSource == nil { - klog.V(5).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName) - return placeHolderCSIDriver, placeHolderHandle + // We make a fast path for non-CSI volumes that aren't migratable + if !csilib.IsPVMigratable(pv) { + return "", "" + } + + pluginName, err := csilib.GetInTreePluginNameFromSpec(pv, nil) + if err != nil { + klog.V(5).Infof("Unable to look up plugin name from PV spec: %v", err) + return "", "" + } + + if !isCSIMigrationOn(csiNode, pluginName) { + klog.V(5).Infof("CSI Migration of plugin %s is not enabled", pluginName) + return "", "" + } + + csiPV, err := csilib.TranslateInTreePVToCSI(pv) + if err != nil { + klog.V(5).Infof("Unable to translate in-tree volume to CSI: %v", err) + return "", "" + } + + if csiPV.Spec.PersistentVolumeSource.CSI == nil { + klog.V(5).Infof("Unable to get a valid volume source for translated PV %s", pvName) + return "", "" + } + + csiSource = csiPV.Spec.PersistentVolumeSource.CSI } + return csiSource.Driver, csiSource.VolumeHandle } -func (c *CSIMaxVolumeLimitChecker) getDriverNameFromSC(pvc *v1.PersistentVolumeClaim) (string, string) { +// getCSIDriverInfoFromSC returns the CSI driver name and a random volume ID of a given PVC's StorageClass. +func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfoFromSC(csiNode *storagev1beta1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { namespace := pvc.Namespace pvcName := pvc.Name scName := pvc.Spec.StorageClassName - placeHolderCSIDriver := "" - placeHolderHandle := "" + // If StorageClass is not set or not found, then PVC must be using immediate binding mode + // and hence it must be bound before scheduling. So it is safe to not count it. if scName == nil { - // if StorageClass is not set or found, then PVC must be using immediate binding mode - // and hence it must be bound before scheduling. So it is safe to not count it. - klog.V(5).Infof("pvc %s/%s has no storageClass", namespace, pvcName) - return placeHolderCSIDriver, placeHolderHandle + klog.V(5).Infof("PVC %s/%s has no StorageClass", namespace, pvcName) + return "", "" } storageClass, err := c.scInfo.GetStorageClassInfo(*scName) if err != nil { - klog.V(5).Infof("no storage %s found for pvc %s/%s", *scName, namespace, pvcName) - return placeHolderCSIDriver, placeHolderHandle + klog.V(5).Infof("Could not get StorageClass for PVC %s/%s: %v", namespace, pvcName, err) + return "", "" } - // We use random prefix to avoid conflict with volume-ids. If PVC is bound in the middle - // predicate and there is another pod(on same node) that uses same volume then we will overcount + // We use random prefix to avoid conflict with volume IDs. If PVC is bound during the execution of the + // predicate and there is another pod on the same node that uses same volume, then we will overcount // the volume and consider both volumes as different. volumeHandle := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName) - return storageClass.Provisioner, volumeHandle + + provisioner := storageClass.Provisioner + if csilib.IsMigratableIntreePluginByName(provisioner) { + if !isCSIMigrationOn(csiNode, provisioner) { + klog.V(5).Infof("CSI Migration of plugin %s is not enabled", provisioner) + return "", "" + } + + driverName, err := csilib.GetCSINameFromInTreeName(provisioner) + if err != nil { + klog.V(5).Infof("Unable to look up driver name from plugin name: %v", err) + return "", "" + } + return driverName, volumeHandle + } + + return provisioner, volumeHandle } diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index be61a41902c..ca7c10e5dd6 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -25,8 +25,9 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -37,6 +38,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" volumehelpers "k8s.io/cloud-provider/volume/helpers" + csilibplugins "k8s.io/csi-translation-lib/plugins" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/features" @@ -315,6 +317,8 @@ type VolumeFilter struct { // Filter normal volumes FilterVolume func(vol *v1.Volume) (id string, relevant bool) FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool) + // IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver + IsMigrated func(csiNode *storagev1beta1.CSINode) bool } // NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the @@ -484,6 +488,11 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata, return true, nil, nil } + // If a plugin has been migrated to a CSI driver, defer to the CSI predicate. + if c.filter.IsMigrated(nodeInfo.CSINode()) { + return true, nil, nil + } + // count unique volumes existingVolumes := make(map[string]bool) for _, existingPod := range nodeInfo.Pods() { @@ -538,6 +547,10 @@ var EBSVolumeFilter = VolumeFilter{ } return "", false }, + + IsMigrated: func(csiNode *storagev1beta1.CSINode) bool { + return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName) + }, } // GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes @@ -555,6 +568,10 @@ var GCEPDVolumeFilter = VolumeFilter{ } return "", false }, + + IsMigrated: func(csiNode *storagev1beta1.CSINode) bool { + return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName) + }, } // AzureDiskVolumeFilter is a VolumeFilter for filtering Azure Disk Volumes @@ -572,6 +589,10 @@ var AzureDiskVolumeFilter = VolumeFilter{ } return "", false }, + + IsMigrated: func(csiNode *storagev1beta1.CSINode) bool { + return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName) + }, } // CinderVolumeFilter is a VolumeFilter for filtering Cinder Volumes @@ -590,6 +611,10 @@ var CinderVolumeFilter = VolumeFilter{ } return "", false }, + + IsMigrated: func(csiNode *storagev1beta1.CSINode) bool { + return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName) + }, } // VolumeZoneChecker contains information to check the volume zone for a predicate. diff --git a/pkg/scheduler/algorithm/predicates/utils.go b/pkg/scheduler/algorithm/predicates/utils.go index 6bbbe0f6bdc..dc833b2d6fb 100644 --- a/pkg/scheduler/algorithm/predicates/utils.go +++ b/pkg/scheduler/algorithm/predicates/utils.go @@ -17,8 +17,15 @@ limitations under the License. package predicates import ( - "k8s.io/api/core/v1" + "strings" + + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + csilibplugins "k8s.io/csi-translation-lib/plugins" + "k8s.io/kubernetes/pkg/features" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -87,3 +94,56 @@ func SetPredicatesOrderingDuringTest(value []string) func() { predicatesOrdering = origVal } } + +// isCSIMigrationOn returns a boolean value indicating whether +// the CSI migration has been enabled for a particular storage plugin. +func isCSIMigrationOn(csiNode *storagev1beta1.CSINode, pluginName string) bool { + if csiNode == nil || len(pluginName) == 0 { + return false + } + + // In-tree storage to CSI driver migration feature should be enabled, + // along with the plugin-specific one + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { + return false + } + + switch pluginName { + case csilibplugins.AWSEBSInTreePluginName: + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) { + return false + } + case csilibplugins.GCEPDInTreePluginName: + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) { + return false + } + case csilibplugins.AzureDiskInTreePluginName: + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) { + return false + } + case csilibplugins.CinderInTreePluginName: + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) { + return false + } + default: + return false + } + + // The plugin name should be listed in the CSINode object annotation. + // This indicates that the plugin has been migrated to a CSI driver in the node. + csiNodeAnn := csiNode.GetAnnotations() + if csiNodeAnn == nil { + return false + } + + var mpaSet sets.String + mpa := csiNodeAnn[v1.MigratedPluginsAnnotationKey] + if len(mpa) == 0 { + mpaSet = sets.NewString() + } else { + tok := strings.Split(mpa, ",") + mpaSet = sets.NewString(tok...) + } + + return mpaSet.Has(pluginName) +} diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 3cff88e627e..5ad4be1eed5 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -18,15 +18,20 @@ package scheduler import ( "fmt" - "k8s.io/klog" "reflect" - "k8s.io/api/core/v1" + "k8s.io/klog" + + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" - storageinformers "k8s.io/client-go/informers/storage/v1" + storageinformersv1 "k8s.io/client-go/informers/storage/v1" + storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/features" ) func (sched *Scheduler) onPvAdd(obj interface{}) { @@ -150,6 +155,63 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { klog.Errorf("scheduler cache RemoveNode failed: %v", err) } } + +func (sched *Scheduler) onCSINodeAdd(obj interface{}) { + csiNode, ok := obj.(*storagev1beta1.CSINode) + if !ok { + klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", obj) + return + } + + if err := sched.config.SchedulerCache.AddCSINode(csiNode); err != nil { + klog.Errorf("scheduler cache AddCSINode failed: %v", err) + } + + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { + oldCSINode, ok := oldObj.(*storagev1beta1.CSINode) + if !ok { + klog.Errorf("cannot convert oldObj to *storagev1beta1.CSINode: %v", oldObj) + return + } + + newCSINode, ok := newObj.(*storagev1beta1.CSINode) + if !ok { + klog.Errorf("cannot convert newObj to *storagev1beta1.CSINode: %v", newObj) + return + } + + if err := sched.config.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil { + klog.Errorf("scheduler cache UpdateCSINode failed: %v", err) + } + + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onCSINodeDelete(obj interface{}) { + var csiNode *storagev1beta1.CSINode + switch t := obj.(type) { + case *storagev1beta1.CSINode: + csiNode = t + case cache.DeletedFinalStateUnknown: + var ok bool + csiNode, ok = t.Obj.(*storagev1beta1.CSINode) + if !ok { + klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t.Obj) + return + } + default: + klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t) + return + } + + if err := sched.config.SchedulerCache.RemoveCSINode(csiNode); err != nil { + klog.Errorf("scheduler cache RemoveCSINode failed: %v", err) + } +} + func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil { utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) @@ -324,7 +386,8 @@ func AddAllEventHandlers( pvInformer coreinformers.PersistentVolumeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, serviceInformer coreinformers.ServiceInformer, - storageClassInformer storageinformers.StorageClassInformer, + storageClassInformer storageinformersv1.StorageClassInformer, + csiNodeInformer storageinformersv1beta1.CSINodeInformer, ) { // scheduled pod cache podInformer.Informer().AddEventHandler( @@ -385,6 +448,16 @@ func AddAllEventHandlers( }, ) + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + csiNodeInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.onCSINodeAdd, + UpdateFunc: sched.onCSINodeUpdate, + DeleteFunc: sched.onCSINodeDelete, + }, + ) + } + // On add and delete of PVs, it will affect equivalence cache items // related to persistent volume pvInformer.Informer().AddEventHandler( diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 924d8c0e18a..0ddc6a03c57 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -38,11 +38,13 @@ go_library( "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 579c4dccb44..a1dc84c804c 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -22,7 +22,7 @@ import ( "fmt" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -36,12 +36,14 @@ import ( appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" policyinformers "k8s.io/client-go/informers/policy/v1beta1" - storageinformers "k8s.io/client-go/informers/storage/v1" + storageinformersv1 "k8s.io/client-go/informers/storage/v1" + storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" - storagelisters "k8s.io/client-go/listers/storage/v1" + storagelistersv1 "k8s.io/client-go/listers/storage/v1" + storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog" @@ -184,7 +186,9 @@ type configFactory struct { // a means to list all PodDisruptionBudgets pdbLister policylisters.PodDisruptionBudgetLister // a means to list all StorageClasses - storageClassLister storagelisters.StorageClassLister + storageClassLister storagelistersv1.StorageClassLister + // a means to list all CSINodes + csiNodeLister storagelistersv1beta1.CSINodeLister // framework has a set of plugins and the context used for running them. framework framework.Framework @@ -236,7 +240,8 @@ type ConfigFactoryArgs struct { StatefulSetInformer appsinformers.StatefulSetInformer ServiceInformer coreinformers.ServiceInformer PdbInformer policyinformers.PodDisruptionBudgetInformer - StorageClassInformer storageinformers.StorageClassInformer + StorageClassInformer storageinformersv1.StorageClassInformer + CSINodeInformer storageinformersv1beta1.CSINodeInformer HardPodAffinitySymmetricWeight int32 DisablePreemption bool PercentageOfNodesToScore int32 @@ -262,10 +267,16 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { } // storageClassInformer is only enabled through VolumeScheduling feature gate - var storageClassLister storagelisters.StorageClassLister + var storageClassLister storagelistersv1.StorageClassLister if args.StorageClassInformer != nil { storageClassLister = args.StorageClassInformer.Lister() } + + var csiNodeLister storagelistersv1beta1.CSINodeLister + if args.CSINodeInformer != nil { + csiNodeLister = args.CSINodeInformer.Lister() + } + c := &configFactory{ client: args.Client, podLister: schedulerCache, @@ -279,6 +290,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { statefulSetLister: args.StatefulSetInformer.Lister(), pdbLister: args.PdbInformer.Lister(), storageClassLister: storageClassLister, + csiNodeLister: csiNodeLister, framework: framework, schedulerCache: schedulerCache, StopEverything: stopEverything, diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index efe82c88057..3c7c09cc544 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" @@ -491,6 +491,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), hardPodAffinitySymmetricWeight, disablePodPreemption, schedulerapi.DefaultPercentageOfNodesToScore, diff --git a/pkg/scheduler/internal/cache/BUILD b/pkg/scheduler/internal/cache/BUILD index e300b5b1b10..7e002c10d26 100644 --- a/pkg/scheduler/internal/cache/BUILD +++ b/pkg/scheduler/internal/cache/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index a663c0c0e22..10581f302e4 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -21,7 +21,8 @@ import ( "sync" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -569,6 +570,47 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { return nil } +func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[csiNode.Name] + if !ok { + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) + cache.nodes[csiNode.Name] = n + } + n.info.SetCSINode(csiNode) + cache.moveNodeInfoToHead(csiNode.Name) + return nil +} + +func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[newCSINode.Name] + if !ok { + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) + cache.nodes[newCSINode.Name] = n + } + n.info.SetCSINode(newCSINode) + cache.moveNodeInfoToHead(newCSINode.Name) + return nil +} + +func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[csiNode.Name] + if !ok { + return fmt.Errorf("node %v is not found", csiNode.Name) + } + n.info.SetCSINode(nil) + cache.moveNodeInfoToHead(csiNode.Name) + return nil +} + // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in // scheduler cache. This function assumes the lock to scheduler cache has been acquired. func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) { diff --git a/pkg/scheduler/internal/cache/fake/BUILD b/pkg/scheduler/internal/cache/fake/BUILD index caed79bb710..1193d9cc5d7 100644 --- a/pkg/scheduler/internal/cache/fake/BUILD +++ b/pkg/scheduler/internal/cache/fake/BUILD @@ -9,6 +9,7 @@ go_library( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], ) diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 868b5ea6313..3553248e985 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -17,7 +17,8 @@ limitations under the License. package fake import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -74,6 +75,15 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil } // RemoveNode is a fake method for testing. func (c *Cache) RemoveNode(node *v1.Node) error { return nil } +// AddCSINode is a fake method for testing. +func (c *Cache) AddCSINode(csiNode *storagev1beta1.CSINode) error { return nil } + +// UpdateCSINode is a fake method for testing. +func (c *Cache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error { return nil } + +// RemoveCSINode is a fake method for testing. +func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil } + // UpdateNodeInfoSnapshot is a fake method for testing. func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *internalcache.NodeInfoSnapshot) error { return nil diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 699818b1e6e..b20174186ad 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -17,7 +17,8 @@ limitations under the License. package cache import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -100,6 +101,15 @@ type Cache interface { // on this node. UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error + // AddCSINode adds overall CSI-related information about node. + AddCSINode(csiNode *storagev1beta1.CSINode) error + + // UpdateCSINode updates overall CSI-related information about node. + UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error + + // RemoveCSINode removes overall CSI-related information about node. + RemoveCSINode(csiNode *storagev1beta1.CSINode) error + // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*v1.Pod, error) diff --git a/pkg/scheduler/nodeinfo/BUILD b/pkg/scheduler/nodeinfo/BUILD index c9631738f6f..7198d72d71f 100644 --- a/pkg/scheduler/nodeinfo/BUILD +++ b/pkg/scheduler/nodeinfo/BUILD @@ -12,7 +12,9 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/scheduler/nodeinfo/node_info.go b/pkg/scheduler/nodeinfo/node_info.go index 6a6703a4fa6..82c7d493339 100644 --- a/pkg/scheduler/nodeinfo/node_info.go +++ b/pkg/scheduler/nodeinfo/node_info.go @@ -22,12 +22,13 @@ import ( "sync" "sync/atomic" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" + volumeutil "k8s.io/kubernetes/pkg/volume/util" ) var ( @@ -46,7 +47,8 @@ type ImageStateSummary struct { // NodeInfo is node level aggregated information. type NodeInfo struct { // Overall node information. - node *v1.Node + node *v1.Node + csiNode *storagev1beta1.CSINode pods []*v1.Pod podsWithAffinity []*v1.Pod @@ -285,6 +287,14 @@ func (n *NodeInfo) Node() *v1.Node { return n.node } +// CSINode returns overall CSI-related information about this node. +func (n *NodeInfo) CSINode() *storagev1beta1.CSINode { + if n == nil { + return nil + } + return n.csiNode +} + // Pods return all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) Pods() []*v1.Pod { if n == nil { @@ -434,6 +444,7 @@ func (n *NodeInfo) SetGeneration(newGeneration int64) { func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ node: n.node, + csiNode: n.csiNode, requestedResource: n.requestedResource.Clone(), nonzeroRequest: n.nonzeroRequest.Clone(), allocatableResource: n.allocatableResource.Clone(), @@ -471,11 +482,24 @@ func (n *NodeInfo) Clone() *NodeInfo { // VolumeLimits returns volume limits associated with the node func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 { volumeLimits := map[v1.ResourceName]int64{} + for k, v := range n.AllocatableResource().ScalarResources { if v1helper.IsAttachableVolumeResourceName(k) { volumeLimits[k] = v } } + + if n.csiNode != nil { + for i := range n.csiNode.Spec.Drivers { + d := n.csiNode.Spec.Drivers[i] + if d.Allocatable != nil && d.Allocatable.Count != nil { + // TODO: drop GetCSIAttachLimitKey once we don't get values from Node object + k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name)) + volumeLimits[k] = int64(*d.Allocatable.Count) + } + } + } + return volumeLimits } @@ -646,6 +670,11 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error { return nil } +// SetCSINode sets the overall CSI-related node information. +func (n *NodeInfo) SetCSINode(csiNode *storagev1beta1.CSINode) { + n.csiNode = csiNode +} + // FilterOutPods receives a list of pods and filters out those whose node names // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e9721b1be94..8aa67512497 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -24,14 +24,15 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" policyinformers "k8s.io/client-go/informers/policy/v1beta1" - storageinformers "k8s.io/client-go/informers/storage/v1" + storageinformersv1 "k8s.io/client-go/informers/storage/v1" + storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -127,7 +128,8 @@ func New(client clientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, - storageClassInformer storageinformers.StorageClassInformer, + storageClassInformer storageinformersv1.StorageClassInformer, + csiNodeInformer storageinformersv1beta1.CSINodeInformer, recorder record.EventRecorder, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, stopCh <-chan struct{}, @@ -154,6 +156,7 @@ func New(client clientset.Interface, ServiceInformer: serviceInformer, PdbInformer: pdbInformer, StorageClassInformer: storageClassInformer, + CSINodeInformer: csiNodeInformer, HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, DisablePreemption: options.disablePreemption, PercentageOfNodesToScore: options.percentageOfNodesToScore, @@ -201,7 +204,7 @@ func New(client clientset.Interface, // Create the scheduler. sched := NewFromConfig(config) - AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer) + AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer) return sched, nil } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e027ddce673..93c2bf5b769 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -200,6 +200,7 @@ func TestSchedulerCreation(t *testing.T) { informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}), kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource}, stopCh, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 4fdd92623cb..5293683e142 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -414,34 +414,6 @@ func ClusterRoles() []rbacv1.ClusterRole { rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("serviceaccounts/token").RuleOrDie(), }, }, - { - // a role to use for the kube-scheduler - ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"}, - Rules: []rbacv1.PolicyRule{ - eventsRule(), - - // this is for leaderlease access - // TODO: scope this to the kube-system namespace - rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("endpoints").RuleOrDie(), - rbacv1helpers.NewRule("get", "update", "patch", "delete").Groups(legacyGroup).Resources("endpoints").Names("kube-scheduler").RuleOrDie(), - - // fundamental resources - rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("nodes").RuleOrDie(), - rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), - rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(), - rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), - // things that select pods - rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(), - rbacv1helpers.NewRule(Read...).Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(), - rbacv1helpers.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(), - // things that pods use or applies to them - rbacv1helpers.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), - rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(), - // Needed to check API access. These creates are non-mutating - rbacv1helpers.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(), - rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(), - }, - }, { // a role to use for the kube-dns pod ObjectMeta: metav1.ObjectMeta{Name: "system:kube-dns"}, @@ -498,6 +470,39 @@ func ClusterRoles() []rbacv1.ClusterRole { }, } + kubeSchedulerRules := []rbacv1.PolicyRule{ + eventsRule(), + // This is for leaderlease access + // TODO: scope this to the kube-system namespace + rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("endpoints").RuleOrDie(), + rbacv1helpers.NewRule("get", "update", "patch", "delete").Groups(legacyGroup).Resources("endpoints").Names("kube-scheduler").RuleOrDie(), + + // Fundamental resources + rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(), + rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), + // Things that select pods + rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(), + // Things that pods use or applies to them + rbacv1helpers.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(), + // Needed to check API access. These creates are non-mutating + rbacv1helpers.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(), + rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(), + } + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && + utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + kubeSchedulerRules = append(kubeSchedulerRules, rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie()) + } + roles = append(roles, rbacv1.ClusterRole{ + // a role to use for the kube-scheduler + ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"}, + Rules: kubeSchedulerRules, + }) + externalProvisionerRules := []rbacv1.PolicyRule{ rbacv1helpers.NewRule("create", "delete", "get", "list", "watch").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index adec063f180..a7d28c1385d 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -802,6 +802,14 @@ items: - subjectaccessreviews verbs: - create + - apiGroups: + - storage.k8s.io + resources: + - csinodes + verbs: + - get + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index b90c7ddb17c..668ba67e6c2 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -23,7 +23,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -131,6 +131,7 @@ func setupScheduler( informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().Services(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), ) eventBroadcaster := record.NewBroadcaster() diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 6f82103a237..9dfdc2e65a0 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -254,6 +254,7 @@ priorities: [] informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ @@ -325,6 +326,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ @@ -621,6 +623,7 @@ func TestMultiScheduler(t *testing.T) { context.informerFactory.Core().V1().PersistentVolumeClaims(), context.informerFactory.Core().V1().Services(), context.informerFactory.Storage().V1().StorageClasses(), + context.informerFactory.Storage().V1beta1().CSINodes(), ) go podInformer2.Informer().Run(stopCh) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 449795cb4cd..638968158e2 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -215,6 +215,7 @@ func initTestSchedulerWithOptions( context.informerFactory.Core().V1().PersistentVolumeClaims(), context.informerFactory.Core().V1().Services(), context.informerFactory.Storage().V1().StorageClasses(), + context.informerFactory.Storage().V1beta1().CSINodes(), ) // set setPodInformer if provided. diff --git a/test/integration/util/util.go b/test/integration/util/util.go index bfbe415a679..438d9a0dabe 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -20,7 +20,7 @@ import ( "net/http" "net/http/httptest" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -28,6 +28,7 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/scheduler" + // import DefaultProvider _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -84,6 +85,7 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().Services(), informerFactory.Storage().V1().StorageClasses(), + informerFactory.Storage().V1beta1().CSINodes(), ) informerFactory.Start(stopCh)