mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
refactor: rename schedulerCache to cacheImpl in internal cache
Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
parent
400b1dea17
commit
09623be0b1
70
pkg/scheduler/internal/cache/cache.go
vendored
70
pkg/scheduler/internal/cache/cache.go
vendored
@ -53,7 +53,7 @@ type nodeInfoListItem struct {
|
||||
prev *nodeInfoListItem
|
||||
}
|
||||
|
||||
type schedulerCache struct {
|
||||
type cacheImpl struct {
|
||||
stop <-chan struct{}
|
||||
ttl time.Duration
|
||||
period time.Duration
|
||||
@ -90,15 +90,15 @@ type imageState struct {
|
||||
}
|
||||
|
||||
// createImageStateSummary returns a summarizing snapshot of the given image's state.
|
||||
func (cache *schedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
|
||||
func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
|
||||
return &framework.ImageStateSummary{
|
||||
Size: state.size,
|
||||
NumNodes: len(state.nodes),
|
||||
}
|
||||
}
|
||||
|
||||
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
|
||||
return &schedulerCache{
|
||||
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl {
|
||||
return &cacheImpl{
|
||||
ttl: ttl,
|
||||
period: period,
|
||||
stop: stop,
|
||||
@ -121,7 +121,7 @@ func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem {
|
||||
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
|
||||
// linked list. The head is the most recently updated NodeInfo.
|
||||
// We assume cache lock is already acquired.
|
||||
func (cache *schedulerCache) moveNodeInfoToHead(name string) {
|
||||
func (cache *cacheImpl) moveNodeInfoToHead(name string) {
|
||||
ni, ok := cache.nodes[name]
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
|
||||
@ -149,7 +149,7 @@ func (cache *schedulerCache) moveNodeInfoToHead(name string) {
|
||||
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
|
||||
// linked list.
|
||||
// We assume cache lock is already acquired.
|
||||
func (cache *schedulerCache) removeNodeInfoFromList(name string) {
|
||||
func (cache *cacheImpl) removeNodeInfoFromList(name string) {
|
||||
ni, ok := cache.nodes[name]
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
|
||||
@ -173,7 +173,7 @@ func (cache *schedulerCache) removeNodeInfoFromList(name string) {
|
||||
// debugging purposes only and shouldn't be confused with UpdateSnapshot
|
||||
// function.
|
||||
// This method is expensive, and should be only used in non-critical path.
|
||||
func (cache *schedulerCache) Dump() *Dump {
|
||||
func (cache *cacheImpl) Dump() *Dump {
|
||||
cache.mu.RLock()
|
||||
defer cache.mu.RUnlock()
|
||||
|
||||
@ -194,7 +194,7 @@ func (cache *schedulerCache) Dump() *Dump {
|
||||
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
|
||||
// This function tracks generation number of NodeInfo and updates only the
|
||||
// entries of an existing snapshot that have changed after the snapshot was taken.
|
||||
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
@ -275,7 +275,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
|
||||
func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
|
||||
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
|
||||
if updateAll {
|
||||
@ -311,7 +311,7 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
|
||||
}
|
||||
|
||||
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
|
||||
func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
|
||||
func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
|
||||
toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
|
||||
for name := range snapshot.nodeInfoMap {
|
||||
if toDelete <= 0 {
|
||||
@ -326,7 +326,7 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
|
||||
|
||||
// NodeCount returns the number of nodes in the cache.
|
||||
// DO NOT use outside of tests.
|
||||
func (cache *schedulerCache) NodeCount() int {
|
||||
func (cache *cacheImpl) NodeCount() int {
|
||||
cache.mu.RLock()
|
||||
defer cache.mu.RUnlock()
|
||||
return len(cache.nodes)
|
||||
@ -334,7 +334,7 @@ func (cache *schedulerCache) NodeCount() int {
|
||||
|
||||
// PodCount returns the number of pods in the cache (including those from deleted nodes).
|
||||
// DO NOT use outside of tests.
|
||||
func (cache *schedulerCache) PodCount() (int, error) {
|
||||
func (cache *cacheImpl) PodCount() (int, error) {
|
||||
cache.mu.RLock()
|
||||
defer cache.mu.RUnlock()
|
||||
// podFilter is expected to return true for most or all of the pods. We
|
||||
@ -347,7 +347,7 @@ func (cache *schedulerCache) PodCount() (int, error) {
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) AssumePod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -362,12 +362,12 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
|
||||
return cache.addPod(pod, true)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error {
|
||||
return cache.finishBinding(pod, time.Now())
|
||||
}
|
||||
|
||||
// finishBinding exists to make tests determinitistic by injecting now as an argument
|
||||
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
|
||||
func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -386,7 +386,7 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) ForgetPod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -408,7 +408,7 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error {
|
||||
func (cache *cacheImpl) addPod(pod *v1.Pod, assumePod bool) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -431,7 +431,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error {
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
func (cache *cacheImpl) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
if err := cache.removePod(oldPod); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -442,7 +442,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
// Removes a pod from the cached node info. If the node information was already
|
||||
// removed and there are no more pods left in the node, cleans up the node from
|
||||
// the cache.
|
||||
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) removePod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -467,7 +467,7 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -501,7 +501,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
|
||||
func (cache *cacheImpl) UpdatePod(oldPod, newPod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(oldPod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -516,7 +516,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
|
||||
if ok && !cache.assumedPods.Has(key) {
|
||||
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
|
||||
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod))
|
||||
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions")
|
||||
klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
|
||||
os.Exit(1)
|
||||
}
|
||||
return cache.updatePod(oldPod, newPod)
|
||||
@ -524,7 +524,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
|
||||
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
||||
func (cache *cacheImpl) RemovePod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -542,14 +542,14 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
||||
if pod.Spec.NodeName != "" {
|
||||
// An empty NodeName is possible when the scheduler misses a Delete
|
||||
// event and it gets the last known state from the informer cache.
|
||||
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions")
|
||||
klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
return cache.removePod(currState.pod)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
|
||||
func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -563,7 +563,7 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
|
||||
|
||||
// GetPod might return a pod for which its node has already been deleted from
|
||||
// the main cache. This is useful to properly process pod update events.
|
||||
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -580,7 +580,7 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
return podState.pod, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
|
||||
func (cache *cacheImpl) AddNode(node *v1.Node) *framework.NodeInfo {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
@ -599,7 +599,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
|
||||
return n.info.Clone()
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
|
||||
func (cache *cacheImpl) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
@ -625,7 +625,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.No
|
||||
// the source of truth.
|
||||
// However, we keep a ghost node with the list of pods until all pod deletion
|
||||
// events have arrived. A ghost node is skipped from snapshots.
|
||||
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
||||
func (cache *cacheImpl) RemoveNode(node *v1.Node) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
@ -652,7 +652,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
||||
|
||||
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
|
||||
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
|
||||
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) {
|
||||
func (cache *cacheImpl) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) {
|
||||
newSum := make(map[string]*framework.ImageStateSummary)
|
||||
|
||||
for _, image := range node.Status.Images {
|
||||
@ -680,7 +680,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framewo
|
||||
// removeNodeImageStates removes the given node record from image entries having the node
|
||||
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
|
||||
// is no longer available on any node, the image entry will be removed from imageStates.
|
||||
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
|
||||
func (cache *cacheImpl) removeNodeImageStates(node *v1.Node) {
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
@ -701,17 +701,17 @@ func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
|
||||
}
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) run() {
|
||||
func (cache *cacheImpl) run() {
|
||||
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
|
||||
func (cache *cacheImpl) cleanupExpiredAssumedPods() {
|
||||
cache.cleanupAssumedPods(time.Now())
|
||||
}
|
||||
|
||||
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
|
||||
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
|
||||
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
|
||||
func (cache *cacheImpl) cleanupAssumedPods(now time.Time) {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
defer cache.updateMetrics()
|
||||
@ -738,7 +738,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
|
||||
}
|
||||
|
||||
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
|
||||
func (cache *schedulerCache) updateMetrics() {
|
||||
func (cache *cacheImpl) updateMetrics() {
|
||||
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
|
||||
metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
|
||||
metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
|
||||
|
16
pkg/scheduler/internal/cache/cache_test.go
vendored
16
pkg/scheduler/internal/cache/cache_test.go
vendored
@ -233,7 +233,7 @@ type testExpirePodStruct struct {
|
||||
assumedTime time.Time
|
||||
}
|
||||
|
||||
func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
|
||||
func assumeAndFinishBinding(cache *cacheImpl, pod *v1.Pod, assumedTime time.Time) error {
|
||||
if err := cache.AssumePod(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1087,7 +1087,7 @@ func TestNodeOperators(t *testing.T) {
|
||||
// Generations are globally unique. We check in our unit tests that they are incremented correctly.
|
||||
expected.Generation = got.info.Generation
|
||||
if !reflect.DeepEqual(got.info, expected) {
|
||||
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
|
||||
t.Errorf("Failed to add node into scheduler cache:\n got: %+v \nexpected: %+v", got, expected)
|
||||
}
|
||||
|
||||
// Step 2: dump cached nodes successfully.
|
||||
@ -1239,7 +1239,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
|
||||
podsWithAffinity = append(podsWithAffinity, pod)
|
||||
}
|
||||
|
||||
var cache *schedulerCache
|
||||
var cache *cacheImpl
|
||||
var snapshot *Snapshot
|
||||
type operation = func(t *testing.T)
|
||||
|
||||
@ -1487,7 +1487,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error {
|
||||
func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *Snapshot) error {
|
||||
// Compare the map.
|
||||
if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes {
|
||||
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap))
|
||||
@ -1561,7 +1561,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
var cache *schedulerCache
|
||||
var cache *cacheImpl
|
||||
var snapshot *Snapshot
|
||||
|
||||
addNode := func(t *testing.T, i int) {
|
||||
@ -1770,7 +1770,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
|
||||
return cache
|
||||
}
|
||||
|
||||
func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache {
|
||||
func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *cacheImpl {
|
||||
cache := newSchedulerCache(time.Second, time.Second, nil)
|
||||
for i := 0; i < podNum; i++ {
|
||||
nodeName := fmt.Sprintf("node-%d", i/10)
|
||||
@ -1785,7 +1785,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
|
||||
return cache
|
||||
}
|
||||
|
||||
func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
|
||||
func isForgottenFromCache(p *v1.Pod, c *cacheImpl) error {
|
||||
if assumed, err := c.IsAssumedPod(p); err != nil {
|
||||
return err
|
||||
} else if assumed {
|
||||
@ -1798,7 +1798,7 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
|
||||
}
|
||||
|
||||
// getNodeInfo returns cached data for the node name.
|
||||
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
|
||||
func (cache *cacheImpl) getNodeInfo(nodeName string) (*v1.Node, error) {
|
||||
cache.mu.RLock()
|
||||
defer cache.mu.RUnlock()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user