Merge pull request #62306 from freehan/pod-status-patch2

Automatic merge from submit-queue (batch tested with PRs 58920, 58327, 60577, 49388, 62306). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Use Patch instead of Put to sync pod status

ref: https://github.com/kubernetes/community/blob/master/keps/sig-network/0007-pod-ready%2B%2B.md
```release-note
Use Patch instead of Put to sync pod status
```
This commit is contained in:
Kubernetes Submit Queue 2018-05-30 16:09:36 -07:00 committed by GitHub
commit ea92879fab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 642 additions and 32 deletions

View File

@ -496,7 +496,6 @@ func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
func TestGetPodsForStatefulSetAdopt(t *testing.T) {
set := newStatefulSet(5)
ssc, spc := newFakeStatefulSetController(set)
pod1 := newStatefulSetPod(set, 1)
// pod2 is an orphan with matching labels and name.
pod2 := newStatefulSetPod(set, 2)
@ -510,6 +509,8 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) {
pod4.OwnerReferences = nil
pod4.Name = "x" + pod4.Name
ssc, spc := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4)
spc.podsIndexer.Add(pod1)
spc.podsIndexer.Add(pod2)
spc.podsIndexer.Add(pod3)

View File

@ -879,7 +879,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
updatedNode, err := kubeClient.CoreV1().Nodes().Get(testKubeletHostname, metav1.GetOptions{})
require.NoError(t, err, "can't apply node status patch")
for i, cond := range updatedNode.Status.Conditions {
@ -891,7 +891,6 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
lastIndex := len(updatedNode.Status.Conditions) - 1
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[lastIndex].Type, "NodeReady should be the last condition")
assert.NotEmpty(t, updatedNode.Status.Conditions[lastIndex].Message)

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/util/pod:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",

View File

@ -37,6 +37,7 @@ import (
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
statusutil "k8s.io/kubernetes/pkg/util/pod"
)
// A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
@ -121,11 +122,22 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
}
}
// isStatusEqual returns true if the given pod statuses are equal, false otherwise.
// isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned
// pod conditions are excluded.
// This method normalizes the status before comparing so as to make sure that meaningless
// changes will be ignored.
func isStatusEqual(oldStatus, status *v1.PodStatus) bool {
return apiequality.Semantic.DeepEqual(status, oldStatus)
func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
oldCopy := oldStatus.DeepCopy()
for _, c := range status.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
_, oc := podutil.GetPodCondition(oldCopy, c.Type)
if oc == nil || oc.Status != c.Status {
return false
}
}
}
oldCopy.Conditions = status.Conditions
return apiequality.Semantic.DeepEqual(oldCopy, status)
}
func (m *manager) Start() {
@ -162,6 +174,13 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for _, c := range pod.Status.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
glog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
"But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
}
}
// Make sure we're caching a deep copy.
status = *status.DeepCopy()
@ -336,7 +355,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
normalizeStatus(pod, &status)
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status.
}
@ -469,9 +488,10 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
m.deletePodStatus(uid)
return
}
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too.
newPod, err := m.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))
glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return
@ -546,7 +566,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
podStatus := pod.Status.DeepCopy()
normalizeStatus(pod, podStatus)
if isStatusEqual(podStatus, &status) {
if isPodStatusByKubeletEqual(podStatus, &status) {
// If the status from the source is the same with the cached status,
// reconcile is not needed. Just return.
return false
@ -559,7 +579,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
// We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by
// apiserver has no nanosecond information. However, the timestamp returned by metav1.Now() contains nanosecond,
// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false.
// so when we do comparison between status from apiserver and cached status, isPodStatusByKubeletEqual() will always return false.
// There is related issue #15262 and PR #15263 about this.
// In fact, the best way to solve this is to do it on api side. However, for now, we normalize the status locally in
// kubelet temporarily.
@ -613,3 +633,22 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses)
return status
}
// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions
// not owned by kubelet is preserved from oldPodStatus
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus {
podConditions := []v1.PodCondition{}
for _, c := range oldPodStatus.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
newPodStatus.Conditions = podConditions
return newPodStatus
}

