diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index ac2d5a7bbe0..00d7be003bf 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -297,8 +297,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { return err } f.Stopped = append(f.Stopped, id) - // Container status should be Updated before container moved to ExitedContainerList - f.updateContainerStatus(id, statusExitedPrefix) var newList []docker.APIContainers for _, container := range f.ContainerList { if container.ID == id { @@ -325,6 +323,7 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { container.State.Running = false } f.ContainerMap[id] = container + f.updateContainerStatus(id, statusExitedPrefix) f.normalSleep(200, 50, 50) return nil } @@ -334,20 +333,11 @@ func (f *FakeDockerClient) RemoveContainer(opts docker.RemoveContainerOptions) e defer f.Unlock() f.called = append(f.called, "remove") err := f.popError("remove") - if err != nil { - return err + if err == nil { + f.Removed = append(f.Removed, opts.ID) } - for i := range f.ExitedContainerList { - if f.ExitedContainerList[i].ID == opts.ID { - delete(f.ContainerMap, opts.ID) - f.ExitedContainerList = append(f.ExitedContainerList[:i], f.ExitedContainerList[i+1:]...) - f.Removed = append(f.Removed, opts.ID) - return nil - } - - } - // To be a good fake, report error if container is not stopped. - return fmt.Errorf("container not stopped") + delete(f.ContainerMap, opts.ID) + return err } // Logs is a test-spy implementation of DockerInterface.Logs. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 34eaefb0cd7..ed7aca8791b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1945,6 +1945,31 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } +// Delete any pods that are no longer running and are marked for deletion. +func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { + var terminating []*api.Pod + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + found := false + for _, runningPod := range runningPods { + if runningPod.ID == pod.UID { + found = true + break + } + } + if found { + glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod)) + continue + } + terminating = append(terminating, pod) + } + } + if !kl.statusManager.TerminatePods(terminating) { + return errors.New("not all pods were successfully terminated") + } + return nil +} + // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -2136,6 +2161,10 @@ func (kl *Kubelet) HandlePodCleanups() error { // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() + if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { + glog.Errorf("Failed to cleanup terminated pods: %v", err) + } + // Clear out any old bandwidth rules if err = kl.cleanupBandwidthLimits(allPods); err != nil { return err @@ -2396,13 +2425,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { if kl.podIsTerminated(pod) { - if pod.DeletionTimestamp != nil { - // If the pod is in a termianted state, there is no pod worker to - // handle the work item. Check if the DeletionTimestamp has been - // set, and force a status update to trigger a pod deletion request - // to the apiserver. - kl.statusManager.TerminatePod(pod) - } return } // Run the sync in an async worker. diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 20d2df64b5a..03396261431 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -83,9 +83,10 @@ type Manager interface { // triggers a status update. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) - // TerminatePod resets the container status for the provided pod to terminated and triggers - // a status update. - TerminatePod(pod *api.Pod) + // TerminatePods resets the container status for the provided pods to terminated and triggers + // a status update. This function may not enqueue all the provided pods, in which case it will + // return false + TerminatePods(pods []*api.Pod) bool // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // the provided podUIDs. @@ -148,7 +149,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { if err != nil { return } - m.updateStatusInternal(pod, status, false) + m.updateStatusInternal(pod, status) } func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -211,32 +212,31 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai status.Conditions = append(status.Conditions, readyCondition) } - m.updateStatusInternal(pod, status, false) + m.updateStatusInternal(pod, status) } -func (m *manager) TerminatePod(pod *api.Pod) { +func (m *manager) TerminatePods(pods []*api.Pod) bool { + allSent := true m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() - oldStatus := &pod.Status - if cachedStatus, ok := m.podStatuses[pod.UID]; ok { - oldStatus = &cachedStatus.status - } - status, err := copyStatus(oldStatus) - if err != nil { - return - } - for i := range status.ContainerStatuses { - status.ContainerStatuses[i].State = api.ContainerState{ - Terminated: &api.ContainerStateTerminated{}, + for _, pod := range pods { + for i := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, + } + } + if sent := m.updateStatusInternal(pod, pod.Status); !sent { + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod)) + allSent = false } } - m.updateStatusInternal(pod, pod.Status, true) + return allSent } // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. Returns whether an update was triggered. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool { +func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool { var oldStatus api.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force normalizeStatus(&status) // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. - if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { + if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. } @@ -289,8 +289,6 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. - glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v", - format.Pod(pod), status) return false } }