diff --git a/pkg/cloudprovider/providers/azure/azure_client.go b/pkg/cloudprovider/providers/azure/azure_client.go index 73d1cd4e256..22f80f0ccfb 100644 --- a/pkg/cloudprovider/providers/azure/azure_client.go +++ b/pkg/cloudprovider/providers/azure/azure_client.go @@ -89,7 +89,6 @@ type SecurityGroupsClient interface { // VirtualMachineScaleSetsClient defines needed functions for azure compute.VirtualMachineScaleSetsClient type VirtualMachineScaleSetsClient interface { - CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, err error) UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error) @@ -835,30 +834,6 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin } } -func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error) { - /* Write rate limiting */ - if !az.rateLimiterWriter.TryAccept() { - err = createRateLimitErr(true, "VMSSCreateOrUpdate") - return - } - - klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, VMScaleSetName) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, VMScaleSetName) - }() - - mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID) - future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters) - mc.Observe(err) - if err != nil { - return future.Response(), err - } - - err = future.WaitForCompletionRef(ctx, az.client.Client) - mc.Observe(err) - return future.Response(), err -} - func (az *azVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) { if !az.rateLimiterReader.TryAccept() { err = createRateLimitErr(false, "VMSSGet") diff --git a/pkg/cloudprovider/providers/azure/azure_fakes.go b/pkg/cloudprovider/providers/azure/azure_fakes.go index 8c007aaa7c5..17bbb8f99ac 100644 --- a/pkg/cloudprovider/providers/azure/azure_fakes.go +++ b/pkg/cloudprovider/providers/azure/azure_fakes.go @@ -902,7 +902,11 @@ func (f *fakeVMSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac return fmt.Errorf("unimplemented") } -func (f *fakeVMSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { +func (f *fakeVMSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { + return fmt.Errorf("unimplemented") +} + +func (f *fakeVMSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { return fmt.Errorf("unimplemented") } diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 62c827960b4..efe649f984a 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -829,13 +829,13 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, // Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB. vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName) - klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName) + klog.V(10).Infof("EnsureBackendPoolDeleted(%s,%s) for service %s: start", lbBackendPoolID, vmSetName, serviceName) err := az.vmSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools) if err != nil { - klog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err) + klog.Errorf("EnsureBackendPoolDeleted(%s) for service %s failed: %v", lbBackendPoolID, serviceName, err) return nil, err } - klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): end", lbBackendPoolID, vmSetName) + klog.V(10).Infof("EnsureBackendPoolDeleted(%s) for service %s: end", lbBackendPoolID, serviceName) // Remove the LB. klog.V(10).Infof("reconcileLoadBalancer: az.DeleteLB(%q): start", lbName) diff --git a/pkg/cloudprovider/providers/azure/azure_standard.go b/pkg/cloudprovider/providers/azure/azure_standard.go index 0d7a0d40881..de5bb6c22ba 100644 --- a/pkg/cloudprovider/providers/azure/azure_standard.go +++ b/pkg/cloudprovider/providers/azure/azure_standard.go @@ -622,24 +622,24 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri return nic, nil } -// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is +// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is // participating in the specified LoadBalancer Backend Pool. -func (as *availabilitySet) ensureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { +func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { vmName := mapNodeNameToVMName(nodeName) serviceName := getServiceName(service) nic, err := as.getPrimaryInterfaceWithVMSet(vmName, vmSetName) if err != nil { if err == errNotInVMSet { - klog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName) + klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName) return nil } - klog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) + klog.Errorf("error: az.EnsureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) return err } if nic.ProvisioningState != nil && *nic.ProvisioningState == nicFailedState { - klog.V(3).Infof("ensureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name) + klog.Warningf("EnsureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name) return nil } @@ -716,7 +716,7 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No } f := func() error { - err := as.ensureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) + err := as.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) if err != nil { return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err) } @@ -733,8 +733,8 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No return nil } -// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. -func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { +// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. +func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { // Do nothing for availability set. return nil } diff --git a/pkg/cloudprovider/providers/azure/azure_vmsets.go b/pkg/cloudprovider/providers/azure/azure_vmsets.go index c6ad05d0ca7..f8c0ee6170a 100644 --- a/pkg/cloudprovider/providers/azure/azure_vmsets.go +++ b/pkg/cloudprovider/providers/azure/azure_vmsets.go @@ -57,8 +57,11 @@ type VMSet interface { // EnsureHostsInPool ensures the given Node's primary IP configurations are // participating in the specified LoadBalancer Backend Pool. EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error - // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. - EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error + // EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is + // participating in the specified LoadBalancer Backend Pool. + EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error + // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. + EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error // AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun. AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error diff --git a/pkg/cloudprovider/providers/azure/azure_vmss.go b/pkg/cloudprovider/providers/azure/azure_vmss.go index ba10690bfb3..8d101f7bdb1 100644 --- a/pkg/cloudprovider/providers/azure/azure_vmss.go +++ b/pkg/cloudprovider/providers/azure/azure_vmss.go @@ -30,7 +30,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" @@ -44,6 +44,7 @@ var ( resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`) vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`) vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s" + vmssIPConfigurationRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(.+)/networkInterfaces(?:.*)`) ) // scaleSet implements VMSet interface for Azure scale set. @@ -351,6 +352,16 @@ func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSet return "", fmt.Errorf("failed to find a primary nic for the vm. vmname=%q", *machine.Name) } +// getVmssMachineID returns the full identifier of a vmss virtual machine. +func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string { + return fmt.Sprintf( + vmssMachineIDTemplate, + az.SubscriptionID, + strings.ToLower(resourceGroup), + scaleSetName, + instanceID) +} + // machineName is composed of computerNamePrefix and 36-based instanceID. // And instanceID part if in fixed length of 6 characters. // Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/. @@ -618,9 +629,8 @@ func (ss *scaleSet) getScaleSetWithRetry(service *v1.Service, name string) (comp return result, exists, err } -// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets. -func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) { - networkConfigurations := *networkConfigurationList +// getPrimarynetworkInterfaceConfiguration gets primary network interface configuration for scale set virtual machine. +func (ss *scaleSet) getPrimarynetworkInterfaceConfiguration(networkConfigurations []compute.VirtualMachineScaleSetNetworkConfiguration, nodeName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) { if len(networkConfigurations) == 1 { return &networkConfigurations[0], nil } @@ -632,10 +642,10 @@ func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]c } } - return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName) + return nil, fmt.Errorf("failed to find a primary network configuration for the scale set VM %q", nodeName) } -func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) { +func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, nodeName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) { ipConfigurations := *config.IPConfigurations if len(ipConfigurations) == 1 { return &ipConfigurations[0], nil @@ -648,31 +658,7 @@ func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachine } } - return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName) -} - -// createOrUpdateVMSS invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry. -func (ss *scaleSet) createOrUpdateVMSS(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error { - if ss.Config.shouldOmitCloudProviderBackoff() { - ctx, cancel := getContextWithCancel() - defer cancel() - resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet) - klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name) - return ss.processHTTPResponse(service, "CreateOrUpdateVMSS", resp, err) - } - - return ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet) -} - -// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry. -func (ss *scaleSet) createOrUpdateVMSSWithRetry(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error { - return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { - ctx, cancel := getContextWithCancel() - defer cancel() - resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet) - klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name) - return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSS", resp, err) - }) + return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set VM %q", nodeName) } // updateVMSSInstances invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry. @@ -699,70 +685,35 @@ func (ss *scaleSet) updateVMSSInstancesWithRetry(service *v1.Service, scaleSetNa }) } -// getNodesScaleSets returns scalesets with instanceIDs and standard node names for given nodes. -func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, []*v1.Node, error) { - scalesets := make(map[string]sets.String) - standardNodes := []*v1.Node{} - - for _, curNode := range nodes { - if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(curNode) { - klog.V(4).Infof("Excluding master node %q from load balancer backendpool", curNode.Name) - continue - } - - if ss.ShouldNodeExcludedFromLoadBalancer(curNode) { - klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name) - continue - } - - curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID) - if err != nil { - klog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name) - standardNodes = append(standardNodes, curNode) - continue - } - - if _, ok := scalesets[curScaleSetName]; !ok { - scalesets[curScaleSetName] = sets.NewString() - } - - instanceID, err := getLastSegment(curNode.Spec.ProviderID) - if err != nil { - klog.Errorf("Failed to get instance ID for node %q: %v", curNode.Spec.ProviderID, err) - return nil, nil, err - } - - scalesets[curScaleSetName].Insert(instanceID) - } - - return scalesets, standardNodes, nil -} - -// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are -// participating in the vmSet's LoadBalancer Backend Pool. -func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error { - klog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID) - serviceName := getServiceName(service) - virtualMachineScaleSet, exists, err := ss.getScaleSet(service, vmSetName) +// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is +// participating in the specified LoadBalancer Backend Pool. +func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { + klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) + vmName := mapNodeNameToVMName(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName) if err != nil { - klog.Errorf("ss.getScaleSet(%s) for service %q failed: %v", vmSetName, serviceName, err) return err } - if !exists { - errorMessage := fmt.Errorf("Scale set %q not found", vmSetName) - klog.Errorf("%v", errorMessage) - return errorMessage + + // Check scale set name: + // - For basic SKU load balancer, errNotInVMSet should be returned if the node's + // scale set is mismatched with vmSetName. + // - For standard SKU load balancer, backend could belong to multiple VMSS, so we + // don't check vmSet for it. + if vmSetName != "" && !ss.useStandardLoadBalancer() && !strings.EqualFold(vmSetName, ssName) { + klog.V(3).Infof("EnsureHostInPool skips node %s because it is not in the scaleSet %s", vmName, vmSetName) + return nil } // Find primary network interface configuration. - networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations - primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName) + networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations + primaryNetworkInterfaceConfiguration, err := ss.getPrimarynetworkInterfaceConfiguration(networkInterfaceConfigurations, vmName) if err != nil { return err } // Find primary IP configuration. - primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName) + primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, vmName) if err != nil { return err } @@ -779,217 +730,288 @@ func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID st break } } - if !foundPool { - if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 { - // Although standard load balancer supports backends from multiple vmss, - // the same network interface couldn't be added to more than one load balancer of - // the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain - // about this. - newBackendPoolsIDs := make([]string, 0, len(newBackendPools)) - for _, pool := range newBackendPools { - if pool.ID != nil { - newBackendPoolsIDs = append(newBackendPoolsIDs, *pool.ID) - } - } - isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs) - if err != nil { - return err - } - if !isSameLB { - klog.V(4).Infof("VMSS %q has already been added to LB %q, omit adding it to a new one", vmSetName, oldLBName) - return nil + + // The backendPoolID has already been found from existing LoadBalancerBackendAddressPools. + if foundPool { + return nil + } + + if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 { + // Although standard load balancer supports backends from multiple scale + // sets, the same network interface couldn't be added to more than one load balancer of + // the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain + // about this. + newBackendPoolsIDs := make([]string, 0, len(newBackendPools)) + for _, pool := range newBackendPools { + if pool.ID != nil { + newBackendPoolsIDs = append(newBackendPoolsIDs, *pool.ID) } } - - newBackendPools = append(newBackendPools, - compute.SubResource{ - ID: to.StringPtr(backendPoolID), - }) - primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools - - err := ss.createOrUpdateVMSS(service, virtualMachineScaleSet) + isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs) if err != nil { return err } + if !isSameLB { + klog.V(4).Infof("Node %q has already been added to LB %q, omit adding it to a new one", nodeName, oldLBName) + return nil + } } - // Update instances to latest VMSS model. - vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ - InstanceIds: &instanceIDs, + // Compose a new vmssVM with added backendPoolID. + newBackendPools = append(newBackendPools, + compute.SubResource{ + ID: to.StringPtr(backendPoolID), + }) + primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools + newVM := compute.VirtualMachineScaleSetVM{ + Sku: vm.Sku, + Location: vm.Location, + VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{ + HardwareProfile: vm.HardwareProfile, + NetworkProfileConfiguration: &compute.VirtualMachineScaleSetVMNetworkProfileConfiguration{ + NetworkInterfaceConfigurations: &networkInterfaceConfigurations, + }, + }, } - err = ss.updateVMSSInstances(service, vmSetName, vmInstanceIDs) + + // Get the node resource group. + nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName) if err != nil { return err } - return nil + // Invalidate the cache since we would update it. + key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) + defer ss.vmssVMCache.Delete(key) + + // Update vmssVM with backoff. + ctx, cancel := getContextWithCancel() + defer cancel() + klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID) + resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM) + if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { + klog.V(2).Infof("EnsureHostInPool update backing off vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err) + retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM) + if retryErr != nil { + err = retryErr + klog.Errorf("EnsureHostInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err) + } + } + + return err } // EnsureHostsInPool ensures the given Node's primary IP configurations are // participating in the specified LoadBalancer Backend Pool. func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error { - serviceName := getServiceName(service) - scalesets, standardNodes, err := ss.getNodesScaleSets(nodes) - if err != nil { - klog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err) - return err - } + hostUpdates := make([]func() error, 0, len(nodes)) + for _, node := range nodes { + localNodeName := node.Name - for ssName, instanceIDs := range scalesets { - // Only add nodes belonging to specified vmSet for basic SKU LB. - if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) { + if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(node) { + klog.V(4).Infof("Excluding master node %q from load balancer backendpool %q", localNodeName, backendPoolID) continue } - if instanceIDs.Len() == 0 { - // This may happen when scaling a vmss capacity to 0. - klog.V(3).Infof("scale set %q has 0 nodes, adding it to load balancer anyway", ssName) - // InstanceIDs is required to update vmss, use * instead here since there are no nodes actually. - instanceIDs.Insert("*") + if ss.ShouldNodeExcludedFromLoadBalancer(node) { + klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", localNodeName) + continue } - err := ss.ensureHostsInVMSetPool(service, backendPoolID, ssName, instanceIDs.List(), isInternal) - if err != nil { - klog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err) - return err + f := func() error { + // VMAS nodes should also be added to the SLB backends. + if ss.useStandardLoadBalancer() { + // Check whether the node is VMAS virtual machine. + managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName) + if err != nil { + klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) + return err + } + if managedByAS { + return ss.availabilitySet.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) + } + } + + err := ss.EnsureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) + if err != nil { + return fmt.Errorf("EnsureHostInPool(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err) + } + return nil } + hostUpdates = append(hostUpdates, f) } - if ss.useStandardLoadBalancer() && len(standardNodes) > 0 { - err := ss.availabilitySet.EnsureHostsInPool(service, standardNodes, backendPoolID, "", isInternal) - if err != nil { - klog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err) - return err - } + errs := utilerrors.AggregateGoroutines(hostUpdates...) + if errs != nil { + return utilerrors.Flatten(errs) } return nil } -// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset. -func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(service *v1.Service, poolID, ssName string) error { - klog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName) - virtualMachineScaleSet, exists, err := ss.getScaleSet(service, ssName) +// ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. +func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { + ssName, instanceID, vm, err := ss.getVmssVM(nodeName) if err != nil { - klog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSet(%s) failed: %v", poolID, ssName, ssName, err) return err } - if !exists { - klog.V(2).Infof("ss.ensureScaleSetBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, ssName, ssName) - return nil - } // Find primary network interface configuration. - networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations - primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, ssName) + networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations + primaryNetworkInterfaceConfiguration, err := ss.getPrimarynetworkInterfaceConfiguration(networkInterfaceConfigurations, nodeName) if err != nil { return err } - // Find primary IP configuration. - primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, ssName) + // Find primary IP configuration.4 + primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, nodeName) if err != nil { return err } - - // Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration. if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 { return nil } + + // Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration. existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools newBackendPools := []compute.SubResource{} foundPool := false for i := len(existingBackendPools) - 1; i >= 0; i-- { curPool := existingBackendPools[i] - if strings.EqualFold(poolID, *curPool.ID) { - klog.V(10).Infof("ensureScaleSetBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, ssName) + if strings.EqualFold(backendPoolID, *curPool.ID) { + klog.V(10).Infof("ensureBackendPoolDeletedFromNode gets unwanted backend pool %q for node %s", backendPoolID, nodeName) foundPool = true newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...) } } + + // Pool not found, assume it has been already removed. if !foundPool { - // Pool not found, assume it has been already removed. return nil } - // Update scale set with backoff. + // Compose a new vmssVM with added backendPoolID. primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools - klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", ssName) - err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet) + newVM := compute.VirtualMachineScaleSetVM{ + Sku: vm.Sku, + Location: vm.Location, + VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{ + HardwareProfile: vm.HardwareProfile, + NetworkProfileConfiguration: &compute.VirtualMachineScaleSetVMNetworkProfileConfiguration{ + NetworkInterfaceConfigurations: &networkInterfaceConfigurations, + }, + }, + } + + // Get the node resource group. + nodeResourceGroup, err := ss.GetNodeResourceGroup(nodeName) if err != nil { return err } - // Update instances to latest VMSS model. - instanceIDs := []string{"*"} - vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ - InstanceIds: &instanceIDs, - } - err = ss.updateVMSSInstances(service, ssName, vmInstanceIDs) - if err != nil { - return err - } + // Invalidate the cache since we would update it. + key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) + defer ss.vmssVMCache.Delete(key) - // Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB. - // TODO: remove this workaround when figuring out the root cause. - if len(newBackendPools) == 0 { - err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet) - if err != nil { - klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName) + // Update vmssVM with backoff. + ctx, cancel := getContextWithCancel() + defer cancel() + klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID) + resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM) + if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { + klog.V(2).Infof("ensureBackendPoolDeletedFromNode update backing off vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err) + retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM) + if retryErr != nil { + err = retryErr + klog.Errorf("ensureBackendPoolDeletedFromNode update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err) } } - - return nil + if err != nil { + klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err) + } else { + klog.V(2).Infof("ensureBackendPoolDeletedFromNode update vmssVM(%s) with backendPoolID %s succeeded", nodeName, backendPoolID) + } + return err } -// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. -func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { +// getNodeNameByIPConfigurationID gets the node name by IP configuration ID. +func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (string, error) { + matches := vmssIPConfigurationRE.FindStringSubmatch(ipConfigurationID) + if len(matches) != 4 { + klog.V(4).Infof("Can not extract scale set name from ipConfigurationID (%s), assuming it is mananaged by availability set", ipConfigurationID) + return "", ErrorNotVmssInstance + } + + resourceGroup := matches[1] + scaleSetName := matches[2] + instanceID := matches[3] + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) + if err != nil { + return "", err + } + + if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil { + return strings.ToLower(*vm.OsProfile.ComputerName), nil + } + + return "", nil +} + +// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. +func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error { + // Returns nil if backend address pools already deleted. if backendAddressPools == nil { return nil } - scalesets := sets.NewString() + ipConfigurationIDs := []string{} for _, backendPool := range *backendAddressPools { - if strings.EqualFold(*backendPool.ID, poolID) && backendPool.BackendIPConfigurations != nil { - for _, ipConfigurations := range *backendPool.BackendIPConfigurations { - if ipConfigurations.ID == nil { + if strings.EqualFold(*backendPool.ID, backendPoolID) && backendPool.BackendIPConfigurations != nil { + for _, ipConf := range *backendPool.BackendIPConfigurations { + if ipConf.ID == nil { continue } - ssName, err := extractScaleSetNameByProviderID(*ipConfigurations.ID) - if err != nil { - klog.V(4).Infof("backend IP configuration %q is not belonging to any vmss, omit it", *ipConfigurations.ID) - continue - } - - scalesets.Insert(ssName) + ipConfigurationIDs = append(ipConfigurationIDs, *ipConf.ID) } - break } } - for ssName := range scalesets { - // Only remove nodes belonging to specified vmSet to basic LB backends. - if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) { - continue - } + hostUpdates := make([]func() error, 0, len(ipConfigurationIDs)) + for i := range ipConfigurationIDs { + ipConfigurationID := ipConfigurationIDs[i] - err := ss.ensureScaleSetBackendPoolDeleted(service, poolID, ssName) - if err != nil { - klog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err) - return err + f := func() error { + if scaleSetName, err := extractScaleSetNameByProviderID(ipConfigurationID); err == nil { + // Only remove nodes belonging to specified vmSet to basic LB backends. + if !ss.useStandardLoadBalancer() && !strings.EqualFold(scaleSetName, vmSetName) { + return nil + } + } + + nodeName, err := ss.getNodeNameByIPConfigurationID(ipConfigurationID) + if err != nil { + if err == ErrorNotVmssInstance { // Do nothing for the VMAS nodes. + return nil + } + klog.Errorf("Failed to getNodeNameByIPConfigurationID(%s): %v", ipConfigurationID, err) + return err + } + + err = ss.ensureBackendPoolDeletedFromNode(service, nodeName, backendPoolID) + if err != nil { + return fmt.Errorf("failed to ensure backend pool %s deleted from node %s: %v", backendPoolID, nodeName, err) + } + + return nil } + hostUpdates = append(hostUpdates, f) + } + + errs := utilerrors.AggregateGoroutines(hostUpdates...) + if errs != nil { + return utilerrors.Flatten(errs) } return nil } - -// getVmssMachineID returns the full identifier of a vmss virtual machine. -func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string { - return fmt.Sprintf( - vmssMachineIDTemplate, - az.SubscriptionID, - strings.ToLower(resourceGroup), - scaleSetName, - instanceID) -}