From 6991069e316aeba05e52ee0d9358db67a3f2d15b Mon Sep 17 00:00:00 2001 From: walter Date: Sun, 13 Oct 2019 18:11:22 -0700 Subject: [PATCH] Push context up to cloud node controller. This adds context to the cloud node controller. It continues the propogation started in 59287. Fixes 815. Fixed test code calls. --- pkg/controller/cloud/node_controller.go | 60 +++++++++---------- pkg/controller/cloud/node_controller_test.go | 29 ++++----- .../cloud/node_lifecycle_controller.go | 2 +- 3 files changed, 46 insertions(+), 45 deletions(-) diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go index 1f7c76dcfb5..ad4fc147c74 100644 --- a/pkg/controller/cloud/node_controller.go +++ b/pkg/controller/cloud/node_controller.go @@ -87,8 +87,8 @@ func NewCloudNodeController( // Use shared informer to listen to add/update of nodes. Note that any nodes // that exist before node controller starts will show up in the update method cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cnc.AddCloudNode, - UpdateFunc: cnc.UpdateCloudNode, + AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) }, }) return cnc, nil @@ -105,11 +105,11 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { // very infrequently. DO NOT MODIFY this to perform frequent operations. // Start a loop to periodically update the node addresses obtained from the cloud - wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh) + wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) } // UpdateNodeStatus updates the node status, such as node addresses -func (cnc *CloudNodeController) UpdateNodeStatus() { +func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { instances, ok := cnc.cloud.Instances() if !ok { utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) @@ -123,12 +123,12 @@ func (cnc *CloudNodeController) UpdateNodeStatus() { } for i := range nodes.Items { - cnc.updateNodeAddress(&nodes.Items[i], instances) + cnc.updateNodeAddress(ctx, &nodes.Items[i], instances) } } // UpdateNodeAddress updates the nodeAddress of a single node -func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) { +func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) { // Do not process nodes that are still tainted cloudTaint := getCloudTaint(node.Spec.Taints) if cloudTaint != nil { @@ -136,7 +136,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud return } // Node that isn't present according to the cloud provider shouldn't have its address updated - exists, err := ensureNodeExistsByProviderID(instances, node) + exists, err := ensureNodeExistsByProviderID(ctx, instances, node) if err != nil { // Continue to update node address when not sure the node is not exists klog.Errorf("%v", err) @@ -145,7 +145,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud return } - nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node) + nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node) if err != nil { klog.Errorf("%v", err) return @@ -192,7 +192,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud } } -func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { +func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { node, ok := newObj.(*v1.Node) if !ok { utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) @@ -205,11 +205,11 @@ func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { return } - cnc.initializeNode(node) + cnc.initializeNode(ctx, node) } // AddCloudNode handles initializing new nodes registered with the cloud taint. -func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { +func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) { node := obj.(*v1.Node) cloudTaint := getCloudTaint(node.Spec.Taints) @@ -218,11 +218,11 @@ func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { return } - cnc.initializeNode(node) + cnc.initializeNode(ctx, node) } // This processes nodes that were added into the cluster, and cloud initialize them if appropriate -func (cnc *CloudNodeController) initializeNode(node *v1.Node) { +func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { instances, ok := cnc.cloud.Instances() if !ok { @@ -259,7 +259,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } if curNode.Spec.ProviderID == "" { - providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name)) + providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name)) if err == nil { curNode.Spec.ProviderID = providerID } else { @@ -270,7 +270,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } } - nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode) + nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode) if err != nil { return err } @@ -283,7 +283,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } } - if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil { + if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil { return err } else if instanceType != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) @@ -291,7 +291,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } if zones, ok := cnc.cloud.Zones(); ok { - zone, err := getZoneByProviderIDOrName(zones, curNode) + zone, err := getZoneByProviderIDOrName(ctx, zones, curNode) if err != nil { return fmt.Errorf("failed to get zone from cloud provider: %v", err) } @@ -313,7 +313,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // So that users do not see any significant delay in IP addresses being filled into the node - cnc.updateNodeAddress(curNode, instances) + cnc.updateNodeAddress(ctx, curNode, instances) klog.Infof("Successfully initialized node %s with cloud provider", node.Name) return nil @@ -346,11 +346,11 @@ func excludeCloudTaint(taints []v1.Taint) []v1.Taint { // ensureNodeExistsByProviderID checks if the instance exists by the provider id, // If provider id in spec is empty it calls instanceId with node name to get provider id -func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) { +func ensureNodeExistsByProviderID(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (bool, error) { providerID := node.Spec.ProviderID if providerID == "" { var err error - providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name)) + providerID, err = instances.InstanceID(ctx, types.NodeName(node.Name)) if err != nil { if err == cloudprovider.InstanceNotFound { return false, nil @@ -364,14 +364,14 @@ func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.No } } - return instances.InstanceExistsByProviderID(context.TODO(), providerID) + return instances.InstanceExistsByProviderID(ctx, providerID) } -func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { - nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID) +func getNodeAddressesByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { + nodeAddresses, err := instances.NodeAddressesByProviderID(ctx, node.Spec.ProviderID) if err != nil { providerIDErr := err - nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name)) + nodeAddresses, err = instances.NodeAddresses(ctx, types.NodeName(node.Name)) if err != nil { return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) } @@ -412,11 +412,11 @@ func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) ( return nodeIP, nodeIPExists } -func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) { - instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID) +func getInstanceTypeByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (string, error) { + instanceType, err := instances.InstanceTypeByProviderID(ctx, node.Spec.ProviderID) if err != nil { providerIDErr := err - instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name)) + instanceType, err = instances.InstanceType(ctx, types.NodeName(node.Name)) if err != nil { return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) } @@ -426,11 +426,11 @@ func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node * // getZoneByProviderIDorName will attempt to get the zone of node using its providerID // then it's name. If both attempts fail, an error is returned -func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) { - zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID) +func getZoneByProviderIDOrName(ctx context.Context, zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) { + zone, err := zones.GetZoneByProviderID(ctx, node.Spec.ProviderID) if err != nil { providerIDErr := err - zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name)) + zone, err = zones.GetZoneByNodeName(ctx, types.NodeName(node.Name)) if err != nil { return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) } diff --git a/pkg/controller/cloud/node_controller_test.go b/pkg/controller/cloud/node_controller_test.go index a6ba82bdd9a..6271e3ab492 100644 --- a/pkg/controller/cloud/node_controller_test.go +++ b/pkg/controller/cloud/node_controller_test.go @@ -17,11 +17,12 @@ limitations under the License. package cloud import ( + "context" "errors" "testing" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -29,7 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" - cloudprovider "k8s.io/cloud-provider" + "k8s.io/cloud-provider" fakecloud "k8s.io/cloud-provider/fake" "k8s.io/kubernetes/pkg/controller/testutil" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -147,7 +148,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) { } instances, _ := fc.Instances() - exists, err := ensureNodeExistsByProviderID(instances, tc.node) + exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, tc.node) assert.Equal(t, err, tc.providerIDErr) assert.EqualValues(t, tc.expectedCalls, fc.Calls, @@ -229,7 +230,7 @@ func TestNodeInitialized(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") @@ -293,7 +294,7 @@ func TestNodeIgnored(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated") } @@ -366,7 +367,7 @@ func TestGCECondition(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") @@ -455,7 +456,7 @@ func TestZoneInitialized(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") @@ -545,7 +546,7 @@ func TestNodeAddresses(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") @@ -562,7 +563,7 @@ func TestNodeAddresses(t *testing.T) { }, } - cloudNodeController.UpdateNodeStatus() + cloudNodeController.UpdateNodeStatus(context.TODO()) updatedNodes := fnh.GetUpdatedNodesCopy() @@ -657,13 +658,13 @@ func TestNodeProvidedIPAddresses(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated") - cloudNodeController.UpdateNodeStatus() + cloudNodeController.UpdateNodeStatus(context.TODO()) updatedNodes := fnh.GetUpdatedNodesCopy() @@ -864,7 +865,7 @@ func TestNodeAddressesNotUpdate(t *testing.T) { cloud: fakeCloud, } - cloudNodeController.updateNodeAddress(fnh.Existing[0], fakeCloud) + cloudNodeController.updateNodeAddress(context.TODO(), fnh.Existing[0], fakeCloud) if len(fnh.UpdatedNodes) != 0 { t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes)) @@ -946,7 +947,7 @@ func TestNodeProviderID(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") @@ -1029,7 +1030,7 @@ func TestNodeProviderIDAlreadySet(t *testing.T) { } eventBroadcaster.StartLogging(klog.Infof) - cloudNodeController.AddCloudNode(fnh.Existing[0]) + cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0]) assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") diff --git a/pkg/controller/cloud/node_lifecycle_controller.go b/pkg/controller/cloud/node_lifecycle_controller.go index e6ee0e3c851..83d21eeab98 100644 --- a/pkg/controller/cloud/node_lifecycle_controller.go +++ b/pkg/controller/cloud/node_lifecycle_controller.go @@ -166,7 +166,7 @@ func (c *CloudNodeLifecycleController) MonitorNodes() { // At this point the node has NotReady status, we need to check if the node has been removed // from the cloud provider. If node cannot be found in cloudprovider, then delete the node - exists, err := ensureNodeExistsByProviderID(instances, node) + exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, node) if err != nil { klog.Errorf("error checking if node %s exists: %v", node.Name, err) continue