Replace vmss update API with instance-level update API

This commit is contained in:
Pengfei Ni 2019-04-15 17:09:27 +08:00
parent c27ff9a11e
commit f7db5f7ab7
6 changed files with 254 additions and 250 deletions

View File

@ -89,7 +89,6 @@ type SecurityGroupsClient interface {
// VirtualMachineScaleSetsClient defines needed functions for azure compute.VirtualMachineScaleSetsClient
type VirtualMachineScaleSetsClient interface {
CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error)
List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, err error)
UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error)
@ -835,30 +834,6 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin
}
}
func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
err = createRateLimitErr(true, "VMSSCreateOrUpdate")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, VMScaleSetName)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, VMScaleSetName)
}()
mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
mc.Observe(err)
if err != nil {
return future.Response(), err
}
err = future.WaitForCompletionRef(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) {
if !az.rateLimiterReader.TryAccept() {
err = createRateLimitErr(false, "VMSSGet")

View File

@ -902,7 +902,11 @@ func (f *fakeVMSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
func (f *fakeVMSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
return fmt.Errorf("unimplemented")
}

View File

@ -829,13 +829,13 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName)
klog.V(10).Infof("EnsureBackendPoolDeleted(%s,%s) for service %s: start", lbBackendPoolID, vmSetName, serviceName)
err := az.vmSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools)
if err != nil {
klog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err)
klog.Errorf("EnsureBackendPoolDeleted(%s) for service %s failed: %v", lbBackendPoolID, serviceName, err)
return nil, err
}
klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): end", lbBackendPoolID, vmSetName)
klog.V(10).Infof("EnsureBackendPoolDeleted(%s) for service %s: end", lbBackendPoolID, serviceName)
// Remove the LB.
klog.V(10).Infof("reconcileLoadBalancer: az.DeleteLB(%q): start", lbName)

View File

@ -622,24 +622,24 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
return nic, nil
}
// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) ensureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
vmName := mapNodeNameToVMName(nodeName)
serviceName := getServiceName(service)
nic, err := as.getPrimaryInterfaceWithVMSet(vmName, vmSetName)
if err != nil {
if err == errNotInVMSet {
klog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName)
klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName)
return nil
}
klog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err)
klog.Errorf("error: az.EnsureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err)
return err
}
if nic.ProvisioningState != nil && *nic.ProvisioningState == nicFailedState {
klog.V(3).Infof("ensureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name)
klog.Warningf("EnsureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name)
return nil
}
@ -716,7 +716,7 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No
}
f := func() error {
err := as.ensureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
err := as.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
if err != nil {
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err)
}
@ -733,8 +733,8 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No
return nil
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes.
func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// Do nothing for availability set.
return nil
}

View File

