mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Add independent cache for CSINode in scheduler
This commit is contained in:
parent
86ebaaa023
commit
09d2cdf384
@ -559,6 +559,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||||||
NodeLister: c.schedulerCache,
|
NodeLister: c.schedulerCache,
|
||||||
PDBLister: c.pdbLister,
|
PDBLister: c.pdbLister,
|
||||||
NodeInfo: c.schedulerCache,
|
NodeInfo: c.schedulerCache,
|
||||||
|
CSINodeInfo: c.schedulerCache,
|
||||||
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
|
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
|
||||||
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
|
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
|
||||||
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
|
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
|
||||||
|
@ -43,6 +43,7 @@ type PluginFactoryArgs struct {
|
|||||||
NodeLister algorithm.NodeLister
|
NodeLister algorithm.NodeLister
|
||||||
PDBLister algorithm.PDBLister
|
PDBLister algorithm.PDBLister
|
||||||
NodeInfo predicates.NodeInfo
|
NodeInfo predicates.NodeInfo
|
||||||
|
CSINodeInfo predicates.CSINodeInfo
|
||||||
PVInfo predicates.PersistentVolumeInfo
|
PVInfo predicates.PersistentVolumeInfo
|
||||||
PVCInfo predicates.PersistentVolumeClaimInfo
|
PVCInfo predicates.PersistentVolumeClaimInfo
|
||||||
StorageClassInfo predicates.StorageClassInfo
|
StorageClassInfo predicates.StorageClassInfo
|
||||||
|
37
pkg/scheduler/internal/cache/cache.go
vendored
37
pkg/scheduler/internal/cache/cache.go
vendored
@ -70,6 +70,7 @@ type schedulerCache struct {
|
|||||||
// a map from pod key to podState.
|
// a map from pod key to podState.
|
||||||
podStates map[string]*podState
|
podStates map[string]*podState
|
||||||
nodes map[string]*nodeInfoListItem
|
nodes map[string]*nodeInfoListItem
|
||||||
|
csiNodes map[string]*storagev1beta1.CSINode
|
||||||
// headNode points to the most recently updated NodeInfo in "nodes". It is the
|
// headNode points to the most recently updated NodeInfo in "nodes". It is the
|
||||||
// head of the linked list.
|
// head of the linked list.
|
||||||
headNode *nodeInfoListItem
|
headNode *nodeInfoListItem
|
||||||
@ -109,6 +110,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
|
|||||||
|
|
||||||
nodes: make(map[string]*nodeInfoListItem),
|
nodes: make(map[string]*nodeInfoListItem),
|
||||||
nodeTree: newNodeTree(nil),
|
nodeTree: newNodeTree(nil),
|
||||||
|
csiNodes: make(map[string]*storagev1beta1.CSINode),
|
||||||
assumedPods: make(map[string]bool),
|
assumedPods: make(map[string]bool),
|
||||||
podStates: make(map[string]*podState),
|
podStates: make(map[string]*podState),
|
||||||
imageStates: make(map[string]*imageState),
|
imageStates: make(map[string]*imageState),
|
||||||
@ -574,13 +576,7 @@ func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error {
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
n, ok := cache.nodes[csiNode.Name]
|
cache.csiNodes[csiNode.Name] = csiNode
|
||||||
if !ok {
|
|
||||||
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
|
|
||||||
cache.nodes[csiNode.Name] = n
|
|
||||||
}
|
|
||||||
n.info.SetCSINode(csiNode)
|
|
||||||
cache.moveNodeInfoToHead(csiNode.Name)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -588,13 +584,7 @@ func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
n, ok := cache.nodes[newCSINode.Name]
|
cache.csiNodes[newCSINode.Name] = newCSINode
|
||||||
if !ok {
|
|
||||||
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
|
|
||||||
cache.nodes[newCSINode.Name] = n
|
|
||||||
}
|
|
||||||
n.info.SetCSINode(newCSINode)
|
|
||||||
cache.moveNodeInfoToHead(newCSINode.Name)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -602,12 +592,11 @@ func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) erro
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
n, ok := cache.nodes[csiNode.Name]
|
_, ok := cache.csiNodes[csiNode.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("node %v is not found", csiNode.Name)
|
return fmt.Errorf("csinode %v is not found", csiNode.Name)
|
||||||
}
|
}
|
||||||
n.info.SetCSINode(nil)
|
delete(cache.csiNodes, csiNode.Name)
|
||||||
cache.moveNodeInfoToHead(csiNode.Name)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -736,3 +725,15 @@ func (cache *schedulerCache) ListNodes() []*v1.Node {
|
|||||||
}
|
}
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
|
||||||
|
cache.mu.RLock()
|
||||||
|
defer cache.mu.RUnlock()
|
||||||
|
|
||||||
|
n, ok := cache.csiNodes[nodeName]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("error retrieving csinode '%v' from cache", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
@ -114,3 +114,8 @@ func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
|||||||
func (c *Cache) ListNodes() []*v1.Node {
|
func (c *Cache) ListNodes() []*v1.Node {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCSINodeInfo is a fake method for testing.
|
||||||
|
func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
5
pkg/scheduler/internal/cache/interface.go
vendored
5
pkg/scheduler/internal/cache/interface.go
vendored
@ -17,7 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
@ -115,6 +115,9 @@ type Cache interface {
|
|||||||
// GetNodeInfo returns the node object with node string.
|
// GetNodeInfo returns the node object with node string.
|
||||||
GetNodeInfo(nodeName string) (*v1.Node, error)
|
GetNodeInfo(nodeName string) (*v1.Node, error)
|
||||||
|
|
||||||
|
// GetCSINodeInfo returns the csinode object with the given name.
|
||||||
|
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
|
||||||
|
|
||||||
// Snapshot takes a snapshot on current cache
|
// Snapshot takes a snapshot on current cache
|
||||||
Snapshot() *Snapshot
|
Snapshot() *Snapshot
|
||||||
|
|
||||||
|
@ -13,9 +13,7 @@ go_library(
|
|||||||
"//pkg/apis/core/v1/helper:go_default_library",
|
"//pkg/apis/core/v1/helper:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||||
"//pkg/volume/util:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
@ -23,14 +23,12 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -50,7 +48,6 @@ type ImageStateSummary struct {
|
|||||||
type NodeInfo struct {
|
type NodeInfo struct {
|
||||||
// Overall node information.
|
// Overall node information.
|
||||||
node *v1.Node
|
node *v1.Node
|
||||||
csiNode *storagev1beta1.CSINode
|
|
||||||
|
|
||||||
pods []*v1.Pod
|
pods []*v1.Pod
|
||||||
podsWithAffinity []*v1.Pod
|
podsWithAffinity []*v1.Pod
|
||||||
@ -292,14 +289,6 @@ func (n *NodeInfo) Node() *v1.Node {
|
|||||||
return n.node
|
return n.node
|
||||||
}
|
}
|
||||||
|
|
||||||
// CSINode returns overall CSI-related information about this node.
|
|
||||||
func (n *NodeInfo) CSINode() *storagev1beta1.CSINode {
|
|
||||||
if n == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return n.csiNode
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pods return all pods scheduled (including assumed to be) on this node.
|
// Pods return all pods scheduled (including assumed to be) on this node.
|
||||||
func (n *NodeInfo) Pods() []*v1.Pod {
|
func (n *NodeInfo) Pods() []*v1.Pod {
|
||||||
if n == nil {
|
if n == nil {
|
||||||
@ -449,7 +438,6 @@ func (n *NodeInfo) SetGeneration(newGeneration int64) {
|
|||||||
func (n *NodeInfo) Clone() *NodeInfo {
|
func (n *NodeInfo) Clone() *NodeInfo {
|
||||||
clone := &NodeInfo{
|
clone := &NodeInfo{
|
||||||
node: n.node,
|
node: n.node,
|
||||||
csiNode: n.csiNode,
|
|
||||||
requestedResource: n.requestedResource.Clone(),
|
requestedResource: n.requestedResource.Clone(),
|
||||||
nonzeroRequest: n.nonzeroRequest.Clone(),
|
nonzeroRequest: n.nonzeroRequest.Clone(),
|
||||||
allocatableResource: n.allocatableResource.Clone(),
|
allocatableResource: n.allocatableResource.Clone(),
|
||||||
@ -487,24 +475,11 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
|||||||
// VolumeLimits returns volume limits associated with the node
|
// VolumeLimits returns volume limits associated with the node
|
||||||
func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
|
func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
|
||||||
volumeLimits := map[v1.ResourceName]int64{}
|
volumeLimits := map[v1.ResourceName]int64{}
|
||||||
|
|
||||||
for k, v := range n.AllocatableResource().ScalarResources {
|
for k, v := range n.AllocatableResource().ScalarResources {
|
||||||
if v1helper.IsAttachableVolumeResourceName(k) {
|
if v1helper.IsAttachableVolumeResourceName(k) {
|
||||||
volumeLimits[k] = v
|
volumeLimits[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.csiNode != nil {
|
|
||||||
for i := range n.csiNode.Spec.Drivers {
|
|
||||||
d := n.csiNode.Spec.Drivers[i]
|
|
||||||
if d.Allocatable != nil && d.Allocatable.Count != nil {
|
|
||||||
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object
|
|
||||||
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
|
|
||||||
volumeLimits[k] = int64(*d.Allocatable.Count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return volumeLimits
|
return volumeLimits
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -688,11 +663,6 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCSINode sets the overall CSI-related node information.
|
|
||||||
func (n *NodeInfo) SetCSINode(csiNode *storagev1beta1.CSINode) {
|
|
||||||
n.csiNode = csiNode
|
|
||||||
}
|
|
||||||
|
|
||||||
// FilterOutPods receives a list of pods and filters out those whose node names
|
// FilterOutPods receives a list of pods and filters out those whose node names
|
||||||
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
|
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
|
||||||
//
|
//
|
||||||
|
Loading…
Reference in New Issue
Block a user