From 388689238b7dfe5c8ac55b66c1288a02a42729af Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 26 Feb 2016 18:41:38 -0800 Subject: [PATCH 1/4] pleg: ensure the cache is updated whenever container are removed Even though we don't rely on the cache for garbage collection yet, we should keep it up-to-date. --- pkg/kubelet/pleg/generic.go | 14 +++++++++----- pkg/kubelet/pleg/pleg.go | 9 +++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 2d89157d17d..51e795a9913 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -124,17 +124,17 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer case plegContainerExited: return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} case plegContainerUnknown: - // Don't generate any event if the status is unknown. - return nil + return &PodLifecycleEvent{ID: podID, Type: ContainerChanged, Data: cid} case plegContainerNonExistent: // We report "ContainerDied" when container was stopped OR removed. We // may want to distinguish the two cases in the future. switch oldState { case plegContainerExited: - // We already reported that the container died before. There is no - // need to do it again. - return nil + // We already reported that the container died before. + return &PodLifecycleEvent{ID: podID, Type: ContainerRemoved, Data: cid} default: + // TODO: We may want to generate a ContainerRemoved event as well. + // It's ok now because no one relies on the ContainerRemoved event. return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} } default: @@ -204,6 +204,10 @@ func (g *GenericPLEG) relist() { // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { + // Filter out events that are not reliable and no other components use yet. + if events[i].Type == ContainerChanged || events[i].Type == ContainerRemoved { + continue + } g.eventChannel <- events[i] } } diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 57a80f4481a..c23472ca925 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -23,13 +23,14 @@ import ( type PodLifeCycleEventType string const ( - ContainerStarted PodLifeCycleEventType = "ContainerStarted" - ContainerDied PodLifeCycleEventType = "ContainerDied" - NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted" - NetworkFailed PodLifeCycleEventType = "NetworkFailed" + ContainerStarted PodLifeCycleEventType = "ContainerStarted" + ContainerDied PodLifeCycleEventType = "ContainerDied" // PodSync is used to trigger syncing of a pod when the observed change of // the state of the pod cannot be captured by any single event above. PodSync PodLifeCycleEventType = "PodSync" + // Do not use the events below because they are disabled in GenericPLEG. + ContainerRemoved PodLifeCycleEventType = "ContainerRemoved" + ContainerChanged PodLifeCycleEventType = "ContainerChanged" ) // PodLifecycleEvent is an event that reflects the change of the pod state. From 96eeb812ff13ee12e8450991fe1b9bbcf8b3e179 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Sun, 28 Feb 2016 13:00:29 -0800 Subject: [PATCH 2/4] kubelet: clear current pod records before relist --- pkg/kubelet/pleg/generic.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 51e795a9913..1f091e49723 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -165,9 +165,7 @@ func (g *GenericPLEG) relist() { return } pods := kubecontainer.Pods(podList) - for _, pod := range pods { - g.podRecords.setCurrent(pod) - } + g.podRecords.setCurrent(pods) // Compare the old and the current pods, and generate events. eventsByPodID := map[types.UID][]*PodLifecycleEvent{} @@ -308,12 +306,17 @@ func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod { return r.current } -func (pr podRecords) setCurrent(pod *kubecontainer.Pod) { - if r, ok := pr[pod.ID]; ok { - r.current = pod - return +func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) { + for i := range pr { + pr[i].current = nil + } + for _, pod := range pods { + if r, ok := pr[pod.ID]; ok { + r.current = pod + } else { + pr[pod.ID] = &podRecord{current: pod} + } } - pr[pod.ID] = &podRecord{current: pod} } func (pr podRecords) update(id types.UID) { From 98283d894c60846be62244ab840f1d3ef354cc67 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Mon, 22 Feb 2016 17:44:38 -0800 Subject: [PATCH 3/4] dockertools: fix error exited container list in fake docker client --- pkg/kubelet/dockertools/fake_docker_client.go | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 00d7be003bf..ac2d5a7bbe0 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -297,6 +297,8 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { return err } f.Stopped = append(f.Stopped, id) + // Container status should be Updated before container moved to ExitedContainerList + f.updateContainerStatus(id, statusExitedPrefix) var newList []docker.APIContainers for _, container := range f.ContainerList { if container.ID == id { @@ -323,7 +325,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { container.State.Running = false } f.ContainerMap[id] = container - f.updateContainerStatus(id, statusExitedPrefix) f.normalSleep(200, 50, 50) return nil } @@ -333,11 +334,20 @@ func (f *FakeDockerClient) RemoveContainer(opts docker.RemoveContainerOptions) e defer f.Unlock() f.called = append(f.called, "remove") err := f.popError("remove") - if err == nil { - f.Removed = append(f.Removed, opts.ID) + if err != nil { + return err } - delete(f.ContainerMap, opts.ID) - return err + for i := range f.ExitedContainerList { + if f.ExitedContainerList[i].ID == opts.ID { + delete(f.ContainerMap, opts.ID) + f.ExitedContainerList = append(f.ExitedContainerList[:i], f.ExitedContainerList[i+1:]...) + f.Removed = append(f.Removed, opts.ID) + return nil + } + + } + // To be a good fake, report error if container is not stopped. + return fmt.Errorf("container not stopped") } // Logs is a test-spy implementation of DockerInterface.Logs. From 866c52c8a9a49cba149bfb05c7ad730c4b017632 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 17 Feb 2016 15:56:54 -0800 Subject: [PATCH 4/4] kubelet: fix duplicated status updates at pod cleanup cleanupTerminatedPods is responsible for checking whether a pod has been terminated and force a status update to trigger the pod deletion. However, this function is called in the periodic clenup routine, which runs every 2 seconds. In other words, it forces a status update for each non-running (and not yet deleted in the apiserver) pod. When batch deleting tens of pods, the rate of new updates surpasses what the status manager can handle, causing numerous redundant requests (and the status channel to be full). This change forces a status update only when detecting the DeletionTimestamp is set for a terminated pod. Note that for other non-terminated pods, the pod workers should be responsible for setting the correct status after killling all the containers. --- pkg/kubelet/kubelet.go | 36 ++++++------------------------ pkg/kubelet/status/manager.go | 42 ++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 49 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 345e208d789..7dc241afa71 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1950,31 +1950,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } -// Delete any pods that are no longer running and are marked for deletion. -func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { - var terminating []*api.Pod - for _, pod := range pods { - if pod.DeletionTimestamp != nil { - found := false - for _, runningPod := range runningPods { - if runningPod.ID == pod.UID { - found = true - break - } - } - if found { - glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod)) - continue - } - terminating = append(terminating, pod) - } - } - if !kl.statusManager.TerminatePods(terminating) { - return errors.New("not all pods were successfully terminated") - } - return nil -} - // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -2166,10 +2141,6 @@ func (kl *Kubelet) HandlePodCleanups() error { // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() - if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { - glog.Errorf("Failed to cleanup terminated pods: %v", err) - } - // Clear out any old bandwidth rules if err = kl.cleanupBandwidthLimits(allPods); err != nil { return err @@ -2430,6 +2401,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { if kl.podIsTerminated(pod) { + if pod.DeletionTimestamp != nil { + // If the pod is in a termianted state, there is no pod worker to + // handle the work item. Check if the DeletionTimestamp has been + // set, and force a status update to trigger a pod deletion request + // to the apiserver. + kl.statusManager.TerminatePod(pod) + } return } // Run the sync in an async worker. diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 03396261431..20d2df64b5a 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -83,10 +83,9 @@ type Manager interface { // triggers a status update. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) - // TerminatePods resets the container status for the provided pods to terminated and triggers - // a status update. This function may not enqueue all the provided pods, in which case it will - // return false - TerminatePods(pods []*api.Pod) bool + // TerminatePod resets the container status for the provided pod to terminated and triggers + // a status update. + TerminatePod(pod *api.Pod) // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // the provided podUIDs. @@ -149,7 +148,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { if err != nil { return } - m.updateStatusInternal(pod, status) + m.updateStatusInternal(pod, status, false) } func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -212,31 +211,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai status.Conditions = append(status.Conditions, readyCondition) } - m.updateStatusInternal(pod, status) + m.updateStatusInternal(pod, status, false) } -func (m *manager) TerminatePods(pods []*api.Pod) bool { - allSent := true +func (m *manager) TerminatePod(pod *api.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() - for _, pod := range pods { - for i := range pod.Status.ContainerStatuses { - pod.Status.ContainerStatuses[i].State = api.ContainerState{ - Terminated: &api.ContainerStateTerminated{}, - } - } - if sent := m.updateStatusInternal(pod, pod.Status); !sent { - glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod)) - allSent = false + oldStatus := &pod.Status + if cachedStatus, ok := m.podStatuses[pod.UID]; ok { + oldStatus = &cachedStatus.status + } + status, err := copyStatus(oldStatus) + if err != nil { + return + } + for i := range status.ContainerStatuses { + status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, } } - return allSent + m.updateStatusInternal(pod, pod.Status, true) } // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. Returns whether an update was triggered. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool { +func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool { var oldStatus api.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool normalizeStatus(&status) // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. - if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { + if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. } @@ -289,6 +289,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. + glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v", + format.Pod(pod), status) return false } }