mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Cloud node controller: Only call once into cloud provider
This commit is contained in:
parent
623b697886
commit
18fa7bdb6e
@ -22,7 +22,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -287,6 +287,10 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.
|
||||
}
|
||||
}
|
||||
|
||||
// nodeModifier is used to carry changes to node objects across multiple attempts to update them
|
||||
// in a retry-if-conflict loop.
|
||||
type nodeModifier func(*v1.Node)
|
||||
|
||||
func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) {
|
||||
node, ok := newObj.(*v1.Node)
|
||||
if !ok {
|
||||
@ -318,6 +322,7 @@ func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{
|
||||
|
||||
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate
|
||||
func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
|
||||
klog.Infof("Initializing node %s with cloud provider", node.Name)
|
||||
|
||||
instances, ok := cnc.cloud.Instances()
|
||||
if !ok {
|
||||
@ -340,78 +345,51 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err))
|
||||
return
|
||||
}
|
||||
|
||||
cloudTaint := getCloudTaint(curNode.Spec.Taints)
|
||||
if cloudTaint == nil {
|
||||
// Node object received from event had the cloud taint but was outdated,
|
||||
// the node has actually already been initialized.
|
||||
return
|
||||
}
|
||||
|
||||
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, curNode, instances)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err))
|
||||
return
|
||||
}
|
||||
|
||||
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
|
||||
n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
|
||||
})
|
||||
|
||||
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
|
||||
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cloudTaint := getCloudTaint(curNode.Spec.Taints)
|
||||
if cloudTaint == nil {
|
||||
// Node object received from event had the cloud taint but was outdated,
|
||||
// the node has actually already been initialized.
|
||||
return nil
|
||||
for _, modify := range nodeModifiers {
|
||||
modify(curNode)
|
||||
}
|
||||
|
||||
if curNode.Spec.ProviderID == "" {
|
||||
providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(curNode.Name))
|
||||
if err == nil {
|
||||
curNode.Spec.ProviderID = providerID
|
||||
} else {
|
||||
// we should attempt to set providerID on curNode, but
|
||||
// we can continue if we fail since we will attempt to set
|
||||
// node addresses given the node name in getNodeAddressesByProviderIDOrName
|
||||
klog.Errorf("failed to set node provider id: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, curNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If user provided an IP address, ensure that IP address is found
|
||||
// in the cloud provider before removing the taint on the node
|
||||
if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok {
|
||||
if nodeIP == nil {
|
||||
return errors.New("failed to find kubelet node IP from cloud provider")
|
||||
}
|
||||
}
|
||||
|
||||
if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, curNode); err != nil {
|
||||
return err
|
||||
} else if instanceType != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
|
||||
curNode.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType)
|
||||
curNode.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType
|
||||
}
|
||||
|
||||
if zones, ok := cnc.cloud.Zones(); ok {
|
||||
zone, err := getZoneByProviderIDOrName(ctx, zones, curNode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get zone from cloud provider: %v", err)
|
||||
}
|
||||
if zone.FailureDomain != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
|
||||
curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain)
|
||||
curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain
|
||||
}
|
||||
if zone.Region != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
|
||||
curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region)
|
||||
curNode.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region
|
||||
}
|
||||
}
|
||||
|
||||
curNode.Spec.Taints = excludeCloudTaint(curNode.Spec.Taints)
|
||||
|
||||
_, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
|
||||
// So that users do not see any significant delay in IP addresses being filled into the node
|
||||
cnc.updateNodeAddress(ctx, curNode, instances)
|
||||
@ -425,6 +403,87 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
|
||||
}
|
||||
}
|
||||
|
||||
// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update
|
||||
// a node object with provider-specific information.
|
||||
// All of the returned functions are idempotent, because they are used in a retry-if-conflict
|
||||
// loop, meaning they could get called multiple times.
|
||||
func (cnc *CloudNodeController) getNodeModifiersFromCloudProvider(ctx context.Context, node *v1.Node, instances cloudprovider.Instances) ([]nodeModifier, error) {
|
||||
var nodeModifiers []nodeModifier
|
||||
|
||||
if node.Spec.ProviderID == "" {
|
||||
providerID, err := cloudprovider.GetInstanceProviderID(ctx, cnc.cloud, types.NodeName(node.Name))
|
||||
if err == nil {
|
||||
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
|
||||
if n.Spec.ProviderID == "" {
|
||||
n.Spec.ProviderID = providerID
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// we should attempt to set providerID on node, but
|
||||
// we can continue if we fail since we will attempt to set
|
||||
// node addresses given the node name in getNodeAddressesByProviderIDOrName
|
||||
klog.Errorf("failed to set node provider id: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
nodeAddresses, err := getNodeAddressesByProviderIDOrName(ctx, instances, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If user provided an IP address, ensure that IP address is found
|
||||
// in the cloud provider before removing the taint on the node
|
||||
if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok {
|
||||
if nodeIP == nil {
|
||||
return nil, errors.New("failed to find kubelet node IP from cloud provider")
|
||||
}
|
||||
}
|
||||
|
||||
if instanceType, err := getInstanceTypeByProviderIDOrName(ctx, instances, node); err != nil {
|
||||
return nil, err
|
||||
} else if instanceType != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType)
|
||||
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
|
||||
if n.Labels == nil {
|
||||
n.Labels = map[string]string{}
|
||||
}
|
||||
n.Labels[v1.LabelInstanceType] = instanceType
|
||||
n.Labels[v1.LabelInstanceTypeStable] = instanceType
|
||||
})
|
||||
}
|
||||
|
||||
if zones, ok := cnc.cloud.Zones(); ok {
|
||||
zone, err := getZoneByProviderIDOrName(ctx, zones, node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
|
||||
}
|
||||
if zone.FailureDomain != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain)
|
||||
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
|
||||
if n.Labels == nil {
|
||||
n.Labels = map[string]string{}
|
||||
}
|
||||
n.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
|
||||
n.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain
|
||||
})
|
||||
}
|
||||
if zone.Region != "" {
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
|
||||
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region)
|
||||
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
|
||||
if n.Labels == nil {
|
||||
n.Labels = map[string]string{}
|
||||
}
|
||||
n.Labels[v1.LabelZoneRegion] = zone.Region
|
||||
n.Labels[v1.LabelZoneRegionStable] = zone.Region
|
||||
})
|
||||
}
|
||||
}
|
||||
return nodeModifiers, nil
|
||||
}
|
||||
|
||||
func getCloudTaint(taints []v1.Taint) *v1.Taint {
|
||||
for _, taint := range taints {
|
||||
if taint.Key == schedulerapi.TaintExternalCloudProvider {
|
||||
|
Loading…
Reference in New Issue
Block a user