Remove CSINode from scheduler cache.

This commit is contained in:
Abdullah Gharaibeh 2019-10-16 16:40:55 -04:00
parent aab740ffc2
commit a772722660
12 changed files with 50 additions and 163 deletions

View File

@ -30,7 +30,6 @@ go_library(
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",

View File

@ -40,6 +40,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",

View File

@ -26,8 +26,8 @@ import (
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
storage "k8s.io/api/storage/v1"
v1beta1storage "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -37,6 +37,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
v1beta1storagelisters "k8s.io/client-go/listers/storage/v1beta1"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
csilibplugins "k8s.io/csi-translation-lib/plugins"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -169,7 +170,19 @@ type NodeInfo interface {
// CSINodeInfo interface represents anything that can get CSINode object from node name.
type CSINodeInfo interface {
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
GetCSINodeInfo(nodeName string) (*v1beta1storage.CSINode, error)
}
var _ CSINodeInfo = &CachedCSINodeInfo{}
// CachedCSINodeInfo implements CSINodeInfoInfo
type CachedCSINodeInfo struct {
v1beta1storagelisters.CSINodeLister
}
// GetCSINodeInfo returns a persistent volume object by PV ID.
func (c *CachedCSINodeInfo) GetCSINodeInfo(nodeName string) (*v1beta1storage.CSINode, error) {
return c.Get(nodeName)
}
// PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID.
@ -209,7 +222,7 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
// StorageClassInfo interface represents anything that can get a storage class object by class name.
type StorageClassInfo interface {
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
GetStorageClassInfo(className string) (*storage.StorageClass, error)
}
var _ StorageClassInfo = &CachedStorageClassInfo{}
@ -220,7 +233,7 @@ type CachedStorageClassInfo struct {
}
// GetStorageClassInfo get StorageClass by class name.
func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storagev1.StorageClass, error) {
func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storage.StorageClass, error) {
return c.Get(className)
}
@ -313,9 +326,9 @@ type VolumeFilter struct {
FilterVolume func(vol *v1.Volume) (id string, relevant bool)
FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
// MatchProvisioner evaluates if the StorageClass provisioner matches the running predicate
MatchProvisioner func(sc *storagev1.StorageClass) (relevant bool)
MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
IsMigrated func(csiNode *storagev1beta1.CSINode) bool
IsMigrated func(csiNode *v1beta1storage.CSINode) bool
}
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
@ -577,14 +590,14 @@ var EBSVolumeFilter = VolumeFilter{
return "", false
},
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.AWSEBSInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
},
}
@ -605,14 +618,14 @@ var GCEPDVolumeFilter = VolumeFilter{
return "", false
},
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.GCEPDInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
},
}
@ -633,14 +646,14 @@ var AzureDiskVolumeFilter = VolumeFilter{
return "", false
},
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.AzureDiskInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
},
}
@ -662,14 +675,14 @@ var CinderVolumeFilter = VolumeFilter{
return "", false
},
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.CinderInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
},
}
@ -758,7 +771,7 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeI
if class.VolumeBindingMode == nil {
return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)
}
if *class.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// Skip unbound volumes
continue
}

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
@ -159,61 +158,13 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
}
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
csiNode, ok := obj.(*storagev1beta1.CSINode)
if !ok {
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", obj)
return
}
if err := sched.SchedulerCache.AddCSINode(csiNode); err != nil {
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
oldCSINode, ok := oldObj.(*storagev1beta1.CSINode)
if !ok {
klog.Errorf("cannot convert oldObj to *storagev1beta1.CSINode: %v", oldObj)
return
}
newCSINode, ok := newObj.(*storagev1beta1.CSINode)
if !ok {
klog.Errorf("cannot convert newObj to *storagev1beta1.CSINode: %v", newObj)
return
}
if err := sched.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil {
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
}
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
var csiNode *storagev1beta1.CSINode
switch t := obj.(type) {
case *storagev1beta1.CSINode:
csiNode = t
case cache.DeletedFinalStateUnknown:
var ok bool
csiNode, ok = t.Obj.(*storagev1beta1.CSINode)
if !ok {
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t)
return
}
if err := sched.SchedulerCache.RemoveCSINode(csiNode); err != nil {
klog.Errorf("scheduler cache RemoveCSINode failed: %v", err)
}
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
if err := sched.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
@ -450,7 +401,6 @@ func AddAllEventHandlers(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onCSINodeAdd,
UpdateFunc: sched.onCSINodeUpdate,
DeleteFunc: sched.onCSINodeDelete,
},
)
}

View File

@ -594,7 +594,7 @@ func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigPr
StatefulSetLister: c.statefulSetLister,
PDBLister: c.pdbLister,
NodeInfo: c.schedulerCache,
CSINodeInfo: c.schedulerCache,
CSINodeInfo: &predicates.CachedCSINodeInfo{CSINodeLister: c.csiNodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},

