Merge pull request #83872 from cheftako/context

Push context up to cloud node controller.
This commit is contained in:
Kubernetes Prow Robot 2019-10-21 12:38:36 -07:00 committed by GitHub
commit b717be8269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 45 deletions

View File

@ -87,8 +87,8 @@ func NewCloudNodeController(
// Use shared informer to listen to add/update of nodes. Note that any nodes // Use shared informer to listen to add/update of nodes. Note that any nodes
// that exist before node controller starts will show up in the update method // that exist before node controller starts will show up in the update method
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cnc.AddCloudNode, AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) },
UpdateFunc: cnc.UpdateCloudNode, UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) },
}) })
return cnc, nil return cnc, nil
@ -105,11 +105,11 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
// very infrequently. DO NOT MODIFY this to perform frequent operations. // very infrequently. DO NOT MODIFY this to perform frequent operations.
// Start a loop to periodically update the node addresses obtained from the cloud // Start a loop to periodically update the node addresses obtained from the cloud
wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh) wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
} }
// UpdateNodeStatus updates the node status, such as node addresses // UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus() { func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
instances, ok := cnc.cloud.Instances() instances, ok := cnc.cloud.Instances()
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
@ -123,12 +123,12 @@ func (cnc *CloudNodeController) UpdateNodeStatus() {
} }
for i := range nodes.Items { for i := range nodes.Items {
cnc.updateNodeAddress(&nodes.Items[i], instances) cnc.updateNodeAddress(ctx, &nodes.Items[i], instances)
} }
} }
// UpdateNodeAddress updates the nodeAddress of a single node // UpdateNodeAddress updates the nodeAddress of a single node
func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) { func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) {
// Do not process nodes that are still tainted // Do not process nodes that are still tainted
cloudTaint := getCloudTaint(node.Spec.Taints) cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil { if cloudTaint != nil {
@ -136,7 +136,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
return return
} }
// Node that isn't present according to the cloud provider shouldn't have its address updated // Node that isn't present according to the cloud provider shouldn't have its address updated
exists, err := ensureNodeExistsByProviderID(instances, node) exists, err := ensureNodeExistsByProviderID(ctx, instances, node)
if err != nil { if err != nil {
// Continue to update node address when not sure the node is not exists // Continue to update node address when not sure the node is not exists
klog.Errorf("%v", err) klog.Errorf("%v", err)
@ -145,7 +145,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
return return
} }
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node) nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node)
if err != nil { if err != nil {
klog.Errorf("%v", err) klog.Errorf("%v", err)
return return
@ -192,7 +192,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
} }
} }
func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) {
node, ok := newObj.(*v1.Node) node, ok := newObj.(*v1.Node)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
@ -205,11 +205,11 @@ func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) {
return return
} }
cnc.initializeNode(node) cnc.initializeNode(ctx, node)
} }
// AddCloudNode handles initializing new nodes registered with the cloud taint. // AddCloudNode handles initializing new nodes registered with the cloud taint.
func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) {
node := obj.(*v1.Node) node := obj.(*v1.Node)
cloudTaint := getCloudTaint(node.Spec.Taints) cloudTaint := getCloudTaint(node.Spec.Taints)
@ -218,11 +218,11 @@ func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
return return
} }
cnc.initializeNode(node) cnc.initializeNode(ctx, node)
} }
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate // This processes nodes that were added into the cluster, and cloud initialize them if appropriate
func (cnc *CloudNodeController) initializeNode(node *v1.Node) { func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
instances, ok := cnc.cloud.Instances() instances, ok := cnc.cloud.Instances()
if !ok { if !ok {
@ -259,7 +259,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
if curNode.Spec.ProviderID == "" { if curNode.Spec.ProviderID == "" {
providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name)) providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name))
if err == nil { if err == nil {
curNode.Spec.ProviderID = providerID curNode.Spec.ProviderID = providerID
} else { } else {
@ -270,7 +270,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
} }
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode) nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode)
if err != nil { if err != nil {
return err return err
} }
@ -283,7 +283,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
} }
if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil { if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil {
return err return err
} else if instanceType != "" { } else if instanceType != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
@ -291,7 +291,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
if zones, ok := cnc.cloud.Zones(); ok { if zones, ok := cnc.cloud.Zones(); ok {
zone, err := getZoneByProviderIDOrName(zones, curNode) zone, err := getZoneByProviderIDOrName(ctx, zones, curNode)
if err != nil { if err != nil {
return fmt.Errorf("failed to get zone from cloud provider: %v", err) return fmt.Errorf("failed to get zone from cloud provider: %v", err)
} }
@ -313,7 +313,7 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
} }
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node // So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(curNode, instances) cnc.updateNodeAddress(ctx, curNode, instances)
klog.Infof("Successfully initialized node %s with cloud provider", node.Name) klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
return nil return nil
@ -346,11 +346,11 @@ func excludeCloudTaint(taints []v1.Taint) []v1.Taint {
// ensureNodeExistsByProviderID checks if the instance exists by the provider id, // ensureNodeExistsByProviderID checks if the instance exists by the provider id,
// If provider id in spec is empty it calls instanceId with node name to get provider id // If provider id in spec is empty it calls instanceId with node name to get provider id
func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) { func ensureNodeExistsByProviderID(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (bool, error) {
providerID := node.Spec.ProviderID providerID := node.Spec.ProviderID
if providerID == "" { if providerID == "" {
var err error var err error
providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name)) providerID, err = instances.InstanceID(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
return false, nil return false, nil
@ -364,14 +364,14 @@ func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.No
} }
} }
return instances.InstanceExistsByProviderID(context.TODO(), providerID) return instances.InstanceExistsByProviderID(ctx, providerID)
} }
func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { func getNodeAddressesByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID) nodeAddresses, err := instances.NodeAddressesByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name)) nodeAddresses, err = instances.NodeAddresses(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }
@ -412,11 +412,11 @@ func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (
return nodeIP, nodeIPExists return nodeIP, nodeIPExists
} }
func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) { func getInstanceTypeByProviderIDOrName(ctx context.Context, instances cloudprovider.Instances, node *v1.Node) (string, error) {
instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID) instanceType, err := instances.InstanceTypeByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name)) instanceType, err = instances.InstanceType(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }
@ -426,11 +426,11 @@ func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *
// getZoneByProviderIDorName will attempt to get the zone of node using its providerID // getZoneByProviderIDorName will attempt to get the zone of node using its providerID
// then it's name. If both attempts fail, an error is returned // then it's name. If both attempts fail, an error is returned
func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) { func getZoneByProviderIDOrName(ctx context.Context, zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) {
zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID) zone, err := zones.GetZoneByProviderID(ctx, node.Spec.ProviderID)
if err != nil { if err != nil {
providerIDErr := err providerIDErr := err
zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name)) zone, err = zones.GetZoneByNodeName(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
} }

