Merge pull request #94736 from HaibaraAi96/mynodecontroller

cloud node controller: implement with workqueues and node lister
This commit is contained in:
Kubernetes Prow Robot 2020-09-16 18:16:59 -07:00 committed by GitHub
commit aaccd5f77b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 181 additions and 93 deletions

View File

@ -17,9 +17,11 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/api:go_default_library",
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",

View File

@ -22,7 +22,7 @@ import (
"fmt"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -32,9 +32,11 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
@ -84,6 +86,7 @@ var UpdateNodeSpecBackoff = wait.Backoff{
Jitter: 1.0,
}
// CloudNodeController is the controller implementation for Node resources
type CloudNodeController struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
@ -92,6 +95,10 @@ type CloudNodeController struct {
cloud cloudprovider.Interface
nodeStatusUpdateFrequency time.Duration
nodesLister corelisters.NodeLister
nodesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
// NewCloudNodeController creates a CloudNodeController object
@ -120,38 +127,112 @@ func NewCloudNodeController(
recorder: recorder,
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"),
}
// Use shared informer to listen to add/update of nodes. Note that any nodes
// that exist before node controller starts will show up in the update method
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) },
UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) },
AddFunc: cnc.enqueueNode,
UpdateFunc: func(oldObj, newObj interface{}) { cnc.enqueueNode(newObj) },
})
return cnc, nil
}
// Run will sync informer caches and starting workers.
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cnc.workqueue.ShutDown()
// The following loops run communicate with the APIServer with a worst case complexity
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok {
klog.Errorf("failed to wait for caches to sync")
return
}
// The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
go wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
go wait.Until(cnc.runWorker, time.Second, stopCh)
// Start a loop to periodically update the node addresses obtained from the cloud
wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
<-stopCh
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (cnc *CloudNodeController) runWorker() {
for cnc.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (cnc *CloudNodeController) processNextWorkItem() bool {
obj, shutdown := cnc.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer cnc.workqueue.Done.
err := func(obj interface{}) error {
defer cnc.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
cnc.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the key of the
// Node resource to be synced.
if err := cnc.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
cnc.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
cnc.workqueue.Forget(obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler implements the logic of the controller.
func (cnc *CloudNodeController) syncHandler(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
return cnc.syncNode(context.TODO(), name)
}
// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return
return err
}
for i := range nodes.Items {
@ -169,6 +250,20 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}
return nil
}
// enqueueNode takes a Node resource and converts it into a key
// string which is then put onto the work queue.
func (cnc *CloudNodeController) enqueueNode(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
cnc.workqueue.Add(key)
}
// reconcileNodeLabels reconciles node labels transitioning from beta to GA
@ -273,45 +368,54 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.
// in a retry-if-conflict loop.
type nodeModifier func(*v1.Node)
func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) {
node, ok := newObj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
// syncNode handles updating existing nodes registered with the cloud taint
// and processes nodes that were added into the cluster, and cloud initialize them if appropriate.
func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) error {
curNode, err := cnc.nodeInformer.Lister().Get(nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
cloudTaint := getCloudTaint(node.Spec.Taints)
cloudTaint := getCloudTaint(curNode.Spec.Taints)
if cloudTaint == nil {
// The node has already been initialized so nothing to do.
return
// Node object received from event had the cloud taint but was outdated,
// the node has actually already been initialized, so this sync event can be ignored.
return nil
}
cnc.initializeNode(ctx, node)
}
klog.Infof("Initializing node %s with cloud provider", nodeName)
// AddCloudNode handles initializing new nodes registered with the cloud taint.
func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) {
node := obj.(*v1.Node)
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint == nil {
klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
return
copyNode := curNode.DeepCopy()
providerID, err := cnc.getProviderID(ctx, copyNode)
if err != nil {
return fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", nodeName, err)
}
cnc.initializeNode(ctx, node)
}
instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, copyNode)
if err != nil {
return fmt.Errorf("failed to get instance metadata for node %s: %v", nodeName, err)
}
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate
func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
klog.Infof("Initializing node %s with cloud provider", node.Name)
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, copyNode, instanceMetadata)
if err != nil {
return fmt.Errorf("failed to get node modifiers from cloud provider: %v", err)
}
err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
// Since there are node taints, do we still need this?
// This condition marks the node as unusable until routes are initialized in the cloud provider
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
})
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
var curNode *v1.Node
if cnc.cloud.ProviderName() == "gce" {
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
// Since there are node taints, do we still need this?
// This condition marks the node as unusable until routes are initialized in the cloud provider
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(nodeName), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
Reason: "NoRouteCreated",
@ -320,75 +424,42 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
}); err != nil {
return err
}
}
return nil
})
if err != nil {
utilruntime.HandleError(err)
return
}
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err))
return
}
cloudTaint := getCloudTaint(curNode.Spec.Taints)
if cloudTaint == nil {
// Node object received from event had the cloud taint but was outdated,
// the node has actually already been initialized.
return
}
providerID, err := cnc.getProviderID(ctx, curNode)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", node.Name, err))
return
}
instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, curNode)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get instance metadata for node %s: %v", node.Name, err))
return
}
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, curNode, instanceMetadata)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err))
return
}
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
})
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil {
return err
// fetch latest node from API server since GCE-specific condition was set and informer cache may be stale
curNode, err = cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return err
}
} else {
curNode, err = cnc.nodeInformer.Lister().Get(nodeName)
if err != nil {
return err
}
}
newNode := curNode.DeepCopy()
for _, modify := range nodeModifiers {
modify(curNode)
modify(newNode)
}
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), curNode, metav1.UpdateOptions{})
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
if err != nil {
return err
}
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(ctx, curNode, instanceMetadata)
cnc.updateNodeAddress(ctx, newNode, instanceMetadata)
klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
klog.Infof("Successfully initialized node %s with cloud provider", nodeName)
return nil
})
if err != nil {
utilruntime.HandleError(err)
return
return err
}
cnc.recorder.Event(copyNode, v1.EventTypeNormal, "Synced", "Node synced successfully")
return nil
}
// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update

View File

@ -162,7 +162,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) {
}
}
func Test_AddCloudNode(t *testing.T) {
func Test_syncNode(t *testing.T) {
tests := []struct {
name string
fakeCloud *fakecloud.Cloud
@ -1290,14 +1290,19 @@ func Test_AddCloudNode(t *testing.T) {
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: test.fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), test.existingNode)
cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{})
if err != nil {
@ -1311,7 +1316,7 @@ func Test_AddCloudNode(t *testing.T) {
}
}
// test AddCloudNode with instanceV2, same test case with TestGCECondition.
// test syncNode with instanceV2, same test case with TestGCECondition.
func TestGCEConditionV2(t *testing.T) {
existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -1369,14 +1374,19 @@ func TestGCEConditionV2(t *testing.T) {
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), existingNode)
cloudNodeController.syncNode(context.TODO(), existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
if err != nil {
@ -1452,14 +1462,19 @@ func TestGCECondition(t *testing.T) {
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
nodesLister: factory.Core().V1().Nodes().Lister(),
cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
factory.Start(nil)
factory.WaitForCacheSync(nil)
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
cloudNodeController.AddCloudNode(context.TODO(), existingNode)
cloudNodeController.syncNode(context.TODO(), existingNode.Name)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
if err != nil {