change kubelet status manager to use patch instead of put to update pod status

This commit is contained in:
Minhan Xia 2018-04-09 17:37:14 -07:00
parent 9fe2c53624
commit 35777c31ea
3 changed files with 278 additions and 27 deletions

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/util/format:go_default_library",
"//pkg/util/pod:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",

View File

@ -37,6 +37,7 @@ import (
kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
statusutil "k8s.io/kubernetes/pkg/util/pod"
) )
// A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
@ -121,11 +122,22 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
} }
} }
// isStatusEqual returns true if the given pod statuses are equal, false otherwise. // isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned
// pod conditions are excluded.
// This method normalizes the status before comparing so as to make sure that meaningless // This method normalizes the status before comparing so as to make sure that meaningless
// changes will be ignored. // changes will be ignored.
func isStatusEqual(oldStatus, status *v1.PodStatus) bool { func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
return apiequality.Semantic.DeepEqual(status, oldStatus) oldCopy := oldStatus.DeepCopy()
for _, c := range status.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
_, oc := podutil.GetPodCondition(oldCopy, c.Type)
if oc == nil || oc.Status != c.Status {
return false
}
}
}
oldCopy.Conditions = status.Conditions
return apiequality.Semantic.DeepEqual(oldCopy, status)
} }
func (m *manager) Start() { func (m *manager) Start() {
@ -162,6 +174,13 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
for _, c := range pod.Status.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
glog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
"But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
}
}
// Make sure we're caching a deep copy. // Make sure we're caching a deep copy.
status = *status.DeepCopy() status = *status.DeepCopy()
@ -336,7 +355,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
normalizeStatus(pod, &status) normalizeStatus(pod, &status)
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status. return false // No new status.
} }
@ -469,9 +488,10 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
m.deletePodStatus(uid) m.deletePodStatus(uid)
return return
} }
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too. oldStatus := pod.Status.DeepCopy()
newPod, err := m.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))
glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil { if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return return
@ -546,7 +566,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
podStatus := pod.Status.DeepCopy() podStatus := pod.Status.DeepCopy()
normalizeStatus(pod, podStatus) normalizeStatus(pod, podStatus)
if isStatusEqual(podStatus, &status) { if isPodStatusByKubeletEqual(podStatus, &status) {
// If the status from the source is the same with the cached status, // If the status from the source is the same with the cached status,
// reconcile is not needed. Just return. // reconcile is not needed. Just return.
return false return false
@ -559,7 +579,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
// We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by // We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by
// apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond, // apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond,
// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false. // so when we do comparison between status from apiserver and cached status, isPodStatusByKubeletEqual() will always return false.
// There is related issue #15262 and PR #15263 about this. // There is related issue #15262 and PR #15263 about this.
// In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in // In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in
// kubelet temporarily. // kubelet temporarily.
@ -613,3 +633,22 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses) kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses)
return status return status
} }
// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions
// not owned by kubelet is preserved from oldPodStatus
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus {
podConditions := []v1.PodCondition{}
for _, c := range oldPodStatus.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
newPodStatus.Conditions = podConditions
return newPodStatus
}

View File