View File

@ -19,6 +19,7 @@ package status
import (
"fmt"
"math/rand"
"reflect"
"strconv"
"strings"
"testing"
@ -48,6 +49,10 @@ import (
// Generate new instance of test pod with the same initial value.
func getTestPod() *v1.Pod {
return &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
Name: "foo",
@ -303,7 +308,7 @@ func TestSyncPod(t *testing.T) {
testPod := getTestPod()
syncer.kubeClient = fake.NewSimpleClientset(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction(), updateAction()})
verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
}
func TestSyncPodChecksMismatchedUID(t *testing.T) {
@ -357,18 +362,18 @@ func TestSyncPodNoDeadlock(t *testing.T) {
t.Logf("Pod not deleted (success case).")
ret = getTestPod()
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated, but still running.")
pod.DeletionTimestamp = new(metav1.Time)
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated successfully.")
pod.Status.ContainerStatuses[0].State.Running = nil
pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{}
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Error case.")
ret = nil
@ -392,7 +397,7 @@ func TestStaleUpdates(t *testing.T) {
t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update")
m.syncBatch()
verifyUpdates(t, m, 3)
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{})
@ -406,7 +411,7 @@ func TestStaleUpdates(t *testing.T) {
m.SetPodStatus(pod, status)
m.syncBatch()
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing stuck in the pipe.")
verifyUpdates(t, m, 0)
@ -443,10 +448,27 @@ func TestStatusEquality(t *testing.T) {
}
normalizeStatus(&pod, &oldPodStatus)
normalizeStatus(&pod, &podStatus)
if !isStatusEqual(&oldPodStatus, &podStatus) {
if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Order of container statuses should not affect normalized equality.")
}
}
oldPodStatus := podStatus
podStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{
Type: v1.PodConditionType("www.example.com/feature"),
Status: v1.ConditionTrue,
})
oldPodStatus.Conditions = append(podStatus.Conditions, v1.PodCondition{
Type: v1.PodConditionType("www.example.com/feature"),
Status: v1.ConditionFalse,
})
normalizeStatus(&pod, &oldPodStatus)
normalizeStatus(&pod, &podStatus)
if !isPodStatusByKubeletEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Differences in pod condition not owned by kubelet should not affect normalized equality.")
}
}
func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) {
@ -507,7 +529,7 @@ func TestStaticPod(t *testing.T) {
t.Logf("Should be able to get the static pod status from status manager")
retrievedStatus := expectPodStatus(t, m, staticPod)
normalizeStatus(staticPod, &status)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
m.syncBatch()
@ -520,10 +542,10 @@ func TestStaticPod(t *testing.T) {
t.Logf("Should be able to get the mirror pod status from status manager")
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should sync pod because the corresponding mirror pod is created")
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("syncBatch should not sync any pods because nothing is changed.")
m.testSyncBatch()
@ -741,7 +763,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Errorf("Pod status is different, a reconciliation is needed")
}
syncer.syncBatch()
verifyActions(t, syncer, []core.Action{getAction(), updateAction()})
verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
}
func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
@ -755,18 +777,16 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
func TestDeletePods(t *testing.T) {
pod := getTestPod()
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = new(metav1.Time)
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podManager.AddPod(pod)
status := getRandomPodStatus()
now := metav1.Now()
status.StartTime = &now
m.SetPodStatus(pod, status)
t.Logf("Expect to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
}
func TestDoNotDeleteMirrorPods(t *testing.T) {
@ -779,7 +799,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
kubetypes.ConfigMirrorAnnotationKey: "mirror",
}
t.Logf("Set the deletion timestamp.")
mirrorPod.DeletionTimestamp = new(metav1.Time)
mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client)
m.podManager.AddPod(staticPod)
@ -795,7 +815,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
m.SetPodStatus(staticPod, status)
t.Logf("Expect not to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
}
func TestUpdateLastTransitionTime(t *testing.T) {
@ -867,6 +887,197 @@ func updateAction() core.UpdateAction {
return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
}
func patchAction() core.PatchAction {
return core.PatchActionImpl{ActionImpl: core.ActionImpl{Verb: "patch", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
}
func deleteAction() core.DeleteAction {
return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}}
}
func TestMergePodStatus(t *testing.T) {
useCases := []struct {
desc string
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
}{
{
"no change",
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { return input },
getPodStatus(),
},
{
"readiness changes",
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus { return input },
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition and readiness changes",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"additional pod condition changes",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionFalse,
})
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.PodConditionType("example.com/feature"),
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
}
for _, tc := range useCases {
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()))
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output)
}
}
}
func statusEqual(left, right v1.PodStatus) bool {
left.Conditions = nil
right.Conditions = nil
return reflect.DeepEqual(left, right)
}
func conditionsEqual(left, right []v1.PodCondition) bool {
if len(left) != len(right) {
return false
}
for _, l := range left {
found := false
for _, r := range right {
if l.Type == r.Type {
found = true
if l.Status != r.Status {
return false
}
}
}
if !found {
return false
}
}
return true
}
func getPodStatus() v1.PodStatus {
return v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Message: "Message",
}
}

