From 35777c31ea5f7973d8f69b22a81377f5d7ea9c71 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 9 Apr 2018 17:37:14 -0700 Subject: [PATCH] change kubelet status manager to use patch instead of put to update pod status --- pkg/kubelet/status/BUILD | 1 + pkg/kubelet/status/status_manager.go | 57 ++++- pkg/kubelet/status/status_manager_test.go | 247 ++++++++++++++++++++-- 3 files changed, 278 insertions(+), 27 deletions(-) diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index ccc78651f0f..63e32a391d9 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/util/pod:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index eaf5b9a0512..93eef28918a 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -37,6 +37,7 @@ import ( kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" + statusutil "k8s.io/kubernetes/pkg/util/pod" ) // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are @@ -121,11 +122,22 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD } } -// isStatusEqual returns true if the given pod statuses are equal, false otherwise. +// isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned +// pod conditions are excluded. // This method normalizes the status before comparing so as to make sure that meaningless // changes will be ignored. -func isStatusEqual(oldStatus, status *v1.PodStatus) bool { - return apiequality.Semantic.DeepEqual(status, oldStatus) +func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { + oldCopy := oldStatus.DeepCopy() + for _, c := range status.Conditions { + if kubetypes.PodConditionByKubelet(c.Type) { + _, oc := podutil.GetPodCondition(oldCopy, c.Type) + if oc == nil || oc.Status != c.Status { + return false + } + } + } + oldCopy.Conditions = status.Conditions + return apiequality.Semantic.DeepEqual(oldCopy, status) } func (m *manager) Start() { @@ -162,6 +174,13 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + + for _, c := range pod.Status.Conditions { + if !kubetypes.PodConditionByKubelet(c.Type) { + glog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+ + "But it is not owned by kubelet.", string(c.Type), format.Pod(pod)) + } + } // Make sure we're caching a deep copy. status = *status.DeepCopy() @@ -336,7 +355,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp normalizeStatus(pod, &status) // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. - if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { + if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. } @@ -469,9 +488,10 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { m.deletePodStatus(uid) return } - pod.Status = status.status - // TODO: handle conflict as a retry, make that easier too. - newPod, err := m.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + + oldStatus := pod.Status.DeepCopy() + newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status)) + glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err != nil { glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) return @@ -546,7 +566,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { podStatus := pod.Status.DeepCopy() normalizeStatus(pod, podStatus) - if isStatusEqual(podStatus, &status) { + if isPodStatusByKubeletEqual(podStatus, &status) { // If the status from the source is the same with the cached status, // reconcile is not needed. Just return. return false @@ -559,7 +579,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { // We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by // apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond, -// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false. +// so when we do comparison between status from apiserver and cached status, isPodStatusByKubeletEqual() will always return false. // There is related issue #15262 and PR #15263 about this. // In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in // kubelet temporarily. @@ -613,3 +633,22 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses) return status } + +// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions +// not owned by kubelet is preserved from oldPodStatus +func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { + podConditions := []v1.PodCondition{} + for _, c := range oldPodStatus.Conditions { + if !kubetypes.PodConditionByKubelet(c.Type) { + podConditions = append(podConditions, c) + } + } + + for _, c := range newPodStatus.Conditions { + if kubetypes.PodConditionByKubelet(c.Type) { + podConditions = append(podConditions, c) + } + } + newPodStatus.Conditions = podConditions + return newPodStatus +} diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 84ddec36e3b..03f79b2a1bb 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -19,6 +19,7 @@ package status import ( "fmt" "math/rand" + "reflect" "strconv" "strings" "testing" @@ -48,6 +49,10 @@ import ( // Generate new instance of test pod with the same initial value. func getTestPod() *v1.Pod { return &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, ObjectMeta: metav1.ObjectMeta{ UID: "12345678", Name: "foo", @@ -303,7 +308,7 @@ func TestSyncPod(t *testing.T) { testPod := getTestPod() syncer.kubeClient = fake.NewSimpleClientset(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) + verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } func TestSyncPodChecksMismatchedUID(t *testing.T) { @@ -357,18 +362,18 @@ func TestSyncPodNoDeadlock(t *testing.T) { t.Logf("Pod not deleted (success case).") ret = getTestPod() m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated, but still running.") - pod.DeletionTimestamp = new(metav1.Time) + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated successfully.") pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{} m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Error case.") ret = nil @@ -392,7 +397,7 @@ func TestStaleUpdates(t *testing.T) { t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") m.syncBatch() verifyUpdates(t, m, 3) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) @@ -406,7 +411,7 @@ func TestStaleUpdates(t *testing.T) { m.SetPodStatus(pod, status) m.syncBatch() - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing stuck in the pipe.") verifyUpdates(t, m, 0) @@ -443,10 +448,27 @@ func TestStatusEquality(t *testing.T) { } normalizeStatus(&pod, &oldPodStatus) normalizeStatus(&pod, &podStatus) - if !isStatusEqual(&oldPodStatus, &podStatus) { + if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) { t.Fatalf("Order of container statuses should not affect normalized equality.") } } + + oldPodStatus := podStatus + podStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("www.example.com/feature"), + Status: v1.ConditionTrue, + }) + + oldPodStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("www.example.com/feature"), + Status: v1.ConditionFalse, + }) + + normalizeStatus(&pod, &oldPodStatus) + normalizeStatus(&pod, &podStatus) + if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) { + t.Fatalf("Differences in pod condition not owned by kubelet should not affect normalized equality.") + } } func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) { @@ -507,7 +529,7 @@ func TestStaticPod(t *testing.T) { t.Logf("Should be able to get the static pod status from status manager") retrievedStatus := expectPodStatus(t, m, staticPod) normalizeStatus(staticPod, &status) - assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) + assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") m.syncBatch() @@ -520,10 +542,10 @@ func TestStaticPod(t *testing.T) { t.Logf("Should be able to get the mirror pod status from status manager") retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) - assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) + assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should sync pod because the corresponding mirror pod is created") - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("syncBatch should not sync any pods because nothing is changed.") m.testSyncBatch() @@ -741,7 +763,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Errorf("Pod status is different, a reconciliation is needed") } syncer.syncBatch() - verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) + verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { @@ -755,18 +777,16 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { func TestDeletePods(t *testing.T) { pod := getTestPod() t.Logf("Set the deletion timestamp.") - pod.DeletionTimestamp = new(metav1.Time) + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) m.podManager.AddPod(pod) - status := getRandomPodStatus() now := metav1.Now() status.StartTime = &now m.SetPodStatus(pod, status) - t.Logf("Expect to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } func TestDoNotDeleteMirrorPods(t *testing.T) { @@ -779,7 +799,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { kubetypes.ConfigMirrorAnnotationKey: "mirror", } t.Logf("Set the deletion timestamp.") - mirrorPod.DeletionTimestamp = new(metav1.Time) + mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) m.podManager.AddPod(staticPod) @@ -795,7 +815,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { m.SetPodStatus(staticPod, status) t.Logf("Expect not to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) } func TestUpdateLastTransitionTime(t *testing.T) { @@ -867,6 +887,197 @@ func updateAction() core.UpdateAction { return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} } +func patchAction() core.PatchAction { + return core.PatchActionImpl{ActionImpl: core.ActionImpl{Verb: "patch", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} +} + func deleteAction() core.DeleteAction { return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}} } + +func TestMergePodStatus(t *testing.T) { + useCases := []struct { + desc string + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus + }{ + { + "no change", + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { return input }, + getPodStatus(), + }, + { + "readiness changes", + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions[0].Status = v1.ConditionFalse + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { return input }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition and readiness changes", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions[0].Status = v1.ConditionFalse + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition changes", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionFalse, + }) + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + } + + for _, tc := range useCases { + output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) + if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { + t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) + } + } + +} + +func statusEqual(left, right v1.PodStatus) bool { + left.Conditions = nil + right.Conditions = nil + return reflect.DeepEqual(left, right) +} + +func conditionsEqual(left, right []v1.PodCondition) bool { + if len(left) != len(right) { + return false + } + + for _, l := range left { + found := false + for _, r := range right { + if l.Type == r.Type { + found = true + if l.Status != r.Status { + return false + } + } + } + if !found { + return false + } + } + return true +} + +func getPodStatus() v1.PodStatus { + return v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + } +}