Kubelet: pass the acutal pod for status update

Pod status update should include the ObjectMeta of the pod. This change is
required for #5738 to merge.
This commit is contained in:
Yu-Ju Hong
2015-03-24 16:52:38 -07:00
parent 754cbea1f0
commit b4b0bc75c4
8 changed files with 120 additions and 70 deletions

View File

@@ -108,7 +108,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]api.Pod,
startTime time.Time) error
}
@@ -1219,7 +1219,7 @@ type podContainerChangesSpec struct {
containersToKeep map[dockertools.DockerID]int
}
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) (podContainerChangesSpec, error) {
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod) (podContainerChangesSpec, error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
@@ -1329,21 +1329,30 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, r
}, nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) error {
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
// Before returning, regenerate status and store it in the cache.
defer func() {
if isStaticPod(pod) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not
// exist yet.
return
}
status, err := kl.generatePodStatusByPod(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
kl.statusManager.SetPodStatus(podFullName, status)
podToUpdate := pod
if mirrorPod != nil {
podToUpdate = mirrorPod
}
kl.statusManager.SetPodStatus(podToUpdate, status)
}
}()
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, runningPod)
containerChanges, err := kl.computePodContainerChanges(pod, runningPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil {
return err
@@ -1421,7 +1430,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubeconta
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
}
if !hasMirrorPod && isStaticPod(pod) {
if mirrorPod == nil && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
@@ -1503,7 +1512,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error {
mirrorPods map[string]api.Pod, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
@@ -1544,8 +1553,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
desiredPods[uid] = empty{}
// Run the sync in an async manifest worker.
_, hasMirrorPod := mirrorPods[podFullName]
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
var mirrorPod *api.Pod = nil
if m, ok := mirrorPods[podFullName]; ok {
mirrorPod = &m
}
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
@@ -1687,21 +1699,21 @@ func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
}
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to node selector mismatch"})
}
fitting, notFitting = kl.checkCapacityExceeded(fitting)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to exceeded capacity"})
}