Merge pull request #108269 from kerthcet/refactor/rename-schedulercache-to-cache

refactor: rename SchedulerCache to Cache in Scheduler
This commit is contained in:
Kubernetes Prow Robot 2022-02-24 14:46:13 -08:00 committed by GitHub
commit d3ece70f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 102 additions and 104 deletions

View File

@ -67,7 +67,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
return return
} }
nodeInfo := sched.SchedulerCache.AddNode(node) nodeInfo := sched.Cache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
} }
@ -84,7 +84,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
return return
} }
nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode) nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable. // Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
@ -108,7 +108,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
return return
} }
klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node)) klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node))
if err := sched.SchedulerCache.RemoveNode(node); err != nil { if err := sched.Cache.RemoveNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache RemoveNode failed") klog.ErrorS(err, "Scheduler cache RemoveNode failed")
} }
} }
@ -129,7 +129,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
return return
} }
isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod) isAssumed, err := sched.Cache.IsAssumedPod(newPod)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err)) utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
} }
@ -185,7 +185,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
} }
klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulerCache.AddPod(pod); err != nil { if err := sched.Cache.AddPod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
} }
@ -205,7 +205,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
} }
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod)) klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { if err := sched.Cache.UpdatePod(oldPod, newPod); err != nil {
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
} }
@ -229,7 +229,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
return return
} }
klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulerCache.RemovePod(pod); err != nil { if err := sched.Cache.RemovePod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
} }

View File

@ -225,21 +225,19 @@ func TestUpdatePodInCache(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
schedulerCache := cache.New(ttl, ctx.Done())
schedulerQueue := queue.NewTestQueue(ctx, nil)
sched := &Scheduler{ sched := &Scheduler{
SchedulerCache: schedulerCache, Cache: cache.New(ttl, ctx.Done()),
SchedulingQueue: schedulerQueue, SchedulingQueue: queue.NewTestQueue(ctx, nil),
} }
sched.addPodToCache(tt.oldObj) sched.addPodToCache(tt.oldObj)
sched.updatePodInCache(tt.oldObj, tt.newObj) sched.updatePodInCache(tt.oldObj, tt.newObj)
if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID { if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID {
if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { if pod, err := sched.Cache.GetPod(tt.oldObj.(*v1.Pod)); err == nil {
t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID) t.Errorf("Get pod UID %v from cache but it should not happen", pod.UID)
} }
} }
pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod)) pod, err := sched.Cache.GetPod(tt.newObj.(*v1.Pod))
if err != nil { if err != nil {
t.Errorf("Failed to get pod from scheduler: %v", err) t.Errorf("Failed to get pod from scheduler: %v", err)
} }

View File