@ -19,6 +19,7 @@ package status
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"reflect"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -48,6 +49,10 @@ import (
// Generate new instance of test pod with the same initial value. // Generate new instance of test pod with the same initial value.
func getTestPod() *v1.Pod { func getTestPod() *v1.Pod {
return &v1.Pod{ return &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
UID: "12345678", UID: "12345678",
Name: "foo", Name: "foo",
@ -303,7 +308,7 @@ func TestSyncPod(t *testing.T) {
testPod := getTestPod() testPod := getTestPod()
syncer.kubeClient = fake.NewSimpleClientset(testPod) syncer.kubeClient = fake.NewSimpleClientset(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }
func TestSyncPodChecksMismatchedUID(t *testing.T) { func TestSyncPodChecksMismatchedUID(t *testing.T) {
@ -357,18 +362,18 @@ func TestSyncPodNoDeadlock(t *testing.T) {
t.Logf("Pod not deleted (success case).") t.Logf("Pod not deleted (success case).")
ret = getTestPod() ret = getTestPod()
m.SetPodStatus(pod, getRandomPodStatus()) m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated, but still running.") t.Logf("Pod is terminated, but still running.")
pod.DeletionTimestamp = new(metav1.Time) pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
m.SetPodStatus(pod, getRandomPodStatus()) m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated successfully.") t.Logf("Pod is terminated successfully.")
pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Running = nil
pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{} pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{}
m.SetPodStatus(pod, getRandomPodStatus()) m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Error case.") t.Logf("Error case.")
ret = nil ret = nil
@ -392,7 +397,7 @@ func TestStaleUpdates(t *testing.T) {
t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update")
m.syncBatch() m.syncBatch()
verifyUpdates(t, m, 3) verifyUpdates(t, m, 3)
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing left in the channel to sync") t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{}) verifyActions(t, m, []core.Action{})
@ -406,7 +411,7 @@ func TestStaleUpdates(t *testing.T) {
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
m.syncBatch() m.syncBatch()
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing stuck in the pipe.") t.Logf("Nothing stuck in the pipe.")
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
@ -443,10 +448,27 @@ func TestStatusEquality(t *testing.T) {
} }
normalizeStatus(&pod, &oldPodStatus) normalizeStatus(&pod, &oldPodStatus)
normalizeStatus(&pod, &podStatus) normalizeStatus(&pod, &podStatus)
if !isStatusEqual(&oldPodStatus, &podStatus) { if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Order of container statuses should not affect normalized equality.") t.Fatalf("Order of container statuses should not affect normalized equality.")
} }
} }
oldPodStatus := podStatus
podStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{
Type: v1.PodConditionType("www.example.com/feature"),
Status: v1.ConditionTrue,
})
oldPodStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{
Type: v1.PodConditionType("www.example.com/feature"),
Status: v1.ConditionFalse,
})
normalizeStatus(&pod, &oldPodStatus)
normalizeStatus(&pod, &podStatus)
if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Differences in pod condition not owned by kubelet should not affect normalized equality.")
}
} }
func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) { func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) {
@ -507,7 +529,7 @@ func TestStaticPod(t *testing.T) {
t.Logf("Should be able to get the static pod status from status manager") t.Logf("Should be able to get the static pod status from status manager")
retrievedStatus := expectPodStatus(t, m, staticPod) retrievedStatus := expectPodStatus(t, m, staticPod)
normalizeStatus(staticPod, &status) normalizeStatus(staticPod, &status)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
m.syncBatch() m.syncBatch()
@ -520,10 +542,10 @@ func TestStaticPod(t *testing.T) {
t.Logf("Should be able to get the mirror pod status from status manager") t.Logf("Should be able to get the mirror pod status from status manager")
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should sync pod because the corresponding mirror pod is created") t.Logf("Should sync pod because the corresponding mirror pod is created")
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("syncBatch should not sync any pods because nothing is changed.") t.Logf("syncBatch should not sync any pods because nothing is changed.")
m.testSyncBatch() m.testSyncBatch()
@ -741,7 +763,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Errorf("Pod status is different, a reconciliation is needed") t.Errorf("Pod status is different, a reconciliation is needed")
} }
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }
func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
@ -755,18 +777,16 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
func TestDeletePods(t *testing.T) { func TestDeletePods(t *testing.T) {
pod := getTestPod() pod := getTestPod()
t.Logf("Set the deletion timestamp.") t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = new(metav1.Time) pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod) client := fake.NewSimpleClientset(pod)
m := newTestManager(client) m := newTestManager(client)
m.podManager.AddPod(pod) m.podManager.AddPod(pod)
status := getRandomPodStatus() status := getRandomPodStatus()
now := metav1.Now() now := metav1.Now()
status.StartTime = &now status.StartTime = &now
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
t.Logf("Expect to see a delete action.") t.Logf("Expect to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
} }
func TestDoNotDeleteMirrorPods(t *testing.T) { func TestDoNotDeleteMirrorPods(t *testing.T) {
@ -779,7 +799,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
kubetypes.ConfigMirrorAnnotationKey: "mirror", kubetypes.ConfigMirrorAnnotationKey: "mirror",
} }
t.Logf("Set the deletion timestamp.") t.Logf("Set the deletion timestamp.")
mirrorPod.DeletionTimestamp = new(metav1.Time) mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(mirrorPod) client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client) m := newTestManager(client)
m.podManager.AddPod(staticPod) m.podManager.AddPod(staticPod)
@ -795,7 +815,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
m.SetPodStatus(staticPod, status) m.SetPodStatus(staticPod, status)
t.Logf("Expect not to see a delete action.") t.Logf("Expect not to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
} }
func TestUpdateLastTransitionTime(t *testing.T) { func TestUpdateLastTransitionTime(t *testing.T) {
@ -867,6 +887,197 @@ func updateAction() core.UpdateAction {
return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
} }
func patchAction() core.PatchAction {
return core.PatchActionImpl{ActionImpl: core.ActionImpl{Verb: "patch", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
}
func deleteAction() core.DeleteAction { func deleteAction() core.DeleteAction {
return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}} return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}}
} }
func TestMergePodStatus(t *testing.T) {
useCases := []struct {
desc string
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
}{
{
"no change",
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { return input },
getPodStatus(),
},
{
"readiness changes",
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus { return input },
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition and readiness changes",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition changes",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionFalse,
})
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
}
for _, tc := range useCases {
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()))
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output)
}
}
}
func statusEqual(left, right v1.PodStatus) bool {
left.Conditions = nil
right.Conditions = nil
return reflect.DeepEqual(left, right)
}
func conditionsEqual(left, right []v1.PodCondition) bool {
if len(left) != len(right) {
return false
}
for _, l := range left {
found := false
for _, r := range right {
if l.Type == r.Type {
found = true
if l.Status != r.Status {
return false
}
}
}
if !found {
return false
}
}
return true
}
func getPodStatus() v1.PodStatus {
return v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Message: "Message",
}
}