Merge pull request #106744 from BinacsLee/binacs/fix-race-condition-in-scheduler-eventhandler

scheduler: fix race condition during cache refresh
This commit is contained in:
Kubernetes Prow Robot 2021-12-11 00:31:59 -08:00 committed by GitHub
commit 0cae5f5006
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 71 deletions

View File

@ -203,18 +203,10 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
return
}
// A Pod delete event followed by an immediate Pod add event may be merged
// into a Pod update event. In this case, we should invalidate the old Pod, and
// then add the new Pod.
if oldPod.UID != newPod.UID {
sched.deletePodFromCache(oldObj)
sched.addPodToCache(newObj)
return
}
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "oldPod", klog.KObj(oldPod), "newPod", klog.KObj(newPod))
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
sched.SchedulingQueue.AssignedPodUpdated(newPod)

View File

@ -25,7 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -224,6 +224,12 @@ func TestUpdatePodInCache(t *testing.T) {
}
sched.addPodToCache(tt.oldObj)
sched.updatePodInCache(tt.oldObj, tt.newObj)
if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID {
if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil {
t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID)
}
}
pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod))
if err != nil {
t.Errorf("Failed to get pod from scheduler: %v", err)

View File

@ -359,13 +359,7 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods.Insert(key)
return nil
return cache.addPod(pod, true)
}
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
@ -406,23 +400,19 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods.Has(key):
err := cache.removePod(pod)
if err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
if ok && cache.assumedPods.Has(key) {
return cache.removePod(pod)
}
return nil
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
@ -430,6 +420,14 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
}
n.info.AddPod(pod)
cache.moveNodeInfoToHead(pod.Spec.NodeName)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
if assumePod {
cache.assumedPods.Insert(key)
}
return nil
}
// Assumes that lock is already acquired.
@ -437,8 +435,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
return cache.addPod(newPod, false)
}
// Assumes that lock is already acquired.
@ -446,19 +443,27 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
// removed and there are no more pods left in the node, cleans up the node from
// the cache.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "pod", klog.KObj(pod))
return nil
}
if err := n.info.RemovePod(pod); err != nil {
return err
}
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
if err := n.info.RemovePod(pod); err != nil {
return err
}
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
}
delete(cache.podStates, key)
delete(cache.assumedPods, key)
return nil
}
@ -477,22 +482,19 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.InfoS("Pod was added to a different node than it was assumed", "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
// Clean this up.
if err = cache.removePod(currState.pod); err != nil {
klog.ErrorS(err, "Error occurred while removing pod")
if err = cache.updatePod(currState.pod, pod); err != nil {
klog.ErrorS(err, "Error occurred while updating pod")
}
cache.addPod(pod)
} else {
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
}
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
if err = cache.addPod(pod, false); err != nil {
klog.ErrorS(err, "Error occurred while adding pod")
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
@ -509,23 +511,17 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods.Has(key):
if ok && !cache.assumedPods.Has(key) {
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod))
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions")
os.Exit(1)
}
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
currState.pod = newPod
default:
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
return cache.updatePod(oldPod, newPod)
}
return nil
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
}
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
@ -549,7 +545,7 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
os.Exit(1)
}
}
return cache.expirePod(key, currState)
return cache.removePod(currState.pod)
default:
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
}
@ -736,22 +732,13 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
}
if now.After(*ps.deadline) {
klog.InfoS("Pod expired", "pod", klog.KObj(ps.pod))
if err := cache.expirePod(key, ps); err != nil {
if err := cache.removePod(ps.pod); err != nil {
klog.ErrorS(err, "ExpirePod failed", "pod", klog.KObj(ps.pod))
}
}
}
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *schedulerCache) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))