View File

@ -63,7 +63,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
},
volumerestrictions.Name: volumerestrictions.New,
volumezone.Name: volumezone.New,
nodevolumelimits.Name: nodevolumelimits.New(args.SchedulerCache),
nodevolumelimits.Name: nodevolumelimits.New,
interpodaffinity.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return interpodaffinity.New(args.SchedulerCache, args.SchedulerCache), nil
},

View File

@ -49,21 +49,23 @@ func (pl *NodeVolumeLimits) Filter(ctx context.Context, _ *framework.CycleState,
return migration.PredicateResultToFrameworkStatus(reasons, err)
}
// New returns function that initializes a new plugin and returns it.
func New(csiNodeInfo predicates.CSINodeInfo) framework.PluginFactory {
return func(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvInfo := &predicates.CachedPersistentVolumeInfo{
PersistentVolumeLister: informerFactory.Core().V1().PersistentVolumes().Lister(),
}
pvcInfo := &predicates.CachedPersistentVolumeClaimInfo{
PersistentVolumeClaimLister: informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
}
classInfo := &predicates.CachedStorageClassInfo{
StorageClassLister: informerFactory.Storage().V1().StorageClasses().Lister(),
}
return &NodeVolumeLimits{
predicate: predicates.NewCSIMaxVolumeLimitPredicate(csiNodeInfo, pvInfo, pvcInfo, classInfo),
}, nil
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
csiNodeInfo := &predicates.CachedCSINodeInfo{
CSINodeLister: informerFactory.Storage().V1beta1().CSINodes().Lister(),
}
pvInfo := &predicates.CachedPersistentVolumeInfo{
PersistentVolumeLister: informerFactory.Core().V1().PersistentVolumes().Lister(),
}
pvcInfo := &predicates.CachedPersistentVolumeClaimInfo{
PersistentVolumeClaimLister: informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
}
classInfo := &predicates.CachedStorageClassInfo{
StorageClassLister: informerFactory.Storage().V1().StorageClasses().Lister(),
}
return &NodeVolumeLimits{
predicate: predicates.NewCSIMaxVolumeLimitPredicate(csiNodeInfo, pvInfo, pvcInfo, classInfo),
}, nil
}

View File

@ -15,7 +15,6 @@ go_library(
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -22,7 +22,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -70,7 +69,6 @@ type schedulerCache struct {
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*nodeInfoListItem
csiNodes map[string]*storagev1beta1.CSINode
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
@ -110,7 +108,6 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(nil),
csiNodes: make(map[string]*storagev1beta1.CSINode),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
imageStates: make(map[string]*imageState),
@ -578,34 +575,6 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
return nil
}
func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.csiNodes[csiNode.Name] = csiNode
return nil
}
func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.csiNodes[newCSINode.Name] = newCSINode
return nil
}
func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error {
cache.mu.Lock()
defer cache.mu.Unlock()
_, ok := cache.csiNodes[csiNode.Name]
if !ok {
return fmt.Errorf("csinode %v is not found", csiNode.Name)
}
delete(cache.csiNodes, csiNode.Name)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
@ -711,15 +680,3 @@ func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return n.info.Node(), nil
}
func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.csiNodes[nodeName]
if !ok {
return nil, fmt.Errorf("error retrieving csinode '%v' from cache", nodeName)
}
return n, nil
}

View File

@ -10,7 +10,6 @@ go_library(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

View File

@ -18,7 +18,6 @@ package fake
import (
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -76,15 +75,6 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
// RemoveNode is a fake method for testing.
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }
// AddCSINode is a fake method for testing.
func (c *Cache) AddCSINode(csiNode *storagev1beta1.CSINode) error { return nil }
// UpdateCSINode is a fake method for testing.
func (c *Cache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error { return nil }
// RemoveCSINode is a fake method for testing.
func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil }
// UpdateNodeInfoSnapshot is a fake method for testing.
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
return nil
@ -112,13 +102,3 @@ func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
func (c *Cache) ListNodes() []*v1.Node {
return nil
}
// GetCSINodeInfo is a fake method for testing.
func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
return nil, nil
}
// NumNodes is a fake method for testing.
func (c *Cache) NumNodes() int {
return 0
}

View File

@ -18,7 +18,6 @@ package cache
import (
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -102,21 +101,9 @@ type Cache interface {
// on this node.
UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error
// AddCSINode adds overall CSI-related information about node.
AddCSINode(csiNode *storagev1beta1.CSINode) error
// UpdateCSINode updates overall CSI-related information about node.
UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error
// RemoveCSINode removes overall CSI-related information about node.
RemoveCSINode(csiNode *storagev1beta1.CSINode) error
// GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error)
// GetCSINodeInfo returns the csinode object with the given name.
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
}