cloud node controller: implement with workqueues and node lister

This commit is contained in:
Nicole Han 2020-09-11 14:45:42 -07:00
parent 3d52b8b5d6
commit 4ba09a8222
3 changed files with 181 additions and 93 deletions

View File

@ -17,9 +17,11 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/api:go_default_library", "//staging/src/k8s.io/cloud-provider/api:go_default_library",
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",

View File

@ -22,7 +22,7 @@ import (
"fmt" "fmt"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -32,9 +32,11 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry" clientretry "k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api" cloudproviderapi "k8s.io/cloud-provider/api"
cloudnodeutil "k8s.io/cloud-provider/node/helpers" cloudnodeutil "k8s.io/cloud-provider/node/helpers"
@ -84,6 +86,7 @@ var UpdateNodeSpecBackoff = wait.Backoff{
Jitter: 1.0, Jitter: 1.0,
} }
// CloudNodeController is the controller implementation for Node resources
type CloudNodeController struct { type CloudNodeController struct {
nodeInformer coreinformers.NodeInformer nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface kubeClient clientset.Interface
@ -92,6 +95,10 @@ type CloudNodeController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
nodeStatusUpdateFrequency time.Duration nodeStatusUpdateFrequency time.Duration
nodesLister corelisters.NodeLister
nodesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
} }
// NewCloudNodeController creates a CloudNodeController object // NewCloudNodeController creates a CloudNodeController object
@ -120,38 +127,112 @@ func NewCloudNodeController(
recorder: recorder, recorder: recorder,
cloud: cloud, cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"),
} }
// Use shared informer to listen to add/update of nodes. Note that any nodes // Use shared informer to listen to add/update of nodes. Note that any nodes
// that exist before node controller starts will show up in the update method // that exist before node controller starts will show up in the update method
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) }, AddFunc: cnc.enqueueNode,
UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) }, UpdateFunc: func(oldObj, newObj interface{}) { cnc.enqueueNode(newObj) },
}) })
return cnc, nil return cnc, nil
} }
// Run will sync informer caches and starting workers.
// This controller updates newly registered nodes with information // This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called // from the cloud provider. This call is blocking so should be called
// via a goroutine // via a goroutine
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer cnc.workqueue.ShutDown()
// The following loops run communicate with the APIServer with a worst case complexity // Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok {
klog.Errorf("failed to wait for caches to sync")
return
}
// The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire // of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations. // very infrequently. DO NOT MODIFY this to perform frequent operations.
go wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
go wait.Until(cnc.runWorker, time.Second, stopCh)
// Start a loop to periodically update the node addresses obtained from the cloud <-stopCh
wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) }
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (cnc *CloudNodeController) runWorker() {
for cnc.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (cnc *CloudNodeController) processNextWorkItem() bool {
obj, shutdown := cnc.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer cnc.workqueue.Done.
err := func(obj interface{}) error {
defer cnc.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
cnc.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the key of the
// Node resource to be synced.
if err := cnc.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
cnc.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
cnc.workqueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler implements the logic of the controller.
func (cnc *CloudNodeController) syncHandler(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
return cnc.syncNode(context.TODO(), name)
} }
// UpdateNodeStatus updates the node status, such as node addresses // UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}) nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
if err != nil { if err != nil {
klog.Errorf("Error monitoring node status: %v", err) klog.Errorf("Error monitoring node status: %v", err)
return return err
} }
for i := range nodes.Items { for i := range nodes.Items {
@ -169,6 +250,20 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
} }
} }
return nil
}
// enqueueNode takes a Node resource and converts it into a key
// string which is then put onto the work queue.
func (cnc *CloudNodeController) enqueueNode(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
cnc.workqueue.Add(key)
} }
// reconcileNodeLabels reconciles node labels transitioning from beta to GA // reconcileNodeLabels reconciles node labels transitioning from beta to GA
@ -273,45 +368,54 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.
// in a retry-if-conflict loop. // in a retry-if-conflict loop.
type nodeModifier func(*v1.Node) type nodeModifier func(*v1.Node)
func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { // syncNode handles updating existing nodes registered with the cloud taint
node, ok := newObj.(*v1.Node) // and processes nodes that were added into the cluster, and cloud initialize them if appropriate.
if !ok { func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) error {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) curNode, err := cnc.nodeInformer.Lister().Get(nodeName)
return if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
} }
cloudTaint := getCloudTaint(node.Spec.Taints) cloudTaint := getCloudTaint(curNode.Spec.Taints)
if cloudTaint == nil { if cloudTaint == nil {
// The node has already been initialized so nothing to do. // Node object received from event had the cloud taint but was outdated,
return // the node has actually already been initialized, so this sync event can be ignored.
return nil
} }
cnc.initializeNode(ctx, node) klog.Infof("Initializing node %s with cloud provider", nodeName)
}
// AddCloudNode handles initializing new nodes registered with the cloud taint. copyNode := curNode.DeepCopy()
func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) { providerID, err := cnc.getProviderID(ctx, copyNode)
node := obj.(*v1.Node) if err != nil {
return fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", nodeName, err)
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint == nil {
klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
return
} }
cnc.initializeNode(ctx, node) instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, copyNode)
} if err != nil {
return fmt.Errorf("failed to get instance metadata for node %s: %v", nodeName, err)
}
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, copyNode, instanceMetadata)
func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { if err != nil {
klog.Infof("Initializing node %s with cloud provider", node.Name) return fmt.Errorf("failed to get node modifiers from cloud provider: %v", err)
}
err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
// Since there are node taints, do we still need this? })
// This condition marks the node as unusable until routes are initialized in the cloud provider
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
var curNode *v1.Node
if cnc.cloud.ProviderName() == "gce" { if cnc.cloud.ProviderName() == "gce" {
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{ // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
// Since there are node taints, do we still need this?
// This condition marks the node as unusable until routes are initialized in the cloud provider
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(nodeName), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable, Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
Reason: "NoRouteCreated", Reason: "NoRouteCreated",
@ -320,75 +424,42 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
}); err != nil { }); err != nil {
return err return err
} }
}
return nil
})
if err != nil {
utilruntime.HandleError(err)
return
}
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{}) // fetch latest node from API server since GCE-specific condition was set and informer cache may be stale
if err != nil { curNode, err = cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err)) if err != nil {
return return err
} }
} else {
cloudTaint := getCloudTaint(curNode.Spec.Taints) curNode, err = cnc.nodeInformer.Lister().Get(nodeName)
if cloudTaint == nil { if err != nil {
// Node object received from event had the cloud taint but was outdated, return err
// the node has actually already been initialized. }
return
}
providerID, err := cnc.getProviderID(ctx, curNode)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", node.Name, err))
return
}
instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, curNode)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get instance metadata for node %s: %v", node.Name, err))
return
}
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, curNode, instanceMetadata)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err))
return
}
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
})
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil {
return err
} }
newNode := curNode.DeepCopy()
for _, modify := range nodeModifiers { for _, modify := range nodeModifiers {
modify(curNode) modify(newNode)
} }
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), curNode, metav1.UpdateOptions{}) _, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
if err != nil { if err != nil {
return err return err
} }
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node // So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(ctx, curNode, instanceMetadata) cnc.updateNodeAddress(ctx, newNode, instanceMetadata)
klog.Infof("Successfully initialized node %s with cloud provider", node.Name) klog.Infof("Successfully initialized node %s with cloud provider", nodeName)
return nil return nil
}) })
if err != nil { if err != nil {
utilruntime.HandleError(err) return err
return
} }
cnc.recorder.Event(copyNode, v1.EventTypeNormal, "Synced", "Node synced successfully")
return nil
} }
// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update // getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update

