From fbc5a7d0346b5eecf9c4038aeac70b83e0dff9c1 Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Fri, 13 Nov 2015 18:09:17 -0800 Subject: [PATCH] Sync status to new mirror pods --- pkg/kubelet/pod/manager.go | 76 +++++++++++++++++------------- pkg/kubelet/status/manager.go | 22 +++++---- pkg/kubelet/status/manager_test.go | 28 ++++++----- 3 files changed, 72 insertions(+), 54 deletions(-) diff --git a/pkg/kubelet/pod/manager.go b/pkg/kubelet/pod/manager.go index 292ec99afcb..d49fb9f194a 100644 --- a/pkg/kubelet/pod/manager.go +++ b/pkg/kubelet/pod/manager.go @@ -59,6 +59,7 @@ type Manager interface { DeleteOrphanedMirrorPods() TranslatePodUID(uid types.UID) types.UID + GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool MirrorClient } @@ -79,6 +80,9 @@ type basicManager struct { podByFullName map[string]*api.Pod mirrorPodByFullName map[string]*api.Pod + // Mirror pod UID to pod UID map. + translationByUID map[types.UID]types.UID + // A mirror pod client to create/delete mirror pods. MirrorClient } @@ -94,30 +98,14 @@ func NewBasicPodManager(client MirrorClient) Manager { func (pm *basicManager) SetPods(newPods []*api.Pod) { pm.lock.Lock() defer pm.lock.Unlock() - pm.setPods(newPods) -} -func (pm *basicManager) setPods(newPods []*api.Pod) { - podByUID := make(map[types.UID]*api.Pod) - mirrorPodByUID := make(map[types.UID]*api.Pod) - podByFullName := make(map[string]*api.Pod) - mirrorPodByFullName := make(map[string]*api.Pod) + pm.podByUID = make(map[types.UID]*api.Pod) + pm.podByFullName = make(map[string]*api.Pod) + pm.mirrorPodByUID = make(map[types.UID]*api.Pod) + pm.mirrorPodByFullName = make(map[string]*api.Pod) + pm.translationByUID = make(map[types.UID]types.UID) - for _, pod := range newPods { - podFullName := kubecontainer.GetPodFullName(pod) - if IsMirrorPod(pod) { - mirrorPodByUID[pod.UID] = pod - mirrorPodByFullName[podFullName] = pod - } else { - podByUID[pod.UID] = pod - podByFullName[podFullName] = pod - } - } - - pm.podByUID = podByUID - pm.podByFullName = podByFullName - pm.mirrorPodByUID = mirrorPodByUID - pm.mirrorPodByFullName = mirrorPodByFullName + pm.updatePodsInternal(newPods...) } func (pm *basicManager) AddPod(pod *api.Pod) { @@ -127,13 +115,22 @@ func (pm *basicManager) AddPod(pod *api.Pod) { func (pm *basicManager) UpdatePod(pod *api.Pod) { pm.lock.Lock() defer pm.lock.Unlock() - podFullName := kubecontainer.GetPodFullName(pod) - if IsMirrorPod(pod) { - pm.mirrorPodByUID[pod.UID] = pod - pm.mirrorPodByFullName[podFullName] = pod - } else { - pm.podByUID[pod.UID] = pod - pm.podByFullName[podFullName] = pod + pm.updatePodsInternal(pod) +} + +func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) { + for _, pod := range pods { + podFullName := kubecontainer.GetPodFullName(pod) + if IsMirrorPod(pod) { + pm.mirrorPodByUID[pod.UID] = pod + pm.mirrorPodByFullName[podFullName] = pod + if p, ok := pm.podByFullName[podFullName]; ok { + pm.translationByUID[pod.UID] = p.UID + } + } else { + pm.podByUID[pod.UID] = pod + pm.podByFullName[podFullName] = pod + } } } @@ -144,6 +141,7 @@ func (pm *basicManager) DeletePod(pod *api.Pod) { if IsMirrorPod(pod) { delete(pm.mirrorPodByUID, pod.UID) delete(pm.mirrorPodByFullName, podFullName) + delete(pm.translationByUID, pod.UID) } else { delete(pm.podByUID, pod.UID) delete(pm.podByFullName, podFullName) @@ -207,15 +205,25 @@ func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID { pm.lock.RLock() defer pm.lock.RUnlock() - if mirrorPod, ok := pm.mirrorPodByUID[uid]; ok { - podFullName := kubecontainer.GetPodFullName(mirrorPod) - if pod, ok := pm.podByFullName[podFullName]; ok { - return pod.UID - } + if translated, ok := pm.translationByUID[uid]; ok { + return translated } return uid } +func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) { + pm.lock.RLock() + defer pm.lock.RUnlock() + + podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID)) + mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID)) + for k, v := range pm.translationByUID { + podToMirror[k] = v + mirrorToPod[v] = k + } + return podToMirror, mirrorToPod +} + func (pm *basicManager) getOrphanedMirrorPodNames() []string { pm.lock.RLock() defer pm.lock.RUnlock() diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 182508d5b16..b45343837cd 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -288,19 +288,26 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { // syncBatch syncs pods statuses with the apiserver. func (m *manager) syncBatch() { var updatedStatuses []podStatusSyncRequest + podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() func() { // Critical section m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() // Clean up orphaned versions. for uid := range m.apiStatusVersions { - if _, ok := m.podStatuses[uid]; !ok { + _, hasPod := m.podStatuses[uid] + _, hasMirror := podToMirror[uid] + if !hasPod && !hasMirror { delete(m.apiStatusVersions, uid) } } for uid, status := range m.podStatuses { - if m.needsUpdate(uid, status) { + syncedUID := uid + if translated, ok := mirrorToPod[uid]; ok { + syncedUID = translated + } + if m.needsUpdate(syncedUID, status) { updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } } @@ -313,11 +320,6 @@ func (m *manager) syncBatch() { // syncPod syncs the given status with the API server. The caller must not hold the lock. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { - if !m.needsUpdate(uid, status) { - glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status) - return - } - // TODO: make me easier to express from client code pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName) if errors.IsNotFound(err) { @@ -332,12 +334,16 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { m.deletePodStatus(uid) return } + if !m.needsUpdate(pod.UID, status) { + glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status) + return + } pod.Status = status.status // TODO: handle conflict as a retry, make that easier too. pod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(pod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully", kubeletutil.FormatPodName(pod)) - m.apiStatusVersions[uid] = status.version + m.apiStatusVersions[pod.UID] = status.version if pod.DeletionTimestamp == nil { return diff --git a/pkg/kubelet/status/manager_test.go b/pkg/kubelet/status/manager_test.go index 2c3021c4a4a..add09a14c08 100644 --- a/pkg/kubelet/status/manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -63,7 +63,7 @@ func getRandomPodStatus() api.PodStatus { func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []testclient.Action) { actions := kubeClient.(*testclient.Fake).Actions() if len(actions) != len(expectedActions) { - t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions) + t.Fatalf("unexpected actions, got: %s expected: %s", actions, expectedActions) return } for i := 0; i < len(actions); i++ { @@ -484,21 +484,25 @@ func TestStaticPodStatus(t *testing.T) { assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status) client.ClearActions() - otherPod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "other-87654321", - Name: "other", - Namespace: "new", - }, - } - m.podManager.AddPod(otherPod) - m.SetPodStatus(otherPod, getRandomPodStatus()) + // No changes. + m.syncBatch() + verifyActions(t, m.kubeClient, []testclient.Action{}) + + // Mirror pod identity changes. + m.podManager.DeletePod(&mirrorPod) + mirrorPod.UID = "new-mirror-pod" + mirrorPod.Status = api.PodStatus{} + m.podManager.AddPod(&mirrorPod) + // Expect update to new mirrorPod. m.syncBatch() verifyActions(t, m.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, }) - _, found := m.GetPodStatus(otherPod.UID) - assert.False(t, found, "otherPod status should have been deleted") + updateAction = client.Actions()[1].(testclient.UpdateActionImpl) + updatedPod = updateAction.Object.(*api.Pod) + assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID) + assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status) } func TestSetContainerReadiness(t *testing.T) {