Merge pull request #17270 from timstclair/mirrorpods

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-21 22:55:26 -08:00
commit 7b281c946b
3 changed files with 72 additions and 54 deletions

View File

@ -59,6 +59,7 @@ type Manager interface {
DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID)
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
MirrorClient
}
@ -79,6 +80,9 @@ type basicManager struct {
podByFullName map[string]*api.Pod
mirrorPodByFullName map[string]*api.Pod
// Mirror pod UID to pod UID map.
translationByUID map[types.UID]types.UID
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
@ -94,30 +98,14 @@ func NewBasicPodManager(client MirrorClient) Manager {
func (pm *basicManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.setPods(newPods)
}
func (pm *basicManager) setPods(newPods []*api.Pod) {
podByUID := make(map[types.UID]*api.Pod)
mirrorPodByUID := make(map[types.UID]*api.Pod)
podByFullName := make(map[string]*api.Pod)
mirrorPodByFullName := make(map[string]*api.Pod)
pm.podByUID = make(map[types.UID]*api.Pod)
pm.podByFullName = make(map[string]*api.Pod)
pm.mirrorPodByUID = make(map[types.UID]*api.Pod)
pm.mirrorPodByFullName = make(map[string]*api.Pod)
pm.translationByUID = make(map[types.UID]types.UID)
for _, pod := range newPods {
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
mirrorPodByUID[pod.UID] = pod
mirrorPodByFullName[podFullName] = pod
} else {
podByUID[pod.UID] = pod
podByFullName[podFullName] = pod
}
}
pm.podByUID = podByUID
pm.podByFullName = podByFullName
pm.mirrorPodByUID = mirrorPodByUID
pm.mirrorPodByFullName = mirrorPodByFullName
pm.updatePodsInternal(newPods...)
}
func (pm *basicManager) AddPod(pod *api.Pod) {
@ -127,13 +115,22 @@ func (pm *basicManager) AddPod(pod *api.Pod) {
func (pm *basicManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
pm.updatePodsInternal(pod)
}
func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) {
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
if p, ok := pm.podByFullName[podFullName]; ok {
pm.translationByUID[pod.UID] = p.UID
}
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
}
}
}
@ -144,6 +141,7 @@ func (pm *basicManager) DeletePod(pod *api.Pod) {
if IsMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName)
delete(pm.translationByUID, pod.UID)
} else {
delete(pm.podByUID, pod.UID)
delete(pm.podByFullName, podFullName)
@ -207,15 +205,25 @@ func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID {
pm.lock.RLock()
defer pm.lock.RUnlock()
if mirrorPod, ok := pm.mirrorPodByUID[uid]; ok {
podFullName := kubecontainer.GetPodFullName(mirrorPod)
if pod, ok := pm.podByFullName[podFullName]; ok {
return pod.UID
}
if translated, ok := pm.translationByUID[uid]; ok {
return translated
}
return uid
}
func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) {
pm.lock.RLock()
defer pm.lock.RUnlock()
podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID))
mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID))
for k, v := range pm.translationByUID {
podToMirror[k] = v
mirrorToPod[v] = k
}
return podToMirror, mirrorToPod
}
func (pm *basicManager) getOrphanedMirrorPodNames() []string {
pm.lock.RLock()
defer pm.lock.RUnlock()

View File

@ -288,19 +288,26 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
// syncBatch syncs pods statuses with the apiserver.
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
if _, ok := m.podStatuses[uid]; !ok {
_, hasPod := m.podStatuses[uid]
_, hasMirror := podToMirror[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
for uid, status := range m.podStatuses {
if m.needsUpdate(uid, status) {
syncedUID := uid
if translated, ok := mirrorToPod[uid]; ok {
syncedUID = translated
}
if m.needsUpdate(syncedUID, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
@ -313,11 +320,6 @@ func (m *manager) syncBatch() {
// syncPod syncs the given status with the API server. The caller must not hold the lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status)
return
}
// TODO: make me easier to express from client code
pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
@ -332,12 +334,16 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
m.deletePodStatus(uid)
return
}
if !m.needsUpdate(pod.UID, status) {
glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status)
return
}
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too.
pod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(pod)
if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletutil.FormatPodName(pod))
m.apiStatusVersions[uid] = status.version
m.apiStatusVersions[pod.UID] = status.version
if pod.DeletionTimestamp == nil {
return

View File

@ -63,7 +63,7 @@ func getRandomPodStatus() api.PodStatus {
func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []testclient.Action) {
actions := kubeClient.(*testclient.Fake).Actions()
if len(actions) != len(expectedActions) {
t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions)
t.Fatalf("unexpected actions, got: %s expected: %s", actions, expectedActions)
return
}
for i := 0; i < len(actions); i++ {
@ -484,21 +484,25 @@ func TestStaticPodStatus(t *testing.T) {
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
client.ClearActions()
otherPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "other-87654321",
Name: "other",
Namespace: "new",
},
}
m.podManager.AddPod(otherPod)
m.SetPodStatus(otherPod, getRandomPodStatus())
// No changes.
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{})
// Mirror pod identity changes.
m.podManager.DeletePod(&mirrorPod)
mirrorPod.UID = "new-mirror-pod"
mirrorPod.Status = api.PodStatus{}
m.podManager.AddPod(&mirrorPod)
// Expect update to new mirrorPod.
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
})
_, found := m.GetPodStatus(otherPod.UID)
assert.False(t, found, "otherPod status should have been deleted")
updateAction = client.Actions()[1].(testclient.UpdateActionImpl)
updatedPod = updateAction.Object.(*api.Pod)
assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID)
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
}
func TestSetContainerReadiness(t *testing.T) {