kubelet: switch to using pod UID as the key in status manager

We chose to use podFullName (name_namespace) as key in the status manager
because mirror pod and static pod share the same status. This is no longer
needed because we do not store statuses for static pods anymore (we only
store statuses for their mirror pods). Also, reviously, a few fixes were
merged to ensure statuses are cleaned up so that a new pod with the same
name would not resuse an old status.

This change cleans up the code by using UID as key so that the code would
become less brittle.
This commit is contained in:
Yu-Ju Hong 2015-08-18 13:26:56 -07:00
parent 5000252e46
commit 0c84b837cf
4 changed files with 59 additions and 60 deletions

View File

@ -1167,7 +1167,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podFullName)
existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
@ -1273,10 +1273,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)
}
// Pod status update is edge-triggered. If there is any update of the
// mirror pod, we need to delete the existing status associated with
// the static pod to trigger an update.
kl.statusManager.DeletePodStatus(podFullName)
}
}
return nil
@ -1398,7 +1394,7 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
now := util.Now()
if pod.Spec.ActiveDeadlineSeconds != nil {
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
podStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
podStatus = pod.Status
}
@ -1428,7 +1424,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
for _, pod := range allPods {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
@ -1450,7 +1446,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
kl.removeOrphanedPodStatuses(allPods)
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
@ -1465,14 +1461,15 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
// TODO(yujuhong): consider using pod UID as they key in the status manager
// to avoid returning the wrong status.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) {
podFullNames := make(map[string]bool)
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podFullNames[kubecontainer.GetPodFullName(pod)] = true
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// dispatchWork dispatches pod updates to workers.
@ -1900,10 +1897,21 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
podStatus, found := kl.statusManager.GetPodStatus(podFullName)
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
}
pod, ok := kl.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName)
}
podStatus, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
return fmt.Errorf("failed to get status for pod %q", podFullName)
}
if err := kl.validatePodPhase(&podStatus); err != nil {
// No log is available if pod is not in a "known" phase (e.g. Unknown).
return err
@ -1914,10 +1922,6 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
// waiting state.
return err
}
pod, ok := kl.GetPodByFullName(podFullName)
if !ok {
return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName)
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, tail, follow, stdout, stderr)
}

View File

@ -2047,13 +2047,13 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
conflictedPodName := kubecontainer.GetPodFullName(pods[0])
conflictedPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(conflictedPodName)
status, found := kl.statusManager.GetPodStatus(conflictedPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", conflictedPodName)
t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2089,13 +2089,13 @@ func TestHandleNodeSelector(t *testing.T) {
},
}
// The first pod should be rejected.
notfittingPodName := kubecontainer.GetPodFullName(pods[0])
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2137,13 +2137,13 @@ func TestHandleMemExceeded(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
notfittingPodName := kubecontainer.GetPodFullName(pods[0])
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2159,17 +2159,18 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl := testKubelet.kubelet
pods := []*api.Pod{
{ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod1", UID: "1234"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod2", UID: "4567"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
podToTest := pods[1]
// Run once to populate the status map.
kl.handleNotFittingPods(pods)
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found {
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found {
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
t.Fatalf("expected to not have status cached for pod2")
}
}
@ -2816,10 +2817,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, ok := kubelet.statusManager.GetPodStatus(podFullName)
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, podFullName)
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
}
}
@ -3148,10 +3148,9 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, found := kubelet.statusManager.GetPodStatus(podFullName)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", podFullName)
t.Errorf("expected to found status for pod %q", pods[0].UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
@ -3203,10 +3202,9 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, found := kubelet.statusManager.GetPodStatus(podFullName)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", podFullName)
t.Errorf("expected to found status for pod %q", pods[0].UID)
}
if status.Phase == api.PodFailed {
t.Fatalf("expected pod status to not be %q", status.Phase)

View File

@ -26,9 +26,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
@ -43,14 +43,14 @@ type statusManager struct {
kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
podStatuses map[types.UID]api.PodStatus
podStatusChannel chan podStatusSyncRequest
}
func newStatusManager(kubeClient client.Interface) *statusManager {
return &statusManager{
kubeClient: kubeClient,
podStatuses: make(map[string]api.PodStatus),
podStatuses: make(map[types.UID]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
}
}
@ -83,18 +83,17 @@ func (s *statusManager) Start() {
}, 0)
}
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
s.podStatusesLock.RLock()
defer s.podStatusesLock.RUnlock()
status, ok := s.podStatuses[podFullName]
status, ok := s.podStatuses[uid]
return status, ok
}
func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
podFullName := kubecontainer.GetPodFullName(pod)
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
oldStatus, found := s.podStatuses[podFullName]
oldStatus, found := s.podStatuses[pod.UID]
// ensure that the start time does not change across updates.
if found && oldStatus.StartTime != nil {
@ -124,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[podFullName] = status
s.podStatuses[pod.UID] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status}
} else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status)
@ -154,18 +153,18 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
return sent
}
func (s *statusManager) DeletePodStatus(podFullName string) {
func (s *statusManager) DeletePodStatus(uid types.UID) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
delete(s.podStatuses, podFullName)
delete(s.podStatuses, uid)
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for key := range s.podStatuses {
if _, ok := podFullNames[key]; !ok {
if _, ok := podUIDs[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(s.podStatuses, key)
}
@ -176,7 +175,6 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
func (s *statusManager) syncBatch() error {
syncRequest := <-s.podStatusChannel
pod := syncRequest.pod
podFullName := kubecontainer.GetPodFullName(pod)
status := syncRequest.status
var err error
@ -209,7 +207,7 @@ func (s *statusManager) syncBatch() error {
}
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(podFullName)
s.DeletePodStatus(pod.UID)
return nil
}
}
@ -222,7 +220,7 @@ func (s *statusManager) syncBatch() error {
// is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates.
go s.DeletePodStatus(podFullName)
go s.DeletePodStatus(pod.UID)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
}

View File

@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
)
@ -94,7 +93,7 @@ func TestNewStatus(t *testing.T) {
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod))
status, _ := syncer.GetPodStatus(testPod.UID)
if status.StartTime.IsZero() {
t.Errorf("SetPodStatus did not set a proper start time value")
}
@ -115,7 +114,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
pod.Status.StartTime = &startTime
syncer.SetPodStatus(pod, getRandomPodStatus())
status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(pod))
status, _ := syncer.GetPodStatus(pod.UID)
if !status.StartTime.Time.Equal(startTime.Time) {
t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime)
}
@ -136,7 +135,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
syncer.SetPodStatus(testPod, firstStatus)
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2)
finalStatus, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod))
finalStatus, _ := syncer.GetPodStatus(testPod.UID)
if finalStatus.StartTime.IsZero() {
t.Errorf("StartTime should not be zero")
}