Merge pull request #12560 from yujuhong/sync_pods

kubelet: refactor SyncPods for better readability
This commit is contained in:
CJ Cullen 2015-08-11 18:09:36 -07:00
commit 49f483400c

View File

@ -89,7 +89,6 @@ var (
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occurring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
@ -1422,42 +1421,56 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
kl.removeOrphanedPodStatuses(allPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
// Send updates to pod workers.
kl.dispatchWork(pods, podSyncTypes, mirrorPods, start)
// Clean up unwanted/orphaned resources.
if err := kl.cleanupPods(allPods, pods); err != nil {
return err
}
return nil
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
// TODO(yujuhong): consider using pod UID as they key in the status manager
// to avoid returning the wrong status.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) {
podFullNames := make(map[string]bool)
for _, pod := range allPods {
for _, pod := range pods {
podFullNames[kubecontainer.GetPodFullName(pod)] = true
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
}
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
var err error
desiredPods := make(map[types.UID]empty)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// dispatchWork dispatches pod updates to workers.
func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) {
// Check for any containers that need starting
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
desiredPods[uid] = empty{}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
}
// cleanupPods performs a series of cleanup work, including terminating pod
// workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error {
desiredPods := make(map[types.UID]empty)
for _, pod := range admittedPods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
@ -1468,6 +1481,12 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
return nil
}
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {