From 56f4605f470cce564820b177ef649549b84e1e07 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 11 Aug 2015 16:25:17 -0700 Subject: [PATCH] kubelet: refactor SyncPods for better readability Eventually we would like to replace the all-encompassing SyncPods function with more well-defined, smaller functions. This would not only help with the readability and profiling of the code, it'd also set in motion for the plans to trigger pod worker individually based on the content of the pod updates. This commit serves as the first step of that, while avoiding breaking all unit tests by preserving the SyncPods function for the time being. --- pkg/kubelet/kubelet.go | 59 ++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cb4bbed343a..97374a606be 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -89,7 +89,6 @@ var ( // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occurring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). @@ -1422,42 +1421,56 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() - // Remove obsolete entries in podStatus where the pod is no longer considered bound to this node. + kl.removeOrphanedPodStatuses(allPods) + // Handles pod admission. + pods := kl.admitPods(allPods, podSyncTypes) + glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) + // Send updates to pod workers. + kl.dispatchWork(pods, podSyncTypes, mirrorPods, start) + // Clean up unwanted/orphaned resources. + if err := kl.cleanupPods(allPods, pods); err != nil { + return err + } + return nil +} + +// removeOrphanedPodStatuses removes obsolete entries in podStatus where +// the pod is no longer considered bound to this node. +// TODO(yujuhong): consider using pod UID as they key in the status manager +// to avoid returning the wrong status. +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) { podFullNames := make(map[string]bool) - for _, pod := range allPods { + for _, pod := range pods { podFullNames[kubecontainer.GetPodFullName(pod)] = true } kl.statusManager.RemoveOrphanedStatuses(podFullNames) +} - // Handles pod admission. - pods := kl.admitPods(allPods, podSyncTypes) - - glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) - var err error - desiredPods := make(map[types.UID]empty) - - runningPods, err := kl.runtimeCache.GetPods() - if err != nil { - glog.Errorf("Error listing containers: %#v", err) - return err - } - +// dispatchWork dispatches pod updates to workers. +func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, + mirrorPods map[string]*api.Pod, start time.Time) { // Check for any containers that need starting for _, pod := range pods { podFullName := kubecontainer.GetPodFullName(pod) - uid := pod.UID - desiredPods[uid] = empty{} - // Run the sync in an async manifest worker. kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() { metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) - // Note the number of containers for new pods. if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } } +} + +// cleanupPods performs a series of cleanup work, including terminating pod +// workers, killing unwanted pods, and removing orphaned volumes/pod +// directories. +func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error { + desiredPods := make(map[types.UID]empty) + for _, pod := range admittedPods { + desiredPods[pod.UID] = empty{} + } // Stop the workers for no-longer existing pods. kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) @@ -1468,6 +1481,12 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP return nil } + runningPods, err := kl.runtimeCache.GetPods() + if err != nil { + glog.Errorf("Error listing containers: %#v", err) + return err + } + // Kill containers associated with unwanted pods. err = kl.killUnwantedPods(desiredPods, runningPods) if err != nil {