kubelet: Keep trying fast status update at startup until node is ready

This commit is contained in:
Jingyuan Liang 2022-10-31 09:48:48 +00:00
parent 4a50fc4b8c
commit 9f5c5b82a9
3 changed files with 272 additions and 27 deletions

View File

@ -28,7 +28,6 @@ import (
"path/filepath"
sysruntime "runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -128,6 +127,10 @@ const (
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// nodeReadyGracePeriod is the period to allow for before fast status update is
// terminated and container runtime not being ready is logged without verbosity guard.
nodeReadyGracePeriod = 120 * time.Second
// DefaultContainerLogsDir is the location of container logs.
DefaultContainerLogsDir = "/var/log/containers"
@ -1063,6 +1066,12 @@ type Kubelet struct {
// used for generating ContainerStatus.
reasonCache *ReasonCache
// containerRuntimeReadyExpected indicates whether container runtime being ready is expected
// so errors are logged without verbosity guard, to avoid excessive error logs at node startup.
// It's false during the node initialization period of nodeReadyGracePeriod, and after that
// it's set to true by fastStatusUpdateOnce when it exits.
containerRuntimeReadyExpected bool
// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
// feature is not enabled, it is also the frequency that kubelet posts node status to master.
// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
@ -1085,15 +1094,15 @@ type Kubelet struct {
lastStatusReportTime time.Time
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
// This lock is used by Kubelet.syncNodeStatus and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
// This lock is used by Kubelet.updatePodCIDR function and shouldn't be used anywhere else.
updatePodCIDRMux sync.Mutex
// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
// This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
// This lock is used by Kubelet.updateRuntimeUp and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else.
updateRuntimeMux sync.Mutex
// nodeLeaseController claims and renews the node lease for this Kubelet
@ -1502,6 +1511,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
@ -2435,9 +2450,13 @@ func (kl *Kubelet) updateRuntimeUp() {
}
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
klogErrorS := klog.ErrorS
if !kl.containerRuntimeReadyExpected {
klogErrorS = klog.V(4).ErrorS
}
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
@ -2447,7 +2466,7 @@ func (kl *Kubelet) updateRuntimeUp() {
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}
@ -2502,31 +2521,25 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str
}
}
// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
// is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
// a runtime update and a node status update. Function returns after one successful node status update.
// fastStatusUpdateOnce starts a loop that checks if the current state of kubelet + container runtime
// would be able to turn the node ready, and sync the ready state to the apiserver as soon as possible.
// Function returns after the node status update after such event, or when the node is already ready.
// Function is executed only during Kubelet start which improves latency to ready node by updating
// pod CIDR, runtime status and node statuses ASAP.
// kubelet state, runtime status and node statuses ASAP.
func (kl *Kubelet) fastStatusUpdateOnce() {
ctx := context.Background()
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "Error getting node")
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(ctx, podCIDRs); err != nil {
klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
continue
}
kl.updateRuntimeUp()
kl.syncNodeStatus()
return
}
start := kl.clock.Now()
stopCh := make(chan struct{})
// Keep trying to make fast node status update until either timeout is reached or an update is successful.
wait.Until(func() {
// fastNodeStatusUpdate returns true when it succeeds or when the grace period has expired
// (status was not updated within nodeReadyGracePeriod and the second argument below gets true),
// then we close the channel and abort the loop.
if kl.fastNodeStatusUpdate(ctx, kl.clock.Since(start) >= nodeReadyGracePeriod) {
close(stopCh)
}
}, 100*time.Millisecond, stopCh)
}
// CheckpointContainer tries to checkpoint a container. The parameters are used to

View File

@ -429,6 +429,85 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
return node, nil
}
// fastNodeStatusUpdate is a "lightweight" version of syncNodeStatus which doesn't hit the
// apiserver except for the final run, to be called by fastStatusUpdateOnce in each loop.
// It holds the same lock as syncNodeStatus and is thread-safe when called concurrently with
// syncNodeStatus. Its return value indicates whether the loop running it should exit
// (final run), and it also sets kl.containerRuntimeReadyExpected.
func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (completed bool) {
kl.syncNodeStatusMux.Lock()
defer func() {
kl.syncNodeStatusMux.Unlock()
if completed {
// containerRuntimeReadyExpected is read by updateRuntimeUp().
// Not going for a more granular mutex as this path runs only once.
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
kl.containerRuntimeReadyExpected = true
}
}()
if timeout {
klog.ErrorS(nil, "Node not becoming ready in time after startup")
return true
}
originalNode, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "Error getting the current node from lister")
return false
}
readyIdx, originalNodeReady := nodeutil.GetNodeCondition(&originalNode.Status, v1.NodeReady)
if readyIdx == -1 {
klog.ErrorS(nil, "Node does not have NodeReady condition", "originalNode", originalNode)
return false
}
if originalNodeReady.Status == v1.ConditionTrue {
return true
}
// This is in addition to the regular syncNodeStatus logic so we can get the container runtime status earlier.
// This function itself has a mutex and it doesn't recursively call fastNodeStatusUpdate or syncNodeStatus.
kl.updateRuntimeUp()
node, changed := kl.updateNode(ctx, originalNode)
if !changed {
// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
return false
}
readyIdx, nodeReady := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if readyIdx == -1 {
klog.ErrorS(nil, "Node does not have NodeReady condition", "node", node)
return false
}
if nodeReady.Status == v1.ConditionFalse {
return false
}
klog.InfoS("Fast updating node status as it just became ready")
if _, err := kl.patchNodeStatus(originalNode, node); err != nil {
// The originalNode is probably stale, but we know that the current state of kubelet would turn
// the node to be ready. Retry using syncNodeStatus() which fetches from the apiserver.
klog.ErrorS(err, "Error updating node status, will retry with syncNodeStatus")
// The reversed kl.syncNodeStatusMux.Unlock/Lock() below to allow kl.syncNodeStatus() execution.
kl.syncNodeStatusMux.Unlock()
kl.syncNodeStatus()
// This lock action is unnecessary if we add a flag to check in the defer before unlocking it,
// but having it here makes the logic a bit easier to read.
kl.syncNodeStatusMux.Lock()
}
// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
return true
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.

View File

@ -1134,6 +1134,159 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
}
}
func TestFastStatusUpdateOnce(t *testing.T) {
tests := []struct {
name string
beforeMarkReady int
beforeNextReady int
beforeTimeout int
wantCalls int
patchFailures int
wantPatches int
}{
{
name: "timeout after third loop",
beforeMarkReady: 9,
beforeNextReady: 9,
beforeTimeout: 2,
wantCalls: 3,
},
{
name: "already ready on third loop",
beforeMarkReady: 9,
beforeNextReady: 1,
beforeTimeout: 9,
wantCalls: 2,
},
{
name: "turns ready on third loop",
beforeMarkReady: 2,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: 3,
wantPatches: 1,
},
{
name: "turns ready on second loop then first patch fails",
beforeMarkReady: 1,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: 3,
patchFailures: 1,
wantPatches: 2,
},
{
name: "turns ready on second loop then all patches fail",
beforeMarkReady: 1,
beforeNextReady: 9,
beforeTimeout: 9,
wantCalls: nodeStatusUpdateRetry + 2,
patchFailures: nodeStatusUpdateRetry + 2,
wantPatches: nodeStatusUpdateRetry + 1,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
// Ensure we capture actions on the heartbeat client only.
// We don't set it to nil or GetNode() doesn't read from nodeLister.
kubelet.kubeClient = &fake.Clientset{}
kubeClient := testKubelet.fakeKubeClient
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(kubelet.nodeName),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "NotReady",
Message: "Node not ready",
},
},
},
}
nodeLister := testNodeLister{[]*v1.Node{node.DeepCopy()}}
kubelet.nodeLister = nodeLister
callCount := 0
// The original node status functions turn the node ready.
nodeStatusFuncs := kubelet.setNodeStatusFuncs
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{func(ctx context.Context, node *v1.Node) error {
assert.False(t, kubelet.containerRuntimeReadyExpected)
callCount++
var lastErr error
if callCount > tc.beforeMarkReady {
for _, f := range nodeStatusFuncs {
if err := f(ctx, node); err != nil {
lastErr = err
}
}
}
if callCount > tc.beforeNextReady {
nodeLister.nodes[0].Status.Conditions[0].Status = v1.ConditionTrue
}
if callCount > tc.beforeTimeout {
testKubelet.fakeClock.Step(nodeReadyGracePeriod)
}
return lastErr
}}
patchCount := 0
kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) {
assert.False(t, kubelet.containerRuntimeReadyExpected)
patchCount++
if patchCount > tc.patchFailures {
return false, nil, nil
}
return true, nil, fmt.Errorf("try again")
})
kubelet.fastStatusUpdateOnce()
assert.True(t, kubelet.containerRuntimeReadyExpected)
assert.Equal(t, tc.wantCalls, callCount)
assert.Equal(t, tc.wantPatches, patchCount)
actions := kubeClient.Actions()
if tc.wantPatches == 0 {
require.Len(t, actions, 0)
return
}
// patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
require.Len(t, actions, 2*tc.wantPatches-1)
for i, action := range actions {
if i%2 == 1 {
require.IsType(t, core.GetActionImpl{}, action)
continue
}
require.IsType(t, core.PatchActionImpl{}, action)
patchAction := action.(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(node, patchAction.GetPatch())
require.NoError(t, err)
seenNodeReady := false
for _, c := range updatedNode.Status.Conditions {
if c.Type == v1.NodeReady {
assert.Equal(t, v1.ConditionTrue, c.Status)
seenNodeReady = true
}
}
assert.True(t, seenNodeReady)
}
})
}
}
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()