mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #79076 from draveness/feature/read-nodes-from-scheduler-cache
fix: predicates read nodes from scheduler cache
This commit is contained in:
commit
dcd57c9e5e
@ -33,6 +33,7 @@ import (
|
|||||||
|
|
||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -454,7 +455,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
|
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
|
||||||
go r.Run(wait.NeverStop)
|
go r.Run(wait.NeverStop)
|
||||||
}
|
}
|
||||||
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
|
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
|
||||||
|
|
||||||
// TODO: get the real node object of ourself,
|
// TODO: get the real node object of ourself,
|
||||||
// and use the real node name and UID.
|
// and use the real node name and UID.
|
||||||
@ -2287,3 +2288,23 @@ func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kub
|
|||||||
}
|
}
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CachedNodeInfo implements NodeInfo
|
||||||
|
type CachedNodeInfo struct {
|
||||||
|
corelisters.NodeLister
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNodeInfo returns cached data for the node name.
|
||||||
|
func (c *CachedNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
||||||
|
node, err := c.Get(nodeName)
|
||||||
|
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return node, nil
|
||||||
|
}
|
||||||
|
@ -196,26 +196,6 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
|
|||||||
return c.PersistentVolumeClaims(namespace).Get(name)
|
return c.PersistentVolumeClaims(namespace).Get(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CachedNodeInfo implements NodeInfo
|
|
||||||
type CachedNodeInfo struct {
|
|
||||||
corelisters.NodeLister
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodeInfo returns cached data for the node 'id'.
|
|
||||||
func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
|
|
||||||
node, err := c.Get(id)
|
|
||||||
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return node, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StorageClassInfo interface represents anything that can get a storage class object by class name.
|
// StorageClassInfo interface represents anything that can get a storage class object by class name.
|
||||||
type StorageClassInfo interface {
|
type StorageClassInfo interface {
|
||||||
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
|
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
|
||||||
|
@ -576,7 +576,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||||||
StatefulSetLister: c.statefulSetLister,
|
StatefulSetLister: c.statefulSetLister,
|
||||||
NodeLister: &nodeLister{c.nodeLister},
|
NodeLister: &nodeLister{c.nodeLister},
|
||||||
PDBLister: c.pdbLister,
|
PDBLister: c.pdbLister,
|
||||||
NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister},
|
NodeInfo: c.schedulerCache,
|
||||||
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
|
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
|
||||||
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
|
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
|
||||||
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
|
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
|
||||||
|
13
pkg/scheduler/internal/cache/cache.go
vendored
13
pkg/scheduler/internal/cache/cache.go
vendored
@ -707,3 +707,16 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
|
|||||||
func (cache *schedulerCache) NodeTree() *NodeTree {
|
func (cache *schedulerCache) NodeTree() *NodeTree {
|
||||||
return cache.nodeTree
|
return cache.nodeTree
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodeInfo returns cached data for the node name.
|
||||||
|
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
||||||
|
cache.mu.RLock()
|
||||||
|
defer cache.mu.RUnlock()
|
||||||
|
|
||||||
|
n, ok := cache.nodes[nodeName]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("error retrieving node '%v' from cache", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return n.info.Node(), nil
|
||||||
|
}
|
||||||
|
@ -104,3 +104,8 @@ func (c *Cache) Snapshot() *internalcache.Snapshot {
|
|||||||
|
|
||||||
// NodeTree is a fake method for testing.
|
// NodeTree is a fake method for testing.
|
||||||
func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }
|
func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }
|
||||||
|
|
||||||
|
// GetNodeInfo is a fake method for testing.
|
||||||
|
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
3
pkg/scheduler/internal/cache/interface.go
vendored
3
pkg/scheduler/internal/cache/interface.go
vendored
@ -110,6 +110,9 @@ type Cache interface {
|
|||||||
// RemoveCSINode removes overall CSI-related information about node.
|
// RemoveCSINode removes overall CSI-related information about node.
|
||||||
RemoveCSINode(csiNode *storagev1beta1.CSINode) error
|
RemoveCSINode(csiNode *storagev1beta1.CSINode) error
|
||||||
|
|
||||||
|
// GetNodeInfo returns the node object with node string.
|
||||||
|
GetNodeInfo(nodeName string) (*v1.Node, error)
|
||||||
|
|
||||||
// List lists all cached pods (including assumed ones).
|
// List lists all cached pods (including assumed ones).
|
||||||
List(labels.Selector) ([]*v1.Pod, error)
|
List(labels.Selector) ([]*v1.Pod, error)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user