View File

@ -12,6 +12,7 @@ go_library(
"constants.go",
"doc.go",
"labels.go",
"pod_status.go",
"pod_update.go",
"types.go",
],
@ -29,6 +30,7 @@ go_test(
name = "go_default_test",
srcs = [
"labels_test.go",
"pod_status_test.go",
"pod_update_test.go",
"types_test.go",
],

View File

@ -0,0 +1,39 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"k8s.io/api/core/v1"
)
// PodConditionsByKubelet is the list of pod conditions owned by kubelet
var PodConditionsByKubelet = []v1.PodConditionType{
v1.PodScheduled,
v1.PodReady,
v1.PodInitialized,
v1.PodReasonUnschedulable,
}
// PodConditionByKubelet returns if the pod condition type is owned by kubelet
func PodConditionByKubelet(conditionType v1.PodConditionType) bool {
for _, c := range PodConditionsByKubelet {
if c == conditionType {
return true
}
}
return false
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"k8s.io/api/core/v1"
"testing"
)
func TestPodConditionByKubelet(t *testing.T) {
trueCases := []v1.PodConditionType{
v1.PodScheduled,
v1.PodReady,
v1.PodInitialized,
v1.PodReasonUnschedulable,
}
for _, tc := range trueCases {
if !PodConditionByKubelet(tc) {
t.Errorf("Expect %q to be condition owned by kubelet.", tc)
}
}
falseCases := []v1.PodConditionType{
v1.PodConditionType("abcd"),
}
for _, tc := range falseCases {
if PodConditionByKubelet(tc) {
t.Errorf("Expect %q NOT to be condition owned by kubelet.", tc)
}
}
}

View File

@ -45,6 +45,7 @@ filegroup(
"//pkg/util/nsenter:all-srcs",
"//pkg/util/oom:all-srcs",
"//pkg/util/parsers:all-srcs",
"//pkg/util/pod:all-srcs",
"//pkg/util/pointer:all-srcs",
"//pkg/util/procfs:all-srcs",
"//pkg/util/reflector/prometheus:all-srcs",

39
pkg/util/pod/BUILD Normal file
View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["pod.go"],
importpath = "k8s.io/kubernetes/pkg/util/pod",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["pod_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

63
pkg/util/pod/pod.go Normal file
View File

@ -0,0 +1,63 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
)
// PatchPodStatus patches pod status.
func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) {
patchBytes, err := preparePatchBytesforPodStatus(namespace, name, oldPodStatus, newPodStatus)
if err != nil {
return nil, nil, err
}
updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)
}
return updatedPod, patchBytes, nil
}
func preparePatchBytesforPodStatus(namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) {
oldData, err := json.Marshal(v1.Pod{
Status: oldPodStatus,
})
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err)
}
newData, err := json.Marshal(v1.Pod{
Status: newPodStatus,
})
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err)
}
return patchBytes, nil
}

116
pkg/util/pod/pod_test.go Normal file
View File