@ -57,8 +57,11 @@ type VMSet interface {
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes.
EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error

View File

@ -30,7 +30,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
@ -44,6 +44,7 @@ var (
resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`)
vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`)
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
vmssIPConfigurationRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(.+)/networkInterfaces(?:.*)`)
)
// scaleSet implements VMSet interface for Azure scale set.
@ -351,6 +352,16 @@ func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSet
return "", fmt.Errorf("failed to find a primary nic for the vm. vmname=%q", *machine.Name)
}
// getVmssMachineID returns the full identifier of a vmss virtual machine.
func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string {
return fmt.Sprintf(
vmssMachineIDTemplate,
az.SubscriptionID,
strings.ToLower(resourceGroup),
scaleSetName,
instanceID)
}
// machineName is composed of computerNamePrefix and 36-based instanceID.
// And instanceID part if in fixed length of 6 characters.
// Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/.
@ -618,9 +629,8 @@ func (ss *scaleSet) getScaleSetWithRetry(service *v1.Service, name string) (comp
return result, exists, err
}
// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets.
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
networkConfigurations := *networkConfigurationList
// getPrimarynetworkInterfaceConfiguration gets primary network interface configuration for scale set virtual machine.
func (ss *scaleSet) getPrimarynetworkInterfaceConfiguration(networkConfigurations []compute.VirtualMachineScaleSetNetworkConfiguration, nodeName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
if len(networkConfigurations) == 1 {
return &networkConfigurations[0], nil
}
@ -632,10 +642,10 @@ func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]c
}
}
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName)
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set VM %q", nodeName)
}
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, nodeName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
ipConfigurations := *config.IPConfigurations
if len(ipConfigurations) == 1 {
return &ipConfigurations[0], nil
@ -648,31 +658,7 @@ func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachine
}
}
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName)
}
// createOrUpdateVMSS invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSS(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
if ss.Config.shouldOmitCloudProviderBackoff() {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return ss.processHTTPResponse(service, "CreateOrUpdateVMSS", resp, err)
}
return ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
}
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSSWithRetry(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSS", resp, err)
})
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set VM %q", nodeName)
}
// updateVMSSInstances invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
@ -699,70 +685,35 @@ func (ss *scaleSet) updateVMSSInstancesWithRetry(service *v1.Service, scaleSetNa
})
}
// getNodesScaleSets returns scalesets with instanceIDs and standard node names for given nodes.
func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, []*v1.Node, error) {
scalesets := make(map[string]sets.String)
standardNodes := []*v1.Node{}
for _, curNode := range nodes {
if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(curNode) {
klog.V(4).Infof("Excluding master node %q from load balancer backendpool", curNode.Name)
continue
}
if ss.ShouldNodeExcludedFromLoadBalancer(curNode) {
klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name)
continue
}
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
if err != nil {
klog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
standardNodes = append(standardNodes, curNode)
continue
}
if _, ok := scalesets[curScaleSetName]; !ok {
scalesets[curScaleSetName] = sets.NewString()
}
instanceID, err := getLastSegment(curNode.Spec.ProviderID)
if err != nil {
klog.Errorf("Failed to get instance ID for node %q: %v", curNode.Spec.ProviderID, err)
return nil, nil, err
}
scalesets[curScaleSetName].Insert(instanceID)
}
return scalesets, standardNodes, nil
}
// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are
// participating in the vmSet's LoadBalancer Backend Pool.
func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
klog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID)
serviceName := getServiceName(service)
virtualMachineScaleSet, exists, err := ss.getScaleSet(service, vmSetName)
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID)
vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName)
if err != nil {
klog.Errorf("ss.getScaleSet(%s) for service %q failed: %v", vmSetName, serviceName, err)
return err
}
if !exists {
errorMessage := fmt.Errorf("Scale set %q not found", vmSetName)
klog.Errorf("%v", errorMessage)
return errorMessage
// Check scale set name:
// - For basic SKU load balancer, errNotInVMSet should be returned if the node's
// scale set is mismatched with vmSetName.
// - For standard SKU load balancer, backend could belong to multiple VMSS, so we
// don't check vmSet for it.
if vmSetName != "" && !ss.useStandardLoadBalancer() && !strings.EqualFold(vmSetName, ssName) {
klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the scaleSet %s", vmName, vmSetName)
return nil
}
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName)
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimarynetworkInterfaceConfiguration(networkInterfaceConfigurations, vmName)
if err != nil {
return err
}
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName)
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, vmName)
if err != nil {
return err
}
@ -779,217 +730,288 @@ func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID st
break
}
}
if !foundPool {
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
// Although standard load balancer supports backends from multiple vmss,
// the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
newBackendPoolsIDs := make([]string, 0, len(newBackendPools))
for _, pool := range newBackendPools {
if pool.ID != nil {
newBackendPoolsIDs = append(newBackendPoolsIDs, *pool.ID)
}
}
isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs)
if err != nil {
return err
}
if !isSameLB {
klog.V(4).Infof("VMSS %q has already been added to LB %q, omit adding it to a new one", vmSetName, oldLBName)
return nil
// The backendPoolID has already been found from existing LoadBalancerBackendAddressPools.
if foundPool {
return nil
}
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
// Although standard load balancer supports backends from multiple scale
// sets, the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
newBackendPoolsIDs := make([]string, 0, len(newBackendPools))
for _, pool := range newBackendPools {
if pool.ID != nil {
newBackendPoolsIDs = append(newBackendPoolsIDs, *pool.ID)
}
}
newBackendPools = append(newBackendPools,
compute.SubResource{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
err := ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs)
if err != nil {
return err
}
if !isSameLB {
klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName)
return nil
}
}
// Update instances to latest VMSS model.
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
// Compose a new vmssVM with added backendPoolID.
newBackendPools = append(newBackendPools,
compute.SubResource{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
newVM := compute.VirtualMachineScaleSetVM{
Sku: vm.Sku,
Location: vm.Location,
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
HardwareProfile: vm.HardwareProfile,
NetworkProfileConfiguration: &compute.VirtualMachineScaleSetVMNetworkProfileConfiguration{
NetworkInterfaceConfigurations: &networkInterfaceConfigurations,
},
},
}
err = ss.updateVMSSInstances(service, vmSetName, vmInstanceIDs)
// Get the node resource group.
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
if err != nil {
return err
}
return nil
// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("EnsureHostInPool update backing off vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM)
if retryErr != nil {
err = retryErr
klog.Errorf("EnsureHostInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
}
}
return err
}
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
serviceName := getServiceName(service)
scalesets, standardNodes, err := ss.getNodesScaleSets(nodes)
if err != nil {
klog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err)
return err
}
hostUpdates := make([]func() error, 0, len(nodes))
for _, node := range nodes {
localNodeName := node.Name
for ssName, instanceIDs := range scalesets {
// Only add nodes belonging to specified vmSet for basic SKU LB.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(node) {
klog.V(4).Infof("Excluding master node %q from load balancer backendpool %q", localNodeName, backendPoolID)
continue
}
if instanceIDs.Len() == 0 {
// This may happen when scaling a vmss capacity to 0.
klog.V(3).Infof("scale set %q has 0 nodes, adding it to load balancer anyway", ssName)
// InstanceIDs is required to update vmss, use * instead here since there are no nodes actually.
instanceIDs.Insert("*")
if ss.ShouldNodeExcludedFromLoadBalancer(node) {
klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", localNodeName)
continue
}
err := ss.ensureHostsInVMSetPool(service, backendPoolID, ssName, instanceIDs.List(), isInternal)
if err != nil {
klog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err)
return err
f := func() error {
// VMAS nodes should also be added to the SLB backends.
if ss.useStandardLoadBalancer() {
// Check whether the node is VMAS virtual machine.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err)
return err
}
if managedByAS {
return ss.availabilitySet.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
}
}
err := ss.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
if err != nil {
return fmt.Errorf("EnsureHostInPool(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err)
}
return nil
}
hostUpdates = append(hostUpdates, f)
}
if ss.useStandardLoadBalancer() && len(standardNodes) > 0 {
err := ss.availabilitySet.EnsureHostsInPool(service, standardNodes, backendPoolID, "", isInternal)
if err != nil {
klog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err)
return err
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
if errs != nil {
return utilerrors.Flatten(errs)
}
return nil
}
// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset.
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(service *v1.Service, poolID, ssName string) error {
klog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSet(service, ssName)
// ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node.
func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error {
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
if err != nil {
klog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSet(%s) failed: %v", poolID, ssName, ssName, err)
return err
}
if !exists {
klog.V(2).Infof("ss.ensureScaleSetBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, ssName, ssName)
return nil
}
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, ssName)
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimarynetworkInterfaceConfiguration(networkInterfaceConfigurations, nodeName)
if err != nil {
return err
}
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, ssName)
// Find primary IP configuration.4
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, nodeName)
if err != nil {
return err
}
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 {
return nil
}
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools
newBackendPools := []compute.SubResource{}
foundPool := false
for i := len(existingBackendPools) - 1; i >= 0; i-- {
curPool := existingBackendPools[i]
if strings.EqualFold(poolID, *curPool.ID) {
klog.V(10).Infof("ensureScaleSetBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, ssName)
if strings.EqualFold(backendPoolID, *curPool.ID) {
klog.V(10).Infof("ensureBackendPoolDeletedFromNode gets unwanted backend pool %q for node %s", backendPoolID, nodeName)
foundPool = true
newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...)
}
}
// Pool not found, assume it has been already removed.
if !foundPool {
// Pool not found, assume it has been already removed.
return nil
}
// Update scale set with backoff.
// Compose a new vmssVM with added backendPoolID.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", ssName)
err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
newVM := compute.VirtualMachineScaleSetVM{
Sku: vm.Sku,
Location: vm.Location,
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
HardwareProfile: vm.HardwareProfile,
NetworkProfileConfiguration: &compute.VirtualMachineScaleSetVMNetworkProfileConfiguration{
NetworkInterfaceConfigurations: &networkInterfaceConfigurations,
},
},
}
// Get the node resource group.
nodeResourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil {
return err
}
// Update instances to latest VMSS model.
instanceIDs := []string{"*"}
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
err = ss.updateVMSSInstances(service, ssName, vmInstanceIDs)
if err != nil {
return err
}
// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
// Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB.
// TODO: remove this workaround when figuring out the root cause.
if len(newBackendPools) == 0 {
err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
if err != nil {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update backing off vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM)
if retryErr != nil {
err = retryErr
klog.Errorf("ensureBackendPoolDeletedFromNode update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
}
}
return nil
if err != nil {
klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err)
} else {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update vmssVM(%s) with backendPoolID %s succeeded", nodeName, backendPoolID)
}
return err
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// getNodeNameByIPConfigurationID gets the node name by IP configuration ID.
func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (string, error) {
matches := vmssIPConfigurationRE.FindStringSubmatch(ipConfigurationID)
if len(matches) != 4 {
klog.V(4).Infof("Can not extract scale set name from ipConfigurationID (%s), assuming it is mananaged by availability set", ipConfigurationID)
return "", ErrorNotVmssInstance
}
resourceGroup := matches[1]
scaleSetName := matches[2]
instanceID := matches[3]
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID)
if err != nil {
return "", err
}
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
return strings.ToLower(*vm.OsProfile.ComputerName), nil
}
return "", nil
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes.
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// Returns nil if backend address pools already deleted.
if backendAddressPools == nil {
return nil
}
scalesets := sets.NewString()
ipConfigurationIDs := []string{}
for _, backendPool := range *backendAddressPools {
if strings.EqualFold(*backendPool.ID, poolID) && backendPool.BackendIPConfigurations != nil {
for _, ipConfigurations := range *backendPool.BackendIPConfigurations {
if ipConfigurations.ID == nil {
if strings.EqualFold(*backendPool.ID, backendPoolID) && backendPool.BackendIPConfigurations != nil {
for _, ipConf := range *backendPool.BackendIPConfigurations {
if ipConf.ID == nil {
continue
}
ssName, err := extractScaleSetNameByProviderID(*ipConfigurations.ID)
if err != nil {
klog.V(4).Infof("backend IP configuration %q is not belonging to any vmss, omit it", *ipConfigurations.ID)
continue
}
scalesets.Insert(ssName)
ipConfigurationIDs = append(ipConfigurationIDs, *ipConf.ID)
}
break
}
}
for ssName := range scalesets {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
continue
}
hostUpdates := make([]func() error, 0, len(ipConfigurationIDs))
for i := range ipConfigurationIDs {
ipConfigurationID := ipConfigurationIDs[i]
err := ss.ensureScaleSetBackendPoolDeleted(service, poolID, ssName)
if err != nil {
klog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err)
return err
f := func() error {
if scaleSetName, err := extractScaleSetNameByProviderID(ipConfigurationID); err == nil {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(scaleSetName, vmSetName) {
return nil
}
}
nodeName, err := ss.getNodeNameByIPConfigurationID(ipConfigurationID)
if err != nil {
if err == ErrorNotVmssInstance { // Do nothing for the VMAS nodes.
return nil
}
klog.Errorf("Failed to getNodeNameByIPConfigurationID(%s): %v", ipConfigurationID, err)
return err
}
err = ss.ensureBackendPoolDeletedFromNode(service, nodeName, backendPoolID)
if err != nil {
return fmt.Errorf("failed to ensure backend pool %s deleted from node %s: %v", backendPoolID, nodeName, err)
}
return nil
}
hostUpdates = append(hostUpdates, f)
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
if errs != nil {
return utilerrors.Flatten(errs)
}
return nil
}
// getVmssMachineID returns the full identifier of a vmss virtual machine.
func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string {
return fmt.Sprintf(
vmssMachineIDTemplate,
az.SubscriptionID,
strings.ToLower(resourceGroup),
scaleSetName,
instanceID)
}