View File

@ -162,7 +162,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) {
} }
} }
func Test_AddCloudNode(t *testing.T) { func Test_syncNode(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
fakeCloud *fakecloud.Cloud fakeCloud *fakecloud.Cloud
@ -1290,14 +1290,19 @@ func Test_AddCloudNode(t *testing.T) {
cloudNodeController := &CloudNodeController{ cloudNodeController := &CloudNodeController{
kubeClient: clientset, kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(), nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: test.fakeCloud, cloud: test.fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second, nodeStatusUpdateFrequency: 1 * time.Second,
} }
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof) w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop() defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), test.existingNode) cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{}) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{})
if err != nil { if err != nil {
@ -1311,7 +1316,7 @@ func Test_AddCloudNode(t *testing.T) {
} }
} }
// test AddCloudNode with instanceV2, same test case with TestGCECondition. // test syncNode with instanceV2, same test case with TestGCECondition.
func TestGCEConditionV2(t *testing.T) { func TestGCEConditionV2(t *testing.T) {
existingNode := &v1.Node{ existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -1369,14 +1374,19 @@ func TestGCEConditionV2(t *testing.T) {
cloudNodeController := &CloudNodeController{ cloudNodeController := &CloudNodeController{
kubeClient: clientset, kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(), nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: fakeCloud, cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second, nodeStatusUpdateFrequency: 1 * time.Second,
} }
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof) w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop() defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), existingNode) cloudNodeController.syncNode(context.TODO(), existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
if err != nil { if err != nil {
@ -1452,14 +1462,19 @@ func TestGCECondition(t *testing.T) {
cloudNodeController := &CloudNodeController{ cloudNodeController := &CloudNodeController{
kubeClient: clientset, kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(), nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: fakeCloud, cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second, nodeStatusUpdateFrequency: 1 * time.Second,
} }
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof) w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop() defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), existingNode) cloudNodeController.syncNode(context.TODO(), existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
if err != nil { if err != nil {