@ -0,0 +1,116 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"testing"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"reflect"
)
func TestPatchPodStatus(t *testing.T) {
ns := "ns"
name := "name"
client := &fake.Clientset{}
client.CoreV1().Pods(ns).Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
})
testCases := []struct {
description string
mutate func(input v1.PodStatus) v1.PodStatus
expectedPatchBytes []byte
}{
{
"no change",
func(input v1.PodStatus) v1.PodStatus { return input },
[]byte(fmt.Sprintf(`{}`)),
},
{
"message change",
func(input v1.PodStatus) v1.PodStatus {
input.Message = "random message"
return input
},
[]byte(fmt.Sprintf(`{"status":{"message":"random message"}}`)),
},
{
"pod condition change",
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
return input
},
[]byte(fmt.Sprintf(`{"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)),
},
{
"additional init container condition",
func(input v1.PodStatus) v1.PodStatus {
input.InitContainerStatuses = []v1.ContainerStatus{
{
Name: "init-container",
Ready: true,
},
}
return input
},
[]byte(fmt.Sprintf(`{"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)),
},
}
for _, tc := range testCases {
_, patchBytes, err := PatchPodStatus(client, ns, name, getPodStatus(), tc.mutate(getPodStatus()))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) {
t.Errorf("for test case %q, expect patchBytes: %q, got: %q\n", tc.description, tc.expectedPatchBytes, patchBytes)
}
}
}
func getPodStatus() v1.PodStatus {
return v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
ContainerStatuses: []v1.ContainerStatus{
{
Name: "container1",
Ready: true,
},
{
Name: "container2",
Ready: true,
},
},
Message: "Message",
}
}

View File

@ -119,7 +119,7 @@ func NodeRules() []rbacv1.PolicyRule {
rbacv1helpers.NewRule("create", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
// Needed for the node to report status of pods it is running.
// Use the NodeRestriction admission plugin to limit a node to updating status of pods bound to itself.
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
// Needed for the node to create pod evictions.
// Use the NodeRestriction admission plugin to limit a node to creating evictions for pods bound to itself.
rbacv1helpers.NewRule("create").Groups(legacyGroup).Resources("pods/eviction").RuleOrDie(),

View File

@ -1098,6 +1098,7 @@ items:
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""

View File

@ -22,6 +22,8 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],

View File

@ -25,6 +25,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
)
@ -72,7 +74,6 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc {
return func(action Action) (bool, runtime.Object, error) {
ns := action.GetNamespace()
gvr := action.GetResource()
// Here and below we need to switch on implementation types,
// not on interfaces, as some interfaces are identical
// (e.g. UpdateAction and CreateAction), so if we use them,
@ -125,6 +126,34 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc {
}
return true, nil, nil
case PatchActionImpl:
obj, err := tracker.Get(gvr, ns, action.GetName())
if err != nil {
// object is not registered
return false, nil, err
}
old, err := json.Marshal(obj)
if err != nil {
return true, nil, err
}
// Only supports strategic merge patch
// TODO: Add support for other Patch types
mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
if err != nil {
return true, nil, err
}
if err = json.Unmarshal(mergedByte, obj); err != nil {
return true, nil, err
}
if err = tracker.Update(gvr, obj, ns); err != nil {
return true, nil, err
}
return true, obj, nil
default:
return false, nil, fmt.Errorf("no reaction implemented for %s", action)
}

View File

@ -6,6 +6,10 @@
"./..."
],
"Deps": [
{
"ImportPath": "github.com/davecgh/go-spew/spew",
"Rev": "782f4967f2dc4564575ca782fe2d04090b5faca8"
},
{
"ImportPath": "github.com/ghodss/yaml",
"Rev": "73d445a93680fa1a78ae23a5839bad48f32ba1ee"
@ -362,6 +366,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/util/json",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/net",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
@ -374,6 +382,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/util/sets",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/validation",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
@ -398,6 +410,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
@ -465,6 +481,10 @@
{
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/kube-openapi/pkg/util/proto",
"Rev": "86e28c192d2743f0232b9bc5f0a531568ef9f2a5"
}
]
}