diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cb4bbed343a..97374a606be 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -89,7 +89,6 @@ var ( // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occurring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). @@ -1422,42 +1421,56 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() - // Remove obsolete entries in podStatus where the pod is no longer considered bound to this node. + kl.removeOrphanedPodStatuses(allPods) + // Handles pod admission. + pods := kl.admitPods(allPods, podSyncTypes) + glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) + // Send updates to pod workers. + kl.dispatchWork(pods, podSyncTypes, mirrorPods, start) + // Clean up unwanted/orphaned resources. + if err := kl.cleanupPods(allPods, pods); err != nil { + return err + } + return nil +} + +// removeOrphanedPodStatuses removes obsolete entries in podStatus where +// the pod is no longer considered bound to this node. +// TODO(yujuhong): consider using pod UID as they key in the status manager +// to avoid returning the wrong status. +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) { podFullNames := make(map[string]bool) - for _, pod := range allPods { + for _, pod := range pods { podFullNames[kubecontainer.GetPodFullName(pod)] = true } kl.statusManager.RemoveOrphanedStatuses(podFullNames) +} - // Handles pod admission. - pods := kl.admitPods(allPods, podSyncTypes) - - glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) - var err error - desiredPods := make(map[types.UID]empty) - - runningPods, err := kl.runtimeCache.GetPods() - if err != nil { - glog.Errorf("Error listing containers: %#v", err) - return err - } - +// dispatchWork dispatches pod updates to workers. +func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, + mirrorPods map[string]*api.Pod, start time.Time) { // Check for any containers that need starting for _, pod := range pods { podFullName := kubecontainer.GetPodFullName(pod) - uid := pod.UID - desiredPods[uid] = empty{} - // Run the sync in an async manifest worker. kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() { metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) - // Note the number of containers for new pods. if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } } +} + +// cleanupPods performs a series of cleanup work, including terminating pod +// workers, killing unwanted pods, and removing orphaned volumes/pod +// directories. +func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error { + desiredPods := make(map[types.UID]empty) + for _, pod := range admittedPods { + desiredPods[pod.UID] = empty{} + } // Stop the workers for no-longer existing pods. kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) @@ -1468,6 +1481,12 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP return nil } + runningPods, err := kl.runtimeCache.GetPods() + if err != nil { + glog.Errorf("Error listing containers: %#v", err) + return err + } + // Kill containers associated with unwanted pods. err = kl.killUnwantedPods(desiredPods, runningPods) if err != nil {