diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 947ae68cb80..96ae9536af5 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -67,7 +67,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { return } - nodeInfo := sched.SchedulerCache.AddNode(node) + nodeInfo := sched.Cache.AddNode(node) klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo)) } @@ -84,7 +84,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { return } - nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode) + nodeInfo := sched.Cache.UpdateNode(oldNode, newNode) // Only requeue unschedulable pods if the node became more schedulable. if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo)) @@ -108,7 +108,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { return } klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node)) - if err := sched.SchedulerCache.RemoveNode(node); err != nil { + if err := sched.Cache.RemoveNode(node); err != nil { klog.ErrorS(err, "Scheduler cache RemoveNode failed") } } @@ -129,7 +129,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { return } - isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod) + isAssumed, err := sched.Cache.IsAssumedPod(newPod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err)) } @@ -185,7 +185,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { } klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) - if err := sched.SchedulerCache.AddPod(pod); err != nil { + if err := sched.Cache.AddPod(pod); err != nil { klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod)) } @@ -205,7 +205,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { } klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod)) - if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { + if err := sched.Cache.UpdatePod(oldPod, newPod); err != nil { klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) } @@ -229,7 +229,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { return } klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) - if err := sched.SchedulerCache.RemovePod(pod); err != nil { + if err := sched.Cache.RemovePod(pod); err != nil { klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) } diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index b61b4866960..81522ca9892 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -225,21 +225,19 @@ func TestUpdatePodInCache(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - schedulerCache := cache.New(ttl, ctx.Done()) - schedulerQueue := queue.NewTestQueue(ctx, nil) sched := &Scheduler{ - SchedulerCache: schedulerCache, - SchedulingQueue: schedulerQueue, + Cache: cache.New(ttl, ctx.Done()), + SchedulingQueue: queue.NewTestQueue(ctx, nil), } sched.addPodToCache(tt.oldObj) sched.updatePodInCache(tt.oldObj, tt.newObj) if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID { - if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { - t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID) + if pod, err := sched.Cache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { + t.Errorf("Get pod UID %v from cache but it should not happen", pod.UID) } } - pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod)) + pod, err := sched.Cache.GetPod(tt.newObj.(*v1.Pod)) if err != nil { t.Errorf("Failed to get pod from scheduler: %v", err) } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 2e70f6a14c5..3643361b869 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -189,7 +189,7 @@ func (c *Configurator) create() (*Scheduler, error) { ) return &Scheduler{ - SchedulerCache: c.schedulerCache, + Cache: c.schedulerCache, Algorithm: algo, Extenders: extenders, Profiles: profiles, diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 4efc11d9e01..73fb6c367b5 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -39,7 +39,7 @@ var ( // "ttl" is how long the assumed pod will get expired. // "stop" is the channel that would close the background goroutine. func New(ttl time.Duration, stop <-chan struct{}) Cache { - cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) + cache := newCache(ttl, cleanAssumedPeriod, stop) cache.run() return cache } @@ -53,7 +53,7 @@ type nodeInfoListItem struct { prev *nodeInfoListItem } -type schedulerCache struct { +type cacheImpl struct { stop <-chan struct{} ttl time.Duration period time.Duration @@ -90,15 +90,15 @@ type imageState struct { } // createImageStateSummary returns a summarizing snapshot of the given image's state. -func (cache *schedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary { +func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.ImageStateSummary { return &framework.ImageStateSummary{ Size: state.size, NumNodes: len(state.nodes), } } -func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { - return &schedulerCache{ +func newCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl { + return &cacheImpl{ ttl: ttl, period: period, stop: stop, @@ -121,7 +121,7 @@ func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem { // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly // linked list. The head is the most recently updated NodeInfo. // We assume cache lock is already acquired. -func (cache *schedulerCache) moveNodeInfoToHead(name string) { +func (cache *cacheImpl) moveNodeInfoToHead(name string) { ni, ok := cache.nodes[name] if !ok { klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) @@ -149,7 +149,7 @@ func (cache *schedulerCache) moveNodeInfoToHead(name string) { // removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly // linked list. // We assume cache lock is already acquired. -func (cache *schedulerCache) removeNodeInfoFromList(name string) { +func (cache *cacheImpl) removeNodeInfoFromList(name string) { ni, ok := cache.nodes[name] if !ok { klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) @@ -173,7 +173,7 @@ func (cache *schedulerCache) removeNodeInfoFromList(name string) { // debugging purposes only and shouldn't be confused with UpdateSnapshot // function. // This method is expensive, and should be only used in non-critical path. -func (cache *schedulerCache) Dump() *Dump { +func (cache *cacheImpl) Dump() *Dump { cache.mu.RLock() defer cache.mu.RUnlock() @@ -194,7 +194,7 @@ func (cache *schedulerCache) Dump() *Dump { // nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. // This function tracks generation number of NodeInfo and updates only the // entries of an existing snapshot that have changed after the snapshot was taken. -func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { +func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() @@ -275,7 +275,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { return nil } -func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { +func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) if updateAll { @@ -311,7 +311,7 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda } // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. -func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { +func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes for name := range snapshot.nodeInfoMap { if toDelete <= 0 { @@ -326,7 +326,7 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) // NodeCount returns the number of nodes in the cache. // DO NOT use outside of tests. -func (cache *schedulerCache) NodeCount() int { +func (cache *cacheImpl) NodeCount() int { cache.mu.RLock() defer cache.mu.RUnlock() return len(cache.nodes) @@ -334,7 +334,7 @@ func (cache *schedulerCache) NodeCount() int { // PodCount returns the number of pods in the cache (including those from deleted nodes). // DO NOT use outside of tests. -func (cache *schedulerCache) PodCount() (int, error) { +func (cache *cacheImpl) PodCount() (int, error) { cache.mu.RLock() defer cache.mu.RUnlock() // podFilter is expected to return true for most or all of the pods. We @@ -347,7 +347,7 @@ func (cache *schedulerCache) PodCount() (int, error) { return count, nil } -func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { +func (cache *cacheImpl) AssumePod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -362,12 +362,12 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { return cache.addPod(pod, true) } -func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { +func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error { return cache.finishBinding(pod, time.Now()) } // finishBinding exists to make tests determinitistic by injecting now as an argument -func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { +func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -386,7 +386,7 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { return nil } -func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { +func (cache *cacheImpl) ForgetPod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -408,7 +408,7 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { } // Assumes that lock is already acquired. -func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error { +func (cache *cacheImpl) addPod(pod *v1.Pod, assumePod bool) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -431,7 +431,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error { } // Assumes that lock is already acquired. -func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { +func (cache *cacheImpl) updatePod(oldPod, newPod *v1.Pod) error { if err := cache.removePod(oldPod); err != nil { return err } @@ -442,7 +442,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { // Removes a pod from the cached node info. If the node information was already // removed and there are no more pods left in the node, cleans up the node from // the cache. -func (cache *schedulerCache) removePod(pod *v1.Pod) error { +func (cache *cacheImpl) removePod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -467,7 +467,7 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error { return nil } -func (cache *schedulerCache) AddPod(pod *v1.Pod) error { +func (cache *cacheImpl) AddPod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -501,7 +501,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error { return nil } -func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { +func (cache *cacheImpl) UpdatePod(oldPod, newPod *v1.Pod) error { key, err := framework.GetPodKey(oldPod) if err != nil { return err @@ -516,7 +516,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { if ok && !cache.assumedPods.Has(key) { if currState.pod.Spec.NodeName != newPod.Spec.NodeName { klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod)) - klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions") + klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions") os.Exit(1) } return cache.updatePod(oldPod, newPod) @@ -524,7 +524,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) } -func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { +func (cache *cacheImpl) RemovePod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { return err @@ -542,14 +542,14 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { if pod.Spec.NodeName != "" { // An empty NodeName is possible when the scheduler misses a Delete // event and it gets the last known state from the informer cache. - klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions") + klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions") os.Exit(1) } } return cache.removePod(currState.pod) } -func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { +func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) { key, err := framework.GetPodKey(pod) if err != nil { return false, err @@ -563,7 +563,7 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { // GetPod might return a pod for which its node has already been deleted from // the main cache. This is useful to properly process pod update events. -func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { +func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) { key, err := framework.GetPodKey(pod) if err != nil { return nil, err @@ -580,7 +580,7 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { return podState.pod, nil } -func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo { +func (cache *cacheImpl) AddNode(node *v1.Node) *framework.NodeInfo { cache.mu.Lock() defer cache.mu.Unlock() @@ -599,7 +599,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo { return n.info.Clone() } -func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { +func (cache *cacheImpl) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { cache.mu.Lock() defer cache.mu.Unlock() @@ -625,7 +625,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.No // the source of truth. // However, we keep a ghost node with the list of pods until all pod deletion // events have arrived. A ghost node is skipped from snapshots. -func (cache *schedulerCache) RemoveNode(node *v1.Node) error { +func (cache *cacheImpl) RemoveNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() @@ -652,7 +652,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in // scheduler cache. This function assumes the lock to scheduler cache has been acquired. -func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) { +func (cache *cacheImpl) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) { newSum := make(map[string]*framework.ImageStateSummary) for _, image := range node.Status.Images { @@ -680,7 +680,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framewo // removeNodeImageStates removes the given node record from image entries having the node // in imageStates cache. After the removal, if any image becomes free, i.e., the image // is no longer available on any node, the image entry will be removed from imageStates. -func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { +func (cache *cacheImpl) removeNodeImageStates(node *v1.Node) { if node == nil { return } @@ -701,17 +701,17 @@ func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { } } -func (cache *schedulerCache) run() { +func (cache *cacheImpl) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } -func (cache *schedulerCache) cleanupExpiredAssumedPods() { +func (cache *cacheImpl) cleanupExpiredAssumedPods() { cache.cleanupAssumedPods(time.Now()) } // cleanupAssumedPods exists for making test deterministic by taking time as input argument. // It also reports metrics on the cache size for nodes, pods, and assumed pods. -func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { +func (cache *cacheImpl) cleanupAssumedPods(now time.Time) { cache.mu.Lock() defer cache.mu.Unlock() defer cache.updateMetrics() @@ -738,7 +738,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { } // updateMetrics updates cache size metric values for pods, assumed pods, and nodes -func (cache *schedulerCache) updateMetrics() { +func (cache *cacheImpl) updateMetrics() { metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods))) metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates))) metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes))) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 08002f0a36c..9e8c890f3b7 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -204,7 +204,7 @@ func TestAssumePodScheduled(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(time.Second, time.Second, nil) + cache := newCache(time.Second, time.Second, nil) for _, pod := range tt.pods { if err := cache.AssumePod(pod); err != nil { t.Fatalf("AssumePod failed: %v", err) @@ -233,7 +233,7 @@ type testExpirePodStruct struct { assumedTime time.Time } -func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error { +func assumeAndFinishBinding(cache *cacheImpl, pod *v1.Pod, assumedTime time.Time) error { if err := cache.AssumePod(pod); err != nil { return err } @@ -287,7 +287,7 @@ func TestExpirePod(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, pod := range tt.pods { if err := cache.AssumePod(pod.pod); err != nil { @@ -347,7 +347,7 @@ func TestAddPodWillConfirm(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -387,7 +387,7 @@ func TestDump(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Errorf("assumePod failed: %v", err) @@ -455,7 +455,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -510,7 +510,7 @@ func TestAddPodAfterExpiration(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { now := time.Now() - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } @@ -576,7 +576,7 @@ func TestUpdatePod(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil { t.Fatalf("AddPod failed: %v", err) @@ -638,7 +638,7 @@ func TestUpdatePodAndGet(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) if err := tt.handler(cache, tt.pod); err != nil { t.Fatalf("unexpected err: %v", err) @@ -709,7 +709,7 @@ func TestExpireAddUpdatePod(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { now := time.Now() - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -786,7 +786,7 @@ func TestEphemeralStorageResource(t *testing.T) { } for i, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - cache := newSchedulerCache(time.Second, time.Second, nil) + cache := newCache(time.Second, time.Second, nil) if err := cache.AddPod(tt.pod); err != nil { t.Fatalf("AddPod failed: %v", err) } @@ -839,7 +839,7 @@ func TestRemovePod(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { nodeName := pod.Spec.NodeName - cache := newSchedulerCache(time.Second, time.Second, nil) + cache := newCache(time.Second, time.Second, nil) // Add/Assume pod succeeds even before adding the nodes. if tt.assume { if err := cache.AddPod(pod); err != nil { @@ -881,7 +881,7 @@ func TestForgetPod(t *testing.T) { now := time.Now() ttl := 10 * time.Second - cache := newSchedulerCache(ttl, time.Second, nil) + cache := newCache(ttl, time.Second, nil) for _, pod := range pods { if err := assumeAndFinishBinding(cache, pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) @@ -1063,7 +1063,7 @@ func TestNodeOperators(t *testing.T) { expected := buildNodeInfo(test.node, test.pods) node := test.node - cache := newSchedulerCache(time.Second, time.Second, nil) + cache := newCache(time.Second, time.Second, nil) cache.AddNode(node) for _, pod := range test.pods { if err := cache.AddPod(pod); err != nil { @@ -1087,7 +1087,7 @@ func TestNodeOperators(t *testing.T) { // Generations are globally unique. We check in our unit tests that they are incremented correctly. expected.Generation = got.info.Generation if !reflect.DeepEqual(got.info, expected) { - t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) + t.Errorf("Failed to add node into scheduler cache:\n got: %+v \nexpected: %+v", got, expected) } // Step 2: dump cached nodes successfully. @@ -1239,7 +1239,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { podsWithAffinity = append(podsWithAffinity, pod) } - var cache *schedulerCache + var cache *cacheImpl var snapshot *Snapshot type operation = func(t *testing.T) @@ -1448,7 +1448,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - cache = newSchedulerCache(time.Second, time.Second, nil) + cache = newCache(time.Second, time.Second, nil) snapshot = NewEmptySnapshot() for _, op := range test.operations { @@ -1487,7 +1487,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } -func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error { +func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *Snapshot) error { // Compare the map. if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes { return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap)) @@ -1561,7 +1561,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { } } - var cache *schedulerCache + var cache *cacheImpl var snapshot *Snapshot addNode := func(t *testing.T, i int) { @@ -1663,7 +1663,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - cache = newSchedulerCache(time.Second, time.Second, nil) + cache = newCache(time.Second, time.Second, nil) snapshot = NewEmptySnapshot() test.operations(t) @@ -1755,7 +1755,7 @@ func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, po } func setupCacheOf1kNodes30kPods(b *testing.B) Cache { - cache := newSchedulerCache(time.Second, time.Second, nil) + cache := newCache(time.Second, time.Second, nil) for i := 0; i < 1000; i++ { nodeName := fmt.Sprintf("node-%d", i) for j := 0; j < 30; j++ { @@ -1770,8 +1770,8 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache { return cache } -func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache { - cache := newSchedulerCache(time.Second, time.Second, nil) +func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *cacheImpl { + cache := newCache(time.Second, time.Second, nil) for i := 0; i < podNum; i++ { nodeName := fmt.Sprintf("node-%d", i/10) objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) @@ -1785,7 +1785,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) return cache } -func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { +func isForgottenFromCache(p *v1.Pod, c *cacheImpl) error { if assumed, err := c.IsAssumedPod(p); err != nil { return err } else if assumed { @@ -1798,7 +1798,7 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { } // getNodeInfo returns cached data for the node name. -func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) { +func (cache *cacheImpl) getNodeInfo(nodeName string) (*v1.Node, error) { cache.mu.RLock() defer cache.mu.RUnlock() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index dc78a5c804e..0d77793a896 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -65,9 +65,9 @@ const ( // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { - // It is expected that changes made via SchedulerCache will be observed + // It is expected that changes made via Cache will be observed // by NodeLister and Algorithm. - SchedulerCache internalcache.Cache + Cache internalcache.Cache Algorithm ScheduleAlgorithm @@ -367,7 +367,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // immediately. assumed.Spec.NodeName = host - if err := sched.SchedulerCache.AssumePod(assumed); err != nil { + if err := sched.Cache.AssumePod(assumed); err != nil { klog.ErrorS(err, "Scheduler cache AssumePod failed") return err } @@ -416,7 +416,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) } func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) { - if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { + if finErr := sched.Cache.FinishBinding(assumed); finErr != nil { klog.ErrorS(finErr, "Scheduler cache FinishBinding failed") } if err != nil { @@ -524,7 +524,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) @@ -544,7 +544,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // One of the plugins returned status different than success or wait. fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) @@ -577,7 +577,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, @@ -600,7 +600,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, @@ -617,7 +617,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { + if err := sched.Cache.ForgetPod(assumedPod); err != nil { klog.ErrorS(err, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, @@ -675,7 +675,7 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo // Case 2: pod that has been assumed could be skipped. // An assumed pod can be added again to the scheduling queue if it got an update event // during its previous scheduling cycle but before getting assumed. - isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) + isAssumed, err := sched.Cache.IsAssumedPod(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) return false diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index cf9efc2bffd..b146dd1cf25 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -398,7 +398,7 @@ func TestSchedulerScheduleOne(t *testing.T) { var gotForgetPod *v1.Pod var gotAssumedPod *v1.Pod var gotBinding *v1.Binding - sCache := &fakecache.Cache{ + cache := &fakecache.Cache{ ForgetFunc: func(pod *v1.Pod) { gotForgetPod = pod }, @@ -436,9 +436,9 @@ func TestSchedulerScheduleOne(t *testing.T) { defer cancel() s := &Scheduler{ - SchedulerCache: sCache, - Algorithm: item.algo, - client: client, + Cache: cache, + Algorithm: item.algo, + client: client, Error: func(p *framework.QueuedPodInfo, err error) { gotPod = p.Pod gotError = err @@ -881,7 +881,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { bindingChan := make(chan *v1.Binding, 1) client := clientsetfake.NewSimpleClientset() client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { @@ -915,15 +915,15 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s ) algo := NewGenericScheduler( - scache, + cache, internalcache.NewEmptySnapshot(), schedulerapi.DefaultPercentageOfNodesToScore, ) errChan := make(chan error, 1) sched := &Scheduler{ - SchedulerCache: scache, - Algorithm: algo, + Cache: cache, + Algorithm: algo, NextPod: func() *framework.QueuedPodInfo { return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} }, @@ -1180,16 +1180,16 @@ func TestSchedulerBinding(t *testing.T) { } stop := make(chan struct{}) defer close(stop) - scache := internalcache.New(100*time.Millisecond, stop) + cache := internalcache.New(100*time.Millisecond, stop) algo := NewGenericScheduler( - scache, + cache, nil, 0, ) sched := Scheduler{ - Algorithm: algo, - Extenders: test.extenders, - SchedulerCache: scache, + Algorithm: algo, + Extenders: test.extenders, + Cache: cache, } err = sched.bind(context.Background(), fwk, pod, "node", nil) if err != nil { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 61c793d8933..45b4d3f5469 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -168,7 +168,7 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, // createAndWaitForNodesInCache calls createNodes(), and wait for the created // nodes to be present in scheduler cache. func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { - existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount() + existingNodes := testCtx.Scheduler.Cache.NodeCount() nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) if err != nil { return nodes, fmt.Errorf("cannot create nodes: %v", err) @@ -180,7 +180,7 @@ func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, // within 30 seconds; otherwise returns false. func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - return sched.SchedulerCache.NodeCount() >= nodeCount, nil + return sched.Cache.NodeCount() >= nodeCount, nil }) if err != nil { return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) @@ -432,7 +432,7 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt // waitCachedPodsStable waits until scheduler cache has the given pods. func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount() + cachedPods, err := testCtx.Scheduler.Cache.PodCount() if err != nil { return false, err } @@ -444,7 +444,7 @@ func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error if err1 != nil { return false, err1 } - cachedPod, err2 := testCtx.Scheduler.SchedulerCache.GetPod(actualPod) + cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod) if err2 != nil || cachedPod == nil { return false, err2 } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index fc3b30fbede..931d2d87de3 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -370,7 +370,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) { schedulerCacheIsEmpty := func() (bool, error) { - dump := sched.SchedulerCache.Dump() + dump := sched.Cache.Dump() return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil }