From 1003d368705bbff088b0fb279da8533b0f23399b Mon Sep 17 00:00:00 2001 From: "Mengqi (David) Yu" Date: Wed, 6 Nov 2024 06:10:48 +0000 Subject: [PATCH] Add random interval to nodeStatusReport interval every time after an actual node status change update TestUpdateNodeStatusWithLease this time to avoid flakiness --- pkg/kubelet/kubelet.go | 6 +++ pkg/kubelet/kubelet_node_status.go | 29 +++++++++-- pkg/kubelet/kubelet_node_status_test.go | 69 ++++++++++++++++++++++++- 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6c1019dca3e..c498027ec86 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1240,6 +1240,12 @@ type Kubelet struct { // status to master. It is only used when node lease feature is enabled. nodeStatusReportFrequency time.Duration + // delayAfterNodeStatusChange is the one-time random duration that we add to the next node status report interval + // every time when there's an actual node status change. But all future node status update that is not caused by + // real status change will stick with nodeStatusReportFrequency. The random duration is a uniform distribution over + // [-0.5*nodeStatusReportFrequency, 0.5*nodeStatusReportFrequency] + delayAfterNodeStatusChange time.Duration + // lastStatusReportTime is the time when node status was last reported. lastStatusReportTime time.Time diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ebe36617f05..0505821a75b 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -19,6 +19,7 @@ package kubelet import ( "context" "fmt" + "math/rand" "net" goruntime "runtime" "sort" @@ -579,13 +580,21 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error } node, changed := kl.updateNode(ctx, originalNode) - shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency - - if !shouldPatchNodeStatus { + // no need to update the status yet + if !changed && !kl.isUpdateStatusPeriodExperid() { kl.markVolumesFromNode(node) return nil } + // We need to update the node status, if this is caused by a node change we want to calculate a new + // random delay so we avoid all the nodes to reach the apiserver at the same time. If the update is not related + // to a node change, because we run over the period, we reset the random delay so the node keeps updating + // its status at the same cadence + if changed { + kl.delayAfterNodeStatusChange = kl.calculateDelay() + } else { + kl.delayAfterNodeStatusChange = 0 + } updatedNode, err := kl.patchNodeStatus(originalNode, node) if err == nil { kl.markVolumesFromNode(updatedNode) @@ -593,6 +602,20 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error return err } +func (kl *Kubelet) isUpdateStatusPeriodExperid() bool { + if kl.lastStatusReportTime.IsZero() { + return false + } + if kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency+kl.delayAfterNodeStatusChange { + return true + } + return false +} + +func (kl *Kubelet) calculateDelay() time.Duration { + return time.Duration(float64(kl.nodeStatusReportFrequency) * (-0.5 + rand.Float64())) +} + // updateNode creates a copy of originalNode and runs update logic on it. // It returns the updated node object and a bool indicating if anything has been changed. func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) { diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 64eca034457..12a6835fd5d 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -849,7 +849,10 @@ func TestUpdateNodeStatusWithLease(t *testing.T) { // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() - kubelet.nodeStatusReportFrequency = time.Minute + // You will add up to 50% of nodeStatusReportFrequency of additional random latency for + // kubelet to determine if update node status is needed due to time passage. We need to + // take that into consideration to ensure this test pass all time. + kubelet.nodeStatusReportFrequency = 30 * time.Second kubeClient := testKubelet.fakeKubeClient existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} @@ -3088,3 +3091,67 @@ func TestUpdateNodeAddresses(t *testing.T) { }) } } + +func TestIsUpdateStatusPeriodExperid(t *testing.T) { + testcases := []struct { + name string + lastStatusReportTime time.Time + delayAfterNodeStatusChange time.Duration + expectExpired bool + }{ + { + name: "no status update before and no delay", + lastStatusReportTime: time.Time{}, + delayAfterNodeStatusChange: 0, + expectExpired: false, + }, + { + name: "no status update before and existing delay", + lastStatusReportTime: time.Time{}, + delayAfterNodeStatusChange: 30 * time.Second, + expectExpired: false, + }, + { + name: "not expired and no delay", + lastStatusReportTime: time.Now().Add(-4 * time.Minute), + delayAfterNodeStatusChange: 0, + expectExpired: false, + }, + { + name: "not expired", + lastStatusReportTime: time.Now().Add(-5 * time.Minute), + delayAfterNodeStatusChange: time.Minute, + expectExpired: false, + }, + { + name: "expired", + lastStatusReportTime: time.Now().Add(-4 * time.Minute), + delayAfterNodeStatusChange: -2 * time.Minute, + expectExpired: true, + }, + } + + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubelet.nodeStatusReportFrequency = 5 * time.Minute + + for _, tc := range testcases { + kubelet.lastStatusReportTime = tc.lastStatusReportTime + kubelet.delayAfterNodeStatusChange = tc.delayAfterNodeStatusChange + expired := kubelet.isUpdateStatusPeriodExperid() + assert.Equal(t, tc.expectExpired, expired, tc.name) + } +} + +func TestCalculateDelay(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubelet.nodeStatusReportFrequency = 5 * time.Minute + + for i := 0; i < 100; i++ { + randomDelay := kubelet.calculateDelay() + assert.LessOrEqual(t, randomDelay.Abs(), kubelet.nodeStatusReportFrequency/2) + } +}