kubelet: refactor SyncPods for better readability

Eventually we would like to replace the all-encompassing SyncPods function with
more well-defined, smaller functions. This would not only help with the
readability and profiling of the code, it'd also set in motion for the plans to
trigger pod worker individually based on the content of the pod updates.

This commit serves as the first step of that, while avoiding breaking all unit
tests by preserving the SyncPods function for the time being.
This commit is contained in:
Yu-Ju Hong
2015-08-11 16:25:17 -07:00
parent 9f009df1cb
commit 56f4605f47

View File

@@ -89,7 +89,6 @@ var (
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occurring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
@@ -1422,42 +1421,56 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
kl.removeOrphanedPodStatuses(allPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
// Send updates to pod workers.
kl.dispatchWork(pods, podSyncTypes, mirrorPods, start)
// Clean up unwanted/orphaned resources.
if err := kl.cleanupPods(allPods, pods); err != nil {
return err
}
return nil
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
// TODO(yujuhong): consider using pod UID as they key in the status manager
// to avoid returning the wrong status.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) {
podFullNames := make(map[string]bool)
for _, pod := range allPods {
for _, pod := range pods {
podFullNames[kubecontainer.GetPodFullName(pod)] = true
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
}
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
var err error
desiredPods := make(map[types.UID]empty)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// dispatchWork dispatches pod updates to workers.
func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) {
// Check for any containers that need starting
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
desiredPods[uid] = empty{}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
}
// cleanupPods performs a series of cleanup work, including terminating pod
// workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error {
desiredPods := make(map[types.UID]empty)
for _, pod := range admittedPods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
@@ -1468,6 +1481,12 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
return nil
}
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {