Merge pull request #112618 from jingyuanliang/fastStatusUpdateOnce

kubelet: Keep trying fast status update at startup until node is ready
This commit is contained in:
Kubernetes Prow Robot 2022-11-09 13:30:53 -08:00 committed by GitHub
commit 2c1b7f5759
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 330 additions and 59 deletions

View File

@ -28,7 +28,6 @@ import (
"path/filepath"
sysruntime "runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -128,6 +127,10 @@ const (
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// nodeReadyGracePeriod is the period to allow for before fast status update is
// terminated and container runtime not being ready is logged without verbosity guard.
nodeReadyGracePeriod = 120 * time.Second
// DefaultContainerLogsDir is the location of container logs.
DefaultContainerLogsDir = "/var/log/containers"
@ -1063,6 +1066,12 @@ type Kubelet struct {
// used for generating ContainerStatus.
reasonCache *ReasonCache
// containerRuntimeReadyExpected indicates whether container runtime being ready is expected
// so errors are logged without verbosity guard, to avoid excessive error logs at node startup.
// It's false during the node initialization period of nodeReadyGracePeriod, and after that
// it's set to true by fastStatusUpdateOnce when it exits.
containerRuntimeReadyExpected bool
// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
// feature is not enabled, it is also the frequency that kubelet posts node status to master.
// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
@ -1085,15 +1094,15 @@ type Kubelet struct {
lastStatusReportTime time.Time
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
// This lock is used by Kubelet.syncNodeStatus and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
// This lock is used by Kubelet.updatePodCIDR function and shouldn't be used anywhere else.
updatePodCIDRMux sync.Mutex
// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
// This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
// This lock is used by Kubelet.updateRuntimeUp and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else.
updateRuntimeMux sync.Mutex
// nodeLeaseController claims and renews the node lease for this Kubelet
@ -1502,6 +1511,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
@ -2435,9 +2450,13 @@ func (kl *Kubelet) updateRuntimeUp() {
}
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
klogErrorS := klog.ErrorS
if !kl.containerRuntimeReadyExpected {
klogErrorS = klog.V(4).ErrorS
}
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
@ -2447,7 +2466,7 @@ func (kl *Kubelet) updateRuntimeUp() {
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}
@ -2502,31 +2521,25 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str
}
}
// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
// is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
// a runtime update and a node status update. Function returns after one successful node status update.
// fastStatusUpdateOnce starts a loop that checks if the current state of kubelet + container runtime
// would be able to turn the node ready, and sync the ready state to the apiserver as soon as possible.
// Function returns after the node status update after such event, or when the node is already ready.
// Function is executed only during Kubelet start which improves latency to ready node by updating
// pod CIDR, runtime status and node statuses ASAP.
// kubelet state, runtime status and node statuses ASAP.
func (kl *Kubelet) fastStatusUpdateOnce() {
ctx := context.Background()
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "Error getting node")
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(ctx, podCIDRs); err != nil {
klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
continue
}
kl.updateRuntimeUp()
kl.syncNodeStatus()
return
}
start := kl.clock.Now()
stopCh := make(chan struct{})
// Keep trying to make fast node status update until either timeout is reached or an update is successful.
wait.Until(func() {
// fastNodeStatusUpdate returns true when it succeeds or when the grace period has expired
// (status was not updated within nodeReadyGracePeriod and the second argument below gets true),
// then we close the channel and abort the loop.
if kl.fastNodeStatusUpdate(ctx, kl.clock.Since(start) >= nodeReadyGracePeriod) {
close(stopCh)
}
}, 100*time.Millisecond, stopCh)
}
// CheckpointContainer tries to checkpoint a container. The parameters are used to

View File

@ -429,6 +429,85 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
return node, nil
}
// fastNodeStatusUpdate is a "lightweight" version of syncNodeStatus which doesn't hit the
// apiserver except for the final run, to be called by fastStatusUpdateOnce in each loop.
// It holds the same lock as syncNodeStatus and is thread-safe when called concurrently with
// syncNodeStatus. Its return value indicates whether the loop running it should exit
// (final run), and it also sets kl.containerRuntimeReadyExpected.
func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (completed bool) {
kl.syncNodeStatusMux.Lock()
defer func() {
kl.syncNodeStatusMux.Unlock()
if completed {
// containerRuntimeReadyExpected is read by updateRuntimeUp().
// Not going for a more granular mutex as this path runs only once.
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
kl.containerRuntimeReadyExpected = true
}
}()
if timeout {
klog.ErrorS(nil, "Node not becoming ready in time after startup")
return true
}
originalNode, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "Error getting the current node from lister")
return false
}
readyIdx, originalNodeReady := nodeutil.GetNodeCondition(&originalNode.Status, v1.NodeReady)
if readyIdx == -1 {
klog.ErrorS(nil, "Node does not have NodeReady condition", "originalNode", originalNode)
return false
}
if originalNodeReady.Status == v1.ConditionTrue {
return true
}
// This is in addition to the regular syncNodeStatus logic so we can get the container runtime status earlier.
// This function itself has a mutex and it doesn't recursively call fastNodeStatusUpdate or syncNodeStatus.
kl.updateRuntimeUp()
node, changed := kl.updateNode(ctx, originalNode)
if !changed {
// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
return false
}
readyIdx, nodeReady := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if readyIdx == -1 {
klog.ErrorS(nil, "Node does not have NodeReady condition", "node", node)
return false
}
if nodeReady.Status == v1.ConditionFalse {
return false
}
klog.InfoS("Fast updating node status as it just became ready")
if _, err := kl.patchNodeStatus(originalNode, node); err != nil {
// The originalNode is probably stale, but we know that the current state of kubelet would turn
// the node to be ready. Retry using syncNodeStatus() which fetches from the apiserver.
klog.ErrorS(err, "Error updating node status, will retry with syncNodeStatus")
// The reversed kl.syncNodeStatusMux.Unlock/Lock() below to allow kl.syncNodeStatus() execution.
kl.syncNodeStatusMux.Unlock()
kl.syncNodeStatus()
// This lock action is unnecessary if we add a flag to check in the defer before unlocking it,
// but having it here makes the logic a bit easier to read.
kl.syncNodeStatusMux.Lock()
}
// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
return true
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.
@ -479,21 +558,40 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
node, changed := kl.updateNode(ctx, originalNode)
shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency
if !shouldPatchNodeStatus {
kl.markVolumesFromNode(node)
return nil
}
updatedNode, err := kl.patchNodeStatus(originalNode, node)
if err == nil {
kl.markVolumesFromNode(updatedNode)
}
return err
}
// updateNode creates a copy of originalNode and runs update logic on it.
// It returns the updated node object and a bool indicating if anything has been changed.
func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
node := originalNode.DeepCopy()
podCIDRChanged := false
if len(node.Spec.PodCIDRs) != 0 {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
var err error
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
klog.ErrorS(err, "Error updating pod CIDR")
@ -521,41 +619,48 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
kl.setNodeStatus(ctx, node)
now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
return nil
}
}
changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
return node, changed
}
// patchNodeStatus patches node on the API server based on originalNode.
// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful.
func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
return nil, err
}
kl.lastStatusReportTime = now
kl.lastStatusReportTime = kl.clock.Now()
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
return updatedNode, nil
}
// markVolumesFromNode updates volumeManager with VolumesInUse status from node.
//
// In the case of node status update being unnecessary, call with the fetched node.
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
//
// Or, after a successful node status update, call with updatedNode returned from
// the patch call, to mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
}
// recordNodeStatusEvent records an event of the given type with the given

View File

@ -1134,6 +1134,159 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
}
}
func TestFastStatusUpdateOnce(t *testing.T) {
tests := []struct {
name string
beforeMarkReady int
beforeNextReady int
beforeTimeout int
wantCalls int
patchFailures int
wantPatches int
}{
{
name: "timeout after third loop",
beforeMarkReady: 9,
beforeNextReady: 9,
beforeTimeout: 2,
wantCalls: 3,
},
{
name: "already ready on third loop",
beforeMarkReady: 9,
beforeNextReady: 1,
beforeTimeout: 9,
wantCalls: 2,
},
{
name: "turns ready on third loop",
beforeMarkReady: 2,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: 3,
wantPatches: 1,
},
{
name: "turns ready on second loop then first patch fails",
beforeMarkReady: 1,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: 3,
patchFailures: 1,
wantPatches: 2,
},
{
name: "turns ready on second loop then all patches fail",
beforeMarkReady: 1,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: nodeStatusUpdateRetry + 2,
patchFailures: nodeStatusUpdateRetry + 2,
wantPatches: nodeStatusUpdateRetry + 1,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
// Ensure we capture actions on the heartbeat client only.
// We don't set it to nil or GetNode() doesn't read from nodeLister.
kubelet.kubeClient = &fake.Clientset{}
kubeClient := testKubelet.fakeKubeClient
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(kubelet.nodeName),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "NotReady",
Message: "Node not ready",
},
},
},
}
nodeLister := testNodeLister{[]*v1.Node{node.DeepCopy()}}
kubelet.nodeLister = nodeLister
callCount := 0
// The original node status functions turn the node ready.
nodeStatusFuncs := kubelet.setNodeStatusFuncs
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{func(ctx context.Context, node *v1.Node) error {
assert.False(t, kubelet.containerRuntimeReadyExpected)
callCount++
var lastErr error
if callCount > tc.beforeMarkReady {
for _, f := range nodeStatusFuncs {
if err := f(ctx, node); err != nil {
lastErr = err
}
}
}
if callCount > tc.beforeNextReady {
nodeLister.nodes[0].Status.Conditions[0].Status = v1.ConditionTrue
}
if callCount > tc.beforeTimeout {
testKubelet.fakeClock.Step(nodeReadyGracePeriod)
}
return lastErr
}}
patchCount := 0
kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) {
assert.False(t, kubelet.containerRuntimeReadyExpected)
patchCount++
if patchCount > tc.patchFailures {
return false, nil, nil
}
return true, nil, fmt.Errorf("try again")
})
kubelet.fastStatusUpdateOnce()
assert.True(t, kubelet.containerRuntimeReadyExpected)
assert.Equal(t, tc.wantCalls, callCount)
assert.Equal(t, tc.wantPatches, patchCount)
actions := kubeClient.Actions()
if tc.wantPatches == 0 {
require.Len(t, actions, 0)
return
}
// patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
require.Len(t, actions, 2*tc.wantPatches-1)
for i, action := range actions {
if i%2 == 1 {
require.IsType(t, core.GetActionImpl{}, action)
continue
}
require.IsType(t, core.PatchActionImpl{}, action)
patchAction := action.(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(node, patchAction.GetPatch())
require.NoError(t, err)
seenNodeReady := false
for _, c := range updatedNode.Status.Conditions {
if c.Type == v1.NodeReady {
assert.Equal(t, v1.ConditionTrue, c.Status)
seenNodeReady = true
}
}
assert.True(t, seenNodeReady)
}
})
}
}
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()