From 09d2cdf384309696c01307d249bc308f87402d0a Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Wed, 17 Jul 2019 09:55:06 +0200 Subject: [PATCH] Add independent cache for CSINode in scheduler --- pkg/scheduler/factory/factory.go | 1 + pkg/scheduler/factory/plugins.go | 1 + pkg/scheduler/internal/cache/cache.go | 37 ++++++++++--------- .../internal/cache/fake/fake_cache.go | 5 +++ pkg/scheduler/internal/cache/interface.go | 5 ++- pkg/scheduler/nodeinfo/BUILD | 2 - pkg/scheduler/nodeinfo/node_info.go | 32 +--------------- 7 files changed, 31 insertions(+), 52 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index b745f0f881f..c6c2c1f29cf 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -559,6 +559,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { NodeLister: c.schedulerCache, PDBLister: c.pdbLister, NodeInfo: c.schedulerCache, + CSINodeInfo: c.schedulerCache, PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister}, PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister}, StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister}, diff --git a/pkg/scheduler/factory/plugins.go b/pkg/scheduler/factory/plugins.go index ecbc772d7d1..d2ce06470f0 100644 --- a/pkg/scheduler/factory/plugins.go +++ b/pkg/scheduler/factory/plugins.go @@ -43,6 +43,7 @@ type PluginFactoryArgs struct { NodeLister algorithm.NodeLister PDBLister algorithm.PDBLister NodeInfo predicates.NodeInfo + CSINodeInfo predicates.CSINodeInfo PVInfo predicates.PersistentVolumeInfo PVCInfo predicates.PersistentVolumeClaimInfo StorageClassInfo predicates.StorageClassInfo diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 952a6124c7c..24bdfef2609 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -70,6 +70,7 @@ type schedulerCache struct { // a map from pod key to podState. podStates map[string]*podState nodes map[string]*nodeInfoListItem + csiNodes map[string]*storagev1beta1.CSINode // headNode points to the most recently updated NodeInfo in "nodes". It is the // head of the linked list. headNode *nodeInfoListItem @@ -109,6 +110,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul nodes: make(map[string]*nodeInfoListItem), nodeTree: newNodeTree(nil), + csiNodes: make(map[string]*storagev1beta1.CSINode), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), imageStates: make(map[string]*imageState), @@ -574,13 +576,7 @@ func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error { cache.mu.Lock() defer cache.mu.Unlock() - n, ok := cache.nodes[csiNode.Name] - if !ok { - n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) - cache.nodes[csiNode.Name] = n - } - n.info.SetCSINode(csiNode) - cache.moveNodeInfoToHead(csiNode.Name) + cache.csiNodes[csiNode.Name] = csiNode return nil } @@ -588,13 +584,7 @@ func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta cache.mu.Lock() defer cache.mu.Unlock() - n, ok := cache.nodes[newCSINode.Name] - if !ok { - n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) - cache.nodes[newCSINode.Name] = n - } - n.info.SetCSINode(newCSINode) - cache.moveNodeInfoToHead(newCSINode.Name) + cache.csiNodes[newCSINode.Name] = newCSINode return nil } @@ -602,12 +592,11 @@ func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) erro cache.mu.Lock() defer cache.mu.Unlock() - n, ok := cache.nodes[csiNode.Name] + _, ok := cache.csiNodes[csiNode.Name] if !ok { - return fmt.Errorf("node %v is not found", csiNode.Name) + return fmt.Errorf("csinode %v is not found", csiNode.Name) } - n.info.SetCSINode(nil) - cache.moveNodeInfoToHead(csiNode.Name) + delete(cache.csiNodes, csiNode.Name) return nil } @@ -736,3 +725,15 @@ func (cache *schedulerCache) ListNodes() []*v1.Node { } return nodes } + +func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) { + cache.mu.RLock() + defer cache.mu.RUnlock() + + n, ok := cache.csiNodes[nodeName] + if !ok { + return nil, fmt.Errorf("error retrieving csinode '%v' from cache", nodeName) + } + + return n, nil +} diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 1ac9ec36d6d..75ca7f37cf2 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -114,3 +114,8 @@ func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) { func (c *Cache) ListNodes() []*v1.Node { return nil } + +// GetCSINodeInfo is a fake method for testing. +func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) { + return nil, nil +} diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index b034bd7148d..2371abfcaea 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -17,7 +17,7 @@ limitations under the License. package cache import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -115,6 +115,9 @@ type Cache interface { // GetNodeInfo returns the node object with node string. GetNodeInfo(nodeName string) (*v1.Node, error) + // GetCSINodeInfo returns the csinode object with the given name. + GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) + // Snapshot takes a snapshot on current cache Snapshot() *Snapshot diff --git a/pkg/scheduler/nodeinfo/BUILD b/pkg/scheduler/nodeinfo/BUILD index 03a5094dd07..70c156041f7 100644 --- a/pkg/scheduler/nodeinfo/BUILD +++ b/pkg/scheduler/nodeinfo/BUILD @@ -13,9 +13,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", - "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/pkg/scheduler/nodeinfo/node_info.go b/pkg/scheduler/nodeinfo/node_info.go index e985d227057..f1e3a9de527 100644 --- a/pkg/scheduler/nodeinfo/node_info.go +++ b/pkg/scheduler/nodeinfo/node_info.go @@ -23,14 +23,12 @@ import ( "sync/atomic" v1 "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" - volumeutil "k8s.io/kubernetes/pkg/volume/util" ) var ( @@ -49,8 +47,7 @@ type ImageStateSummary struct { // NodeInfo is node level aggregated information. type NodeInfo struct { // Overall node information. - node *v1.Node - csiNode *storagev1beta1.CSINode + node *v1.Node pods []*v1.Pod podsWithAffinity []*v1.Pod @@ -292,14 +289,6 @@ func (n *NodeInfo) Node() *v1.Node { return n.node } -// CSINode returns overall CSI-related information about this node. -func (n *NodeInfo) CSINode() *storagev1beta1.CSINode { - if n == nil { - return nil - } - return n.csiNode -} - // Pods return all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) Pods() []*v1.Pod { if n == nil { @@ -449,7 +438,6 @@ func (n *NodeInfo) SetGeneration(newGeneration int64) { func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ node: n.node, - csiNode: n.csiNode, requestedResource: n.requestedResource.Clone(), nonzeroRequest: n.nonzeroRequest.Clone(), allocatableResource: n.allocatableResource.Clone(), @@ -487,24 +475,11 @@ func (n *NodeInfo) Clone() *NodeInfo { // VolumeLimits returns volume limits associated with the node func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 { volumeLimits := map[v1.ResourceName]int64{} - for k, v := range n.AllocatableResource().ScalarResources { if v1helper.IsAttachableVolumeResourceName(k) { volumeLimits[k] = v } } - - if n.csiNode != nil { - for i := range n.csiNode.Spec.Drivers { - d := n.csiNode.Spec.Drivers[i] - if d.Allocatable != nil && d.Allocatable.Count != nil { - // TODO: drop GetCSIAttachLimitKey once we don't get values from Node object - k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name)) - volumeLimits[k] = int64(*d.Allocatable.Count) - } - } - } - return volumeLimits } @@ -688,11 +663,6 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error { return nil } -// SetCSINode sets the overall CSI-related node information. -func (n *NodeInfo) SetCSINode(csiNode *storagev1beta1.CSINode) { - n.csiNode = csiNode -} - // FilterOutPods receives a list of pods and filters out those whose node names // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. //