kubelet: fix duplicated status updates at pod cleanup

cleanupTerminatedPods is responsible for checking whether a pod has been
terminated and force a status update to trigger the pod deletion. However, this
function is called in the periodic clenup routine, which runs every 2 seconds.
In other words, it forces a status update for each non-running (and not yet
deleted in the apiserver) pod. When batch deleting tens of pods, the rate of
new updates surpasses what the status manager can handle, causing numerous
redundant requests (and the status channel to be full).

This change forces a status update only when detecting the DeletionTimestamp is
set for a terminated pod. Note that for other non-terminated pods, the pod
workers should be responsible for setting the correct status after killling all
the containers.
This commit is contained in:
Yu-Ju Hong 2016-02-17 15:56:54 -08:00
parent 80ff0cbbda
commit 386453a09d
2 changed files with 40 additions and 53 deletions

View File

@ -1894,31 +1894,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil return nil
} }
// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than // pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds. // ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
@ -2110,10 +2085,6 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Remove any orphaned mirror pods. // Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods() kl.podManager.DeleteOrphanedMirrorPods()
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
// Clear out any old bandwidth rules // Clear out any old bandwidth rules
if err = kl.cleanupBandwidthLimits(allPods); err != nil { if err = kl.cleanupBandwidthLimits(allPods); err != nil {
return err return err
@ -2374,6 +2345,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) { if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a termianted state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return return
} }
// Run the sync in an async worker. // Run the sync in an async worker.

View File

@ -83,10 +83,9 @@ type Manager interface {
// triggers a status update. // triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// TerminatePods resets the container status for the provided pods to terminated and triggers // TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will // a status update.
// return false TerminatePod(pod *api.Pod)
TerminatePods(pods []*api.Pod) bool
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs. // the provided podUIDs.
@ -147,7 +146,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
m.updateStatusInternal(pod, status) m.updateStatusInternal(pod, status, false)
} }
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
@ -187,13 +186,10 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
return return
} }
// Make sure we're not updating the cached version. status, err := copyStatus(&oldStatus.status)
clone, err := api.Scheme.DeepCopy(&oldStatus.status)
if err != nil { if err != nil {
glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err)
return return
} }
status := *clone.(*api.PodStatus)
status.ContainerStatuses[containerIndex].Ready = ready status.ContainerStatuses[containerIndex].Ready = ready
// Update pod condition. // Update pod condition.
@ -212,31 +208,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
status.Conditions = append(status.Conditions, readyCondition) status.Conditions = append(status.Conditions, readyCondition)
} }
m.updateStatusInternal(pod, status) m.updateStatusInternal(pod, status, false)
} }
func (m *manager) TerminatePods(pods []*api.Pod) bool { func (m *manager) TerminatePod(pod *api.Pod) {
allSent := true
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
for _, pod := range pods { oldStatus := &pod.Status
for i := range pod.Status.ContainerStatuses { if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
pod.Status.ContainerStatuses[i].State = api.ContainerState{ oldStatus = &cachedStatus.status
Terminated: &api.ContainerStateTerminated{}, }
} status, err := copyStatus(oldStatus)
} if err != nil {
if sent := m.updateStatusInternal(pod, pod.Status); !sent { return
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod)) }
allSent = false for i := range status.ContainerStatuses {
status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
} }
} }
return allSent m.updateStatusInternal(pod, pod.Status, true)
} }
// updateStatusInternal updates the internal status cache, and queues an update to the api server if // updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered. // necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function. // This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool { func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
var oldStatus api.PodStatus var oldStatus api.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID] cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached { if isCached {
@ -269,7 +266,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status. return false // No new status.
} }
@ -288,6 +285,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
default: default:
// Let the periodic syncBatch handle the update if the channel is full. // Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock. // We can't block, since we hold the mutex lock.
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false return false
} }
} }
@ -495,3 +494,13 @@ func notRunning(statuses []api.ContainerStatus) bool {
} }
return true return true
} }
func copyStatus(source *api.PodStatus) (api.PodStatus, error) {
clone, err := api.Scheme.DeepCopy(source)
if err != nil {
glog.Errorf("Failed to clone status %+v: %v", source, err)
return api.PodStatus{}, err
}
status := *clone.(*api.PodStatus)
return status, nil
}