Add UID precondition to kubelet pod status patch updates

This commit is contained in:
Jordan Liggitt 2019-12-16 14:27:32 -05:00
parent 3dc521ac04
commit a65d8aeb76
4 changed files with 23 additions and 16 deletions

View File

@ -24,7 +24,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -540,7 +540,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
} }
oldStatus := pod.Status.DeepCopy() oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status)) newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil { if err != nil {
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
@ -20,6 +21,7 @@ go_test(
deps = [ deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
], ],
) )

View File

@ -20,15 +20,16 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
) )
// PatchPodStatus patches pod status. // PatchPodStatus patches pod status.
func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) {
patchBytes, err := preparePatchBytesForPodStatus(namespace, name, oldPodStatus, newPodStatus) patchBytes, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -40,7 +41,7 @@ func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus,
return updatedPod, patchBytes, nil return updatedPod, patchBytes, nil
} }
func preparePatchBytesForPodStatus(namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) {
oldData, err := json.Marshal(v1.Pod{ oldData, err := json.Marshal(v1.Pod{
Status: oldPodStatus, Status: oldPodStatus,
}) })
@ -49,7 +50,8 @@ func preparePatchBytesForPodStatus(namespace, name string, oldPodStatus, newPodS
} }
newData, err := json.Marshal(v1.Pod{ newData, err := json.Marshal(v1.Pod{
Status: newPodStatus, ObjectMeta: metav1.ObjectMeta{UID: uid}, // only put the uid in the new object to ensure it appears in the patch as a precondition
Status: newPodStatus,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err)

View File

@ -17,18 +17,21 @@ limitations under the License.
package pod package pod
import ( import (
"fmt"
"testing" "testing"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"reflect" "reflect"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
) )
func TestPatchPodStatus(t *testing.T) { func TestPatchPodStatus(t *testing.T) {
ns := "ns" ns := "ns"
name := "name" name := "name"
uid := types.UID("myuid")
client := &fake.Clientset{} client := &fake.Clientset{}
client.CoreV1().Pods(ns).Create(&v1.Pod{ client.CoreV1().Pods(ns).Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -45,7 +48,7 @@ func TestPatchPodStatus(t *testing.T) {
{ {
"no change", "no change",
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
[]byte(fmt.Sprintf(`{}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)),
}, },
{ {
"message change", "message change",
@ -53,7 +56,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Message = "random message" input.Message = "random message"
return input return input
}, },
[]byte(fmt.Sprintf(`{"status":{"message":"random message"}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)),
}, },
{ {
"pod condition change", "pod condition change",
@ -61,7 +64,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Conditions[0].Status = v1.ConditionFalse input.Conditions[0].Status = v1.ConditionFalse
return input return input
}, },
[]byte(fmt.Sprintf(`{"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)),
}, },
{ {
"additional init container condition", "additional init container condition",
@ -74,11 +77,11 @@ func TestPatchPodStatus(t *testing.T) {
} }
return input return input
}, },
[]byte(fmt.Sprintf(`{"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)),
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
_, patchBytes, err := PatchPodStatus(client, ns, name, getPodStatus(), tc.mutate(getPodStatus())) _, patchBytes, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus()))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }