Push context up to cloud node controller.

This adds context to the cloud node controller. It continues the propogation started in 59287.  Fixes 815.
Fixed test code calls.
This commit is contained in:
walter 2019-10-13 18:11:22 -07:00
parent 019b662ff5
commit 6991069e31
3 changed files with 46 additions and 45 deletions

View File

@ -87,8 +87,8 @@ func NewCloudNodeController(
// Use shared informer to listen to add/update of nodes. Note that any nodes // Use shared informer to listen to add/update of nodes. Note that any nodes
// that exist before node controller starts will show up in the update method // that exist before node controller starts will show up in the update method
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cnc.AddCloudNode, AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) },
UpdateFunc: cnc.UpdateCloudNode, UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) },
}) })
return cnc, nil return cnc, nil
@ -105,11 +105,11 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
// very infrequently. DO NOT MODIFY this to perform frequent operations. // very infrequently. DO NOT MODIFY this to perform frequent operations.
// Start a loop to periodically update the node addresses obtained from the cloud // Start a loop to periodically update the node addresses obtained from the cloud
wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh) wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
} }
// UpdateNodeStatus updates the node status, such as node addresses // UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus() { func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
instances, ok := cnc.cloud.Instances() instances, ok := cnc.cloud.Instances()
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
@ -123,12 +123,12 @@ func (cnc *CloudNodeController) UpdateNodeStatus() {
} }
for i := range nodes.Items { for i := range nodes.Items {
cnc.updateNodeAddress(&nodes.Items[i], instances) cnc.updateNodeAddress(ctx, &nodes.Items[i], instances)
} }
} }
// UpdateNodeAddress updates the nodeAddress of a single node // UpdateNodeAddress updates the nodeAddress of a single node
func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) { func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) {
// Do not process nodes that are still tainted // Do not process nodes that are still tainted
cloudTaint := getCloudTaint(node.Spec.Taints) cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil { if cloudTaint != nil {
@ -136,7 +136,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
return return
} }
// Node that isn't present according to the cloud provider shouldn't have its address updated // Node that isn't present according to the cloud provider shouldn't have its address updated
exists, err := ensureNodeExistsByProviderID(instances, node) exists, err := ensureNodeExistsByProviderID(ctx, instances, node)
if err != nil { if err != nil {
// Continue to update node address when not sure the node is not exists // Continue to update node address when not sure the node is not exists
klog.Errorf("%v", err) klog.Errorf("%v", err)
@ -145,7 +145,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
return return
} }
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node) nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node)
if err != nil { if err != nil {
klog.Errorf("%v", err) klog.Errorf("%v", err)
return return
@ -192,7 +192,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
} }
} }
func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) {
node, ok := newObj.(*v1.Node) node, ok := newObj.(*v1.Node)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
@ -205,11 +205,11 @@ func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) {
return return
} }
cnc.initializeNode(node) cnc.initializeNode(ctx, node)
} }
// AddCloudNode handles initializing new nodes registered with the cloud taint. // AddCloudNode handles initializing new nodes registered with the cloud taint.
func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) {
node := obj.(*v1.Node) node := obj.(*v1.Node)
cloudTaint := getCloudTaint(node.Spec.Taints) cloudTaint := getCloudTaint(node.Spec.Taints)
@ -218,11 +218,11 @@ func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
return return
} }
cnc.initializeNode(node) cnc.initializeNode(ctx, node)
} }
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate // This processes nodes that were added into the cluster, and cloud initialize them if appropriate
func (cnc *CloudNodeController) initializeNode(node *v1.Node) { func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
instances, ok := cnc.cloud.Instances() instances, ok := cnc.cloud.Instances()
if !ok { if !ok {
@ -259,7 +259,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
if curNode.Spec.ProviderID == "" { if curNode.Spec.ProviderID == "" {
providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name)) providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name))
if err == nil { if err == nil {
curNode.Spec.ProviderID = providerID curNode.Spec.ProviderID = providerID
} else { } else {
@ -270,7 +270,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
} }
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode) nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode)
if err != nil { if err != nil {
return err return err
} }
@ -283,7 +283,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
} }
if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil { if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil {
return err return err
} else if instanceType != "" { } else if instanceType != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
@ -291,7 +291,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
if zones, ok := cnc.cloud.Zones(); ok { if zones, ok := cnc.cloud.Zones(); ok {
zone, err := getZoneByProviderIDOrName(zones, curNode) zone, err := getZoneByProviderIDOrName(ctx, zones, curNode)
if err != nil { if err != nil {
return fmt.Errorf("failed to get zone from cloud provider: %v", err) return fmt.Errorf("failed to get zone from cloud provider: %v", err)
} }
@ -313,7 +313,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node // So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(curNode, instances) cnc.updateNodeAddress(ctx, curNode, instances)
klog.Infof("Successfully initialized node %s with cloud provider", node.Name) klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
return nil return nil
@ -346,11 +346,11 @@ func excludeCloudTaint(taints []v1.Taint) []v1.Taint {
// ensureNodeExistsByProviderID checks if the instance exists by the provider id, // ensureNodeExistsByProviderID checks if the instance exists by the provider id,
// If provider id in spec is empty it calls instanceId with node name to get provider id // If provider id in spec is empty it calls instanceId with node name to get provider id
func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) { func ensureNodeExistsByProviderID(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (bool, error) {
providerID := node.Spec.ProviderID providerID := node.Spec.ProviderID
if providerID == "" { if providerID == "" {
var err error var err error
providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name)) providerID, err = instances.InstanceID(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
return false, nil return false, nil
@ -364,14 +364,14 @@ func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.No
} }
} }
return instances.InstanceExistsByProviderID(context.TODO(), providerID) return instances.InstanceExistsByProviderID(ctx, providerID)
} }
func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { func getNodeAddressesByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID) nodeAddresses, err := instances.NodeAddressesByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name)) nodeAddresses, err = instances.NodeAddresses(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }
@ -412,11 +412,11 @@ func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (
return nodeIP, nodeIPExists return nodeIP, nodeIPExists
} }
func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) { func getInstanceTypeByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (string, error) {
instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID) instanceType, err := instances.InstanceTypeByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name)) instanceType, err = instances.InstanceType(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }
@ -426,11 +426,11 @@ func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *
// getZoneByProviderIDorName will attempt to get the zone of node using its providerID // getZoneByProviderIDorName will attempt to get the zone of node using its providerID
// then it's name. If both attempts fail, an error is returned // then it's name. If both attempts fail, an error is returned
func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) { func getZoneByProviderIDOrName(ctx context.Context, zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) {
zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID) zone, err := zones.GetZoneByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name)) zone, err = zones.GetZoneByNodeName(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }

View File

@ -17,11 +17,12 @@ limitations under the License.
package cloud package cloud
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -29,7 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider"
fakecloud "k8s.io/cloud-provider/fake" fakecloud "k8s.io/cloud-provider/fake"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -147,7 +148,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) {
} }
instances, _ := fc.Instances() instances, _ := fc.Instances()
exists, err := ensureNodeExistsByProviderID(instances, tc.node) exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, tc.node)
assert.Equal(t, err, tc.providerIDErr) assert.Equal(t, err, tc.providerIDErr)
assert.EqualValues(t, tc.expectedCalls, fc.Calls, assert.EqualValues(t, tc.expectedCalls, fc.Calls,
@ -229,7 +230,7 @@ func TestNodeInitialized(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -293,7 +294,7 @@ func TestNodeIgnored(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated") assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated")
} }
@ -366,7 +367,7 @@ func TestGCECondition(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -455,7 +456,7 @@ func TestZoneInitialized(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -545,7 +546,7 @@ func TestNodeAddresses(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -562,7 +563,7 @@ func TestNodeAddresses(t *testing.T) {
}, },
} }
cloudNodeController.UpdateNodeStatus() cloudNodeController.UpdateNodeStatus(context.TODO())
updatedNodes := fnh.GetUpdatedNodesCopy() updatedNodes := fnh.GetUpdatedNodesCopy()
@ -657,13 +658,13 @@ func TestNodeProvidedIPAddresses(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated") assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated")
cloudNodeController.UpdateNodeStatus() cloudNodeController.UpdateNodeStatus(context.TODO())
updatedNodes := fnh.GetUpdatedNodesCopy() updatedNodes := fnh.GetUpdatedNodesCopy()
@ -864,7 +865,7 @@ func TestNodeAddressesNotUpdate(t *testing.T) {
cloud: fakeCloud, cloud: fakeCloud,
} }
cloudNodeController.updateNodeAddress(fnh.Existing[0], fakeCloud) cloudNodeController.updateNodeAddress(context.TODO(), fnh.Existing[0], fakeCloud)
if len(fnh.UpdatedNodes) != 0 { if len(fnh.UpdatedNodes) != 0 {
t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes)) t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes))
@ -946,7 +947,7 @@ func TestNodeProviderID(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -1029,7 +1030,7 @@ func TestNodeProviderIDAlreadySet(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")

View File

@ -166,7 +166,7 @@ func (c *CloudNodeLifecycleController) MonitorNodes() {
// At this point the node has NotReady status, we need to check if the node has been removed // At this point the node has NotReady status, we need to check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node // from the cloud provider. If node cannot be found in cloudprovider, then delete the node
exists, err := ensureNodeExistsByProviderID(instances, node) exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, node)
if err != nil { if err != nil {
klog.Errorf("error checking if node %s exists: %v", node.Name, err) klog.Errorf("error checking if node %s exists: %v", node.Name, err)
continue continue