diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 90a161790a7..1a3916a3faa 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -28,7 +28,6 @@ import ( "path/filepath" sysruntime "runtime" "sort" - "strings" "sync" "sync/atomic" "time" @@ -128,6 +127,10 @@ const ( // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 + // nodeReadyGracePeriod is the period to allow for before fast status update is + // terminated and container runtime not being ready is logged without verbosity guard. + nodeReadyGracePeriod = 120 * time.Second + // DefaultContainerLogsDir is the location of container logs. DefaultContainerLogsDir = "/var/log/containers" @@ -1063,6 +1066,12 @@ type Kubelet struct { // used for generating ContainerStatus. reasonCache *ReasonCache + // containerRuntimeReadyExpected indicates whether container runtime being ready is expected + // so errors are logged without verbosity guard, to avoid excessive error logs at node startup. + // It's false during the node initialization period of nodeReadyGracePeriod, and after that + // it's set to true by fastStatusUpdateOnce when it exits. + containerRuntimeReadyExpected bool + // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease // feature is not enabled, it is also the frequency that kubelet posts node status to master. // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod @@ -1085,15 +1094,15 @@ type Kubelet struct { lastStatusReportTime time.Time // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. - // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else. + // This lock is used by Kubelet.syncNodeStatus and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else. syncNodeStatusMux sync.Mutex // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe. - // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else. + // This lock is used by Kubelet.updatePodCIDR function and shouldn't be used anywhere else. updatePodCIDRMux sync.Mutex // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe. - // This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else. + // This lock is used by Kubelet.updateRuntimeUp and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else. updateRuntimeMux sync.Mutex // nodeLeaseController claims and renews the node lease for this Kubelet @@ -1502,6 +1511,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { + // Start two go-routines to update the status. + // + // The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals, + // while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver + // once the node becomes ready, then exits afterwards. + // // Introduce some small jittering to ensure that over time the requests won't start // accumulating at approximately the same time from the set of nodes due to priority and // fairness effect. @@ -2435,9 +2450,13 @@ func (kl *Kubelet) updateRuntimeUp() { } // Periodically log the whole runtime status for debugging. klog.V(4).InfoS("Container runtime status", "status", s) + klogErrorS := klog.ErrorS + if !kl.containerRuntimeReadyExpected { + klogErrorS = klog.V(4).ErrorS + } networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady) if networkReady == nil || !networkReady.Status { - klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady) + klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady) kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady)) } else { // Set nil if the container runtime network is ready. @@ -2447,7 +2466,7 @@ func (kl *Kubelet) updateRuntimeUp() { runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady) // If RuntimeReady is not set or is false, report an error. if runtimeReady == nil || !runtimeReady.Status { - klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady) + klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady) kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady)) return } @@ -2502,31 +2521,25 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str } } -// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR -// is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off -// a runtime update and a node status update. Function returns after one successful node status update. +// fastStatusUpdateOnce starts a loop that checks if the current state of kubelet + container runtime +// would be able to turn the node ready, and sync the ready state to the apiserver as soon as possible. +// Function returns after the node status update after such event, or when the node is already ready. // Function is executed only during Kubelet start which improves latency to ready node by updating -// pod CIDR, runtime status and node statuses ASAP. +// kubelet state, runtime status and node statuses ASAP. func (kl *Kubelet) fastStatusUpdateOnce() { ctx := context.Background() - for { - time.Sleep(100 * time.Millisecond) - node, err := kl.GetNode() - if err != nil { - klog.ErrorS(err, "Error getting node") - continue + start := kl.clock.Now() + stopCh := make(chan struct{}) + + // Keep trying to make fast node status update until either timeout is reached or an update is successful. + wait.Until(func() { + // fastNodeStatusUpdate returns true when it succeeds or when the grace period has expired + // (status was not updated within nodeReadyGracePeriod and the second argument below gets true), + // then we close the channel and abort the loop. + if kl.fastNodeStatusUpdate(ctx, kl.clock.Since(start) >= nodeReadyGracePeriod) { + close(stopCh) } - if len(node.Spec.PodCIDRs) != 0 { - podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") - if _, err := kl.updatePodCIDR(ctx, podCIDRs); err != nil { - klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs) - continue - } - kl.updateRuntimeUp() - kl.syncNodeStatus() - return - } - } + }, 100*time.Millisecond, stopCh) } // CheckpointContainer tries to checkpoint a container. The parameters are used to diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index cf0550a7b8d..d598c55b79a 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -429,6 +429,85 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) { return node, nil } +// fastNodeStatusUpdate is a "lightweight" version of syncNodeStatus which doesn't hit the +// apiserver except for the final run, to be called by fastStatusUpdateOnce in each loop. +// It holds the same lock as syncNodeStatus and is thread-safe when called concurrently with +// syncNodeStatus. Its return value indicates whether the loop running it should exit +// (final run), and it also sets kl.containerRuntimeReadyExpected. +func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (completed bool) { + kl.syncNodeStatusMux.Lock() + defer func() { + kl.syncNodeStatusMux.Unlock() + + if completed { + // containerRuntimeReadyExpected is read by updateRuntimeUp(). + // Not going for a more granular mutex as this path runs only once. + kl.updateRuntimeMux.Lock() + defer kl.updateRuntimeMux.Unlock() + kl.containerRuntimeReadyExpected = true + } + }() + + if timeout { + klog.ErrorS(nil, "Node not becoming ready in time after startup") + return true + } + + originalNode, err := kl.GetNode() + if err != nil { + klog.ErrorS(err, "Error getting the current node from lister") + return false + } + + readyIdx, originalNodeReady := nodeutil.GetNodeCondition(&originalNode.Status, v1.NodeReady) + if readyIdx == -1 { + klog.ErrorS(nil, "Node does not have NodeReady condition", "originalNode", originalNode) + return false + } + + if originalNodeReady.Status == v1.ConditionTrue { + return true + } + + // This is in addition to the regular syncNodeStatus logic so we can get the container runtime status earlier. + // This function itself has a mutex and it doesn't recursively call fastNodeStatusUpdate or syncNodeStatus. + kl.updateRuntimeUp() + + node, changed := kl.updateNode(ctx, originalNode) + + if !changed { + // We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus(). + return false + } + + readyIdx, nodeReady := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) + if readyIdx == -1 { + klog.ErrorS(nil, "Node does not have NodeReady condition", "node", node) + return false + } + + if nodeReady.Status == v1.ConditionFalse { + return false + } + + klog.InfoS("Fast updating node status as it just became ready") + if _, err := kl.patchNodeStatus(originalNode, node); err != nil { + // The originalNode is probably stale, but we know that the current state of kubelet would turn + // the node to be ready. Retry using syncNodeStatus() which fetches from the apiserver. + klog.ErrorS(err, "Error updating node status, will retry with syncNodeStatus") + + // The reversed kl.syncNodeStatusMux.Unlock/Lock() below to allow kl.syncNodeStatus() execution. + kl.syncNodeStatusMux.Unlock() + kl.syncNodeStatus() + // This lock action is unnecessary if we add a flag to check in the defer before unlocking it, + // but having it here makes the logic a bit easier to read. + kl.syncNodeStatusMux.Lock() + } + + // We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus(). + return true +} + // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master if there is any change or enough time // passed from the last sync, registering the kubelet first if necessary. @@ -479,21 +558,40 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error if tryNumber == 0 { util.FromApiserverCache(&opts) } - node, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) + originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } - - originalNode := node.DeepCopy() if originalNode == nil { return fmt.Errorf("nil %q node object", kl.nodeName) } + node, changed := kl.updateNode(ctx, originalNode) + shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency + + if !shouldPatchNodeStatus { + kl.markVolumesFromNode(node) + return nil + } + + updatedNode, err := kl.patchNodeStatus(originalNode, node) + if err == nil { + kl.markVolumesFromNode(updatedNode) + } + return err +} + +// updateNode creates a copy of originalNode and runs update logic on it. +// It returns the updated node object and a bool indicating if anything has been changed. +func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) { + node := originalNode.DeepCopy() + podCIDRChanged := false if len(node.Spec.PodCIDRs) != 0 { // Pod CIDR could have been updated before, so we cannot rely on // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is // actually changed. + var err error podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil { klog.ErrorS(err, "Error updating pod CIDR") @@ -521,41 +619,48 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error kl.setNodeStatus(ctx, node) - now := kl.clock.Now() - if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { - if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent { - // We must mark the volumes as ReportedInUse in volume manager's dsw even - // if no changes were made to the node status (no volumes were added or removed - // from the VolumesInUse list). - // - // The reason is that on a kubelet restart, the volume manager's dsw is - // repopulated and the volume ReportedInUse is initialized to false, while the - // VolumesInUse list from the Node object still contains the state from the - // previous kubelet instantiation. - // - // Once the volumes are added to the dsw, the ReportedInUse field needs to be - // synced from the VolumesInUse list in the Node.Status. - // - // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly - // because it does not have access to the Node object. - // This also cannot be populated on node status manager init because the volume - // may not have been added to dsw at that time. - kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) - return nil - } - } + changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent + return node, changed +} +// patchNodeStatus patches node on the API server based on originalNode. +// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful. +func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) { // Patch the current status on the API server updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) if err != nil { - return err + return nil, err } - kl.lastStatusReportTime = now + kl.lastStatusReportTime = kl.clock.Now() kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) - // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate - // those volumes are already updated in the node's status - kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) - return nil + return updatedNode, nil +} + +// markVolumesFromNode updates volumeManager with VolumesInUse status from node. +// +// In the case of node status update being unnecessary, call with the fetched node. +// We must mark the volumes as ReportedInUse in volume manager's dsw even +// if no changes were made to the node status (no volumes were added or removed +// from the VolumesInUse list). +// +// The reason is that on a kubelet restart, the volume manager's dsw is +// repopulated and the volume ReportedInUse is initialized to false, while the +// VolumesInUse list from the Node object still contains the state from the +// previous kubelet instantiation. +// +// Once the volumes are added to the dsw, the ReportedInUse field needs to be +// synced from the VolumesInUse list in the Node.Status. +// +// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly +// because it does not have access to the Node object. +// This also cannot be populated on node status manager init because the volume +// may not have been added to dsw at that time. +// +// Or, after a successful node status update, call with updatedNode returned from +// the patch call, to mark the volumeInUse as reportedInUse to indicate +// those volumes are already updated in the node's status +func (kl *Kubelet) markVolumesFromNode(node *v1.Node) { + kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) } // recordNodeStatusEvent records an event of the given type with the given diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 7c0e6e605e5..61cb729e683 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -1134,6 +1134,159 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { } } +func TestFastStatusUpdateOnce(t *testing.T) { + tests := []struct { + name string + beforeMarkReady int + beforeNextReady int + beforeTimeout int + wantCalls int + patchFailures int + wantPatches int + }{ + { + name: "timeout after third loop", + beforeMarkReady: 9, + beforeNextReady: 9, + beforeTimeout: 2, + wantCalls: 3, + }, + { + name: "already ready on third loop", + beforeMarkReady: 9, + beforeNextReady: 1, + beforeTimeout: 9, + wantCalls: 2, + }, + { + name: "turns ready on third loop", + beforeMarkReady: 2, + beforeNextReady: 9, + beforeTimeout: 9, + wantCalls: 3, + wantPatches: 1, + }, + { + name: "turns ready on second loop then first patch fails", + beforeMarkReady: 1, + beforeNextReady: 9, + beforeTimeout: 9, + wantCalls: 3, + patchFailures: 1, + wantPatches: 2, + }, + { + name: "turns ready on second loop then all patches fail", + beforeMarkReady: 1, + beforeNextReady: 9, + beforeTimeout: 9, + wantCalls: nodeStatusUpdateRetry + 2, + patchFailures: nodeStatusUpdateRetry + 2, + wantPatches: nodeStatusUpdateRetry + 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + // Ensure we capture actions on the heartbeat client only. + // We don't set it to nil or GetNode() doesn't read from nodeLister. + kubelet.kubeClient = &fake.Clientset{} + kubeClient := testKubelet.fakeKubeClient + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(kubelet.nodeName), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "NotReady", + Message: "Node not ready", + }, + }, + }, + } + + nodeLister := testNodeLister{[]*v1.Node{node.DeepCopy()}} + kubelet.nodeLister = nodeLister + + callCount := 0 + // The original node status functions turn the node ready. + nodeStatusFuncs := kubelet.setNodeStatusFuncs + kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{func(ctx context.Context, node *v1.Node) error { + assert.False(t, kubelet.containerRuntimeReadyExpected) + callCount++ + var lastErr error + if callCount > tc.beforeMarkReady { + for _, f := range nodeStatusFuncs { + if err := f(ctx, node); err != nil { + lastErr = err + } + } + } + if callCount > tc.beforeNextReady { + nodeLister.nodes[0].Status.Conditions[0].Status = v1.ConditionTrue + } + if callCount > tc.beforeTimeout { + testKubelet.fakeClock.Step(nodeReadyGracePeriod) + } + return lastErr + }} + + patchCount := 0 + kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { + assert.False(t, kubelet.containerRuntimeReadyExpected) + patchCount++ + if patchCount > tc.patchFailures { + return false, nil, nil + } + return true, nil, fmt.Errorf("try again") + }) + + kubelet.fastStatusUpdateOnce() + + assert.True(t, kubelet.containerRuntimeReadyExpected) + assert.Equal(t, tc.wantCalls, callCount) + assert.Equal(t, tc.wantPatches, patchCount) + + actions := kubeClient.Actions() + if tc.wantPatches == 0 { + require.Len(t, actions, 0) + return + } + + // patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches + require.Len(t, actions, 2*tc.wantPatches-1) + + for i, action := range actions { + if i%2 == 1 { + require.IsType(t, core.GetActionImpl{}, action) + continue + } + + require.IsType(t, core.PatchActionImpl{}, action) + patchAction := action.(core.PatchActionImpl) + + updatedNode, err := applyNodeStatusPatch(node, patchAction.GetPatch()) + require.NoError(t, err) + seenNodeReady := false + for _, c := range updatedNode.Status.Conditions { + if c.Type == v1.NodeReady { + assert.Equal(t, v1.ConditionTrue, c.Status) + seenNodeReady = true + } + } + assert.True(t, seenNodeReady) + } + }) + } +} + func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup()