kubelet: Avoid sending no-op patches

In an e2e run, out of 1857 pod status updates executed by the
Kubelet 453 (25%) were no-ops - they only contained the UID of
the pod and no status changes. If the patch is a no-op we can
avoid invoking the server and continue.
This commit is contained in:
Clayton Coleman 2020-02-26 17:05:33 -05:00
parent 5ceddce539
commit b252865479
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
4 changed files with 51 additions and 29 deletions

View File

@ -541,15 +541,19 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
} }
oldStatus := pod.Status.DeepCopy() oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil { if err != nil {
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return return
} }
pod = newPod if unchanged {
klog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version)
} else {
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
pod = newPod
}
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// We don't handle graceful deletion of mirror pods. // We don't handle graceful deletion of mirror pods.

View File

@ -96,6 +96,7 @@ func getRandomPodStatus() v1.PodStatus {
} }
func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) { func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) {
t.Helper()
manager.consumeUpdates() manager.consumeUpdates()
actions := manager.kubeClient.(*fake.Clientset).Actions() actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions() defer manager.kubeClient.(*fake.Clientset).ClearActions()
@ -401,17 +402,17 @@ func TestStaleUpdates(t *testing.T) {
t.Logf("Nothing left in the channel to sync") t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{}) verifyActions(t, m, []core.Action{})
t.Log("Unchanged status should not send an update.") t.Log("Unchanged status should not send an update")
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
t.Log("... unless it's stale.") t.Log("... even if it's stale as long as nothing changes")
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
m.syncBatch() m.syncBatch()
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction()})
t.Logf("Nothing stuck in the pipe.") t.Logf("Nothing stuck in the pipe.")
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
@ -821,8 +822,9 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) { if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status is the same, a reconciliation is not needed") t.Fatalf("Pod status is the same, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
@ -835,17 +837,19 @@ func TestReconcilePodStatus(t *testing.T) {
testPod.Status.StartTime = &normalizedStartTime testPod.Status.StartTime = &normalizedStartTime
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) { if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed") t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
testPod.Status = getRandomPodStatus() changedPodStatus := getRandomPodStatus()
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if !syncer.needsReconcile(testPod.UID, podStatus) { if !syncer.needsReconcile(testPod.UID, changedPodStatus) {
t.Errorf("Pod status is different, a reconciliation is needed") t.Fatalf("Pod status is different, a reconciliation is needed")
} }
syncer.SetPodStatus(testPod, changedPodStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package pod package pod
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -28,26 +29,29 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
) )
// PatchPodStatus patches pod status. // PatchPodStatus patches pod status. It returns true and avoids an update if the patch contains no changes.
func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) {
patchBytes, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) patchBytes, unchanged, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, false, err
}
if unchanged {
return nil, patchBytes, true, nil
} }
updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) return nil, nil, false, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)
} }
return updatedPod, patchBytes, nil return updatedPod, patchBytes, false, nil
} }
func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, bool, error) {
oldData, err := json.Marshal(v1.Pod{ oldData, err := json.Marshal(v1.Pod{
Status: oldPodStatus, Status: oldPodStatus,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err)
} }
newData, err := json.Marshal(v1.Pod{ newData, err := json.Marshal(v1.Pod{
@ -55,12 +59,12 @@ func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPod
Status: newPodStatus, Status: newPodStatus,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err)
} }
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err)
} }
return patchBytes, nil return patchBytes, bytes.Equal(patchBytes, []byte(fmt.Sprintf(`{"metadata":{"uid":%q}}`, uid))), nil
} }

View File

@ -44,11 +44,13 @@ func TestPatchPodStatus(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
mutate func(input v1.PodStatus) v1.PodStatus mutate func(input v1.PodStatus) v1.PodStatus
expectUnchanged bool
expectedPatchBytes []byte expectedPatchBytes []byte
}{ }{
{ {
"no change", "no change",
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
true,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)),
}, },
{ {
@ -57,6 +59,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Message = "random message" input.Message = "random message"
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)),
}, },
{ {
@ -65,6 +68,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Conditions[0].Status = v1.ConditionFalse input.Conditions[0].Status = v1.ConditionFalse
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)),
}, },
{ {
@ -78,17 +82,23 @@ func TestPatchPodStatus(t *testing.T) {
} }
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)),
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
_, patchBytes, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) t.Run(tc.description, func(t *testing.T) {
if err != nil { _, patchBytes, unchanged, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus()))
t.Errorf("unexpected error: %v", err) if err != nil {
} t.Errorf("unexpected error: %v", err)
if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) { }
t.Errorf("for test case %q, expect patchBytes: %q, got: %q\n", tc.description, tc.expectedPatchBytes, patchBytes) if unchanged != tc.expectUnchanged {
} t.Errorf("unexpected change: %t", unchanged)
}
if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) {
t.Errorf("expect patchBytes: %q, got: %q\n", tc.expectedPatchBytes, patchBytes)
}
})
} }
} }