From 4ba09a82227fa683d987c28aa10c59c629f15583 Mon Sep 17 00:00:00 2001 From: Nicole Han Date: Fri, 11 Sep 2020 14:45:42 -0700 Subject: [PATCH] cloud node controller: implement with workqueues and node lister --- .../cloud-provider/controllers/node/BUILD | 2 + .../controllers/node/node_controller.go | 247 +++++++++++------- .../controllers/node/node_controller_test.go | 25 +- 3 files changed, 181 insertions(+), 93 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/BUILD b/staging/src/k8s.io/cloud-provider/controllers/node/BUILD index 62684aa1d96..d7ef5846951 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/BUILD +++ b/staging/src/k8s.io/cloud-provider/controllers/node/BUILD @@ -17,9 +17,11 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/api:go_default_library", "//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library", diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index 191fba81a00..91780fa5871 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -22,7 +22,7 @@ import ( "fmt" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -32,9 +32,11 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" clientretry "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" cloudproviderapi "k8s.io/cloud-provider/api" cloudnodeutil "k8s.io/cloud-provider/node/helpers" @@ -84,6 +86,7 @@ var UpdateNodeSpecBackoff = wait.Backoff{ Jitter: 1.0, } +// CloudNodeController is the controller implementation for Node resources type CloudNodeController struct { nodeInformer coreinformers.NodeInformer kubeClient clientset.Interface @@ -92,6 +95,10 @@ type CloudNodeController struct { cloud cloudprovider.Interface nodeStatusUpdateFrequency time.Duration + + nodesLister corelisters.NodeLister + nodesSynced cache.InformerSynced + workqueue workqueue.RateLimitingInterface } // NewCloudNodeController creates a CloudNodeController object @@ -120,38 +127,112 @@ func NewCloudNodeController( recorder: recorder, cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, + nodesLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"), } // Use shared informer to listen to add/update of nodes. Note that any nodes // that exist before node controller starts will show up in the update method cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) }, - UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) }, + AddFunc: cnc.enqueueNode, + UpdateFunc: func(oldObj, newObj interface{}) { cnc.enqueueNode(newObj) }, }) return cnc, nil } +// Run will sync informer caches and starting workers. // This controller updates newly registered nodes with information // from the cloud provider. This call is blocking so should be called // via a goroutine func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer cnc.workqueue.ShutDown() - // The following loops run communicate with the APIServer with a worst case complexity + // Wait for the caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok { + klog.Errorf("failed to wait for caches to sync") + return + } + + // The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity // of O(num_nodes) per cycle. These functions are justified here because these events fire // very infrequently. DO NOT MODIFY this to perform frequent operations. + go wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) + go wait.Until(cnc.runWorker, time.Second, stopCh) - // Start a loop to periodically update the node addresses obtained from the cloud - wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) + <-stopCh +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (cnc *CloudNodeController) runWorker() { + for cnc.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (cnc *CloudNodeController) processNextWorkItem() bool { + obj, shutdown := cnc.workqueue.Get() + if shutdown { + return false + } + + // We wrap this block in a func so we can defer cnc.workqueue.Done. + err := func(obj interface{}) error { + defer cnc.workqueue.Done(obj) + + var key string + var ok bool + if key, ok = obj.(string); !ok { + cnc.workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + + // Run the syncHandler, passing it the key of the + // Node resource to be synced. + if err := cnc.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + cnc.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + cnc.workqueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// syncHandler implements the logic of the controller. +func (cnc *CloudNodeController) syncHandler(key string) error { + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + return cnc.syncNode(context.TODO(), name) } // UpdateNodeStatus updates the node status, such as node addresses -func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { +func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error { nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}) if err != nil { klog.Errorf("Error monitoring node status: %v", err) - return + return err } for i := range nodes.Items { @@ -169,6 +250,20 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) } } + + return nil +} + +// enqueueNode takes a Node resource and converts it into a key +// string which is then put onto the work queue. +func (cnc *CloudNodeController) enqueueNode(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + cnc.workqueue.Add(key) } // reconcileNodeLabels reconciles node labels transitioning from beta to GA @@ -273,45 +368,54 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1. // in a retry-if-conflict loop. type nodeModifier func(*v1.Node) -func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { - node, ok := newObj.(*v1.Node) - if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) - return +// syncNode handles updating existing nodes registered with the cloud taint +// and processes nodes that were added into the cluster, and cloud initialize them if appropriate. +func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) error { + curNode, err := cnc.nodeInformer.Lister().Get(nodeName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + return err } - cloudTaint := getCloudTaint(node.Spec.Taints) + cloudTaint := getCloudTaint(curNode.Spec.Taints) if cloudTaint == nil { - // The node has already been initialized so nothing to do. - return + // Node object received from event had the cloud taint but was outdated, + // the node has actually already been initialized, so this sync event can be ignored. + return nil } - cnc.initializeNode(ctx, node) -} + klog.Infof("Initializing node %s with cloud provider", nodeName) -// AddCloudNode handles initializing new nodes registered with the cloud taint. -func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) { - node := obj.(*v1.Node) - - cloudTaint := getCloudTaint(node.Spec.Taints) - if cloudTaint == nil { - klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name) - return + copyNode := curNode.DeepCopy() + providerID, err := cnc.getProviderID(ctx, copyNode) + if err != nil { + return fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", nodeName, err) } - cnc.initializeNode(ctx, node) -} + instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, copyNode) + if err != nil { + return fmt.Errorf("failed to get instance metadata for node %s: %v", nodeName, err) + } -// This processes nodes that were added into the cluster, and cloud initialize them if appropriate -func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { - klog.Infof("Initializing node %s with cloud provider", node.Name) + nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, copyNode, instanceMetadata) + if err != nil { + return fmt.Errorf("failed to get node modifiers from cloud provider: %v", err) + } - err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { - // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition - // Since there are node taints, do we still need this? - // This condition marks the node as unusable until routes are initialized in the cloud provider + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + n.Spec.Taints = excludeCloudTaint(n.Spec.Taints) + }) + + err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { + var curNode *v1.Node if cnc.cloud.ProviderName() == "gce" { - if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{ + // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition + // Since there are node taints, do we still need this? + // This condition marks the node as unusable until routes are initialized in the cloud provider + if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(nodeName), v1.NodeCondition{ Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue, Reason: "NoRouteCreated", @@ -320,75 +424,42 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod }); err != nil { return err } - } - return nil - }) - if err != nil { - utilruntime.HandleError(err) - return - } - curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{}) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err)) - return - } - - cloudTaint := getCloudTaint(curNode.Spec.Taints) - if cloudTaint == nil { - // Node object received from event had the cloud taint but was outdated, - // the node has actually already been initialized. - return - } - - providerID, err := cnc.getProviderID(ctx, curNode) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", node.Name, err)) - return - } - - instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, curNode) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get instance metadata for node %s: %v", node.Name, err)) - return - } - - nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, curNode, instanceMetadata) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err)) - return - } - - nodeModifiers = append(nodeModifiers, func(n *v1.Node) { - n.Spec.Taints = excludeCloudTaint(n.Spec.Taints) - }) - - err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { - curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{}) - if err != nil { - return err + // fetch latest node from API server since GCE-specific condition was set and informer cache may be stale + curNode, err = cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + } else { + curNode, err = cnc.nodeInformer.Lister().Get(nodeName) + if err != nil { + return err + } } + newNode := curNode.DeepCopy() for _, modify := range nodeModifiers { - modify(curNode) + modify(newNode) } - _, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), curNode, metav1.UpdateOptions{}) + _, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{}) if err != nil { return err } // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // So that users do not see any significant delay in IP addresses being filled into the node - cnc.updateNodeAddress(ctx, curNode, instanceMetadata) + cnc.updateNodeAddress(ctx, newNode, instanceMetadata) - klog.Infof("Successfully initialized node %s with cloud provider", node.Name) + klog.Infof("Successfully initialized node %s with cloud provider", nodeName) return nil }) if err != nil { - utilruntime.HandleError(err) - return + return err } + + cnc.recorder.Event(copyNode, v1.EventTypeNormal, "Synced", "Node synced successfully") + return nil } // getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index e6b522df2b9..827f33c8c72 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -162,7 +162,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) { } } -func Test_AddCloudNode(t *testing.T) { +func Test_syncNode(t *testing.T) { tests := []struct { name string fakeCloud *fakecloud.Cloud @@ -1290,14 +1290,19 @@ func Test_AddCloudNode(t *testing.T) { cloudNodeController := &CloudNodeController{ kubeClient: clientset, nodeInformer: factory.Core().V1().Nodes(), + nodesLister: factory.Core().V1().Nodes().Lister(), cloud: test.fakeCloud, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), nodeStatusUpdateFrequency: 1 * time.Second, } + + factory.Start(nil) + factory.WaitForCacheSync(nil) + w := eventBroadcaster.StartLogging(klog.Infof) defer w.Stop() - cloudNodeController.AddCloudNode(context.TODO(), test.existingNode) + cloudNodeController.syncNode(context.TODO(), test.existingNode.Name) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{}) if err != nil { @@ -1311,7 +1316,7 @@ func Test_AddCloudNode(t *testing.T) { } } -// test AddCloudNode with instanceV2, same test case with TestGCECondition. +// test syncNode with instanceV2, same test case with TestGCECondition. func TestGCEConditionV2(t *testing.T) { existingNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -1369,14 +1374,19 @@ func TestGCEConditionV2(t *testing.T) { cloudNodeController := &CloudNodeController{ kubeClient: clientset, nodeInformer: factory.Core().V1().Nodes(), + nodesLister: factory.Core().V1().Nodes().Lister(), cloud: fakeCloud, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), nodeStatusUpdateFrequency: 1 * time.Second, } + + factory.Start(nil) + factory.WaitForCacheSync(nil) + w := eventBroadcaster.StartLogging(klog.Infof) defer w.Stop() - cloudNodeController.AddCloudNode(context.TODO(), existingNode) + cloudNodeController.syncNode(context.TODO(), existingNode.Name) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) if err != nil { @@ -1452,14 +1462,19 @@ func TestGCECondition(t *testing.T) { cloudNodeController := &CloudNodeController{ kubeClient: clientset, nodeInformer: factory.Core().V1().Nodes(), + nodesLister: factory.Core().V1().Nodes().Lister(), cloud: fakeCloud, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), nodeStatusUpdateFrequency: 1 * time.Second, } + + factory.Start(nil) + factory.WaitForCacheSync(nil) + w := eventBroadcaster.StartLogging(klog.Infof) defer w.Stop() - cloudNodeController.AddCloudNode(context.TODO(), existingNode) + cloudNodeController.syncNode(context.TODO(), existingNode.Name) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) if err != nil {