From 4a50fc4b8cbb08db2d87f2cd3438a75292baa51e Mon Sep 17 00:00:00 2001 From: Jingyuan Liang Date: Mon, 31 Oct 2022 09:48:48 +0000 Subject: [PATCH] kubelet: Refactor tryUpdateNodeStatus() into smaller functions --- pkg/kubelet/kubelet_node_status.go | 90 +++++++++++++++++++----------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index cf0550a7b8d..8d17c2f8946 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -479,21 +479,40 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error if tryNumber == 0 { util.FromApiserverCache(&opts) } - node, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) + originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } - - originalNode := node.DeepCopy() if originalNode == nil { return fmt.Errorf("nil %q node object", kl.nodeName) } + node, changed := kl.updateNode(ctx, originalNode) + shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency + + if !shouldPatchNodeStatus { + kl.markVolumesFromNode(node) + return nil + } + + updatedNode, err := kl.patchNodeStatus(originalNode, node) + if err == nil { + kl.markVolumesFromNode(updatedNode) + } + return err +} + +// updateNode creates a copy of originalNode and runs update logic on it. +// It returns the updated node object and a bool indicating if anything has been changed. +func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) { + node := originalNode.DeepCopy() + podCIDRChanged := false if len(node.Spec.PodCIDRs) != 0 { // Pod CIDR could have been updated before, so we cannot rely on // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is // actually changed. + var err error podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil { klog.ErrorS(err, "Error updating pod CIDR") @@ -521,41 +540,48 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error kl.setNodeStatus(ctx, node) - now := kl.clock.Now() - if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { - if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent { - // We must mark the volumes as ReportedInUse in volume manager's dsw even - // if no changes were made to the node status (no volumes were added or removed - // from the VolumesInUse list). - // - // The reason is that on a kubelet restart, the volume manager's dsw is - // repopulated and the volume ReportedInUse is initialized to false, while the - // VolumesInUse list from the Node object still contains the state from the - // previous kubelet instantiation. - // - // Once the volumes are added to the dsw, the ReportedInUse field needs to be - // synced from the VolumesInUse list in the Node.Status. - // - // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly - // because it does not have access to the Node object. - // This also cannot be populated on node status manager init because the volume - // may not have been added to dsw at that time. - kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) - return nil - } - } + changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent + return node, changed +} +// patchNodeStatus patches node on the API server based on originalNode. +// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful. +func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) { // Patch the current status on the API server updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) if err != nil { - return err + return nil, err } - kl.lastStatusReportTime = now + kl.lastStatusReportTime = kl.clock.Now() kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) - // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate - // those volumes are already updated in the node's status - kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) - return nil + return updatedNode, nil +} + +// markVolumesFromNode updates volumeManager with VolumesInUse status from node. +// +// In the case of node status update being unnecessary, call with the fetched node. +// We must mark the volumes as ReportedInUse in volume manager's dsw even +// if no changes were made to the node status (no volumes were added or removed +// from the VolumesInUse list). +// +// The reason is that on a kubelet restart, the volume manager's dsw is +// repopulated and the volume ReportedInUse is initialized to false, while the +// VolumesInUse list from the Node object still contains the state from the +// previous kubelet instantiation. +// +// Once the volumes are added to the dsw, the ReportedInUse field needs to be +// synced from the VolumesInUse list in the Node.Status. +// +// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly +// because it does not have access to the Node object. +// This also cannot be populated on node status manager init because the volume +// may not have been added to dsw at that time. +// +// Or, after a successful node status update, call with updatedNode returned from +// the patch call, to mark the volumeInUse as reportedInUse to indicate +// those volumes are already updated in the node's status +func (kl *Kubelet) markVolumesFromNode(node *v1.Node) { + kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) } // recordNodeStatusEvent records an event of the given type with the given