From 212a16eccc7de3f90c0b2b86bee11b8b602d5c41 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 9 Apr 2018 17:36:08 -0700 Subject: [PATCH 1/7] add utils to patch pod status --- pkg/util/BUILD | 1 + pkg/util/pod/BUILD | 39 +++++++++++++ pkg/util/pod/pod.go | 63 +++++++++++++++++++++ pkg/util/pod/pod_test.go | 116 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 219 insertions(+) create mode 100644 pkg/util/pod/BUILD create mode 100644 pkg/util/pod/pod.go create mode 100644 pkg/util/pod/pod_test.go diff --git a/pkg/util/BUILD b/pkg/util/BUILD index 1622fe7150d..27bee9d6cf6 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -45,6 +45,7 @@ filegroup( "//pkg/util/nsenter:all-srcs", "//pkg/util/oom:all-srcs", "//pkg/util/parsers:all-srcs", + "//pkg/util/pod:all-srcs", "//pkg/util/pointer:all-srcs", "//pkg/util/procfs:all-srcs", "//pkg/util/reflector/prometheus:all-srcs", diff --git a/pkg/util/pod/BUILD b/pkg/util/pod/BUILD new file mode 100644 index 00000000000..57a081ab7c6 --- /dev/null +++ b/pkg/util/pod/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["pod.go"], + importpath = "k8s.io/kubernetes/pkg/util/pod", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["pod_test.go"], + embed = [":go_default_library"], + deps = [ + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go new file mode 100644 index 00000000000..81d4304fa2b --- /dev/null +++ b/pkg/util/pod/pod.go @@ -0,0 +1,63 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "encoding/json" + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + clientset "k8s.io/client-go/kubernetes" +) + +// PatchPodStatus patches pod status. +func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { + patchBytes, err := preparePatchBytesforPodStatus(namespace, name, oldPodStatus, newPodStatus) + if err != nil { + return nil, nil, err + } + + updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) + } + return updatedPod, patchBytes, nil +} + +func preparePatchBytesforPodStatus(namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { + oldData, err := json.Marshal(v1.Pod{ + Status: oldPodStatus, + }) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err) + } + + newData, err := json.Marshal(v1.Pod{ + Status: newPodStatus, + }) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err) + } + return patchBytes, nil +} diff --git a/pkg/util/pod/pod_test.go b/pkg/util/pod/pod_test.go new file mode 100644 index 00000000000..af0278fa090 --- /dev/null +++ b/pkg/util/pod/pod_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "testing" + + "fmt" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "reflect" +) + +func TestPatchPodStatus(t *testing.T) { + ns := "ns" + name := "name" + client := &fake.Clientset{} + client.CoreV1().Pods(ns).Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + }) + + testCases := []struct { + description string + mutate func(input v1.PodStatus) v1.PodStatus + expectedPatchBytes []byte + }{ + { + "no change", + func(input v1.PodStatus) v1.PodStatus { return input }, + []byte(fmt.Sprintf(`{}`)), + }, + { + "message change", + func(input v1.PodStatus) v1.PodStatus { + input.Message = "random message" + return input + }, + []byte(fmt.Sprintf(`{"status":{"message":"random message"}}`)), + }, + { + "pod condition change", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions[0].Status = v1.ConditionFalse + return input + }, + []byte(fmt.Sprintf(`{"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), + }, + { + "additional init container condition", + func(input v1.PodStatus) v1.PodStatus { + input.InitContainerStatuses = []v1.ContainerStatus{ + { + Name: "init-container", + Ready: true, + }, + } + return input + }, + []byte(fmt.Sprintf(`{"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), + }, + } + for _, tc := range testCases { + _, patchBytes, err := PatchPodStatus(client, ns, name, getPodStatus(), tc.mutate(getPodStatus())) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) { + t.Errorf("for test case %q, expect patchBytes: %q, got: %q\n", tc.description, tc.expectedPatchBytes, patchBytes) + } + } +} + +func getPodStatus() v1.PodStatus { + return v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "container1", + Ready: true, + }, + { + Name: "container2", + Ready: true, + }, + }, + Message: "Message", + } +} From 9fe2c536249d7ec3010d71d6b610f35754e478b6 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 9 Apr 2018 17:36:25 -0700 Subject: [PATCH 2/7] include patch permission for kubelets --- plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go | 2 +- .../authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 912ab05a1b3..284f91a105e 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -119,7 +119,7 @@ func NodeRules() []rbacv1.PolicyRule { rbacv1helpers.NewRule("create", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), // Needed for the node to report status of pods it is running. // Use the NodeRestriction admission plugin to limit a node to updating status of pods bound to itself. - rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), + rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), // Needed for the node to create pod evictions. // Use the NodeRestriction admission plugin to limit a node to creating evictions for pods bound to itself. rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/eviction").RuleOrDie(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 300b66485ca..287440dbaa5 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -1098,6 +1098,7 @@ items: resources: - pods/status verbs: + - patch - update - apiGroups: - "" From 35777c31ea5f7973d8f69b22a81377f5d7ea9c71 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 9 Apr 2018 17:37:14 -0700 Subject: [PATCH 3/7] change kubelet status manager to use patch instead of put to update pod status --- pkg/kubelet/status/BUILD | 1 + pkg/kubelet/status/status_manager.go | 57 ++++- pkg/kubelet/status/status_manager_test.go | 247 ++++++++++++++++++++-- 3 files changed, 278 insertions(+), 27 deletions(-) diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index ccc78651f0f..63e32a391d9 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/util/pod:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index eaf5b9a0512..93eef28918a 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -37,6 +37,7 @@ import ( kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" + statusutil "k8s.io/kubernetes/pkg/util/pod" ) // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are @@ -121,11 +122,22 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD } } -// isStatusEqual returns true if the given pod statuses are equal, false otherwise. +// isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned +// pod conditions are excluded. // This method normalizes the status before comparing so as to make sure that meaningless // changes will be ignored. -func isStatusEqual(oldStatus, status *v1.PodStatus) bool { - return apiequality.Semantic.DeepEqual(status, oldStatus) +func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { + oldCopy := oldStatus.DeepCopy() + for _, c := range status.Conditions { + if kubetypes.PodConditionByKubelet(c.Type) { + _, oc := podutil.GetPodCondition(oldCopy, c.Type) + if oc == nil || oc.Status != c.Status { + return false + } + } + } + oldCopy.Conditions = status.Conditions + return apiequality.Semantic.DeepEqual(oldCopy, status) } func (m *manager) Start() { @@ -162,6 +174,13 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + + for _, c := range pod.Status.Conditions { + if !kubetypes.PodConditionByKubelet(c.Type) { + glog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+ + "But it is not owned by kubelet.", string(c.Type), format.Pod(pod)) + } + } // Make sure we're caching a deep copy. status = *status.DeepCopy() @@ -336,7 +355,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp normalizeStatus(pod, &status) // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. - if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { + if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. } @@ -469,9 +488,10 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { m.deletePodStatus(uid) return } - pod.Status = status.status - // TODO: handle conflict as a retry, make that easier too. - newPod, err := m.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + + oldStatus := pod.Status.DeepCopy() + newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status)) + glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err != nil { glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) return @@ -546,7 +566,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { podStatus := pod.Status.DeepCopy() normalizeStatus(pod, podStatus) - if isStatusEqual(podStatus, &status) { + if isPodStatusByKubeletEqual(podStatus, &status) { // If the status from the source is the same with the cached status, // reconcile is not needed. Just return. return false @@ -559,7 +579,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { // We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by // apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond, -// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false. +// so when we do comparison between status from apiserver and cached status, isPodStatusByKubeletEqual() will always return false. // There is related issue #15262 and PR #15263 about this. // In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in // kubelet temporarily. @@ -613,3 +633,22 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses) return status } + +// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions +// not owned by kubelet is preserved from oldPodStatus +func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { + podConditions := []v1.PodCondition{} + for _, c := range oldPodStatus.Conditions { + if !kubetypes.PodConditionByKubelet(c.Type) { + podConditions = append(podConditions, c) + } + } + + for _, c := range newPodStatus.Conditions { + if kubetypes.PodConditionByKubelet(c.Type) { + podConditions = append(podConditions, c) + } + } + newPodStatus.Conditions = podConditions + return newPodStatus +} diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 84ddec36e3b..03f79b2a1bb 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -19,6 +19,7 @@ package status import ( "fmt" "math/rand" + "reflect" "strconv" "strings" "testing" @@ -48,6 +49,10 @@ import ( // Generate new instance of test pod with the same initial value. func getTestPod() *v1.Pod { return &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, ObjectMeta: metav1.ObjectMeta{ UID: "12345678", Name: "foo", @@ -303,7 +308,7 @@ func TestSyncPod(t *testing.T) { testPod := getTestPod() syncer.kubeClient = fake.NewSimpleClientset(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) + verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } func TestSyncPodChecksMismatchedUID(t *testing.T) { @@ -357,18 +362,18 @@ func TestSyncPodNoDeadlock(t *testing.T) { t.Logf("Pod not deleted (success case).") ret = getTestPod() m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated, but still running.") - pod.DeletionTimestamp = new(metav1.Time) + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated successfully.") pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{} m.SetPodStatus(pod, getRandomPodStatus()) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Error case.") ret = nil @@ -392,7 +397,7 @@ func TestStaleUpdates(t *testing.T) { t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") m.syncBatch() verifyUpdates(t, m, 3) - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) @@ -406,7 +411,7 @@ func TestStaleUpdates(t *testing.T) { m.SetPodStatus(pod, status) m.syncBatch() - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing stuck in the pipe.") verifyUpdates(t, m, 0) @@ -443,10 +448,27 @@ func TestStatusEquality(t *testing.T) { } normalizeStatus(&pod, &oldPodStatus) normalizeStatus(&pod, &podStatus) - if !isStatusEqual(&oldPodStatus, &podStatus) { + if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) { t.Fatalf("Order of container statuses should not affect normalized equality.") } } + + oldPodStatus := podStatus + podStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("www.example.com/feature"), + Status: v1.ConditionTrue, + }) + + oldPodStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("www.example.com/feature"), + Status: v1.ConditionFalse, + }) + + normalizeStatus(&pod, &oldPodStatus) + normalizeStatus(&pod, &podStatus) + if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) { + t.Fatalf("Differences in pod condition not owned by kubelet should not affect normalized equality.") + } } func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) { @@ -507,7 +529,7 @@ func TestStaticPod(t *testing.T) { t.Logf("Should be able to get the static pod status from status manager") retrievedStatus := expectPodStatus(t, m, staticPod) normalizeStatus(staticPod, &status) - assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) + assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") m.syncBatch() @@ -520,10 +542,10 @@ func TestStaticPod(t *testing.T) { t.Logf("Should be able to get the mirror pod status from status manager") retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) - assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) + assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should sync pod because the corresponding mirror pod is created") - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("syncBatch should not sync any pods because nothing is changed.") m.testSyncBatch() @@ -741,7 +763,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Errorf("Pod status is different, a reconciliation is needed") } syncer.syncBatch() - verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) + verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { @@ -755,18 +777,16 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { func TestDeletePods(t *testing.T) { pod := getTestPod() t.Logf("Set the deletion timestamp.") - pod.DeletionTimestamp = new(metav1.Time) + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) m.podManager.AddPod(pod) - status := getRandomPodStatus() now := metav1.Now() status.StartTime = &now m.SetPodStatus(pod, status) - t.Logf("Expect to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } func TestDoNotDeleteMirrorPods(t *testing.T) { @@ -779,7 +799,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { kubetypes.ConfigMirrorAnnotationKey: "mirror", } t.Logf("Set the deletion timestamp.") - mirrorPod.DeletionTimestamp = new(metav1.Time) + mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) m.podManager.AddPod(staticPod) @@ -795,7 +815,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { m.SetPodStatus(staticPod, status) t.Logf("Expect not to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), updateAction()}) + verifyActions(t, m, []core.Action{getAction(), patchAction()}) } func TestUpdateLastTransitionTime(t *testing.T) { @@ -867,6 +887,197 @@ func updateAction() core.UpdateAction { return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} } +func patchAction() core.PatchAction { + return core.PatchActionImpl{ActionImpl: core.ActionImpl{Verb: "patch", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} +} + func deleteAction() core.DeleteAction { return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}} } + +func TestMergePodStatus(t *testing.T) { + useCases := []struct { + desc string + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus + }{ + { + "no change", + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { return input }, + getPodStatus(), + }, + { + "readiness changes", + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions[0].Status = v1.ConditionFalse + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { return input }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition and readiness changes", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions[0].Status = v1.ConditionFalse + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "additional pod condition changes", + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionFalse, + }) + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodConditionType("example.com/feature"), + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + } + + for _, tc := range useCases { + output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) + if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { + t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) + } + } + +} + +func statusEqual(left, right v1.PodStatus) bool { + left.Conditions = nil + right.Conditions = nil + return reflect.DeepEqual(left, right) +} + +func conditionsEqual(left, right []v1.PodCondition) bool { + if len(left) != len(right) { + return false + } + + for _, l := range left { + found := false + for _, r := range right { + if l.Type == r.Type { + found = true + if l.Status != r.Status { + return false + } + } + } + if !found { + return false + } + } + return true +} + +func getPodStatus() v1.PodStatus { + return v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + } +} From 8b3b4e4deabe4cf922eee752df2fad189b2c1471 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 11 Apr 2018 11:37:30 -0700 Subject: [PATCH 4/7] add Patch support in fake kubeClient --- staging/src/k8s.io/client-go/testing/BUILD | 2 ++ .../src/k8s.io/client-go/testing/fixture.go | 31 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/client-go/testing/BUILD b/staging/src/k8s.io/client-go/testing/BUILD index d6821abfb37..5b8684c2653 100644 --- a/staging/src/k8s.io/client-go/testing/BUILD +++ b/staging/src/k8s.io/client-go/testing/BUILD @@ -22,6 +22,8 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], diff --git a/staging/src/k8s.io/client-go/testing/fixture.go b/staging/src/k8s.io/client-go/testing/fixture.go index 13192f92d16..00c4c49fce4 100644 --- a/staging/src/k8s.io/client-go/testing/fixture.go +++ b/staging/src/k8s.io/client-go/testing/fixture.go @@ -25,6 +25,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" ) @@ -72,7 +74,6 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc { return func(action Action) (bool, runtime.Object, error) { ns := action.GetNamespace() gvr := action.GetResource() - // Here and below we need to switch on implementation types, // not on interfaces, as some interfaces are identical // (e.g. UpdateAction and CreateAction), so if we use them, @@ -125,6 +126,34 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc { } return true, nil, nil + case PatchActionImpl: + obj, err := tracker.Get(gvr, ns, action.GetName()) + if err != nil { + // object is not registered + return false, nil, err + } + + old, err := json.Marshal(obj) + if err != nil { + return true, nil, err + } + // Only supports strategic merge patch + // TODO: Add support for other Patch types + mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) + if err != nil { + return true, nil, err + } + + if err = json.Unmarshal(mergedByte, obj); err != nil { + return true, nil, err + } + + if err = tracker.Update(gvr, obj, ns); err != nil { + return true, nil, err + } + + return true, obj, nil + default: return false, nil, fmt.Errorf("no reaction implemented for %s", action) } From cb9ac047773dddf8a55c277f682d9dbfea04b55d Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 11 Apr 2018 17:04:19 -0700 Subject: [PATCH 5/7] fix unit tests using Patch in fake client --- pkg/controller/statefulset/stateful_set_test.go | 3 ++- pkg/kubelet/kubelet_node_status_test.go | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 3893753d1df..7d9a7cc1629 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -496,7 +496,6 @@ func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { func TestGetPodsForStatefulSetAdopt(t *testing.T) { set := newStatefulSet(5) - ssc, spc := newFakeStatefulSetController(set) pod1 := newStatefulSetPod(set, 1) // pod2 is an orphan with matching labels and name. pod2 := newStatefulSetPod(set, 2) @@ -510,6 +509,8 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) { pod4.OwnerReferences = nil pod4.Name = "x" + pod4.Name + ssc, spc := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4) + spc.podsIndexer.Add(pod1) spc.podsIndexer.Add(pod2) spc.podsIndexer.Add(pod3) diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index a657d3f89ba..a9ffe58605e 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -879,7 +879,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") - updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) + updatedNode, err := kubeClient.CoreV1().Nodes().Get(testKubeletHostname, metav1.GetOptions{}) require.NoError(t, err, "can't apply node status patch") for i, cond := range updatedNode.Status.Conditions { @@ -891,7 +891,6 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 lastIndex := len(updatedNode.Status.Conditions) - 1 - assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[lastIndex].Type, "NodeReady should be the last condition") assert.NotEmpty(t, updatedNode.Status.Conditions[lastIndex].Message) From 78b86333c10af0927c9b9c3a21638d9588fdf159 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 11 Apr 2018 17:44:19 -0700 Subject: [PATCH 6/7] make update --- pkg/kubelet/types/BUILD | 2 ++ staging/src/k8s.io/metrics/Godeps/Godeps.json | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/pkg/kubelet/types/BUILD b/pkg/kubelet/types/BUILD index c362d096de2..b1d5ffd1ae5 100644 --- a/pkg/kubelet/types/BUILD +++ b/pkg/kubelet/types/BUILD @@ -12,6 +12,7 @@ go_library( "constants.go", "doc.go", "labels.go", + "pod_status.go", "pod_update.go", "types.go", ], @@ -29,6 +30,7 @@ go_test( name = "go_default_test", srcs = [ "labels_test.go", + "pod_status_test.go", "pod_update_test.go", "types_test.go", ], diff --git a/staging/src/k8s.io/metrics/Godeps/Godeps.json b/staging/src/k8s.io/metrics/Godeps/Godeps.json index 9415da5b540..250fe79ac28 100644 --- a/staging/src/k8s.io/metrics/Godeps/Godeps.json +++ b/staging/src/k8s.io/metrics/Godeps/Godeps.json @@ -6,6 +6,10 @@ "./..." ], "Deps": [ + { + "ImportPath": "github.com/davecgh/go-spew/spew", + "Rev": "782f4967f2dc4564575ca782fe2d04090b5faca8" + }, { "ImportPath": "github.com/ghodss/yaml", "Rev": "73d445a93680fa1a78ae23a5839bad48f32ba1ee" @@ -362,6 +366,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/json", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/net", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" @@ -374,6 +382,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/sets", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/validation", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" @@ -398,6 +410,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/watch", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" @@ -465,6 +481,10 @@ { "ImportPath": "k8s.io/client-go/util/integer", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, + { + "ImportPath": "k8s.io/kube-openapi/pkg/util/proto", + "Rev": "86e28c192d2743f0232b9bc5f0a531568ef9f2a5" } ] } From 85e0d05ac76b943acb208ea70c6b806649706ea5 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 21 May 2018 15:22:43 -0700 Subject: [PATCH 7/7] add utils for pod condition --- pkg/kubelet/types/pod_status.go | 39 +++++++++++++++++++++++ pkg/kubelet/types/pod_status_test.go | 47 ++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 pkg/kubelet/types/pod_status.go create mode 100644 pkg/kubelet/types/pod_status_test.go diff --git a/pkg/kubelet/types/pod_status.go b/pkg/kubelet/types/pod_status.go new file mode 100644 index 00000000000..a7756382b3c --- /dev/null +++ b/pkg/kubelet/types/pod_status.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "k8s.io/api/core/v1" +) + +// PodConditionsByKubelet is the list of pod conditions owned by kubelet +var PodConditionsByKubelet = []v1.PodConditionType{ + v1.PodScheduled, + v1.PodReady, + v1.PodInitialized, + v1.PodReasonUnschedulable, +} + +// PodConditionByKubelet returns if the pod condition type is owned by kubelet +func PodConditionByKubelet(conditionType v1.PodConditionType) bool { + for _, c := range PodConditionsByKubelet { + if c == conditionType { + return true + } + } + return false +} diff --git a/pkg/kubelet/types/pod_status_test.go b/pkg/kubelet/types/pod_status_test.go new file mode 100644 index 00000000000..61c837748d3 --- /dev/null +++ b/pkg/kubelet/types/pod_status_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "k8s.io/api/core/v1" + "testing" +) + +func TestPodConditionByKubelet(t *testing.T) { + trueCases := []v1.PodConditionType{ + v1.PodScheduled, + v1.PodReady, + v1.PodInitialized, + v1.PodReasonUnschedulable, + } + + for _, tc := range trueCases { + if !PodConditionByKubelet(tc) { + t.Errorf("Expect %q to be condition owned by kubelet.", tc) + } + } + + falseCases := []v1.PodConditionType{ + v1.PodConditionType("abcd"), + } + + for _, tc := range falseCases { + if PodConditionByKubelet(tc) { + t.Errorf("Expect %q NOT to be condition owned by kubelet.", tc) + } + } +}