mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
scheduler: fix race condition during cache refresh
This commit is contained in:
parent
c1153d3353
commit
1027b8de40
@ -203,18 +203,10 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
|
||||
klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
// A Pod delete event followed by an immediate Pod add event may be merged
|
||||
// into a Pod update event. In this case, we should invalidate the old Pod, and
|
||||
// then add the new Pod.
|
||||
if oldPod.UID != newPod.UID {
|
||||
sched.deletePodFromCache(oldObj)
|
||||
sched.addPodToCache(newObj)
|
||||
return
|
||||
}
|
||||
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
|
||||
|
||||
if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
|
||||
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "oldPod", klog.KObj(oldPod), "newPod", klog.KObj(newPod))
|
||||
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
|
||||
}
|
||||
|
||||
sched.SchedulingQueue.AssignedPodUpdated(newPod)
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -224,6 +224,12 @@ func TestUpdatePodInCache(t *testing.T) {
|
||||
}
|
||||
sched.addPodToCache(tt.oldObj)
|
||||
sched.updatePodInCache(tt.oldObj, tt.newObj)
|
||||
|
||||
if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID {
|
||||
if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil {
|
||||
t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID)
|
||||
}
|
||||
}
|
||||
pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod))
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get pod from scheduler: %v", err)
|
||||
|
107
pkg/scheduler/internal/cache/cache.go
vendored
107
pkg/scheduler/internal/cache/cache.go
vendored
@ -359,13 +359,7 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
|
||||
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
|
||||
}
|
||||
|
||||
cache.addPod(pod)
|
||||
ps := &podState{
|
||||
pod: pod,
|
||||
}
|
||||
cache.podStates[key] = ps
|
||||
cache.assumedPods.Insert(key)
|
||||
return nil
|
||||
return cache.addPod(pod, true)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
|
||||
@ -406,23 +400,19 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
|
||||
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
|
||||
}
|
||||
|
||||
switch {
|
||||
// Only assumed pod can be forgotten.
|
||||
case ok && cache.assumedPods.Has(key):
|
||||
err := cache.removePod(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(cache.assumedPods, key)
|
||||
delete(cache.podStates, key)
|
||||
default:
|
||||
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
|
||||
if ok && cache.assumedPods.Has(key) {
|
||||
return cache.removePod(pod)
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
func (cache *schedulerCache) addPod(pod *v1.Pod) {
|
||||
func (cache *schedulerCache) addPod(pod *v1.Pod, assumePod bool) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, ok := cache.nodes[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
n = newNodeInfoListItem(framework.NewNodeInfo())
|
||||
@ -430,6 +420,14 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
|
||||
}
|
||||
n.info.AddPod(pod)
|
||||
cache.moveNodeInfoToHead(pod.Spec.NodeName)
|
||||
ps := &podState{
|
||||
pod: pod,
|
||||
}
|
||||
cache.podStates[key] = ps
|
||||
if assumePod {
|
||||
cache.assumedPods.Insert(key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
@ -437,8 +435,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
if err := cache.removePod(oldPod); err != nil {
|
||||
return err
|
||||
}
|
||||
cache.addPod(newPod)
|
||||
return nil
|
||||
return cache.addPod(newPod, false)
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
@ -446,19 +443,27 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
// removed and there are no more pods left in the node, cleans up the node from
|
||||
// the cache.
|
||||
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
||||
key, err := framework.GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n, ok := cache.nodes[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "pod", klog.KObj(pod))
|
||||
return nil
|
||||
}
|
||||
if err := n.info.RemovePod(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(n.info.Pods) == 0 && n.info.Node() == nil {
|
||||
cache.removeNodeInfoFromList(pod.Spec.NodeName)
|
||||
} else {
|
||||
cache.moveNodeInfoToHead(pod.Spec.NodeName)
|
||||
if err := n.info.RemovePod(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(n.info.Pods) == 0 && n.info.Node() == nil {
|
||||
cache.removeNodeInfoFromList(pod.Spec.NodeName)
|
||||
} else {
|
||||
cache.moveNodeInfoToHead(pod.Spec.NodeName)
|
||||
}
|
||||
}
|
||||
|
||||
delete(cache.podStates, key)
|
||||
delete(cache.assumedPods, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -477,22 +482,19 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
|
||||
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||
// The pod was added to a different node than it was assumed to.
|
||||
klog.InfoS("Pod was added to a different node than it was assumed", "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
|
||||
// Clean this up.
|
||||
if err = cache.removePod(currState.pod); err != nil {
|
||||
klog.ErrorS(err, "Error occurred while removing pod")
|
||||
if err = cache.updatePod(currState.pod, pod); err != nil {
|
||||
klog.ErrorS(err, "Error occurred while updating pod")
|
||||
}
|
||||
cache.addPod(pod)
|
||||
} else {
|
||||
delete(cache.assumedPods, key)
|
||||
cache.podStates[key].deadline = nil
|
||||
cache.podStates[key].pod = pod
|
||||
}
|
||||
delete(cache.assumedPods, key)
|
||||
cache.podStates[key].deadline = nil
|
||||
cache.podStates[key].pod = pod
|
||||
case !ok:
|
||||
// Pod was expired. We should add it back.
|
||||
cache.addPod(pod)
|
||||
ps := &podState{
|
||||
pod: pod,
|
||||
if err = cache.addPod(pod, false); err != nil {
|
||||
klog.ErrorS(err, "Error occurred while adding pod")
|
||||
}
|
||||
cache.podStates[key] = ps
|
||||
default:
|
||||
return fmt.Errorf("pod %v was already in added state", key)
|
||||
}
|
||||
@ -509,23 +511,17 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
currState, ok := cache.podStates[key]
|
||||
switch {
|
||||
// An assumed pod won't have Update/Remove event. It needs to have Add event
|
||||
// before Update event, in which case the state would change from Assumed to Added.
|
||||
case ok && !cache.assumedPods.Has(key):
|
||||
if ok && !cache.assumedPods.Has(key) {
|
||||
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
|
||||
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod))
|
||||
klog.ErrorS(nil, "SchedulerCache is corrupted and can badly affect scheduling decisions")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := cache.updatePod(oldPod, newPod); err != nil {
|
||||
return err
|
||||
}
|
||||
currState.pod = newPod
|
||||
default:
|
||||
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
|
||||
return cache.updatePod(oldPod, newPod)
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
||||
@ -549,7 +545,7 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
return cache.expirePod(key, currState)
|
||||
return cache.removePod(currState.pod)
|
||||
default:
|
||||
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
|
||||
}
|
||||
@ -736,22 +732,13 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
|
||||
}
|
||||
if now.After(*ps.deadline) {
|
||||
klog.InfoS("Pod expired", "pod", klog.KObj(ps.pod))
|
||||
if err := cache.expirePod(key, ps); err != nil {
|
||||
if err := cache.removePod(ps.pod); err != nil {
|
||||
klog.ErrorS(err, "ExpirePod failed", "pod", klog.KObj(ps.pod))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
|
||||
if err := cache.removePod(ps.pod); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(cache.assumedPods, key)
|
||||
delete(cache.podStates, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
|
||||
func (cache *schedulerCache) updateMetrics() {
|
||||
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
|
||||
|
Loading…
Reference in New Issue
Block a user