@ -189,7 +189,7 @@ func (c *Configurator) create() (*Scheduler, error) {
) )
return &Scheduler{ return &Scheduler{
SchedulerCache: c.schedulerCache, Cache: c.schedulerCache,
Algorithm: algo, Algorithm: algo,
Extenders: extenders, Extenders: extenders,
Profiles: profiles, Profiles: profiles,

View File

@ -39,7 +39,7 @@ var (
// "ttl" is how long the assumed pod will get expired. // "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine. // "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache { func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) cache := newCache(ttl, cleanAssumedPeriod, stop)
cache.run() cache.run()
return cache return cache
} }
@ -53,7 +53,7 @@ type nodeInfoListItem struct {
prev *nodeInfoListItem prev *nodeInfoListItem
} }
type schedulerCache struct { type cacheImpl struct {
stop <-chan struct{} stop <-chan struct{}
ttl time.Duration ttl time.Duration
period time.Duration period time.Duration
@ -90,15 +90,15 @@ type imageState struct {
} }
// createImageStateSummary returns a summarizing snapshot of the given image's state. // createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary { func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
return &framework.ImageStateSummary{ return &framework.ImageStateSummary{
Size: state.size, Size: state.size,
NumNodes: len(state.nodes), NumNodes: len(state.nodes),
} }
} }
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { func newCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl {
return &schedulerCache{ return &cacheImpl{
ttl: ttl, ttl: ttl,
period: period, period: period,
stop: stop, stop: stop,
@ -121,7 +121,7 @@ func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem {
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo. // linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired. // We assume cache lock is already acquired.
func (cache *schedulerCache) moveNodeInfoToHead(name string) { func (cache *cacheImpl) moveNodeInfoToHead(name string) {
ni, ok := cache.nodes[name] ni, ok := cache.nodes[name]
if !ok { if !ok {
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
@ -149,7 +149,7 @@ func (cache *schedulerCache) moveNodeInfoToHead(name string) {
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly // removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
// linked list. // linked list.
// We assume cache lock is already acquired. // We assume cache lock is already acquired.
func (cache *schedulerCache) removeNodeInfoFromList(name string) { func (cache *cacheImpl) removeNodeInfoFromList(name string) {
ni, ok := cache.nodes[name] ni, ok := cache.nodes[name]
if !ok { if !ok {
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
@ -173,7 +173,7 @@ func (cache *schedulerCache) removeNodeInfoFromList(name string) {
// debugging purposes only and shouldn't be confused with UpdateSnapshot // debugging purposes only and shouldn't be confused with UpdateSnapshot
// function. // function.
// This method is expensive, and should be only used in non-critical path. // This method is expensive, and should be only used in non-critical path.
func (cache *schedulerCache) Dump() *Dump { func (cache *cacheImpl) Dump() *Dump {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
@ -194,7 +194,7 @@ func (cache *schedulerCache) Dump() *Dump {
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. // nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the // This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken. // entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
@ -275,7 +275,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
return nil return nil
} }
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
if updateAll { if updateAll {
@ -311,7 +311,7 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
} }
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
for name := range snapshot.nodeInfoMap { for name := range snapshot.nodeInfoMap {
if toDelete <= 0 { if toDelete <= 0 {
@ -326,7 +326,7 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
// NodeCount returns the number of nodes in the cache. // NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests. // DO NOT use outside of tests.
func (cache *schedulerCache) NodeCount() int { func (cache *cacheImpl) NodeCount() int {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
return len(cache.nodes) return len(cache.nodes)
@ -334,7 +334,7 @@ func (cache *schedulerCache) NodeCount() int {
// PodCount returns the number of pods in the cache (including those from deleted nodes). // PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests. // DO NOT use outside of tests.
func (cache *schedulerCache) PodCount() (int, error) { func (cache *cacheImpl) PodCount() (int, error) {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We // podFilter is expected to return true for most or all of the pods. We
@ -347,7 +347,7 @@ func (cache *schedulerCache) PodCount() (int, error) {
return count, nil return count, nil
} }
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { func (cache *cacheImpl) AssumePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -362,12 +362,12 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
return cache.addPod(pod, true) return cache.addPod(pod, true)
} }
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now()) return cache.finishBinding(pod, time.Now())
} }
// finishBinding exists to make tests determinitistic by injecting now as an argument // finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -386,7 +386,7 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
return nil return nil
} }
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { func (cache *cacheImpl) ForgetPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -408,7 +408,7 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
} }
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error { func (cache *cacheImpl) addPod(pod *v1.Pod, assumePod bool) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -431,7 +431,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error {
} }
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { func (cache *cacheImpl) updatePod(oldPod, newPod *v1.Pod) error {
if err := cache.removePod(oldPod); err != nil { if err := cache.removePod(oldPod); err != nil {
return err return err
} }
@ -442,7 +442,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
// Removes a pod from the cached node info. If the node information was already // Removes a pod from the cached node info. If the node information was already
// removed and there are no more pods left in the node, cleans up the node from // removed and there are no more pods left in the node, cleans up the node from
// the cache. // the cache.
func (cache *schedulerCache) removePod(pod *v1.Pod) error { func (cache *cacheImpl) removePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -467,7 +467,7 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error {
return nil return nil
} }
func (cache *schedulerCache) AddPod(pod *v1.Pod) error { func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -501,7 +501,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
return nil return nil
} }
func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { func (cache *cacheImpl) UpdatePod(oldPod, newPod *v1.Pod) error {
key, err := framework.GetPodKey(oldPod) key, err := framework.GetPodKey(oldPod)
if err != nil { if err != nil {
return err return err
@ -516,7 +516,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
if ok && !cache.assumedPods.Has(key) { if ok && !cache.assumedPods.Has(key) {
if currState.pod.Spec.NodeName != newPod.Spec.NodeName { if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod)) klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod))
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions") klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
os.Exit(1) os.Exit(1)
} }
return cache.updatePod(oldPod, newPod) return cache.updatePod(oldPod, newPod)
@ -524,7 +524,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
} }
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { func (cache *cacheImpl) RemovePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
@ -542,14 +542,14 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
if pod.Spec.NodeName != "" { if pod.Spec.NodeName != "" {
// An empty NodeName is possible when the scheduler misses a Delete // An empty NodeName is possible when the scheduler misses a Delete
// event and it gets the last known state from the informer cache. // event and it gets the last known state from the informer cache.
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions") klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
os.Exit(1) os.Exit(1)
} }
} }
return cache.removePod(currState.pod) return cache.removePod(currState.pod)
} }
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return false, err return false, err
@ -563,7 +563,7 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
// GetPod might return a pod for which its node has already been deleted from // GetPod might return a pod for which its node has already been deleted from
// the main cache. This is useful to properly process pod update events. // the main cache. This is useful to properly process pod update events.
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := framework.GetPodKey(pod) key, err := framework.GetPodKey(pod)
if err != nil { if err != nil {
return nil, err return nil, err
@ -580,7 +580,7 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return podState.pod, nil return podState.pod, nil
} }
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo { func (cache *cacheImpl) AddNode(node *v1.Node) *framework.NodeInfo {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
@ -599,7 +599,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
return n.info.Clone() return n.info.Clone()
} }
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { func (cache *cacheImpl) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
@ -625,7 +625,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.No
// the source of truth. // the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion // However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots. // events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error { func (cache *cacheImpl) RemoveNode(node *v1.Node) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
@ -652,7 +652,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired. // scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) { func (cache *cacheImpl) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) {
newSum := make(map[string]*framework.ImageStateSummary) newSum := make(map[string]*framework.ImageStateSummary)
for _, image := range node.Status.Images { for _, image := range node.Status.Images {
@ -680,7 +680,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framewo
// removeNodeImageStates removes the given node record from image entries having the node // removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image // in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates. // is no longer available on any node, the image entry will be removed from imageStates.
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { func (cache *cacheImpl) removeNodeImageStates(node *v1.Node) {
if node == nil { if node == nil {
return return
} }
@ -701,17 +701,17 @@ func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
} }
} }
func (cache *schedulerCache) run() { func (cache *cacheImpl) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
} }
func (cache *schedulerCache) cleanupExpiredAssumedPods() { func (cache *cacheImpl) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now()) cache.cleanupAssumedPods(time.Now())
} }
// cleanupAssumedPods exists for making test deterministic by taking time as input argument. // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods. // It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { func (cache *cacheImpl) cleanupAssumedPods(now time.Time) {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
defer cache.updateMetrics() defer cache.updateMetrics()
@ -738,7 +738,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
} }
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes // updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *schedulerCache) updateMetrics() { func (cache *cacheImpl) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods))) metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates))) metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes))) metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))

View File

@ -204,7 +204,7 @@ func TestAssumePodScheduled(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.AssumePod(pod); err != nil { if err := cache.AssumePod(pod); err != nil {
t.Fatalf("AssumePod failed: %v", err) t.Fatalf("AssumePod failed: %v", err)
@ -233,7 +233,7 @@ type testExpirePodStruct struct {
assumedTime time.Time assumedTime time.Time
} }
func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error { func assumeAndFinishBinding(cache *cacheImpl, pod *v1.Pod, assumedTime time.Time) error {
if err := cache.AssumePod(pod); err != nil { if err := cache.AssumePod(pod); err != nil {
return err return err
} }
@ -287,7 +287,7 @@ func TestExpirePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.AssumePod(pod.pod); err != nil { if err := cache.AssumePod(pod.pod); err != nil {
@ -347,7 +347,7 @@ func TestAddPodWillConfirm(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@ -387,7 +387,7 @@ func TestDump(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Errorf("assumePod failed: %v", err) t.Errorf("assumePod failed: %v", err)
@ -455,7 +455,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@ -510,7 +510,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
now := time.Now() now := time.Now()
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
} }
@ -576,7 +576,7 @@ func TestUpdatePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAdd := range tt.podsToAdd { for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil { if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err) t.Fatalf("AddPod failed: %v", err)
@ -638,7 +638,7 @@ func TestUpdatePodAndGet(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
if err := tt.handler(cache, tt.pod); err != nil { if err := tt.handler(cache, tt.pod); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
@ -709,7 +709,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
now := time.Now() now := time.Now()
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@ -786,7 +786,7 @@ func TestEphemeralStorageResource(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
if err := cache.AddPod(tt.pod); err != nil { if err := cache.AddPod(tt.pod); err != nil {
t.Fatalf("AddPod failed: %v", err) t.Fatalf("AddPod failed: %v", err)
} }
@ -839,7 +839,7 @@ func TestRemovePod(t *testing.T) {
for name, tt := range tests { for name, tt := range tests {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
nodeName := pod.Spec.NodeName nodeName := pod.Spec.NodeName
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
// Add/Assume pod succeeds even before adding the nodes. // Add/Assume pod succeeds even before adding the nodes.
if tt.assume { if tt.assume {
if err := cache.AddPod(pod); err != nil { if err := cache.AddPod(pod); err != nil {
@ -881,7 +881,7 @@ func TestForgetPod(t *testing.T) {
now := time.Now() now := time.Now()
ttl := 10 * time.Second ttl := 10 * time.Second
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, pod := range pods { for _, pod := range pods {
if err := assumeAndFinishBinding(cache, pod, now); err != nil { if err := assumeAndFinishBinding(cache, pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@ -1063,7 +1063,7 @@ func TestNodeOperators(t *testing.T) {
expected := buildNodeInfo(test.node, test.pods) expected := buildNodeInfo(test.node, test.pods)
node := test.node node := test.node
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
cache.AddNode(node) cache.AddNode(node)
for _, pod := range test.pods { for _, pod := range test.pods {
if err := cache.AddPod(pod); err != nil { if err := cache.AddPod(pod); err != nil {
@ -1087,7 +1087,7 @@ func TestNodeOperators(t *testing.T) {
// Generations are globally unique. We check in our unit tests that they are incremented correctly. // Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.Generation = got.info.Generation expected.Generation = got.info.Generation
if !reflect.DeepEqual(got.info, expected) { if !reflect.DeepEqual(got.info, expected) {
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) t.Errorf("Failed to add node into scheduler cache:\n got: %+v \nexpected: %+v", got, expected)
} }
// Step 2: dump cached nodes successfully. // Step 2: dump cached nodes successfully.
@ -1239,7 +1239,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
podsWithAffinity = append(podsWithAffinity, pod) podsWithAffinity = append(podsWithAffinity, pod)
} }
var cache *schedulerCache var cache *cacheImpl
var snapshot *Snapshot var snapshot *Snapshot
type operation = func(t *testing.T) type operation = func(t *testing.T)
@ -1448,7 +1448,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil) cache = newCache(time.Second, time.Second, nil)
snapshot = NewEmptySnapshot() snapshot = NewEmptySnapshot()
for _, op := range test.operations { for _, op := range test.operations {
@ -1487,7 +1487,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
} }
} }
func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error { func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *Snapshot) error {
// Compare the map. // Compare the map.
if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes { if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap)) return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap))
@ -1561,7 +1561,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
} }
} }
var cache *schedulerCache var cache *cacheImpl
var snapshot *Snapshot var snapshot *Snapshot
addNode := func(t *testing.T, i int) { addNode := func(t *testing.T, i int) {
@ -1663,7 +1663,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil) cache = newCache(time.Second, time.Second, nil)
snapshot = NewEmptySnapshot() snapshot = NewEmptySnapshot()
test.operations(t) test.operations(t)
@ -1755,7 +1755,7 @@ func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, po
} }
func setupCacheOf1kNodes30kPods(b *testing.B) Cache { func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
nodeName := fmt.Sprintf("node-%d", i) nodeName := fmt.Sprintf("node-%d", i)
for j := 0; j < 30; j++ { for j := 0; j < 30; j++ {
@ -1770,8 +1770,8 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
return cache return cache
} }
func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache { func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *cacheImpl {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for i := 0; i < podNum; i++ { for i := 0; i < podNum; i++ {
nodeName := fmt.Sprintf("node-%d", i/10) nodeName := fmt.Sprintf("node-%d", i/10)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
@ -1785,7 +1785,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
return cache return cache
} }
func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { func isForgottenFromCache(p *v1.Pod, c *cacheImpl) error {
if assumed, err := c.IsAssumedPod(p); err != nil { if assumed, err := c.IsAssumedPod(p); err != nil {
return err return err
} else if assumed { } else if assumed {
@ -1798,7 +1798,7 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
} }
// getNodeInfo returns cached data for the node name. // getNodeInfo returns cached data for the node name.
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) { func (cache *cacheImpl) getNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()

View File

@ -65,9 +65,9 @@ const (
// Scheduler watches for new unscheduled pods. It attempts to find // Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server. // nodes that they fit on and writes bindings back to the api server.
type Scheduler struct { type Scheduler struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
SchedulerCache internalcache.Cache Cache internalcache.Cache
Algorithm ScheduleAlgorithm Algorithm ScheduleAlgorithm
@ -367,7 +367,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// immediately. // immediately.
assumed.Spec.NodeName = host assumed.Spec.NodeName = host
if err := sched.SchedulerCache.AssumePod(assumed); err != nil { if err := sched.Cache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "Scheduler cache AssumePod failed") klog.ErrorS(err, "Scheduler cache AssumePod failed")
return err return err
} }
@ -416,7 +416,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
} }
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) { func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { if finErr := sched.Cache.FinishBinding(assumed); finErr != nil {
klog.ErrorS(finErr, "Scheduler cache FinishBinding failed") klog.ErrorS(finErr, "Scheduler cache FinishBinding failed")
} }
if err != nil { if err != nil {
@ -524,7 +524,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod // trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
@ -544,7 +544,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} }
// One of the plugins returned status different than success or wait. // One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
@ -577,7 +577,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@ -600,7 +600,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@ -617,7 +617,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { if err := sched.Cache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed") klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@ -675,7 +675,7 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo
// Case 2: pod that has been assumed could be skipped. // Case 2: pod that has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event // An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed. // during its previous scheduling cycle but before getting assumed.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) isAssumed, err := sched.Cache.IsAssumedPod(pod)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false return false

View File

@ -398,7 +398,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
var gotForgetPod *v1.Pod var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding var gotBinding *v1.Binding
sCache := &fakecache.Cache{ cache := &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) { ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod gotForgetPod = pod
}, },
@ -436,7 +436,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
defer cancel() defer cancel()
s := &Scheduler{ s := &Scheduler{
SchedulerCache: sCache, Cache: cache,
Algorithm: item.algo, Algorithm: item.algo,
client: client, client: client,
Error: func(p *framework.QueuedPodInfo, err error) { Error: func(p *framework.QueuedPodInfo, err error) {
@ -881,7 +881,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods. // scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
bindingChan := make(chan *v1.Binding, 1) bindingChan := make(chan *v1.Binding, 1)
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@ -915,14 +915,14 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
) )
algo := NewGenericScheduler( algo := NewGenericScheduler(
scache, cache,
internalcache.NewEmptySnapshot(), internalcache.NewEmptySnapshot(),
schedulerapi.DefaultPercentageOfNodesToScore, schedulerapi.DefaultPercentageOfNodesToScore,
) )
errChan := make(chan error, 1) errChan := make(chan error, 1)
sched := &Scheduler{ sched := &Scheduler{
SchedulerCache: scache, Cache: cache,
Algorithm: algo, Algorithm: algo,
NextPod: func() *framework.QueuedPodInfo { NextPod: func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
@ -1180,16 +1180,16 @@ func TestSchedulerBinding(t *testing.T) {
} }
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
scache := internalcache.New(100*time.Millisecond, stop) cache := internalcache.New(100*time.Millisecond, stop)
algo := NewGenericScheduler( algo := NewGenericScheduler(
scache, cache,
nil, nil,
0, 0,
) )
sched := Scheduler{ sched := Scheduler{
Algorithm: algo, Algorithm: algo,
Extenders: test.extenders, Extenders: test.extenders,
SchedulerCache: scache, Cache: cache,
} }
err = sched.bind(context.Background(), fwk, pod, "node", nil) err = sched.bind(context.Background(), fwk, pod, "node", nil)
if err != nil { if err != nil {

View File

@ -168,7 +168,7 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper,
// createAndWaitForNodesInCache calls createNodes(), and wait for the created // createAndWaitForNodesInCache calls createNodes(), and wait for the created
// nodes to be present in scheduler cache. // nodes to be present in scheduler cache.
func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount() existingNodes := testCtx.Scheduler.Cache.NodeCount()
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
if err != nil { if err != nil {
return nodes, fmt.Errorf("cannot create nodes: %v", err) return nodes, fmt.Errorf("cannot create nodes: %v", err)
@ -180,7 +180,7 @@ func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string,
// within 30 seconds; otherwise returns false. // within 30 seconds; otherwise returns false.
func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return sched.SchedulerCache.NodeCount() >= nodeCount, nil return sched.Cache.NodeCount() >= nodeCount, nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
@ -432,7 +432,7 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods. // waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount() cachedPods, err := testCtx.Scheduler.Cache.PodCount()
if err != nil { if err != nil {
return false, err return false, err
} }
@ -444,7 +444,7 @@ func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error
if err1 != nil { if err1 != nil {
return false, err1 return false, err1
} }
cachedPod, err2 := testCtx.Scheduler.SchedulerCache.GetPod(actualPod) cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
if err2 != nil || cachedPod == nil { if err2 != nil || cachedPod == nil {
return false, err2 return false, err2
} }

View File

@ -370,7 +370,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
// WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete
func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) { func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
schedulerCacheIsEmpty := func() (bool, error) { schedulerCacheIsEmpty := func() (bool, error) {
dump := sched.SchedulerCache.Dump() dump := sched.Cache.Dump()
return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil
} }