mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Merge pull request #37379 from wojtek-t/safe_schedulercache
Automatic merge from submit-queue Try self-repair scheduler cache or panic Fix #37232
This commit is contained in:
commit
a894bde225
@ -110,15 +110,15 @@ func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
|
|||||||
|
|
||||||
// assumePod exists for making test deterministic by taking time as input argument.
|
// assumePod exists for making test deterministic by taking time as input argument.
|
||||||
func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
|
func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
|
||||||
cache.mu.Lock()
|
|
||||||
defer cache.mu.Unlock()
|
|
||||||
|
|
||||||
key, err := getPodKey(pod)
|
key, err := getPodKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cache.mu.Lock()
|
||||||
|
defer cache.mu.Unlock()
|
||||||
if _, ok := cache.podStates[key]; ok {
|
if _, ok := cache.podStates[key]; ok {
|
||||||
return fmt.Errorf("pod state wasn't initial but get assumed. Pod key: %v", key)
|
return fmt.Errorf("pod %v state wasn't initial but get assumed", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.addPod(pod)
|
cache.addPod(pod)
|
||||||
@ -141,7 +141,11 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
_, ok := cache.podStates[key]
|
currState, ok := cache.podStates[key]
|
||||||
|
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||||
|
return fmt.Errorf("pod %v state was assumed on a different node", key)
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
// Only assumed pod can be forgotten.
|
// Only assumed pod can be forgotten.
|
||||||
case ok && cache.assumedPods[key]:
|
case ok && cache.assumedPods[key]:
|
||||||
@ -152,7 +156,38 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
|
|||||||
delete(cache.assumedPods, key)
|
delete(cache.assumedPods, key)
|
||||||
delete(cache.podStates, key)
|
delete(cache.podStates, key)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("pod state wasn't assumed but get forgotten. Pod key: %v", key)
|
return fmt.Errorf("pod %v state wasn't assumed but get forgotten", key)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assumes that lock is already acquired.
|
||||||
|
func (cache *schedulerCache) addPod(pod *v1.Pod) {
|
||||||
|
n, ok := cache.nodes[pod.Spec.NodeName]
|
||||||
|
if !ok {
|
||||||
|
n = NewNodeInfo()
|
||||||
|
cache.nodes[pod.Spec.NodeName] = n
|
||||||
|
}
|
||||||
|
n.addPod(pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assumes that lock is already acquired.
|
||||||
|
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||||
|
if err := cache.removePod(oldPod); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cache.addPod(newPod)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assumes that lock is already acquired.
|
||||||
|
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
||||||
|
n := cache.nodes[pod.Spec.NodeName]
|
||||||
|
if err := n.removePod(pod); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(n.pods) == 0 && n.node == nil {
|
||||||
|
delete(cache.nodes, pod.Spec.NodeName)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -166,9 +201,16 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
_, ok := cache.podStates[key]
|
currState, ok := cache.podStates[key]
|
||||||
switch {
|
switch {
|
||||||
case ok && cache.assumedPods[key]:
|
case ok && cache.assumedPods[key]:
|
||||||
|
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||||
|
// The pod was added to a different node than it was assumed to.
|
||||||
|
glog.Warningf("Pod %v assumed to a different node than added to.", key)
|
||||||
|
// Clean this up.
|
||||||
|
cache.removePod(currState.pod)
|
||||||
|
cache.addPod(pod)
|
||||||
|
}
|
||||||
delete(cache.assumedPods, key)
|
delete(cache.assumedPods, key)
|
||||||
cache.podStates[key].deadline = nil
|
cache.podStates[key].deadline = nil
|
||||||
case !ok:
|
case !ok:
|
||||||
@ -193,44 +235,20 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
_, ok := cache.podStates[key]
|
currState, ok := cache.podStates[key]
|
||||||
switch {
|
switch {
|
||||||
// An assumed pod won't have Update/Remove event. It needs to have Add event
|
// An assumed pod won't have Update/Remove event. It needs to have Add event
|
||||||
// before Update event, in which case the state would change from Assumed to Added.
|
// before Update event, in which case the state would change from Assumed to Added.
|
||||||
case ok && !cache.assumedPods[key]:
|
case ok && !cache.assumedPods[key]:
|
||||||
|
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
|
||||||
|
glog.Errorf("Pod %v updated on a different node than previously added to.", key)
|
||||||
|
glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
|
||||||
|
}
|
||||||
if err := cache.updatePod(oldPod, newPod); err != nil {
|
if err := cache.updatePod(oldPod, newPod); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("pod state wasn't added but get updated. Pod key: %v", key)
|
return fmt.Errorf("pod %v state wasn't added but get updated", key)
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
|
||||||
if err := cache.removePod(oldPod); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
cache.addPod(newPod)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cache *schedulerCache) addPod(pod *v1.Pod) {
|
|
||||||
n, ok := cache.nodes[pod.Spec.NodeName]
|
|
||||||
if !ok {
|
|
||||||
n = NewNodeInfo()
|
|
||||||
cache.nodes[pod.Spec.NodeName] = n
|
|
||||||
}
|
|
||||||
n.addPod(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
|
||||||
n := cache.nodes[pod.Spec.NodeName]
|
|
||||||
if err := n.removePod(pod); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(n.pods) == 0 && n.node == nil {
|
|
||||||
delete(cache.nodes, pod.Spec.NodeName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -244,12 +262,16 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
|
|
||||||
cachedstate, ok := cache.podStates[key]
|
currState, ok := cache.podStates[key]
|
||||||
switch {
|
switch {
|
||||||
// An assumed pod won't have Delete/Remove event. It needs to have Add event
|
// An assumed pod won't have Delete/Remove event. It needs to have Add event
|
||||||
// before Remove event, in which case the state would change from Assumed to Added.
|
// before Remove event, in which case the state would change from Assumed to Added.
|
||||||
case ok && !cache.assumedPods[key]:
|
case ok && !cache.assumedPods[key]:
|
||||||
err := cache.removePod(cachedstate.pod)
|
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
|
||||||
|
glog.Errorf("Pod %v removed from a different node than previously added to.", key)
|
||||||
|
glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
|
||||||
|
}
|
||||||
|
err := cache.removePod(currState.pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user