Merge pull request #12876 from yujuhong/pod_status

kubelet: switch to using pod UID as the key in status manager
This commit is contained in:
Nikhil Jindal 2015-08-24 10:39:02 -07:00
commit a1c5f3f45d
4 changed files with 59 additions and 60 deletions

View File

@ -1171,7 +1171,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podFullName)
existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
@ -1277,10 +1277,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)
}
// Pod status update is edge-triggered. If there is any update of the
// mirror pod, we need to delete the existing status associated with
// the static pod to trigger an update.
kl.statusManager.DeletePodStatus(podFullName)
}
}
return nil
@ -1402,7 +1398,7 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
now := util.Now()
if pod.Spec.ActiveDeadlineSeconds != nil {
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
podStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
podStatus = pod.Status
}
@ -1432,7 +1428,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
for _, pod := range allPods {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
@ -1454,7 +1450,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
kl.removeOrphanedPodStatuses(allPods)
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
@ -1469,14 +1465,15 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncP
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
// TODO(yujuhong): consider using pod UID as they key in the status manager
// to avoid returning the wrong status.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod) {
podFullNames := make(map[string]bool)
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podFullNames[kubecontainer.GetPodFullName(pod)] = true
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// dispatchWork dispatches pod updates to workers.
@ -1904,10 +1901,21 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
podStatus, found := kl.statusManager.GetPodStatus(podFullName)
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
}
pod, ok := kl.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName)
}
podStatus, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
return fmt.Errorf("failed to get status for pod %q", podFullName)
}
if err := kl.validatePodPhase(&podStatus); err != nil {
// No log is available if pod is not in a "known" phase (e.g. Unknown).
return err
@ -1918,10 +1926,6 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
// waiting state.
return err
}
pod, ok := kl.GetPodByFullName(podFullName)
if !ok {
return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName)
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, tail, follow, stdout, stderr)
}

View File

@ -2047,13 +2047,13 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
conflictedPodName := kubecontainer.GetPodFullName(pods[0])
conflictedPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(conflictedPodName)
status, found := kl.statusManager.GetPodStatus(conflictedPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", conflictedPodName)
t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2089,13 +2089,13 @@ func TestHandleNodeSelector(t *testing.T) {
},
}
// The first pod should be rejected.
notfittingPodName := kubecontainer.GetPodFullName(pods[0])
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2137,13 +2137,13 @@ func TestHandleMemExceeded(t *testing.T) {
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
notfittingPodName := kubecontainer.GetPodFullName(pods[0])
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2159,17 +2159,18 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl := testKubelet.kubelet
pods := []*api.Pod{
{ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod1", UID: "1234"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod2", UID: "4567"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
podToTest := pods[1]
// Run once to populate the status map.
kl.handleNotFittingPods(pods)
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found {
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found {
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
t.Fatalf("expected to not have status cached for pod2")
}
}
@ -2816,10 +2817,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, ok := kubelet.statusManager.GetPodStatus(podFullName)
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, podFullName)
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
}
}
@ -3148,10 +3148,9 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, found := kubelet.statusManager.GetPodStatus(podFullName)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", podFullName)
t.Errorf("expected to found status for pod %q", pods[0].UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
@ -3203,10 +3202,9 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := kubecontainer.GetPodFullName(pods[0])
status, found := kubelet.statusManager.GetPodStatus(podFullName)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", podFullName)
t.Errorf("expected to found status for pod %q", pods[0].UID)
}
if status.Phase == api.PodFailed {
t.Fatalf("expected pod status to not be %q", status.Phase)

View File

@ -26,9 +26,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
@ -43,14 +43,14 @@ type statusManager struct {
kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
podStatuses map[types.UID]api.PodStatus
podStatusChannel chan podStatusSyncRequest
}
func newStatusManager(kubeClient client.Interface) *statusManager {
return &statusManager{
kubeClient: kubeClient,
podStatuses: make(map[string]api.PodStatus),
podStatuses: make(map[types.UID]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
}
}
@ -83,18 +83,17 @@ func (s *statusManager) Start() {
}, 0)
}
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
s.podStatusesLock.RLock()
defer s.podStatusesLock.RUnlock()
status, ok := s.podStatuses[podFullName]
status, ok := s.podStatuses[uid]
return status, ok
}
func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
podFullName := kubecontainer.GetPodFullName(pod)
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
oldStatus, found := s.podStatuses[podFullName]
oldStatus, found := s.podStatuses[pod.UID]
// ensure that the start time does not change across updates.
if found && oldStatus.StartTime != nil {
@ -124,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[podFullName] = status
s.podStatuses[pod.UID] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status}
} else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status)
@ -154,18 +153,18 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
return sent
}
func (s *statusManager) DeletePodStatus(podFullName string) {
func (s *statusManager) DeletePodStatus(uid types.UID) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
delete(s.podStatuses, podFullName)
delete(s.podStatuses, uid)
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for key := range s.podStatuses {
if _, ok := podFullNames[key]; !ok {
if _, ok := podUIDs[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(s.podStatuses, key)
}
@ -176,7 +175,6 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
func (s *statusManager) syncBatch() error {
syncRequest := <-s.podStatusChannel
pod := syncRequest.pod
podFullName := kubecontainer.GetPodFullName(pod)
status := syncRequest.status
var err error
@ -209,7 +207,7 @@ func (s *statusManager) syncBatch() error {
}
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(podFullName)
s.DeletePodStatus(pod.UID)
return nil
}
}
@ -222,7 +220,7 @@ func (s *statusManager) syncBatch() error {
// is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates.
go s.DeletePodStatus(podFullName)
go s.DeletePodStatus(pod.UID)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
}

View File

@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
)
@ -94,7 +93,7 @@ func TestNewStatus(t *testing.T) {
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod))
status, _ := syncer.GetPodStatus(testPod.UID)
if status.StartTime.IsZero() {
t.Errorf("SetPodStatus did not set a proper start time value")
}
@ -115,7 +114,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
pod.Status.StartTime = &startTime
syncer.SetPodStatus(pod, getRandomPodStatus())
status, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(pod))
status, _ := syncer.GetPodStatus(pod.UID)
if !status.StartTime.Time.Equal(startTime.Time) {
t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime)
}
@ -136,7 +135,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
syncer.SetPodStatus(testPod, firstStatus)
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2)
finalStatus, _ := syncer.GetPodStatus(kubecontainer.GetPodFullName(testPod))
finalStatus, _ := syncer.GetPodStatus(testPod.UID)
if finalStatus.StartTime.IsZero() {
t.Errorf("StartTime should not be zero")
}