mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #128640 from mengqiy/spreadkubeletlaod
Add random interval to nodeStatusReport interval every time after an actual node status change
This commit is contained in:
commit
c9024e7ae6
@ -1256,6 +1256,12 @@ type Kubelet struct {
|
|||||||
// status to master. It is only used when node lease feature is enabled.
|
// status to master. It is only used when node lease feature is enabled.
|
||||||
nodeStatusReportFrequency time.Duration
|
nodeStatusReportFrequency time.Duration
|
||||||
|
|
||||||
|
// delayAfterNodeStatusChange is the one-time random duration that we add to the next node status report interval
|
||||||
|
// every time when there's an actual node status change. But all future node status update that is not caused by
|
||||||
|
// real status change will stick with nodeStatusReportFrequency. The random duration is a uniform distribution over
|
||||||
|
// [-0.5*nodeStatusReportFrequency, 0.5*nodeStatusReportFrequency]
|
||||||
|
delayAfterNodeStatusChange time.Duration
|
||||||
|
|
||||||
// lastStatusReportTime is the time when node status was last reported.
|
// lastStatusReportTime is the time when node status was last reported.
|
||||||
lastStatusReportTime time.Time
|
lastStatusReportTime time.Time
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ package kubelet
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"sort"
|
"sort"
|
||||||
@ -579,13 +580,21 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
node, changed := kl.updateNode(ctx, originalNode)
|
node, changed := kl.updateNode(ctx, originalNode)
|
||||||
shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency
|
// no need to update the status yet
|
||||||
|
if !changed && !kl.isUpdateStatusPeriodExperid() {
|
||||||
if !shouldPatchNodeStatus {
|
|
||||||
kl.markVolumesFromNode(node)
|
kl.markVolumesFromNode(node)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We need to update the node status, if this is caused by a node change we want to calculate a new
|
||||||
|
// random delay so we avoid all the nodes to reach the apiserver at the same time. If the update is not related
|
||||||
|
// to a node change, because we run over the period, we reset the random delay so the node keeps updating
|
||||||
|
// its status at the same cadence
|
||||||
|
if changed {
|
||||||
|
kl.delayAfterNodeStatusChange = kl.calculateDelay()
|
||||||
|
} else {
|
||||||
|
kl.delayAfterNodeStatusChange = 0
|
||||||
|
}
|
||||||
updatedNode, err := kl.patchNodeStatus(originalNode, node)
|
updatedNode, err := kl.patchNodeStatus(originalNode, node)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
kl.markVolumesFromNode(updatedNode)
|
kl.markVolumesFromNode(updatedNode)
|
||||||
@ -593,6 +602,20 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) isUpdateStatusPeriodExperid() bool {
|
||||||
|
if kl.lastStatusReportTime.IsZero() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency+kl.delayAfterNodeStatusChange {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) calculateDelay() time.Duration {
|
||||||
|
return time.Duration(float64(kl.nodeStatusReportFrequency) * (-0.5 + rand.Float64()))
|
||||||
|
}
|
||||||
|
|
||||||
// updateNode creates a copy of originalNode and runs update logic on it.
|
// updateNode creates a copy of originalNode and runs update logic on it.
|
||||||
// It returns the updated node object and a bool indicating if anything has been changed.
|
// It returns the updated node object and a bool indicating if anything has been changed.
|
||||||
func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
|
func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
|
||||||
|
@ -849,7 +849,10 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
|
|||||||
// Since this test retroactively overrides the stub container manager,
|
// Since this test retroactively overrides the stub container manager,
|
||||||
// we have to regenerate default status setters.
|
// we have to regenerate default status setters.
|
||||||
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
||||||
kubelet.nodeStatusReportFrequency = time.Minute
|
// You will add up to 50% of nodeStatusReportFrequency of additional random latency for
|
||||||
|
// kubelet to determine if update node status is needed due to time passage. We need to
|
||||||
|
// take that into consideration to ensure this test pass all time.
|
||||||
|
kubelet.nodeStatusReportFrequency = 30 * time.Second
|
||||||
|
|
||||||
kubeClient := testKubelet.fakeKubeClient
|
kubeClient := testKubelet.fakeKubeClient
|
||||||
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
|
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
|
||||||
@ -3088,3 +3091,67 @@ func TestUpdateNodeAddresses(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsUpdateStatusPeriodExperid(t *testing.T) {
|
||||||
|
testcases := []struct {
|
||||||
|
name string
|
||||||
|
lastStatusReportTime time.Time
|
||||||
|
delayAfterNodeStatusChange time.Duration
|
||||||
|
expectExpired bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no status update before and no delay",
|
||||||
|
lastStatusReportTime: time.Time{},
|
||||||
|
delayAfterNodeStatusChange: 0,
|
||||||
|
expectExpired: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no status update before and existing delay",
|
||||||
|
lastStatusReportTime: time.Time{},
|
||||||
|
delayAfterNodeStatusChange: 30 * time.Second,
|
||||||
|
expectExpired: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "not expired and no delay",
|
||||||
|
lastStatusReportTime: time.Now().Add(-4 * time.Minute),
|
||||||
|
delayAfterNodeStatusChange: 0,
|
||||||
|
expectExpired: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "not expired",
|
||||||
|
lastStatusReportTime: time.Now().Add(-5 * time.Minute),
|
||||||
|
delayAfterNodeStatusChange: time.Minute,
|
||||||
|
expectExpired: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expired",
|
||||||
|
lastStatusReportTime: time.Now().Add(-4 * time.Minute),
|
||||||
|
delayAfterNodeStatusChange: -2 * time.Minute,
|
||||||
|
expectExpired: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
kubelet := testKubelet.kubelet
|
||||||
|
kubelet.nodeStatusReportFrequency = 5 * time.Minute
|
||||||
|
|
||||||
|
for _, tc := range testcases {
|
||||||
|
kubelet.lastStatusReportTime = tc.lastStatusReportTime
|
||||||
|
kubelet.delayAfterNodeStatusChange = tc.delayAfterNodeStatusChange
|
||||||
|
expired := kubelet.isUpdateStatusPeriodExperid()
|
||||||
|
assert.Equal(t, tc.expectExpired, expired, tc.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCalculateDelay(t *testing.T) {
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
kubelet := testKubelet.kubelet
|
||||||
|
kubelet.nodeStatusReportFrequency = 5 * time.Minute
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
randomDelay := kubelet.calculateDelay()
|
||||||
|
assert.LessOrEqual(t, randomDelay.Abs(), kubelet.nodeStatusReportFrequency/2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user