From 18fa7bdb6ea2c26e13175f7a3572651c3318c99d Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Thu, 28 Nov 2019 17:28:33 +0100 Subject: [PATCH] Cloud node controller: Only call once into cloud provider --- pkg/controller/cloud/node_controller.go | 181 ++++++++++++++++-------- 1 file changed, 120 insertions(+), 61 deletions(-) diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go index f69ce0b5a24..22a2cc5cf81 100644 --- a/pkg/controller/cloud/node_controller.go +++ b/pkg/controller/cloud/node_controller.go @@ -22,7 +22,7 @@ import ( "fmt" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -287,6 +287,10 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1. } } +// nodeModifier is used to carry changes to node objects across multiple attempts to update them +// in a retry-if-conflict loop. +type nodeModifier func(*v1.Node) + func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { node, ok := newObj.(*v1.Node) if !ok { @@ -318,6 +322,7 @@ func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{ // This processes nodes that were added into the cluster, and cloud initialize them if appropriate func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { + klog.Infof("Initializing node %s with cloud provider", node.Name) instances, ok := cnc.cloud.Instances() if !ok { @@ -340,78 +345,51 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod return err } } + return nil + }) + if err != nil { + utilruntime.HandleError(err) + return + } + curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err)) + return + } + + cloudTaint := getCloudTaint(curNode.Spec.Taints) + if cloudTaint == nil { + // Node object received from event had the cloud taint but was outdated, + // the node has actually already been initialized. + return + } + + nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, curNode, instances) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err)) + return + } + + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + n.Spec.Taints = excludeCloudTaint(n.Spec.Taints) + }) + + err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) if err != nil { return err } - cloudTaint := getCloudTaint(curNode.Spec.Taints) - if cloudTaint == nil { - // Node object received from event had the cloud taint but was outdated, - // the node has actually already been initialized. - return nil + for _, modify := range nodeModifiers { + modify(curNode) } - if curNode.Spec.ProviderID == "" { - providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name)) - if err == nil { - curNode.Spec.ProviderID = providerID - } else { - // we should attempt to set providerID on curNode, but - // we can continue if we fail since we will attempt to set - // node addresses given the node name in getNodeAddressesByProviderIDOrName - klog.Errorf("failed to set node provider id: %v", err) - } - } - - nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode) - if err != nil { - return err - } - - // If user provided an IP address, ensure that IP address is found - // in the cloud provider before removing the taint on the node - if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok { - if nodeIP == nil { - return errors.New("failed to find kubelet node IP from cloud provider") - } - } - - if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil { - return err - } else if instanceType != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) - curNode.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType) - curNode.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType - } - - if zones, ok := cnc.cloud.Zones(); ok { - zone, err := getZoneByProviderIDOrName(ctx, zones, curNode) - if err != nil { - return fmt.Errorf("failed to get zone from cloud provider: %v", err) - } - if zone.FailureDomain != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) - curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain) - curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain - } - if zone.Region != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) - curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region) - curNode.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region - } - } - - curNode.Spec.Taints = excludeCloudTaint(curNode.Spec.Taints) - _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode) if err != nil { return err } + // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // So that users do not see any significant delay in IP addresses being filled into the node cnc.updateNodeAddress(ctx, curNode, instances) @@ -425,6 +403,87 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod } } +// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update +// a node object with provider-specific information. +// All of the returned functions are idempotent, because they are used in a retry-if-conflict +// loop, meaning they could get called multiple times. +func (cnc *CloudNodeController) getNodeModifiersFromCloudProvider(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) ([]nodeModifier, error) { + var nodeModifiers []nodeModifier + + if node.Spec.ProviderID == "" { + providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(node.Name)) + if err == nil { + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Spec.ProviderID == "" { + n.Spec.ProviderID = providerID + } + }) + } else { + // we should attempt to set providerID on node, but + // we can continue if we fail since we will attempt to set + // node addresses given the node name in getNodeAddressesByProviderIDOrName + klog.Errorf("failed to set node provider id: %v", err) + } + } + + nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node) + if err != nil { + return nil, err + } + + // If user provided an IP address, ensure that IP address is found + // in the cloud provider before removing the taint on the node + if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { + if nodeIP == nil { + return nil, errors.New("failed to find kubelet node IP from cloud provider") + } + } + + if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, node); err != nil { + return nil, err + } else if instanceType != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelInstanceType] = instanceType + n.Labels[v1.LabelInstanceTypeStable] = instanceType + }) + } + + if zones, ok := cnc.cloud.Zones(); ok { + zone, err := getZoneByProviderIDOrName(ctx, zones, node) + if err != nil { + return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) + } + if zone.FailureDomain != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain + n.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain + }) + } + if zone.Region != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelZoneRegion] = zone.Region + n.Labels[v1.LabelZoneRegionStable] = zone.Region + }) + } + } + return nodeModifiers, nil +} + func getCloudTaint(taints []v1.Taint) *v1.Taint { for _, taint := range taints { if taint.Key == schedulerapi.TaintExternalCloudProvider {