Switch to UpdateVMs() for updating VMSS backend address pool

This commit is contained in:
Pengfei Ni 2020-03-01 04:46:51 +00:00
parent 2af26dca97
commit 0f4cfe58d8
4 changed files with 185 additions and 118 deletions

View File

@ -722,23 +722,23 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is // EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool. // participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) (string, string, string, *compute.VirtualMachineScaleSetVM, error) {
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
serviceName := getServiceName(service) serviceName := getServiceName(service)
nic, err := as.getPrimaryInterfaceWithVMSet(vmName, vmSetName) nic, err := as.getPrimaryInterfaceWithVMSet(vmName, vmSetName)
if err != nil { if err != nil {
if err == errNotInVMSet { if err == errNotInVMSet {
klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName) klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName)
return nil return "", "", "", nil, nil
} }
klog.Errorf("error: az.EnsureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) klog.Errorf("error: az.EnsureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err)
return err return "", "", "", nil, err
} }
if nic.ProvisioningState != nil && *nic.ProvisioningState == nicFailedState { if nic.ProvisioningState != nil && *nic.ProvisioningState == nicFailedState {
klog.Warningf("EnsureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name) klog.Warningf("EnsureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name)
return nil return "", "", "", nil, nil
} }
var primaryIPConfig *network.InterfaceIPConfiguration var primaryIPConfig *network.InterfaceIPConfiguration
@ -746,12 +746,12 @@ func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.
if !as.Cloud.ipv6DualStackEnabled && !ipv6 { if !as.Cloud.ipv6DualStackEnabled && !ipv6 {
primaryIPConfig, err = getPrimaryIPConfig(nic) primaryIPConfig, err = getPrimaryIPConfig(nic)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
} else { } else {
primaryIPConfig, err = getIPConfigByIPFamily(nic, ipv6) primaryIPConfig, err = getIPConfigByIPFamily(nic, ipv6)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
} }
@ -780,11 +780,11 @@ func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.
} }
isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs) isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
if !isSameLB { if !isSameLB {
klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName) klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName)
return nil return "", "", "", nil, nil
} }
} }
@ -799,10 +799,10 @@ func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.
klog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) klog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
err := as.CreateOrUpdateInterface(service, nic) err := as.CreateOrUpdateInterface(service, nic)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
} }
return nil return "", "", "", nil, nil
} }
// EnsureHostsInPool ensures the given Node's primary IP configurations are // EnsureHostsInPool ensures the given Node's primary IP configurations are
@ -822,7 +822,7 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No
} }
f := func() error { f := func() error {
err := as.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) _, _, _, _, err := as.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
if err != nil { if err != nil {
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err) return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err)
} }

View File

@ -62,7 +62,7 @@ type VMSet interface {
EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is // EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool. // participating in the specified LoadBalancer Backend Pool.
EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) (string, string, string, *compute.VirtualMachineScaleSetVM, error)
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes.
EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error

View File