View File

@ -17,11 +17,12 @@ limitations under the License.
package cloud package cloud
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -29,7 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider"
fakecloud "k8s.io/cloud-provider/fake" fakecloud "k8s.io/cloud-provider/fake"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -147,7 +148,7 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) {
} }
instances, _ := fc.Instances() instances, _ := fc.Instances()
exists, err := ensureNodeExistsByProviderID(instances, tc.node) exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, tc.node)
assert.Equal(t, err, tc.providerIDErr) assert.Equal(t, err, tc.providerIDErr)
assert.EqualValues(t, tc.expectedCalls, fc.Calls, assert.EqualValues(t, tc.expectedCalls, fc.Calls,
@ -229,7 +230,7 @@ func TestNodeInitialized(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -293,7 +294,7 @@ func TestNodeIgnored(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated") assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated")
} }
@ -366,7 +367,7 @@ func TestGCECondition(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -455,7 +456,7 @@ func TestZoneInitialized(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -545,7 +546,7 @@ func TestNodeAddresses(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -562,7 +563,7 @@ func TestNodeAddresses(t *testing.T) {
}, },
} }
cloudNodeController.UpdateNodeStatus() cloudNodeController.UpdateNodeStatus(context.TODO())
updatedNodes := fnh.GetUpdatedNodesCopy() updatedNodes := fnh.GetUpdatedNodesCopy()
@ -657,13 +658,13 @@ func TestNodeProvidedIPAddresses(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated") assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated")
cloudNodeController.UpdateNodeStatus() cloudNodeController.UpdateNodeStatus(context.TODO())
updatedNodes := fnh.GetUpdatedNodesCopy() updatedNodes := fnh.GetUpdatedNodesCopy()
@ -864,7 +865,7 @@ func TestNodeAddressesNotUpdate(t *testing.T) {
cloud: fakeCloud, cloud: fakeCloud,
} }
cloudNodeController.updateNodeAddress(fnh.Existing[0], fakeCloud) cloudNodeController.updateNodeAddress(context.TODO(), fnh.Existing[0], fakeCloud)
if len(fnh.UpdatedNodes) != 0 { if len(fnh.UpdatedNodes) != 0 {
t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes)) t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes))
@ -946,7 +947,7 @@ func TestNodeProviderID(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
@ -1029,7 +1030,7 @@ func TestNodeProviderIDAlreadySet(t *testing.T) {
} }
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0]) cloudNodeController.AddCloudNode(context.TODO(), fnh.Existing[0])
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")

View File

@ -166,7 +166,7 @@ func (c *CloudNodeLifecycleController) MonitorNodes() {
// At this point the node has NotReady status, we need to check if the node has been removed // At this point the node has NotReady status, we need to check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node // from the cloud provider. If node cannot be found in cloudprovider, then delete the node
exists, err := ensureNodeExistsByProviderID(instances, node) exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, node)
if err != nil { if err != nil {
klog.Errorf("error checking if node %s exists: %v", node.Name, err) klog.Errorf("error checking if node %s exists: %v", node.Name, err)
continue continue