diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 67df0c96bd5..6558e6a31b5 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -203,18 +203,10 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj) return } - - // A Pod delete event followed by an immediate Pod add event may be merged - // into a Pod update event. In this case, we should invalidate the old Pod, and - // then add the new Pod. - if oldPod.UID != newPod.UID { - sched.deletePodFromCache(oldObj) - sched.addPodToCache(newObj) - return - } + klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod)) if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { - klog.ErrorS(err, "Scheduler cache UpdatePod failed", "oldPod", klog.KObj(oldPod), "newPod", klog.KObj(newPod)) + klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) } sched.SchedulingQueue.AssignedPodUpdated(newPod) diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 31bf9b666c8..0f53940fd9f 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -25,7 +25,7 @@ import ( "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -224,6 +224,12 @@ func TestUpdatePodInCache(t *testing.T) { } sched.addPodToCache(tt.oldObj) sched.updatePodInCache(tt.oldObj, tt.newObj) + + if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID { + if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { + t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID) + } + } pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod)) if err != nil { t.Errorf("Failed to get pod from scheduler: %v", err) diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 9519cc85e45..3ace2396c4f 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -359,13 +359,7 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) } - cache.addPod(pod) - ps := &podState{ - pod: pod, - } - cache.podStates[key] = ps - cache.assumedPods.Insert(key) - return nil + return cache.addPod(pod, true) } func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { @@ -406,23 +400,19 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) } - switch { // Only assumed pod can be forgotten. - case ok && cache.assumedPods.Has(key): - err := cache.removePod(pod) - if err != nil { - return err - } - delete(cache.assumedPods, key) - delete(cache.podStates, key) - default: - return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key) + if ok && cache.assumedPods.Has(key) { + return cache.removePod(pod) } - return nil + return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key) } // Assumes that lock is already acquired. -func (cache *schedulerCache) addPod(pod *v1.Pod) { +func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } n, ok := cache.nodes[pod.Spec.NodeName] if !ok { n = newNodeInfoListItem(framework.NewNodeInfo()) @@ -430,6 +420,14 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) { } n.info.AddPod(pod) cache.moveNodeInfoToHead(pod.Spec.NodeName) + ps := &podState{ + pod: pod, + } + cache.podStates[key] = ps + if assumePod { + cache.assumedPods.Insert(key) + } + return nil } // Assumes that lock is already acquired. @@ -437,8 +435,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { if err := cache.removePod(oldPod); err != nil { return err } - cache.addPod(newPod) - return nil + return cache.addPod(newPod, false) } // Assumes that lock is already acquired. @@ -446,19 +443,27 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { // removed and there are no more pods left in the node, cleans up the node from // the cache. func (cache *schedulerCache) removePod(pod *v1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + n, ok := cache.nodes[pod.Spec.NodeName] if !ok { klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "pod", klog.KObj(pod)) - return nil - } - if err := n.info.RemovePod(pod); err != nil { - return err - } - if len(n.info.Pods) == 0 && n.info.Node() == nil { - cache.removeNodeInfoFromList(pod.Spec.NodeName) } else { - cache.moveNodeInfoToHead(pod.Spec.NodeName) + if err := n.info.RemovePod(pod); err != nil { + return err + } + if len(n.info.Pods) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(pod.Spec.NodeName) + } else { + cache.moveNodeInfoToHead(pod.Spec.NodeName) + } } + + delete(cache.podStates, key) + delete(cache.assumedPods, key) return nil } @@ -477,22 +482,19 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error { if currState.pod.Spec.NodeName != pod.Spec.NodeName { // The pod was added to a different node than it was assumed to. klog.InfoS("Pod was added to a different node than it was assumed", "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) - // Clean this up. - if err = cache.removePod(currState.pod); err != nil { - klog.ErrorS(err, "Error occurred while removing pod") + if err = cache.updatePod(currState.pod, pod); err != nil { + klog.ErrorS(err, "Error occurred while updating pod") } - cache.addPod(pod) + } else { + delete(cache.assumedPods, key) + cache.podStates[key].deadline = nil + cache.podStates[key].pod = pod } - delete(cache.assumedPods, key) - cache.podStates[key].deadline = nil - cache.podStates[key].pod = pod case !ok: // Pod was expired. We should add it back. - cache.addPod(pod) - ps := &podState{ - pod: pod, + if err = cache.addPod(pod, false); err != nil { + klog.ErrorS(err, "Error occurred while adding pod") } - cache.podStates[key] = ps default: return fmt.Errorf("pod %v was already in added state", key) } @@ -509,23 +511,17 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { defer cache.mu.Unlock() currState, ok := cache.podStates[key] - switch { // An assumed pod won't have Update/Remove event. It needs to have Add event // before Update event, in which case the state would change from Assumed to Added. - case ok && !cache.assumedPods.Has(key): + if ok && !cache.assumedPods.Has(key) { if currState.pod.Spec.NodeName != newPod.Spec.NodeName { klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod)) klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions") os.Exit(1) } - if err := cache.updatePod(oldPod, newPod); err != nil { - return err - } - currState.pod = newPod - default: - return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) + return cache.updatePod(oldPod, newPod) } - return nil + return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) } func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { @@ -549,7 +545,7 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { os.Exit(1) } } - return cache.expirePod(key, currState) + return cache.removePod(currState.pod) default: return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key) } @@ -736,22 +732,13 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { } if now.After(*ps.deadline) { klog.InfoS("Pod expired", "pod", klog.KObj(ps.pod)) - if err := cache.expirePod(key, ps); err != nil { + if err := cache.removePod(ps.pod); err != nil { klog.ErrorS(err, "ExpirePod failed", "pod", klog.KObj(ps.pod)) } } } } -func (cache *schedulerCache) expirePod(key string, ps *podState) error { - if err := cache.removePod(ps.pod); err != nil { - return err - } - delete(cache.assumedPods, key) - delete(cache.podStates, key) - return nil -} - // updateMetrics updates cache size metric values for pods, assumed pods, and nodes func (cache *schedulerCache) updateMetrics() { metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))