@ -63,6 +63,12 @@ const (
vmssVMInstanceUpdateDelay = 3 * time.Second vmssVMInstanceUpdateDelay = 3 * time.Second
) )
// vmssMetaInfo contains the metadata for a VMSS.
type vmssMetaInfo struct {
vmssName string
resourceGroup string
}
// scaleSet implements VMSet interface for Azure scale set. // scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct { type scaleSet struct {
*Cloud *Cloud
@ -799,13 +805,13 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin
} }
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is // EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool. // participating in the specified LoadBalancer Backend Pool, which returns (resourceGroup, vmssName, instanceID, vmssVM, error).
func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) (string, string, string, *compute.VirtualMachineScaleSetVM, error) {
klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID)
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
// Check scale set name: // Check scale set name:
@ -814,19 +820,19 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
// don't check vmSet for it. // don't check vmSet for it.
if vmSetName != "" && !ss.useStandardLoadBalancer() && !strings.EqualFold(vmSetName, ssName) { if vmSetName != "" && !ss.useStandardLoadBalancer() && !strings.EqualFold(vmSetName, ssName) {
klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the scaleSet %s", vmName, vmSetName) klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the scaleSet %s", vmName, vmSetName)
return nil return "", "", "", nil, nil
} }
// Find primary network interface configuration. // Find primary network interface configuration.
if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil { if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", vmName) klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", vmName)
return nil return "", "", "", nil, nil
} }
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, vmName) primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, vmName)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
var primaryIPConfiguration *compute.VirtualMachineScaleSetIPConfiguration var primaryIPConfiguration *compute.VirtualMachineScaleSetIPConfiguration
@ -836,12 +842,12 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
// Find primary IP configuration. // Find primary IP configuration.
primaryIPConfiguration, err = getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration) primaryIPConfiguration, err = getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
} else { } else {
primaryIPConfiguration, err = ss.getConfigForScaleSetByIPFamily(primaryNetworkInterfaceConfiguration, vmName, ipv6) primaryIPConfiguration, err = ss.getConfigForScaleSetByIPFamily(primaryNetworkInterfaceConfiguration, vmName, ipv6)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
} }
@ -860,7 +866,7 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
// The backendPoolID has already been found from existing LoadBalancerBackendAddressPools. // The backendPoolID has already been found from existing LoadBalancerBackendAddressPools.
if foundPool { if foundPool {
return nil return "", "", "", nil, nil
} }
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 { if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
@ -876,11 +882,11 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
} }
isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs) isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
if !isSameLB { if !isSameLB {
klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName) klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName)
return nil return "", "", "", nil, nil
} }
} }
@ -890,7 +896,7 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
ID: to.StringPtr(backendPoolID), ID: to.StringPtr(backendPoolID),
}) })
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
newVM := compute.VirtualMachineScaleSetVM{ newVM := &compute.VirtualMachineScaleSetVM{
Sku: vm.Sku, Sku: vm.Sku,
Location: vm.Location, Location: vm.Location,
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{ VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
@ -904,23 +910,10 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
// Get the node resource group. // Get the node resource group.
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName) nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
// Invalidate the cache since right after update return nodeResourceGroup, ssName, instanceID, newVM, nil
defer ss.deleteCacheForNode(vmName)
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil {
klog.Errorf("EnsureHostInPool VirtualMachineScaleSetVMsClient.Update(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
return rerr.Error()
}
return nil
} }
func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) { func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) {
@ -1050,6 +1043,8 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
// participating in the specified LoadBalancer Backend Pool. // participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error { func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
hostUpdates := make([]func() error, 0, len(nodes)) hostUpdates := make([]func() error, 0, len(nodes))
nodeUpdates := make(map[vmssMetaInfo]map[string]compute.VirtualMachineScaleSetVM)
errors := make([]error, 0)
for _, node := range nodes { for _, node := range nodes {
localNodeName := node.Name localNodeName := node.Name
@ -1063,39 +1058,80 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
continue continue
} }
f := func() error {
// Check whether the node is VMAS virtual machine. // Check whether the node is VMAS virtual machine.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, azcache.CacheReadTypeDefault) managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err)
return err errors = append(errors, err)
continue
} }
if managedByAS { if managedByAS {
// VMAS nodes should also be added to the SLB backends. // VMAS nodes should also be added to the SLB backends.
if ss.useStandardLoadBalancer() { if ss.useStandardLoadBalancer() {
return ss.availabilitySet.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) hostUpdates = append(hostUpdates, func() error {
_, _, _, _, err := ss.availabilitySet.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
return err
})
continue
} }
klog.V(3).Infof("EnsureHostsInPool skips node %s because VMAS nodes couldn't be added to basic LB with VMSS backends", localNodeName) klog.V(3).Infof("EnsureHostsInPool skips node %s because VMAS nodes couldn't be added to basic LB with VMSS backends", localNodeName)
return nil continue
} }
err = ss.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) nodeResourceGroup, nodeVMSS, nodeInstanceID, nodeVMSSVM, err := ss.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
if err != nil { if err != nil {
return fmt.Errorf("EnsureHostInPool(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err) klog.Errorf("EnsureHostInPool(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err)
} errors = append(errors, err)
return nil continue
}
hostUpdates = append(hostUpdates, f)
} }
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...) // No need to update if nodeVMSSVM is nil.
if nodeVMSSVM == nil {
continue
}
nodeVMSSMetaInfo := vmssMetaInfo{vmssName: nodeVMSS, resourceGroup: nodeResourceGroup}
if v, ok := nodeUpdates[nodeVMSSMetaInfo]; ok {
v[nodeInstanceID] = *nodeVMSSVM
} else {
nodeUpdates[nodeVMSSMetaInfo] = map[string]compute.VirtualMachineScaleSetVM{
nodeInstanceID: *nodeVMSSVM,
}
}
// Invalidate the cache since the VMSS VM would be updated.
defer ss.deleteCacheForNode(localNodeName)
}
// Update VMs with best effort that have already been added to nodeUpdates.
for meta, update := range nodeUpdates {
hostUpdates = append(hostUpdates, func() error {
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("EnsureHostInPool begins to UpdateVMs for VMSS(%s, %s) with new backendPoolID %s", meta.resourceGroup, meta.vmssName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update")
if rerr != nil {
klog.Errorf("EnsureHostInPool UpdateVMs for VMSS(%s, %s) failed with error %v", meta.resourceGroup, meta.vmssName, rerr.Error())
return rerr.Error()
}
return nil
})
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
if errs != nil { if errs != nil {
return utilerrors.Flatten(errs) return utilerrors.Flatten(errs)
} }
// we need to add the LB backend updates back to VMSS model, see issue kubernetes/kubernetes#80365 for detailed information // Fail if there're other errors.
if len(errors) > 0 {
return utilerrors.Flatten(utilerrors.NewAggregate(errors))
}
// Ensure the backendPoolID is also added on VMSS itself.
// Refer to issue kubernetes/kubernetes#80365 for detailed information
err := ss.ensureVMSSInPool(service, nodes, backendPoolID, vmSetName) err := ss.ensureVMSSInPool(service, nodes, backendPoolID, vmSetName)
if err != nil { if err != nil {
return err return err
@ -1104,31 +1140,32 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
return nil return nil
} }
// ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted
func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { // from the specified node, which returns (resourceGroup, vmssName, instanceID, vmssVM, error).
func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) (string, string, string, *compute.VirtualMachineScaleSetVM, error) {
ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
// Find primary network interface configuration. // Find primary network interface configuration.
if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil { if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", nodeName) klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", nodeName)
return nil return "", "", "", nil, nil
} }
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, nodeName) primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, nodeName)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
// Find primary IP configuration. // Find primary IP configuration.
primaryIPConfiguration, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration) primaryIPConfiguration, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 { if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 {
return nil return "", "", "", nil, nil
} }
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration. // Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
@ -1146,12 +1183,12 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
// Pool not found, assume it has been already removed. // Pool not found, assume it has been already removed.
if !foundPool { if !foundPool {
return nil return "", "", "", nil, nil
} }
// Compose a new vmssVM with added backendPoolID. // Compose a new vmssVM with added backendPoolID.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
newVM := compute.VirtualMachineScaleSetVM{ newVM := &compute.VirtualMachineScaleSetVM{
Sku: vm.Sku, Sku: vm.Sku,
Location: vm.Location, Location: vm.Location,
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{ VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
@ -1165,23 +1202,10 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
// Get the node resource group. // Get the node resource group.
nodeResourceGroup, err := ss.GetNodeResourceGroup(nodeName) nodeResourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil { if err != nil {
return err return "", "", "", nil, err
} }
// Invalidate the cache since right after update return nodeResourceGroup, ssName, instanceID, newVM, nil
defer ss.deleteCacheForNode(nodeName)
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil {
klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err)
} else {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update vmssVM(%s) with backendPoolID %s succeeded", nodeName, backendPoolID)
}
return rerr.Error()
} }
// getNodeNameByIPConfigurationID gets the node name by IP configuration ID. // getNodeNameByIPConfigurationID gets the node name by IP configuration ID.
@ -1331,43 +1355,82 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
} }
hostUpdates := make([]func() error, 0, len(ipConfigurationIDs)) hostUpdates := make([]func() error, 0, len(ipConfigurationIDs))
nodeUpdates := make(map[vmssMetaInfo]map[string]compute.VirtualMachineScaleSetVM)
errors := make([]error, 0)
for i := range ipConfigurationIDs { for i := range ipConfigurationIDs {
ipConfigurationID := ipConfigurationIDs[i] ipConfigurationID := ipConfigurationIDs[i]
f := func() error {
var scaleSetName string var scaleSetName string
var err error var err error
if scaleSetName, err = extractScaleSetNameByProviderID(ipConfigurationID); err == nil { if scaleSetName, err = extractScaleSetNameByProviderID(ipConfigurationID); err == nil {
// Only remove nodes belonging to specified vmSet to basic LB backends. // Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(scaleSetName, vmSetName) { if !ss.useStandardLoadBalancer() && !strings.EqualFold(scaleSetName, vmSetName) {
return nil continue
} }
} }
nodeName, err := ss.getNodeNameByIPConfigurationID(ipConfigurationID) nodeName, err := ss.getNodeNameByIPConfigurationID(ipConfigurationID)
if err != nil { if err != nil {
if err == ErrorNotVmssInstance { // Do nothing for the VMAS nodes. if err == ErrorNotVmssInstance { // Do nothing for the VMAS nodes.
return nil continue
} }
klog.Errorf("Failed to getNodeNameByIPConfigurationID(%s): %v", ipConfigurationID, err) klog.Errorf("Failed to getNodeNameByIPConfigurationID(%s): %v", ipConfigurationID, err)
return err errors = append(errors, err)
continue
} }
err = ss.ensureBackendPoolDeletedFromNode(service, nodeName, backendPoolID) nodeResourceGroup, nodeVMSS, nodeInstanceID, nodeVMSSVM, err := ss.ensureBackendPoolDeletedFromNode(service, nodeName, backendPoolID)
if err != nil { if err != nil {
return fmt.Errorf("failed to ensure backend pool %s deleted from node %s: %v", backendPoolID, nodeName, err) klog.Errorf("EnsureBackendPoolDeleted(%s): backendPoolID(%s) - failed with error %v", getServiceName(service), backendPoolID, err)
errors = append(errors, err)
continue
}
// No need to update if nodeVMSSVM is nil.
if nodeVMSSVM == nil {
continue
}
nodeVMSSMetaInfo := vmssMetaInfo{vmssName: nodeVMSS, resourceGroup: nodeResourceGroup}
if v, ok := nodeUpdates[nodeVMSSMetaInfo]; ok {
v[nodeInstanceID] = *nodeVMSSVM
} else {
nodeUpdates[nodeVMSSMetaInfo] = map[string]compute.VirtualMachineScaleSetVM{
nodeInstanceID: *nodeVMSSVM,
}
}
// Invalidate the cache since the VMSS VM would be updated.
defer ss.deleteCacheForNode(nodeName)
}
// Update VMs with best effort that have already been added to nodeUpdates.
for meta, update := range nodeUpdates {
hostUpdates = append(hostUpdates, func() error {
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("EnsureBackendPoolDeleted begins to UpdateVMs for VMSS(%s, %s) with backendPoolID %s", meta.resourceGroup, meta.vmssName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update")
if rerr != nil {
klog.Errorf("EnsureBackendPoolDeleted UpdateVMs for VMSS(%s, %s) failed with error %v", meta.resourceGroup, meta.vmssName, rerr.Error())
return rerr.Error()
} }
return nil return nil
})
} }
hostUpdates = append(hostUpdates, f) errs := utilerrors.AggregateGoroutines(hostUpdates...)
}
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
if errs != nil { if errs != nil {
return utilerrors.Flatten(errs) return utilerrors.Flatten(errs)
} }
// Fail if there're other errors.
if len(errors) > 0 {
return utilerrors.Flatten(utilerrors.NewAggregate(errors))
}
// Ensure the backendPoolID is also deleted on VMSS itself.
err := ss.ensureBackendPoolDeletedFromVMSS(service, backendPoolID, vmSetName, ipConfigurationIDs) err := ss.ensureBackendPoolDeletedFromVMSS(service, backendPoolID, vmSetName, ipConfigurationIDs)
if err != nil { if err != nil {
return err return err

View File

@ -26,7 +26,7 @@ import (
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
types "k8s.io/apimachinery/pkg/types" types "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloud_provider "k8s.io/cloud-provider"
cache "k8s.io/legacy-cloud-providers/azure/cache" cache "k8s.io/legacy-cloud-providers/azure/cache"
) )
@ -130,10 +130,10 @@ func (mr *MockVMSetMockRecorder) GetNodeNameByProviderID(providerID interface{})
} }
// GetZoneByNodeName mocks base method // GetZoneByNodeName mocks base method
func (m *MockVMSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { func (m *MockVMSet) GetZoneByNodeName(name string) (cloud_provider.Zone, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetZoneByNodeName", name) ret := m.ctrl.Call(m, "GetZoneByNodeName", name)
ret0, _ := ret[0].(cloudprovider.Zone) ret0, _ := ret[0].(cloud_provider.Zone)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
@ -188,11 +188,15 @@ func (mr *MockVMSetMockRecorder) EnsureHostsInPool(service, nodes, backendPoolID
} }
// EnsureHostInPool mocks base method // EnsureHostInPool mocks base method
func (m *MockVMSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID, vmSetName string, isInternal bool) error { func (m *MockVMSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID, vmSetName string, isInternal bool) (string, string, string, *compute.VirtualMachineScaleSetVM, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EnsureHostInPool", service, nodeName, backendPoolID, vmSetName, isInternal) ret := m.ctrl.Call(m, "EnsureHostInPool", service, nodeName, backendPoolID, vmSetName, isInternal)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(string)
return ret0 ret1, _ := ret[1].(string)
ret2, _ := ret[2].(string)
ret3, _ := ret[3].(*compute.VirtualMachineScaleSetVM)
ret4, _ := ret[4].(error)
return ret0, ret1, ret2, ret3, ret4
} }
// EnsureHostInPool indicates an expected call of EnsureHostInPool // EnsureHostInPool indicates an expected call of EnsureHostInPool