Revert "kubelet: fix duplicated status updates at pod cleanup"

This commit is contained in:
Wojciech Tyczynski 2016-02-22 14:27:49 +01:00
parent 9b9d63ac5e
commit 93a0eac476
2 changed files with 53 additions and 40 deletions

View File

@ -1894,6 +1894,31 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil return nil
} }
// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than // pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds. // ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
@ -2085,6 +2110,10 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Remove any orphaned mirror pods. // Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods() kl.podManager.DeleteOrphanedMirrorPods()
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
// Clear out any old bandwidth rules // Clear out any old bandwidth rules
if err = kl.cleanupBandwidthLimits(allPods); err != nil { if err = kl.cleanupBandwidthLimits(allPods); err != nil {
return err return err
@ -2345,13 +2374,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) { if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a termianted state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return return
} }
// Run the sync in an async worker. // Run the sync in an async worker.

View File

@ -83,9 +83,10 @@ type Manager interface {
// triggers a status update. // triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// TerminatePod resets the container status for the provided pod to terminated and triggers // TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. // a status update. This function may not enqueue all the provided pods, in which case it will
TerminatePod(pod *api.Pod) // return false
TerminatePods(pods []*api.Pod) bool
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs. // the provided podUIDs.
@ -146,7 +147,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
m.updateStatusInternal(pod, status, false) m.updateStatusInternal(pod, status)
} }
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
@ -186,10 +187,13 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
return return
} }
status, err := copyStatus(&oldStatus.status) // Make sure we're not updating the cached version.
clone, err := api.Scheme.DeepCopy(&oldStatus.status)
if err != nil { if err != nil {
glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err)
return return
} }
status := *clone.(*api.PodStatus)
status.ContainerStatuses[containerIndex].Ready = ready status.ContainerStatuses[containerIndex].Ready = ready
// Update pod condition. // Update pod condition.
@ -208,32 +212,31 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
status.Conditions = append(status.Conditions, readyCondition) status.Conditions = append(status.Conditions, readyCondition)
} }
m.updateStatusInternal(pod, status, false) m.updateStatusInternal(pod, status)
} }
func (m *manager) TerminatePod(pod *api.Pod) { func (m *manager) TerminatePods(pods []*api.Pod) bool {
allSent := true
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
oldStatus := &pod.Status for _, pod := range pods {
if cachedStatus, ok := m.podStatuses[pod.UID]; ok { for i := range pod.Status.ContainerStatuses {
oldStatus = &cachedStatus.status pod.Status.ContainerStatuses[i].State = api.ContainerState{
} Terminated: &api.ContainerStateTerminated{},
status, err := copyStatus(oldStatus) }
if err != nil { }
return if sent := m.updateStatusInternal(pod, pod.Status); !sent {
} glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
for i := range status.ContainerStatuses { allSent = false
status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
} }
} }
m.updateStatusInternal(pod, pod.Status, true) return allSent
} }
// updateStatusInternal updates the internal status cache, and queues an update to the api server if // updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered. // necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function. // This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool { func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
var oldStatus api.PodStatus var oldStatus api.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID] cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached { if isCached {
@ -266,7 +269,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status. return false // No new status.
} }
@ -285,8 +288,6 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force
default: default:
// Let the periodic syncBatch handle the update if the channel is full. // Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock. // We can't block, since we hold the mutex lock.
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false return false
} }
} }
@ -494,13 +495,3 @@ func notRunning(statuses []api.ContainerStatus) bool {
} }
return true return true
} }
func copyStatus(source *api.PodStatus) (api.PodStatus, error) {
clone, err := api.Scheme.DeepCopy(source)
if err != nil {
glog.Errorf("Failed to clone status %+v: %v", source, err)
return api.PodStatus{}, err
}
status := *clone.(*api.PodStatus)
return status, nil
}