Replace pod key with KObj(pod) in scheduler cache error msg

Add back pod key in scheduler cache

Add additional changes

Change podInfo to pod
This commit is contained in:
Yuan Chen 2022-08-12 17:01:24 -07:00
parent 773a2aa5a6
commit 974a41e55a

View File

@ -379,7 +379,7 @@ func (cache *cacheImpl) AssumePod(pod *v1.Pod) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok { if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) return fmt.Errorf("pod %v(%v) is in the cache, so can't be assumed", key, klog.KObj(pod))
} }
return cache.addPod(pod, true) return cache.addPod(pod, true)
@ -399,7 +399,7 @@ func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
klog.V(5).InfoS("Finished binding for pod, can be expired", "pod", klog.KObj(pod)) klog.V(5).InfoS("Finished binding for pod, can be expired", "podKey", key, "pod", klog.KObj(pod))
currState, ok := cache.podStates[key] currState, ok := cache.podStates[key]
if ok && cache.assumedPods.Has(key) { if ok && cache.assumedPods.Has(key) {
if cache.ttl == time.Duration(0) { if cache.ttl == time.Duration(0) {
@ -424,14 +424,14 @@ func (cache *cacheImpl) ForgetPod(pod *v1.Pod) error {
currState, ok := cache.podStates[key] currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName { if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName)
} }
// Only assumed pod can be forgotten. // Only assumed pod can be forgotten.
if ok && cache.assumedPods.Has(key) { if ok && cache.assumedPods.Has(key) {
return cache.removePod(pod) return cache.removePod(pod)
} }
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key) return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
} }
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
@ -477,7 +477,8 @@ func (cache *cacheImpl) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName] n, ok := cache.nodes[pod.Spec.NodeName]
if !ok { if !ok {
klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "pod", klog.KObj(pod)) klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))
} else { } else {
if err := n.info.RemovePod(pod); err != nil { if err := n.info.RemovePod(pod); err != nil {
return err return err
@ -508,7 +509,7 @@ func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
case ok && cache.assumedPods.Has(key): case ok && cache.assumedPods.Has(key):
if currState.pod.Spec.NodeName != pod.Spec.NodeName { if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to. // The pod was added to a different node than it was assumed to.
klog.InfoS("Pod was added to a different node than it was assumed", "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
if err = cache.updatePod(currState.pod, pod); err != nil { if err = cache.updatePod(currState.pod, pod); err != nil {
klog.ErrorS(err, "Error occurred while updating pod") klog.ErrorS(err, "Error occurred while updating pod")
} }
@ -523,7 +524,7 @@ func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
klog.ErrorS(err, "Error occurred while adding pod") klog.ErrorS(err, "Error occurred while adding pod")
} }
default: default:
return fmt.Errorf("pod %v was already in added state", key) return fmt.Errorf("pod %v(%v) was already in added state", key, klog.KObj(pod))
} }
return nil return nil
} }
@ -542,13 +543,13 @@ func (cache *cacheImpl) UpdatePod(oldPod, newPod *v1.Pod) error {
// before Update event, in which case the state would change from Assumed to Added. // before Update event, in which case the state would change from Assumed to Added.
if ok && !cache.assumedPods.Has(key) { if ok && !cache.assumedPods.Has(key) {
if currState.pod.Spec.NodeName != newPod.Spec.NodeName { if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "pod", klog.KObj(oldPod)) klog.ErrorS(nil, "Pod updated on a different node than previously added to", "podKey", key, "pod", klog.KObj(oldPod))
klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions") klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
os.Exit(1) os.Exit(1)
} }
return cache.updatePod(oldPod, newPod) return cache.updatePod(oldPod, newPod)
} }
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) return fmt.Errorf("pod %v(%v) is not added to scheduler cache, so cannot be updated", key, klog.KObj(oldPod))
} }
func (cache *cacheImpl) RemovePod(pod *v1.Pod) error { func (cache *cacheImpl) RemovePod(pod *v1.Pod) error {
@ -562,10 +563,10 @@ func (cache *cacheImpl) RemovePod(pod *v1.Pod) error {
currState, ok := cache.podStates[key] currState, ok := cache.podStates[key]
if !ok { if !ok {
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key) return fmt.Errorf("pod %v(%v) is not found in scheduler cache, so cannot be removed from it", key, klog.KObj(pod))
} }
if currState.pod.Spec.NodeName != pod.Spec.NodeName { if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.ErrorS(nil, "Pod was added to a different node than it was assumed", "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) klog.ErrorS(nil, "Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
if pod.Spec.NodeName != "" { if pod.Spec.NodeName != "" {
// An empty NodeName is possible when the scheduler misses a Delete // An empty NodeName is possible when the scheduler misses a Delete
// event and it gets the last known state from the informer cache. // event and it gets the last known state from the informer cache.
@ -601,7 +602,7 @@ func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) {
podState, ok := cache.podStates[key] podState, ok := cache.podStates[key]
if !ok { if !ok {
return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key) return nil, fmt.Errorf("pod %v(%v) does not exist in scheduler cache", key, klog.KObj(pod))
} }
return podState.pod, nil return podState.pod, nil
@ -751,14 +752,13 @@ func (cache *cacheImpl) cleanupAssumedPods(now time.Time) {
os.Exit(1) os.Exit(1)
} }
if !ps.bindingFinished { if !ps.bindingFinished {
klog.V(5).InfoS("Could not expire cache for pod as binding is still in progress", klog.V(5).InfoS("Could not expire cache for pod as binding is still in progress", "podKey", key, "pod", klog.KObj(ps.pod))
"pod", klog.KObj(ps.pod))
continue continue
} }
if cache.ttl != 0 && now.After(*ps.deadline) { if cache.ttl != 0 && now.After(*ps.deadline) {
klog.InfoS("Pod expired", "pod", klog.KObj(ps.pod)) klog.InfoS("Pod expired", "podKey", key, "pod", klog.KObj(ps.pod))
if err := cache.removePod(ps.pod); err != nil { if err := cache.removePod(ps.pod); err != nil {
klog.ErrorS(err, "ExpirePod failed", "pod", klog.KObj(ps.pod)) klog.ErrorS(err, "ExpirePod failed", "podKey", key, "pod", klog.KObj(ps.pod))
} }
} }
} }