From 0c84b837cf6bf2b0d9afc116b6b4e51f8a847a5a Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 18 Aug 2015 13:26:56 -0700 Subject: [PATCH] kubelet: switch to using pod UID as the key in status manager We chose to use podFullName (name_namespace) as key in the status manager because mirror pod and static pod share the same status. This is no longer needed because we do not store statuses for static pods anymore (we only store statuses for their mirror pods). Also, reviously, a few fixes were merged to ensure statuses are cleaned up so that a new pod with the same name would not resuse an old status. This change cleans up the code by using UID as key so that the code would become less brittle. --- pkg/kubelet/kubelet.go | 42 ++++++++++++++++-------------- pkg/kubelet/kubelet_test.go | 42 ++++++++++++++---------------- pkg/kubelet/status_manager.go | 28 +++++++++----------- pkg/kubelet/status_manager_test.go | 7 +++-- 4 files changed, 59 insertions(+), 60 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3f9d82779c9..4a92dd3258b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1167,7 +1167,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if mirrorPod != nil { podToUpdate = mirrorPod } - existingStatus, ok := kl.statusManager.GetPodStatus(podFullName) + existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID) if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) @@ -1273,10 +1273,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if err := kl.podManager.CreateMirrorPod(pod); err != nil { glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err) } - // Pod status update is edge-triggered. If there is any update of the - // mirror pod, we need to delete the existing status associated with - // the static pod to trigger an update. - kl.statusManager.DeletePodStatus(podFullName) } } return nil @@ -1398,7 +1394,7 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { now := util.Now() if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + podStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { podStatus = pod.Status } @@ -1428,7 +1424,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { for _, pod := range allPods { var status api.PodStatus // Check the cached pod status which was set after the last sync. - status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + status, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { // If there is no cached status, use the status from the // apiserver. This is useful if kubelet has recently been @@ -1450,7 +1446,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() - kl.removeOrphanedPodStatuses(allPods) + kl.removeOrphanedPodStatuses(allPods, mirrorPods) // Handles pod admission. pods := kl.admitPods(allPods, podSyncTypes) glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) @@ -1465,14 +1461,15 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. -// TODO(yujuhong): consider using pod UID as they key in the status manager -// to avoid returning the wrong status. -func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) { - podFullNames := make(map[string]bool) +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) { + podUIDs := make(map[types.UID]bool) for _, pod := range pods { - podFullNames[kubecontainer.GetPodFullName(pod)] = true + podUIDs[pod.UID] = true } - kl.statusManager.RemoveOrphanedStatuses(podFullNames) + for _, pod := range mirrorPods { + podUIDs[pod.UID] = true + } + kl.statusManager.RemoveOrphanedStatuses(podUIDs) } // dispatchWork dispatches pod updates to workers. @@ -1900,10 +1897,21 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // Pod workers periodically write status to statusManager. If status is not // cached there, something is wrong (or kubelet just restarted and hasn't // caught up yet). Just assume the pod is not ready yet. - podStatus, found := kl.statusManager.GetPodStatus(podFullName) + name, namespace, err := kubecontainer.ParsePodFullName(podFullName) + if err != nil { + return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err) + } + + pod, ok := kl.GetPodByName(namespace, name) + if !ok { + return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName) + } + + podStatus, found := kl.statusManager.GetPodStatus(pod.UID) if !found { return fmt.Errorf("failed to get status for pod %q", podFullName) } + if err := kl.validatePodPhase(&podStatus); err != nil { // No log is available if pod is not in a "known" phase (e.g. Unknown). return err @@ -1914,10 +1922,6 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // waiting state. return err } - pod, ok := kl.GetPodByFullName(podFullName) - if !ok { - return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName) - } return kl.containerRuntime.GetContainerLogs(pod, containerID, tail, follow, stdout, stderr) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4ca5043304d..f18f6e6b21d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2047,13 +2047,13 @@ func TestHandlePortConflicts(t *testing.T) { pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. - conflictedPodName := kubecontainer.GetPodFullName(pods[0]) + conflictedPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(conflictedPodName) + status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", conflictedPodName) + t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2089,13 +2089,13 @@ func TestHandleNodeSelector(t *testing.T) { }, } // The first pod should be rejected. - notfittingPodName := kubecontainer.GetPodFullName(pods[0]) + notfittingPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(notfittingPodName) + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2137,13 +2137,13 @@ func TestHandleMemExceeded(t *testing.T) { pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. - notfittingPodName := kubecontainer.GetPodFullName(pods[0]) + notfittingPod := pods[0] kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(notfittingPodName) + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2159,17 +2159,18 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { kl := testKubelet.kubelet pods := []*api.Pod{ - {ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod1", UID: "1234"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod2", UID: "4567"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, } + podToTest := pods[1] // Run once to populate the status map. kl.handleNotFittingPods(pods) - if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found { + if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found { + if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found { t.Fatalf("expected to not have status cached for pod2") } } @@ -2816,10 +2817,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, ok := kubelet.statusManager.GetPodStatus(podFullName) + status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID) if ok { - t.Errorf("unexpected status %#v found for static pod %q", status, podFullName) + t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID) } } @@ -3148,10 +3148,9 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, found := kubelet.statusManager.GetPodStatus(podFullName) + status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { - t.Errorf("expected to found status for pod %q", podFullName) + t.Errorf("expected to found status for pod %q", pods[0].UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) @@ -3203,10 +3202,9 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - podFullName := kubecontainer.GetPodFullName(pods[0]) - status, found := kubelet.statusManager.GetPodStatus(podFullName) + status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { - t.Errorf("expected to found status for pod %q", podFullName) + t.Errorf("expected to found status for pod %q", pods[0].UID) } if status.Phase == api.PodFailed { t.Fatalf("expected pod status to not be %q", status.Phase) diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 5a6e53e7723..2dedad2056a 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -26,9 +26,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -43,14 +43,14 @@ type statusManager struct { kubeClient client.Interface // Map from pod full name to sync status of the corresponding pod. podStatusesLock sync.RWMutex - podStatuses map[string]api.PodStatus + podStatuses map[types.UID]api.PodStatus podStatusChannel chan podStatusSyncRequest } func newStatusManager(kubeClient client.Interface) *statusManager { return &statusManager{ kubeClient: kubeClient, - podStatuses: make(map[string]api.PodStatus), + podStatuses: make(map[types.UID]api.PodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses } } @@ -83,18 +83,17 @@ func (s *statusManager) Start() { }, 0) } -func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { +func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { s.podStatusesLock.RLock() defer s.podStatusesLock.RUnlock() - status, ok := s.podStatuses[podFullName] + status, ok := s.podStatuses[uid] return status, ok } func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { - podFullName := kubecontainer.GetPodFullName(pod) s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() - oldStatus, found := s.podStatuses[podFullName] + oldStatus, found := s.podStatuses[pod.UID] // ensure that the start time does not change across updates. if found && oldStatus.StartTime != nil { @@ -124,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // workers and/or the kubelet but dropping the lock before sending the // status down the channel feels like an easy way to get a bullet in foot. if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { - s.podStatuses[podFullName] = status + s.podStatuses[pod.UID] = status s.podStatusChannel <- podStatusSyncRequest{pod, status} } else { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status) @@ -154,18 +153,18 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { return sent } -func (s *statusManager) DeletePodStatus(podFullName string) { +func (s *statusManager) DeletePodStatus(uid types.UID) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() - delete(s.podStatuses, podFullName) + delete(s.podStatuses, uid) } // TODO(filipg): It'd be cleaner if we can do this without signal from user. -func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { +func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() for key := range s.podStatuses { - if _, ok := podFullNames[key]; !ok { + if _, ok := podUIDs[key]; !ok { glog.V(5).Infof("Removing %q from status map.", key) delete(s.podStatuses, key) } @@ -176,7 +175,6 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { func (s *statusManager) syncBatch() error { syncRequest := <-s.podStatusChannel pod := syncRequest.pod - podFullName := kubecontainer.GetPodFullName(pod) status := syncRequest.status var err error @@ -209,7 +207,7 @@ func (s *statusManager) syncBatch() error { } if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) - s.DeletePodStatus(podFullName) + s.DeletePodStatus(pod.UID) return nil } } @@ -222,7 +220,7 @@ func (s *statusManager) syncBatch() error { // is full, and the pod worker holding the lock is waiting on this method // to clear the channel. Even if this delete never runs subsequent container // changes on the node should trigger updates. - go s.DeletePodStatus(podFullName) + go s.DeletePodStatus(pod.UID) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 84da4f3d376..e04f24b80f0 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -26,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/util" ) @@ -94,7 +93,7 @@ func TestNewStatus(t *testing.T) { syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) - status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod)) + status, _ := syncer.GetPodStatus(testPod.UID) if status.StartTime.IsZero() { t.Errorf("SetPodStatus did not set a proper start time value") } @@ -115,7 +114,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) { pod.Status.StartTime = &startTime syncer.SetPodStatus(pod, getRandomPodStatus()) - status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(pod)) + status, _ := syncer.GetPodStatus(pod.UID) if !status.StartTime.Time.Equal(startTime.Time) { t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime) } @@ -136,7 +135,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { syncer.SetPodStatus(testPod, firstStatus) syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 2) - finalStatus, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod)) + finalStatus, _ := syncer.GetPodStatus(testPod.UID) if finalStatus.StartTime.IsZero() { t.Errorf("StartTime should not be zero") }