diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 00d7be003bf..ac2d5a7bbe0 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -297,6 +297,8 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { return err } f.Stopped = append(f.Stopped, id) + // Container status should be Updated before container moved to ExitedContainerList + f.updateContainerStatus(id, statusExitedPrefix) var newList []docker.APIContainers for _, container := range f.ContainerList { if container.ID == id { @@ -323,7 +325,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { container.State.Running = false } f.ContainerMap[id] = container - f.updateContainerStatus(id, statusExitedPrefix) f.normalSleep(200, 50, 50) return nil } @@ -333,11 +334,20 @@ func (f *FakeDockerClient) RemoveContainer(opts docker.RemoveContainerOptions) e defer f.Unlock() f.called = append(f.called, "remove") err := f.popError("remove") - if err == nil { - f.Removed = append(f.Removed, opts.ID) + if err != nil { + return err } - delete(f.ContainerMap, opts.ID) - return err + for i := range f.ExitedContainerList { + if f.ExitedContainerList[i].ID == opts.ID { + delete(f.ContainerMap, opts.ID) + f.ExitedContainerList = append(f.ExitedContainerList[:i], f.ExitedContainerList[i+1:]...) + f.Removed = append(f.Removed, opts.ID) + return nil + } + + } + // To be a good fake, report error if container is not stopped. + return fmt.Errorf("container not stopped") } // Logs is a test-spy implementation of DockerInterface.Logs. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 345e208d789..7dc241afa71 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1950,31 +1950,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } -// Delete any pods that are no longer running and are marked for deletion. -func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { - var terminating []*api.Pod - for _, pod := range pods { - if pod.DeletionTimestamp != nil { - found := false - for _, runningPod := range runningPods { - if runningPod.ID == pod.UID { - found = true - break - } - } - if found { - glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod)) - continue - } - terminating = append(terminating, pod) - } - } - if !kl.statusManager.TerminatePods(terminating) { - return errors.New("not all pods were successfully terminated") - } - return nil -} - // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -2166,10 +2141,6 @@ func (kl *Kubelet) HandlePodCleanups() error { // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() - if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { - glog.Errorf("Failed to cleanup terminated pods: %v", err) - } - // Clear out any old bandwidth rules if err = kl.cleanupBandwidthLimits(allPods); err != nil { return err @@ -2430,6 +2401,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { if kl.podIsTerminated(pod) { + if pod.DeletionTimestamp != nil { + // If the pod is in a termianted state, there is no pod worker to + // handle the work item. Check if the DeletionTimestamp has been + // set, and force a status update to trigger a pod deletion request + // to the apiserver. + kl.statusManager.TerminatePod(pod) + } return } // Run the sync in an async worker. diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 2d89157d17d..1f091e49723 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -124,17 +124,17 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer case plegContainerExited: return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} case plegContainerUnknown: - // Don't generate any event if the status is unknown. - return nil + return &PodLifecycleEvent{ID: podID, Type: ContainerChanged, Data: cid} case plegContainerNonExistent: // We report "ContainerDied" when container was stopped OR removed. We // may want to distinguish the two cases in the future. switch oldState { case plegContainerExited: - // We already reported that the container died before. There is no - // need to do it again. - return nil + // We already reported that the container died before. + return &PodLifecycleEvent{ID: podID, Type: ContainerRemoved, Data: cid} default: + // TODO: We may want to generate a ContainerRemoved event as well. + // It's ok now because no one relies on the ContainerRemoved event. return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} } default: @@ -165,9 +165,7 @@ func (g *GenericPLEG) relist() { return } pods := kubecontainer.Pods(podList) - for _, pod := range pods { - g.podRecords.setCurrent(pod) - } + g.podRecords.setCurrent(pods) // Compare the old and the current pods, and generate events. eventsByPodID := map[types.UID][]*PodLifecycleEvent{} @@ -204,6 +202,10 @@ func (g *GenericPLEG) relist() { // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { + // Filter out events that are not reliable and no other components use yet. + if events[i].Type == ContainerChanged || events[i].Type == ContainerRemoved { + continue + } g.eventChannel <- events[i] } } @@ -304,12 +306,17 @@ func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod { return r.current } -func (pr podRecords) setCurrent(pod *kubecontainer.Pod) { - if r, ok := pr[pod.ID]; ok { - r.current = pod - return +func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) { + for i := range pr { + pr[i].current = nil + } + for _, pod := range pods { + if r, ok := pr[pod.ID]; ok { + r.current = pod + } else { + pr[pod.ID] = &podRecord{current: pod} + } } - pr[pod.ID] = &podRecord{current: pod} } func (pr podRecords) update(id types.UID) { diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 57a80f4481a..c23472ca925 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -23,13 +23,14 @@ import ( type PodLifeCycleEventType string const ( - ContainerStarted PodLifeCycleEventType = "ContainerStarted" - ContainerDied PodLifeCycleEventType = "ContainerDied" - NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted" - NetworkFailed PodLifeCycleEventType = "NetworkFailed" + ContainerStarted PodLifeCycleEventType = "ContainerStarted" + ContainerDied PodLifeCycleEventType = "ContainerDied" // PodSync is used to trigger syncing of a pod when the observed change of // the state of the pod cannot be captured by any single event above. PodSync PodLifeCycleEventType = "PodSync" + // Do not use the events below because they are disabled in GenericPLEG. + ContainerRemoved PodLifeCycleEventType = "ContainerRemoved" + ContainerChanged PodLifeCycleEventType = "ContainerChanged" ) // PodLifecycleEvent is an event that reflects the change of the pod state. diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 03396261431..20d2df64b5a 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -83,10 +83,9 @@ type Manager interface { // triggers a status update. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) - // TerminatePods resets the container status for the provided pods to terminated and triggers - // a status update. This function may not enqueue all the provided pods, in which case it will - // return false - TerminatePods(pods []*api.Pod) bool + // TerminatePod resets the container status for the provided pod to terminated and triggers + // a status update. + TerminatePod(pod *api.Pod) // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // the provided podUIDs. @@ -149,7 +148,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { if err != nil { return } - m.updateStatusInternal(pod, status) + m.updateStatusInternal(pod, status, false) } func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -212,31 +211,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai status.Conditions = append(status.Conditions, readyCondition) } - m.updateStatusInternal(pod, status) + m.updateStatusInternal(pod, status, false) } -func (m *manager) TerminatePods(pods []*api.Pod) bool { - allSent := true +func (m *manager) TerminatePod(pod *api.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() - for _, pod := range pods { - for i := range pod.Status.ContainerStatuses { - pod.Status.ContainerStatuses[i].State = api.ContainerState{ - Terminated: &api.ContainerStateTerminated{}, - } - } - if sent := m.updateStatusInternal(pod, pod.Status); !sent { - glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod)) - allSent = false + oldStatus := &pod.Status + if cachedStatus, ok := m.podStatuses[pod.UID]; ok { + oldStatus = &cachedStatus.status + } + status, err := copyStatus(oldStatus) + if err != nil { + return + } + for i := range status.ContainerStatuses { + status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, } } - return allSent + m.updateStatusInternal(pod, pod.Status, true) } // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. Returns whether an update was triggered. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool { +func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool { var oldStatus api.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool normalizeStatus(&status) // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. - if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { + if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. } @@ -289,6 +289,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. + glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v", + format.Pod(pod), status) return false } }