Kubelet: refactor pod manager

This change cleans up the pod manager extensively so that
 * Mirror pods are actually stored in the pod manager.
 * Both (non-mirror) pods and mirror pods are indexed by UID and full name for
   easy lookup and mapping. This is required for the next change to send
   full pod along with the pod status update.

This change also renames mirrorManager as mirrorClient since it is merely a
client to contact the API server and create/delete mirror pods.
This commit is contained in:
Yu-Ju Hong
2015-03-23 12:17:12 -07:00
parent ed68c8e82b
commit 08e4a883b6
9 changed files with 374 additions and 375 deletions

View File

@@ -95,7 +95,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod,
startTime time.Time) error
}
@@ -1480,7 +1480,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
@@ -1528,7 +1529,8 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
_, hasMirrorPod := mirrorPods[podFullName]
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
@@ -1597,7 +1599,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)
kl.podManager.DeleteOrphanedMirrorPods()
return err
}
@@ -1723,7 +1725,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
pods, mirrorPods := kl.GetPods()
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
@@ -1788,8 +1790,8 @@ func (kl *Kubelet) GetHostname() string {
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
// pods.
func (kl *Kubelet) GetPods() []api.Pod {
return kl.podManager.GetPods()
}