diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c8c278157e7..5651d63b2b1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1171,7 +1171,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if mirrorPod != nil { podToUpdate = mirrorPod } - existingStatus, ok := kl.statusManager.GetPodStatus(podFullName) + existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID) if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) @@ -1277,10 +1277,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if err := kl.podManager.CreateMirrorPod(pod); err != nil { glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err) } - // Pod status update is edge-triggered. If there is any update of the - // mirror pod, we need to delete the existing status associated with - // the static pod to trigger an update. - kl.statusManager.DeletePodStatus(podFullName) } } return nil @@ -1402,7 +1398,7 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { now := util.Now() if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + podStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { podStatus = pod.Status } @@ -1432,7 +1428,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { for _, pod := range allPods { var status api.PodStatus // Check the cached pod status which was set after the last sync. - status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + status, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { // If there is no cached status, use the status from the // apiserver. This is useful if kubelet has recently been @@ -1454,7 +1450,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() - kl.removeOrphanedPodStatuses(allPods) + kl.removeOrphanedPodStatuses(allPods, mirrorPods) // Handles pod admission. pods := kl.admitPods(allPods, podSyncTypes) glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) @@ -1469,14 +1465,15 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. -// TODO(yujuhong): consider using pod UID as they key in the status manager -// to avoid returning the wrong status. -func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) { - podFullNames := make(map[string]bool) +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) { + podUIDs := make(map[types.UID]bool) for _, pod := range pods { - podFullNames[kubecontainer.GetPodFullName(pod)] = true + podUIDs[pod.UID] = true } - kl.statusManager.RemoveOrphanedStatuses(podFullNames) + for _, pod := range mirrorPods { + podUIDs[pod.UID] = true + } + kl.statusManager.RemoveOrphanedStatuses(podUIDs) } // dispatchWork dispatches pod updates to workers. @@ -1904,10 +1901,21 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // Pod workers periodically write status to statusManager. If status is not // cached there, something is wrong (or kubelet just restarted and hasn't // caught up yet). Just assume the pod is not ready yet. - podStatus, found := kl.statusManager.GetPodStatus(podFullName) + name, namespace, err := kubecontainer.ParsePodFullName(podFullName) + if err != nil { + return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err) + } + + pod, ok := kl.GetPodByName(namespace, name) + if !ok { + return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName) + } + + podStatus, found := kl.statusManager.GetPodStatus(pod.UID) if !found { return fmt.Errorf("failed to get status for pod %q", podFullName) } + if err := kl.validatePodPhase(&podStatus); err != nil { // No log is available if pod is not in a "known" phase (e.g. Unknown). return err @@ -1918,10 +1926,6 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // waiting state. return err } - pod, ok := kl.GetPodByFullName(podFullName) - if !ok { - return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName) - } return kl.containerRuntime.GetContainerLogs(pod, containerID, tail, follow, stdout, stderr) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4ca5043304d..f18f6e6b21d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2047,13 +2047,13 @@ func TestHandlePortConflicts(t *testing.T) { pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. - conflictedPodName := kubecontainer.GetPodFullName(pods[0]) + conflictedPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(conflictedPodName) + status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", conflictedPodName) + t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2089,13 +2089,13 @@ func TestHandleNodeSelector(t *testing.T) { }, } // The first pod should be rejected. - notfittingPodName := kubecontainer.GetPodFullName(pods[0]) + notfittingPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(notfittingPodName) + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2137,13 +2137,13 @@ func TestHandleMemExceeded(t *testing.T) { pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. - notfittingPodName := kubecontainer.GetPodFullName(pods[0]) + notfittingPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(notfittingPodName) + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2159,17 +2159,18 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { kl := testKubelet.kubelet pods := []*api.Pod{ - {ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod1", UID: "1234"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod2", UID: "4567"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, } + podToTest := pods[1] // Run once to populate the status map. kl.handleNotFittingPods(pods) - if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found { + if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found { + if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found { t.Fatalf("expected to not have status cached for pod2") } } @@ -2816,10 +2817,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, ok := kubelet.statusManager.GetPodStatus(podFullName) + status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID) if ok { - t.Errorf("unexpected status %#v found for static pod %q", status, podFullName) + t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID) } } @@ -3148,10 +3148,9 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, found := kubelet.statusManager.GetPodStatus(podFullName) + status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { - t.Errorf("expected to found status for pod %q", podFullName) + t.Errorf("expected to found status for pod %q", pods[0].UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) @@ -3203,10 +3202,9 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, found := kubelet.statusManager.GetPodStatus(podFullName) + status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { - t.Errorf("expected to found status for pod %q", podFullName) + t.Errorf("expected to found status for pod %q", pods[0].UID) } if status.Phase == api.PodFailed { t.Fatalf("expected pod status to not be %q", status.Phase) diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 5a6e53e7723..2dedad2056a 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -26,9 +26,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -43,14 +43,14 @@ type statusManager struct { kubeClient client.Interface // Map from pod full name to sync status of the corresponding pod. podStatusesLock sync.RWMutex - podStatuses map[string]api.PodStatus + podStatuses map[types.UID]api.PodStatus podStatusChannel chan podStatusSyncRequest } func newStatusManager(kubeClient client.Interface) *statusManager { return &statusManager{ kubeClient: kubeClient, - podStatuses: make(map[string]api.PodStatus), + podStatuses: make(map[types.UID]api.PodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses } } @@ -83,18 +83,17 @@ func (s *statusManager) Start() { }, 0) } -func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { +func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { s.podStatusesLock.RLock() defer s.podStatusesLock.RUnlock() - status, ok := s.podStatuses[podFullName] + status, ok := s.podStatuses[uid] return status, ok } func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { - podFullName := kubecontainer.GetPodFullName(pod) s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() - oldStatus, found := s.podStatuses[podFullName] + oldStatus, found := s.podStatuses[pod.UID] // ensure that the start time does not change across updates. if found && oldStatus.StartTime != nil { @@ -124,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // workers and/or the kubelet but dropping the lock before sending the // status down the channel feels like an easy way to get a bullet in foot. if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { - s.podStatuses[podFullName] = status + s.podStatuses[pod.UID] = status s.podStatusChannel <- podStatusSyncRequest{pod, status} } else { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status) @@ -154,18 +153,18 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { return sent } -func (s *statusManager) DeletePodStatus(podFullName string) { +func (s *statusManager) DeletePodStatus(uid types.UID) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() - delete(s.podStatuses, podFullName) + delete(s.podStatuses, uid) } // TODO(filipg): It'd be cleaner if we can do this without signal from user. -func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { +func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() for key := range s.podStatuses { - if _, ok := podFullNames[key]; !ok { + if _, ok := podUIDs[key]; !ok { glog.V(5).Infof("Removing %q from status map.", key) delete(s.podStatuses, key) } @@ -176,7 +175,6 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { func (s *statusManager) syncBatch() error { syncRequest := <-s.podStatusChannel pod := syncRequest.pod - podFullName := kubecontainer.GetPodFullName(pod) status := syncRequest.status var err error @@ -209,7 +207,7 @@ func (s *statusManager) syncBatch() error { } if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) - s.DeletePodStatus(podFullName) + s.DeletePodStatus(pod.UID) return nil } } @@ -222,7 +220,7 @@ func (s *statusManager) syncBatch() error { // is full, and the pod worker holding the lock is waiting on this method // to clear the channel. Even if this delete never runs subsequent container // changes on the node should trigger updates. - go s.DeletePodStatus(podFullName) + go s.DeletePodStatus(pod.UID) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 84da4f3d376..e04f24b80f0 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -26,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/util" ) @@ -94,7 +93,7 @@ func TestNewStatus(t *testing.T) { syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) - status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod)) + status, _ := syncer.GetPodStatus(testPod.UID) if status.StartTime.IsZero() { t.Errorf("SetPodStatus did not set a proper start time value") } @@ -115,7 +114,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) { pod.Status.StartTime = &startTime syncer.SetPodStatus(pod, getRandomPodStatus()) - status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(pod)) + status, _ := syncer.GetPodStatus(pod.UID) if !status.StartTime.Time.Equal(startTime.Time) { t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime) } @@ -136,7 +135,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { syncer.SetPodStatus(testPod, firstStatus) syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 2) - finalStatus, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod)) + finalStatus, _ := syncer.GetPodStatus(testPod.UID) if finalStatus.StartTime.IsZero() { t.Errorf("StartTime should not be zero") }