From 8f1d240f73959b99ef0eb7a6503f78505452078e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 23 Nov 2016 15:33:59 +0100 Subject: [PATCH] Try self-repair scheduler cache or panic --- plugin/pkg/scheduler/schedulercache/cache.go | 100 +++++++++++-------- 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 697b3789073..472b9fb853d 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -110,15 +110,15 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { // assumePod exists for making test deterministic by taking time as input argument. func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error { - cache.mu.Lock() - defer cache.mu.Unlock() - key, err := getPodKey(pod) if err != nil { return err } + + cache.mu.Lock() + defer cache.mu.Unlock() if _, ok := cache.podStates[key]; ok { - return fmt.Errorf("pod state wasn't initial but get assumed. Pod key: %v", key) + return fmt.Errorf("pod %v state wasn't initial but get assumed", key) } cache.addPod(pod) @@ -141,7 +141,11 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { cache.mu.Lock() defer cache.mu.Unlock() - _, ok := cache.podStates[key] + currState, ok := cache.podStates[key] + if currState.pod.Spec.NodeName != pod.Spec.NodeName { + return fmt.Errorf("pod %v state was assumed on a different node", key) + } + switch { // Only assumed pod can be forgotten. case ok && cache.assumedPods[key]: @@ -152,7 +156,38 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { delete(cache.assumedPods, key) delete(cache.podStates, key) default: - return fmt.Errorf("pod state wasn't assumed but get forgotten. Pod key: %v", key) + return fmt.Errorf("pod %v state wasn't assumed but get forgotten", key) + } + return nil +} + +// Assumes that lock is already acquired. +func (cache *schedulerCache) addPod(pod *v1.Pod) { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + n = NewNodeInfo() + cache.nodes[pod.Spec.NodeName] = n + } + n.addPod(pod) +} + +// Assumes that lock is already acquired. +func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { + if err := cache.removePod(oldPod); err != nil { + return err + } + cache.addPod(newPod) + return nil +} + +// Assumes that lock is already acquired. +func (cache *schedulerCache) removePod(pod *v1.Pod) error { + n := cache.nodes[pod.Spec.NodeName] + if err := n.removePod(pod); err != nil { + return err + } + if len(n.pods) == 0 && n.node == nil { + delete(cache.nodes, pod.Spec.NodeName) } return nil } @@ -166,9 +201,16 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error { cache.mu.Lock() defer cache.mu.Unlock() - _, ok := cache.podStates[key] + currState, ok := cache.podStates[key] switch { case ok && cache.assumedPods[key]: + if currState.pod.Spec.NodeName != pod.Spec.NodeName { + // The pod was added to a different node than it was assumed to. + glog.Warningf("Pod %v assumed to a different node than added to.", key) + // Clean this up. + cache.removePod(currState.pod) + cache.addPod(pod) + } delete(cache.assumedPods, key) cache.podStates[key].deadline = nil case !ok: @@ -193,44 +235,20 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { cache.mu.Lock() defer cache.mu.Unlock() - _, ok := cache.podStates[key] + currState, ok := cache.podStates[key] switch { // An assumed pod won't have Update/Remove event. It needs to have Add event // before Update event, in which case the state would change from Assumed to Added. case ok && !cache.assumedPods[key]: + if currState.pod.Spec.NodeName != newPod.Spec.NodeName { + glog.Errorf("Pod %v updated on a different node than previously added to.", key) + glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") + } if err := cache.updatePod(oldPod, newPod); err != nil { return err } default: - return fmt.Errorf("pod state wasn't added but get updated. Pod key: %v", key) - } - return nil -} - -func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { - if err := cache.removePod(oldPod); err != nil { - return err - } - cache.addPod(newPod) - return nil -} - -func (cache *schedulerCache) addPod(pod *v1.Pod) { - n, ok := cache.nodes[pod.Spec.NodeName] - if !ok { - n = NewNodeInfo() - cache.nodes[pod.Spec.NodeName] = n - } - n.addPod(pod) -} - -func (cache *schedulerCache) removePod(pod *v1.Pod) error { - n := cache.nodes[pod.Spec.NodeName] - if err := n.removePod(pod); err != nil { - return err - } - if len(n.pods) == 0 && n.node == nil { - delete(cache.nodes, pod.Spec.NodeName) + return fmt.Errorf("pod %v state wasn't added but get updated", key) } return nil } @@ -244,12 +262,16 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { cache.mu.Lock() defer cache.mu.Unlock() - cachedstate, ok := cache.podStates[key] + currState, ok := cache.podStates[key] switch { // An assumed pod won't have Delete/Remove event. It needs to have Add event // before Remove event, in which case the state would change from Assumed to Added. case ok && !cache.assumedPods[key]: - err := cache.removePod(cachedstate.pod) + if currState.pod.Spec.NodeName != pod.Spec.NodeName { + glog.Errorf("Pod %v removed from a different node than previously added to.", key) + glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") + } + err := cache.removePod(currState.pod) if err != nil { return err }