mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 02:11:09 +00:00
Update scheduler to use volume limits from CSINode
This commit is contained in:
parent
33c8bacd41
commit
00b0ab86af
@ -172,6 +172,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
|
||||
cc.InformerFactory.Core().V1().Services(),
|
||||
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
cc.InformerFactory.Storage().V1().StorageClasses(),
|
||||
cc.InformerFactory.Storage().V1beta1().CSINodes(),
|
||||
cc.Recorder,
|
||||
cc.ComponentConfig.AlgorithmSource,
|
||||
stopCh,
|
||||
|
@ -10,6 +10,7 @@ go_library(
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/api:go_default_library",
|
||||
@ -23,15 +24,18 @@ go_library(
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
|
@ -30,6 +30,7 @@ go_library(
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
@ -41,6 +42,8 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -19,9 +19,11 @@ package predicates
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
csilib "k8s.io/csi-translation-lib"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
@ -30,9 +32,10 @@ import (
|
||||
|
||||
// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
|
||||
type CSIMaxVolumeLimitChecker struct {
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
scInfo StorageClassInfo
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
scInfo StorageClassInfo
|
||||
|
||||
randomVolumeIDPrefix string
|
||||
}
|
||||
|
||||
@ -50,52 +53,48 @@ func NewCSIMaxVolumeLimitPredicate(
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
|
||||
pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
|
||||
// if feature gate is disable we return
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
return true, nil, nil
|
||||
}
|
||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
||||
// If the new pod doesn't have any volume attached to it, the predicate will always be true
|
||||
if len(pod.Spec.Volumes) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
nodeVolumeLimits := nodeInfo.VolumeLimits()
|
||||
|
||||
// if node does not have volume limits this predicate should exit
|
||||
if len(nodeVolumeLimits) == 0 {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// a map of unique volume name/csi volume handle and volume limit key
|
||||
newVolumes := make(map[string]string)
|
||||
if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
if err := c.filterAttachableVolumes(nodeInfo, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// If the pod doesn't have any new CSI volumes, the predicate will always be true
|
||||
if len(newVolumes) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// a map of unique volume name/csi volume handle and volume limit key
|
||||
// If the node doesn't have volume limits, the predicate will always be true
|
||||
nodeVolumeLimits := nodeInfo.VolumeLimits()
|
||||
if len(nodeVolumeLimits) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
attachedVolumes := make(map[string]string)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
|
||||
if err := c.filterAttachableVolumes(nodeInfo, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
newVolumeCount := map[string]int{}
|
||||
attachedVolumeCount := map[string]int{}
|
||||
|
||||
for volumeName, volumeLimitKey := range attachedVolumes {
|
||||
if _, ok := newVolumes[volumeName]; ok {
|
||||
delete(newVolumes, volumeName)
|
||||
for volumeUniqueName, volumeLimitKey := range attachedVolumes {
|
||||
if _, ok := newVolumes[volumeUniqueName]; ok {
|
||||
// Don't count single volume used in multiple pods more than once
|
||||
delete(newVolumes, volumeUniqueName)
|
||||
}
|
||||
attachedVolumeCount[volumeLimitKey]++
|
||||
}
|
||||
|
||||
newVolumeCount := map[string]int{}
|
||||
for _, volumeLimitKey := range newVolumes {
|
||||
newVolumeCount[volumeLimitKey]++
|
||||
}
|
||||
@ -114,7 +113,7 @@ func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
|
||||
}
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
|
||||
volumes []v1.Volume, namespace string, result map[string]string) error {
|
||||
nodeInfo *schedulernodeinfo.NodeInfo, volumes []v1.Volume, namespace string, result map[string]string) error {
|
||||
|
||||
for _, vol := range volumes {
|
||||
// CSI volumes can only be used as persistent volumes
|
||||
@ -130,74 +129,119 @@ func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
|
||||
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
|
||||
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
|
||||
klog.V(5).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
|
||||
continue
|
||||
}
|
||||
|
||||
driverName, volumeHandle := c.getCSIDriver(pvc)
|
||||
// if we can't find driver name or volume handle - we don't count this volume.
|
||||
csiNode := nodeInfo.CSINode()
|
||||
driverName, volumeHandle := c.getCSIDriverInfo(csiNode, pvc)
|
||||
if driverName == "" || volumeHandle == "" {
|
||||
klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume")
|
||||
continue
|
||||
}
|
||||
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
|
||||
result[volumeHandle] = volumeLimitKey
|
||||
|
||||
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
|
||||
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
|
||||
result[volumeUniqueName] = volumeLimitKey
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) getCSIDriver(pvc *v1.PersistentVolumeClaim) (string, string) {
|
||||
// getCSIDriverInfo returns the CSI driver name and volume ID of a given PVC.
|
||||
// If the PVC is from a migrated in-tree plugin, this function will return
|
||||
// the information of the CSI driver that the plugin has been migrated to.
|
||||
func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfo(csiNode *storagev1beta1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
|
||||
pvName := pvc.Spec.VolumeName
|
||||
namespace := pvc.Namespace
|
||||
pvcName := pvc.Name
|
||||
|
||||
placeHolderCSIDriver := ""
|
||||
placeHolderHandle := ""
|
||||
if pvName == "" {
|
||||
klog.V(5).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName)
|
||||
return c.getDriverNameFromSC(pvc)
|
||||
return c.getCSIDriverInfoFromSC(csiNode, pvc)
|
||||
}
|
||||
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
|
||||
|
||||
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
|
||||
klog.V(5).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
|
||||
// If we can't fetch PV associated with PVC, may be it got deleted
|
||||
// or PVC was prebound to a PVC that hasn't been created yet.
|
||||
// fallback to using StorageClass for volume counting
|
||||
return c.getDriverNameFromSC(pvc)
|
||||
return c.getCSIDriverInfoFromSC(csiNode, pvc)
|
||||
}
|
||||
|
||||
csiSource := pv.Spec.PersistentVolumeSource.CSI
|
||||
if csiSource == nil {
|
||||
klog.V(5).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName)
|
||||
return placeHolderCSIDriver, placeHolderHandle
|
||||
// We make a fast path for non-CSI volumes that aren't migratable
|
||||
if !csilib.IsPVMigratable(pv) {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
pluginName, err := csilib.GetInTreePluginNameFromSpec(pv, nil)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("Unable to look up plugin name from PV spec: %v", err)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
if !isCSIMigrationOn(csiNode, pluginName) {
|
||||
klog.V(5).Infof("CSI Migration of plugin %s is not enabled", pluginName)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
csiPV, err := csilib.TranslateInTreePVToCSI(pv)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("Unable to translate in-tree volume to CSI: %v", err)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
if csiPV.Spec.PersistentVolumeSource.CSI == nil {
|
||||
klog.V(5).Infof("Unable to get a valid volume source for translated PV %s", pvName)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
csiSource = csiPV.Spec.PersistentVolumeSource.CSI
|
||||
}
|
||||
|
||||
return csiSource.Driver, csiSource.VolumeHandle
|
||||
}
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) getDriverNameFromSC(pvc *v1.PersistentVolumeClaim) (string, string) {
|
||||
// getCSIDriverInfoFromSC returns the CSI driver name and a random volume ID of a given PVC's StorageClass.
|
||||
func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfoFromSC(csiNode *storagev1beta1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
|
||||
namespace := pvc.Namespace
|
||||
pvcName := pvc.Name
|
||||
scName := pvc.Spec.StorageClassName
|
||||
|
||||
placeHolderCSIDriver := ""
|
||||
placeHolderHandle := ""
|
||||
// If StorageClass is not set or not found, then PVC must be using immediate binding mode
|
||||
// and hence it must be bound before scheduling. So it is safe to not count it.
|
||||
if scName == nil {
|
||||
// if StorageClass is not set or found, then PVC must be using immediate binding mode
|
||||
// and hence it must be bound before scheduling. So it is safe to not count it.
|
||||
klog.V(5).Infof("pvc %s/%s has no storageClass", namespace, pvcName)
|
||||
return placeHolderCSIDriver, placeHolderHandle
|
||||
klog.V(5).Infof("PVC %s/%s has no StorageClass", namespace, pvcName)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
storageClass, err := c.scInfo.GetStorageClassInfo(*scName)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("no storage %s found for pvc %s/%s", *scName, namespace, pvcName)
|
||||
return placeHolderCSIDriver, placeHolderHandle
|
||||
klog.V(5).Infof("Could not get StorageClass for PVC %s/%s: %v", namespace, pvcName, err)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// We use random prefix to avoid conflict with volume-ids. If PVC is bound in the middle
|
||||
// predicate and there is another pod(on same node) that uses same volume then we will overcount
|
||||
// We use random prefix to avoid conflict with volume IDs. If PVC is bound during the execution of the
|
||||
// predicate and there is another pod on the same node that uses same volume, then we will overcount
|
||||
// the volume and consider both volumes as different.
|
||||
volumeHandle := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)
|
||||
return storageClass.Provisioner, volumeHandle
|
||||
|
||||
provisioner := storageClass.Provisioner
|
||||
if csilib.IsMigratableIntreePluginByName(provisioner) {
|
||||
if !isCSIMigrationOn(csiNode, provisioner) {
|
||||
klog.V(5).Infof("CSI Migration of plugin %s is not enabled", provisioner)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
driverName, err := csilib.GetCSINameFromInTreeName(provisioner)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("Unable to look up driver name from plugin name: %v", err)
|
||||
return "", ""
|
||||
}
|
||||
return driverName, volumeHandle
|
||||
}
|
||||
|
||||
return provisioner, volumeHandle
|
||||
}
|
||||
|
@ -25,8 +25,9 @@ import (
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
@ -37,6 +38,7 @@ import (
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
volumehelpers "k8s.io/cloud-provider/volume/helpers"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -315,6 +317,8 @@ type VolumeFilter struct {
|
||||
// Filter normal volumes
|
||||
FilterVolume func(vol *v1.Volume) (id string, relevant bool)
|
||||
FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
|
||||
// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
|
||||
IsMigrated func(csiNode *storagev1beta1.CSINode) bool
|
||||
}
|
||||
|
||||
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
|
||||
@ -484,6 +488,11 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata,
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// If a plugin has been migrated to a CSI driver, defer to the CSI predicate.
|
||||
if c.filter.IsMigrated(nodeInfo.CSINode()) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// count unique volumes
|
||||
existingVolumes := make(map[string]bool)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
@ -538,6 +547,10 @@ var EBSVolumeFilter = VolumeFilter{
|
||||
}
|
||||
return "", false
|
||||
},
|
||||
|
||||
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
|
||||
return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
|
||||
},
|
||||
}
|
||||
|
||||
// GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes
|
||||
@ -555,6 +568,10 @@ var GCEPDVolumeFilter = VolumeFilter{
|
||||
}
|
||||
return "", false
|
||||
},
|
||||
|
||||
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
|
||||
return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
|
||||
},
|
||||
}
|
||||
|
||||
// AzureDiskVolumeFilter is a VolumeFilter for filtering Azure Disk Volumes
|
||||
@ -572,6 +589,10 @@ var AzureDiskVolumeFilter = VolumeFilter{
|
||||
}
|
||||
return "", false
|
||||
},
|
||||
|
||||
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
|
||||
return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
|
||||
},
|
||||
}
|
||||
|
||||
// CinderVolumeFilter is a VolumeFilter for filtering Cinder Volumes
|
||||
@ -590,6 +611,10 @@ var CinderVolumeFilter = VolumeFilter{
|
||||
}
|
||||
return "", false
|
||||
},
|
||||
|
||||
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
|
||||
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
|
||||
},
|
||||
}
|
||||
|
||||
// VolumeZoneChecker contains information to check the volume zone for a predicate.
|
||||
|
@ -17,8 +17,15 @@ limitations under the License.
|
||||
package predicates
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
"strings"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
)
|
||||
|
||||
@ -87,3 +94,56 @@ func SetPredicatesOrderingDuringTest(value []string) func() {
|
||||
predicatesOrdering = origVal
|
||||
}
|
||||
}
|
||||
|
||||
// isCSIMigrationOn returns a boolean value indicating whether
|
||||
// the CSI migration has been enabled for a particular storage plugin.
|
||||
func isCSIMigrationOn(csiNode *storagev1beta1.CSINode, pluginName string) bool {
|
||||
if csiNode == nil || len(pluginName) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// In-tree storage to CSI driver migration feature should be enabled,
|
||||
// along with the plugin-specific one
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
|
||||
return false
|
||||
}
|
||||
|
||||
switch pluginName {
|
||||
case csilibplugins.AWSEBSInTreePluginName:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) {
|
||||
return false
|
||||
}
|
||||
case csilibplugins.GCEPDInTreePluginName:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) {
|
||||
return false
|
||||
}
|
||||
case csilibplugins.AzureDiskInTreePluginName:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) {
|
||||
return false
|
||||
}
|
||||
case csilibplugins.CinderInTreePluginName:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
// The plugin name should be listed in the CSINode object annotation.
|
||||
// This indicates that the plugin has been migrated to a CSI driver in the node.
|
||||
csiNodeAnn := csiNode.GetAnnotations()
|
||||
if csiNodeAnn == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var mpaSet sets.String
|
||||
mpa := csiNodeAnn[v1.MigratedPluginsAnnotationKey]
|
||||
if len(mpa) == 0 {
|
||||
mpaSet = sets.NewString()
|
||||
} else {
|
||||
tok := strings.Split(mpa, ",")
|
||||
mpaSet = sets.NewString(tok...)
|
||||
}
|
||||
|
||||
return mpaSet.Has(pluginName)
|
||||
}
|
||||
|
@ -18,15 +18,20 @@ package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/klog"
|
||||
"reflect"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func (sched *Scheduler) onPvAdd(obj interface{}) {
|
||||
@ -150,6 +155,63 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
|
||||
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
|
||||
csiNode, ok := obj.(*storagev1beta1.CSINode)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.AddCSINode(csiNode); err != nil {
|
||||
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
|
||||
oldCSINode, ok := oldObj.(*storagev1beta1.CSINode)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to *storagev1beta1.CSINode: %v", oldObj)
|
||||
return
|
||||
}
|
||||
|
||||
newCSINode, ok := newObj.(*storagev1beta1.CSINode)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to *storagev1beta1.CSINode: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil {
|
||||
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
|
||||
var csiNode *storagev1beta1.CSINode
|
||||
switch t := obj.(type) {
|
||||
case *storagev1beta1.CSINode:
|
||||
csiNode = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
csiNode, ok = t.Obj.(*storagev1beta1.CSINode)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.RemoveCSINode(csiNode); err != nil {
|
||||
klog.Errorf("scheduler cache RemoveCSINode failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
|
||||
if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
|
||||
@ -324,7 +386,8 @@ func AddAllEventHandlers(
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer,
|
||||
storageClassInformer storageinformersv1.StorageClassInformer,
|
||||
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
|
||||
) {
|
||||
// scheduled pod cache
|
||||
podInformer.Informer().AddEventHandler(
|
||||
@ -385,6 +448,16 @@ func AddAllEventHandlers(
|
||||
},
|
||||
)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
|
||||
csiNodeInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.onCSINodeAdd,
|
||||
UpdateFunc: sched.onCSINodeUpdate,
|
||||
DeleteFunc: sched.onCSINodeDelete,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// On add and delete of PVs, it will affect equivalence cache items
|
||||
// related to persistent volume
|
||||
pvInformer.Informer().AddEventHandler(
|
||||
|
@ -38,11 +38,13 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
@ -36,12 +36,14 @@ import (
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
policylisters "k8s.io/client-go/listers/policy/v1beta1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
|
||||
storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog"
|
||||
@ -184,7 +186,9 @@ type configFactory struct {
|
||||
// a means to list all PodDisruptionBudgets
|
||||
pdbLister policylisters.PodDisruptionBudgetLister
|
||||
// a means to list all StorageClasses
|
||||
storageClassLister storagelisters.StorageClassLister
|
||||
storageClassLister storagelistersv1.StorageClassLister
|
||||
// a means to list all CSINodes
|
||||
csiNodeLister storagelistersv1beta1.CSINodeLister
|
||||
// framework has a set of plugins and the context used for running them.
|
||||
framework framework.Framework
|
||||
|
||||
@ -236,7 +240,8 @@ type ConfigFactoryArgs struct {
|
||||
StatefulSetInformer appsinformers.StatefulSetInformer
|
||||
ServiceInformer coreinformers.ServiceInformer
|
||||
PdbInformer policyinformers.PodDisruptionBudgetInformer
|
||||
StorageClassInformer storageinformers.StorageClassInformer
|
||||
StorageClassInformer storageinformersv1.StorageClassInformer
|
||||
CSINodeInformer storageinformersv1beta1.CSINodeInformer
|
||||
HardPodAffinitySymmetricWeight int32
|
||||
DisablePreemption bool
|
||||
PercentageOfNodesToScore int32
|
||||
@ -262,10 +267,16 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
}
|
||||
|
||||
// storageClassInformer is only enabled through VolumeScheduling feature gate
|
||||
var storageClassLister storagelisters.StorageClassLister
|
||||
var storageClassLister storagelistersv1.StorageClassLister
|
||||
if args.StorageClassInformer != nil {
|
||||
storageClassLister = args.StorageClassInformer.Lister()
|
||||
}
|
||||
|
||||
var csiNodeLister storagelistersv1beta1.CSINodeLister
|
||||
if args.CSINodeInformer != nil {
|
||||
csiNodeLister = args.CSINodeInformer.Lister()
|
||||
}
|
||||
|
||||
c := &configFactory{
|
||||
client: args.Client,
|
||||
podLister: schedulerCache,
|
||||
@ -279,6 +290,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
statefulSetLister: args.StatefulSetInformer.Lister(),
|
||||
pdbLister: args.PdbInformer.Lister(),
|
||||
storageClassLister: storageClassLister,
|
||||
csiNodeLister: csiNodeLister,
|
||||
framework: framework,
|
||||
schedulerCache: schedulerCache,
|
||||
StopEverything: stopEverything,
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
@ -491,6 +491,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
hardPodAffinitySymmetricWeight,
|
||||
disablePodPreemption,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
|
1
pkg/scheduler/internal/cache/BUILD
vendored
1
pkg/scheduler/internal/cache/BUILD
vendored
@ -15,6 +15,7 @@ go_library(
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
|
44
pkg/scheduler/internal/cache/cache.go
vendored
44
pkg/scheduler/internal/cache/cache.go
vendored
@ -21,7 +21,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -569,6 +570,47 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
n, ok := cache.nodes[csiNode.Name]
|
||||
if !ok {
|
||||
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
|
||||
cache.nodes[csiNode.Name] = n
|
||||
}
|
||||
n.info.SetCSINode(csiNode)
|
||||
cache.moveNodeInfoToHead(csiNode.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
n, ok := cache.nodes[newCSINode.Name]
|
||||
if !ok {
|
||||
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
|
||||
cache.nodes[newCSINode.Name] = n
|
||||
}
|
||||
n.info.SetCSINode(newCSINode)
|
||||
cache.moveNodeInfoToHead(newCSINode.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
n, ok := cache.nodes[csiNode.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("node %v is not found", csiNode.Name)
|
||||
}
|
||||
n.info.SetCSINode(nil)
|
||||
cache.moveNodeInfoToHead(csiNode.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
|
||||
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
|
||||
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
|
||||
|
1
pkg/scheduler/internal/cache/fake/BUILD
vendored
1
pkg/scheduler/internal/cache/fake/BUILD
vendored
@ -9,6 +9,7 @@ go_library(
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
],
|
||||
)
|
||||
|
12
pkg/scheduler/internal/cache/fake/fake_cache.go
vendored
12
pkg/scheduler/internal/cache/fake/fake_cache.go
vendored
@ -17,7 +17,8 @@ limitations under the License.
|
||||
package fake
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
@ -74,6 +75,15 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
|
||||
// RemoveNode is a fake method for testing.
|
||||
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }
|
||||
|
||||
// AddCSINode is a fake method for testing.
|
||||
func (c *Cache) AddCSINode(csiNode *storagev1beta1.CSINode) error { return nil }
|
||||
|
||||
// UpdateCSINode is a fake method for testing.
|
||||
func (c *Cache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error { return nil }
|
||||
|
||||
// RemoveCSINode is a fake method for testing.
|
||||
func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil }
|
||||
|
||||
// UpdateNodeInfoSnapshot is a fake method for testing.
|
||||
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *internalcache.NodeInfoSnapshot) error {
|
||||
return nil
|
||||
|
12
pkg/scheduler/internal/cache/interface.go
vendored
12
pkg/scheduler/internal/cache/interface.go
vendored
@ -17,7 +17,8 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
@ -100,6 +101,15 @@ type Cache interface {
|
||||
// on this node.
|
||||
UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error
|
||||
|
||||
// AddCSINode adds overall CSI-related information about node.
|
||||
AddCSINode(csiNode *storagev1beta1.CSINode) error
|
||||
|
||||
// UpdateCSINode updates overall CSI-related information about node.
|
||||
UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error
|
||||
|
||||
// RemoveCSINode removes overall CSI-related information about node.
|
||||
RemoveCSINode(csiNode *storagev1beta1.CSINode) error
|
||||
|
||||
// List lists all cached pods (including assumed ones).
|
||||
List(labels.Selector) ([]*v1.Pod, error)
|
||||
|
||||
|
@ -12,7 +12,9 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
|
@ -22,12 +22,13 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/klog"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -46,7 +47,8 @@ type ImageStateSummary struct {
|
||||
// NodeInfo is node level aggregated information.
|
||||
type NodeInfo struct {
|
||||
// Overall node information.
|
||||
node *v1.Node
|
||||
node *v1.Node
|
||||
csiNode *storagev1beta1.CSINode
|
||||
|
||||
pods []*v1.Pod
|
||||
podsWithAffinity []*v1.Pod
|
||||
@ -285,6 +287,14 @@ func (n *NodeInfo) Node() *v1.Node {
|
||||
return n.node
|
||||
}
|
||||
|
||||
// CSINode returns overall CSI-related information about this node.
|
||||
func (n *NodeInfo) CSINode() *storagev1beta1.CSINode {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.csiNode
|
||||
}
|
||||
|
||||
// Pods return all pods scheduled (including assumed to be) on this node.
|
||||
func (n *NodeInfo) Pods() []*v1.Pod {
|
||||
if n == nil {
|
||||
@ -434,6 +444,7 @@ func (n *NodeInfo) SetGeneration(newGeneration int64) {
|
||||
func (n *NodeInfo) Clone() *NodeInfo {
|
||||
clone := &NodeInfo{
|
||||
node: n.node,
|
||||
csiNode: n.csiNode,
|
||||
requestedResource: n.requestedResource.Clone(),
|
||||
nonzeroRequest: n.nonzeroRequest.Clone(),
|
||||
allocatableResource: n.allocatableResource.Clone(),
|
||||
@ -471,11 +482,24 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
||||
// VolumeLimits returns volume limits associated with the node
|
||||
func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
|
||||
volumeLimits := map[v1.ResourceName]int64{}
|
||||
|
||||
for k, v := range n.AllocatableResource().ScalarResources {
|
||||
if v1helper.IsAttachableVolumeResourceName(k) {
|
||||
volumeLimits[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if n.csiNode != nil {
|
||||
for i := range n.csiNode.Spec.Drivers {
|
||||
d := n.csiNode.Spec.Drivers[i]
|
||||
if d.Allocatable != nil && d.Allocatable.Count != nil {
|
||||
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object
|
||||
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
|
||||
volumeLimits[k] = int64(*d.Allocatable.Count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return volumeLimits
|
||||
}
|
||||
|
||||
@ -646,6 +670,11 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetCSINode sets the overall CSI-related node information.
|
||||
func (n *NodeInfo) SetCSINode(csiNode *storagev1beta1.CSINode) {
|
||||
n.csiNode = csiNode
|
||||
}
|
||||
|
||||
// FilterOutPods receives a list of pods and filters out those whose node names
|
||||
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
|
||||
//
|
||||
|
@ -24,14 +24,15 @@ import (
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
@ -127,7 +128,8 @@ func New(client clientset.Interface,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer,
|
||||
storageClassInformer storageinformersv1.StorageClassInformer,
|
||||
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
|
||||
recorder record.EventRecorder,
|
||||
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
|
||||
stopCh <-chan struct{},
|
||||
@ -154,6 +156,7 @@ func New(client clientset.Interface,
|
||||
ServiceInformer: serviceInformer,
|
||||
PdbInformer: pdbInformer,
|
||||
StorageClassInformer: storageClassInformer,
|
||||
CSINodeInformer: csiNodeInformer,
|
||||
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
|
||||
DisablePreemption: options.disablePreemption,
|
||||
PercentageOfNodesToScore: options.percentageOfNodesToScore,
|
||||
@ -201,7 +204,7 @@ func New(client clientset.Interface,
|
||||
// Create the scheduler.
|
||||
sched := NewFromConfig(config)
|
||||
|
||||
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer)
|
||||
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer)
|
||||
return sched, nil
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -200,6 +200,7 @@ func TestSchedulerCreation(t *testing.T) {
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
|
||||
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
|
||||
stopCh,
|
||||
|
@ -414,34 +414,6 @@ func ClusterRoles() []rbacv1.ClusterRole {
|
||||
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("serviceaccounts/token").RuleOrDie(),
|
||||
},
|
||||
},
|
||||
{
|
||||
// a role to use for the kube-scheduler
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
eventsRule(),
|
||||
|
||||
// this is for leaderlease access
|
||||
// TODO: scope this to the kube-system namespace
|
||||
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("endpoints").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "update", "patch", "delete").Groups(legacyGroup).Resources("endpoints").Names("kube-scheduler").RuleOrDie(),
|
||||
|
||||
// fundamental resources
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
|
||||
// things that select pods
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
|
||||
// things that pods use or applies to them
|
||||
rbacv1helpers.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(),
|
||||
// Needed to check API access. These creates are non-mutating
|
||||
rbacv1helpers.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
|
||||
},
|
||||
},
|
||||
{
|
||||
// a role to use for the kube-dns pod
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "system:kube-dns"},
|
||||
@ -498,6 +470,39 @@ func ClusterRoles() []rbacv1.ClusterRole {
|
||||
},
|
||||
}
|
||||
|
||||
kubeSchedulerRules := []rbacv1.PolicyRule{
|
||||
eventsRule(),
|
||||
// This is for leaderlease access
|
||||
// TODO: scope this to the kube-system namespace
|
||||
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("endpoints").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "update", "patch", "delete").Groups(legacyGroup).Resources("endpoints").Names("kube-scheduler").RuleOrDie(),
|
||||
|
||||
// Fundamental resources
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
|
||||
// Things that select pods
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
|
||||
// Things that pods use or applies to them
|
||||
rbacv1helpers.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
|
||||
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(),
|
||||
// Needed to check API access. These creates are non-mutating
|
||||
rbacv1helpers.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
kubeSchedulerRules = append(kubeSchedulerRules, rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie())
|
||||
}
|
||||
roles = append(roles, rbacv1.ClusterRole{
|
||||
// a role to use for the kube-scheduler
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},
|
||||
Rules: kubeSchedulerRules,
|
||||
})
|
||||
|
||||
externalProvisionerRules := []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("create", "delete", "get", "list", "watch").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "watch", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
|
||||
|
@ -802,6 +802,14 @@ items:
|
||||
- subjectaccessreviews
|
||||
verbs:
|
||||
- create
|
||||
- apiGroups:
|
||||
- storage.k8s.io
|
||||
resources:
|
||||
- csinodes
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -131,6 +131,7 @@ func setupScheduler(
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
)
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -254,6 +254,7 @@ priorities: []
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
kubeschedulerconfig.SchedulerAlgorithmSource{
|
||||
Policy: &kubeschedulerconfig.SchedulerPolicySource{
|
||||
@ -325,6 +326,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
kubeschedulerconfig.SchedulerAlgorithmSource{
|
||||
Policy: &kubeschedulerconfig.SchedulerPolicySource{
|
||||
@ -621,6 +623,7 @@ func TestMultiScheduler(t *testing.T) {
|
||||
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
context.informerFactory.Core().V1().Services(),
|
||||
context.informerFactory.Storage().V1().StorageClasses(),
|
||||
context.informerFactory.Storage().V1beta1().CSINodes(),
|
||||
)
|
||||
|
||||
go podInformer2.Informer().Run(stopCh)
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
@ -215,6 +215,7 @@ func initTestSchedulerWithOptions(
|
||||
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
context.informerFactory.Core().V1().Services(),
|
||||
context.informerFactory.Storage().V1().StorageClasses(),
|
||||
context.informerFactory.Storage().V1beta1().CSINodes(),
|
||||
)
|
||||
|
||||
// set setPodInformer if provided.
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
|
||||
// import DefaultProvider
|
||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
@ -84,6 +85,7 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
informerFactory.Storage().V1beta1().CSINodes(),
|
||||
)
|
||||
|
||||
informerFactory.Start(stopCh)
|
||||
|
Loading…
Reference in New Issue
Block a user