Merge pull request #81411 from nilo19/t-qini-fix_80365

Add/delete load balancer backendPoodID in VMSS.
This commit is contained in:
Kubernetes Prow Robot 2019-08-19 01:25:30 -07:00 committed by GitHub
commit 11eaf430e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 363 additions and 8 deletions

View File

@ -656,6 +656,50 @@ func (az *Cloud) UpdateVmssVMWithRetry(resourceGroupName string, VMScaleSetName
})
}
// CreateOrUpdateVmssWithRetry invokes az.VirtualMachineScaleSetsClient.Update with exponential backoff retry
func (az *Cloud) CreateOrUpdateVmssWithRetry(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: verify the status of the vmss being created or updated")
vmss, err := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName)
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: found vmss %s being deleted, skipping", VMScaleSetName)
return true, nil
}
resp, err := az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
// GetScaleSetWithRetry gets scale set with exponential backoff retry
func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vmssName string) (compute.VirtualMachineScaleSet, error) {
var result compute.VirtualMachineScaleSet
var retryErr error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
result, retryErr = az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, vmssName)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", vmssName, retryErr)
return false, nil
}
klog.V(4).Infof("backoff: success for scale set %q", vmssName)
return true, nil
})
return result, err
}
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp *http.Response) bool {
if resp == nil {

View File

@ -34,7 +34,8 @@ import (
const (
// The version number is taken from "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-08-01/network".
azureNetworkAPIVersion = "2018-08-01"
azureNetworkAPIVersion = "2018-08-01"
virtualMachineScaleSetsDeallocating = "Deallocating"
)
// Helpers for rate limiting error/error channel creation
@ -98,6 +99,7 @@ type SecurityGroupsClient interface {
type VirtualMachineScaleSetsClient interface {
Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error)
List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, err error)
CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error)
}
// VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient
@ -973,6 +975,28 @@ func (az *azVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGro
return result, nil
}
func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, vmScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
err = createRateLimitErr(true, "NiCreateOrUpdate")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, vmScaleSetName)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, vmScaleSetName)
}()
mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID, "")
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, vmScaleSetName, parameters)
if err != nil {
return future.Response(), mc.Observe(err)
}
err = future.WaitForCompletionRef(ctx, az.client.Client)
return future.Response(), mc.Observe(err)
}
// azVirtualMachineScaleSetVMsClient implements VirtualMachineScaleSetVMsClient.
type azVirtualMachineScaleSetVMsClient struct {
client compute.VirtualMachineScaleSetVMsClient
@ -1064,7 +1088,7 @@ func (az *azVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceG
func (az *azVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (resp *http.Response, err error) {
if !az.rateLimiterWriter.TryAccept() {
err = createRateLimitErr(true, "VMSSUpdate")
err = createRateLimitErr(true, "VMSSVMUpdate")
return
}

View File

@ -44,6 +44,7 @@ var (
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
vmssIPConfigurationRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(.+)/networkInterfaces(?:.*)`)
vmssPIPConfigurationRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(.+)/networkInterfaces/(.+)/ipConfigurations/(.+)/publicIPAddresses/(.+)`)
vmssVMProviderIDRE = regexp.MustCompile(`azure:///subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(?:\d+)`)
)
// scaleSet implements VMSet interface for Azure scale set.
@ -649,7 +650,23 @@ func (ss *scaleSet) getPrimaryNetworkInterfaceConfiguration(networkConfiguration
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set VM %q", nodeName)
}
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, nodeName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
// getPrimaryNetworkInterfaceConfigurationForScaleSet gets primary network interface configuration for scale set.
func (ss *scaleSet) getPrimaryNetworkInterfaceConfigurationForScaleSet(networkConfigurations []compute.VirtualMachineScaleSetNetworkConfiguration, vmssName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
if len(networkConfigurations) == 1 {
return &networkConfigurations[0], nil
}
for idx := range networkConfigurations {
networkConfig := &networkConfigurations[idx]
if networkConfig.Primary != nil && *networkConfig.Primary == true {
return networkConfig, nil
}
}
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", vmssName)
}
func getPrimaryIPConfigFromVMSSNetworkConfig(config *compute.VirtualMachineScaleSetNetworkConfiguration) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
ipConfigurations := *config.IPConfigurations
if len(ipConfigurations) == 1 {
return &ipConfigurations[0], nil
@ -662,7 +679,7 @@ func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachine
}
}
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set VM %q", nodeName)
return nil, fmt.Errorf("failed to find a primary IP configuration")
}
// EnsureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
@ -685,6 +702,10 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
}
// Find primary network interface configuration.
if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", vmName)
return nil
}
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, vmName)
if err != nil {
@ -692,7 +713,7 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
}
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, vmName)
primaryIPConfiguration, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration)
if err != nil {
return err
}
@ -780,6 +801,141 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
return err
}
func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) {
matches := vmssVMProviderIDRE.FindStringSubmatch(providerID)
if len(matches) != 3 {
return "", "", ErrorNotVmssInstance
}
return matches[1], matches[2], nil
}
func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string) error {
vmssNamesMap := make(map[string]bool)
// the standard load balancer supports multiple vmss in its backend while the basic sku doesn't
if ss.useStandardLoadBalancer() {
for _, node := range nodes {
if ss.excludeMasterNodesFromStandardLB() && isMasterNode(node) {
continue
}
// in this scenario the vmSetName is an empty string and the name of vmss should be obtained from the provider IDs of nodes
vmssName, resourceGroupName, err := getVmssAndResourceGroupNameByVMProviderID(node.Spec.ProviderID)
if err != nil {
klog.V(4).Infof("ensureVMSSInPool: found VMAS node %s, will skip checking and continue", node.Name)
continue
}
// only vmsses in the resource group same as it's in azure config are included
if strings.EqualFold(resourceGroupName, ss.ResourceGroup) {
vmssNamesMap[vmssName] = true
}
}
} else {
vmssNamesMap[vmSetName] = true
}
for vmssName := range vmssNamesMap {
vmss, err := ss.GetScaleSetWithRetry(service, ss.ResourceGroup, vmssName)
if err != nil {
return err
}
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("ensureVMSSInPool: found vmss %s being deleted, skipping", vmssName)
continue
}
if vmss.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vmss %s", vmssName)
continue
}
vmssNIC := *vmss.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNIC, err := ss.getPrimaryNetworkInterfaceConfigurationForScaleSet(vmssNIC, vmssName)
if err != nil {
return err
}
primaryIPConfig, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNIC)
if err != nil {
return err
}
loadBalancerBackendAddressPools := []compute.SubResource{}
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
loadBalancerBackendAddressPools = *primaryIPConfig.LoadBalancerBackendAddressPools
}
var found bool
for _, loadBalancerBackendAddressPool := range loadBalancerBackendAddressPools {
if strings.EqualFold(*loadBalancerBackendAddressPool.ID, backendPoolID) {
found = true
break
}
}
if found {
continue
}
if ss.useStandardLoadBalancer() && len(loadBalancerBackendAddressPools) > 0 {
// Although standard load balancer supports backends from multiple scale
// sets, the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
newBackendPoolsIDs := make([]string, 0, len(loadBalancerBackendAddressPools))
for _, pool := range loadBalancerBackendAddressPools {
if pool.ID != nil {
newBackendPoolsIDs = append(newBackendPoolsIDs, *pool.ID)
}
}
isSameLB, oldLBName, err := isBackendPoolOnSameLB(backendPoolID, newBackendPoolsIDs)
if err != nil {
return err
}
if !isSameLB {
klog.V(4).Infof("VMSS %q has already been added to LB %q, omit adding it to a new one", vmssName, oldLBName)
return nil
}
}
// Compose a new vmss with added backendPoolID.
loadBalancerBackendAddressPools = append(loadBalancerBackendAddressPools,
compute.SubResource{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfig.LoadBalancerBackendAddressPools = &loadBalancerBackendAddressPools
newVMSS := compute.VirtualMachineScaleSet{
Sku: vmss.Sku,
Location: vmss.Location,
VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{
VirtualMachineProfile: &compute.VirtualMachineScaleSetVMProfile{
NetworkProfile: &compute.VirtualMachineScaleSetNetworkProfile{
NetworkInterfaceConfigurations: &vmssNIC,
},
},
},
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureVMSSInPool begins to update vmss(%s) with new backendPoolID %s", vmssName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("ensureVMSSInPool update backing off vmss(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
err = retryErr
klog.Errorf("ensureVMSSInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
}
}
if err != nil {
return err
}
}
return nil
}
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
@ -829,6 +985,12 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
return utilerrors.Flatten(errs)
}
// we need to add the LB backend updates back to VMSS model, see issue kubernetes/kubernetes#80365 for detailed information
err := ss.ensureVMSSInPool(service, nodes, backendPoolID, vmSetName)
if err != nil {
return err
}
return nil
}
@ -840,14 +1002,18 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
}
// Find primary network interface configuration.
if vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vm %s, probably because the vm's being deleted", nodeName)
return nil
}
networkInterfaceConfigurations := *vm.NetworkProfileConfiguration.NetworkInterfaceConfigurations
primaryNetworkInterfaceConfiguration, err := ss.getPrimaryNetworkInterfaceConfiguration(networkInterfaceConfigurations, nodeName)
if err != nil {
return err
}
// Find primary IP configuration.4
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkInterfaceConfiguration, nodeName)
// Find primary IP configuration.
primaryIPConfiguration, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNetworkInterfaceConfiguration)
if err != nil {
return err
}
@ -940,6 +1106,120 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st
return "", nil
}
func getScaleSetAndResourceGroupNameByIPConfigurationID(ipConfigurationID string) (string, string, error) {
matches := vmssIPConfigurationRE.FindStringSubmatch(ipConfigurationID)
if len(matches) != 4 {
klog.V(4).Infof("Can not extract scale set name from ipConfigurationID (%s), assuming it is mananaged by availability set", ipConfigurationID)
return "", "", ErrorNotVmssInstance
}
resourceGroup := matches[1]
scaleSetName := matches[2]
return scaleSetName, resourceGroup, nil
}
func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backendPoolID, vmSetName string, ipConfigurationIDs []string) error {
vmssNamesMap := make(map[string]bool)
// the standard load balancer supports multiple vmss in its backend while the basic sku doesn't
if ss.useStandardLoadBalancer() {
for _, ipConfigurationID := range ipConfigurationIDs {
// in this scenario the vmSetName is an empty string and the name of vmss should be obtained from the provider IDs of nodes
vmssName, resourceGroupName, err := getScaleSetAndResourceGroupNameByIPConfigurationID(ipConfigurationID)
if err != nil {
klog.V(4).Infof("ensureBackendPoolDeletedFromVMSS: found VMAS ipcConfigurationID %s, will skip checking and continue", ipConfigurationID)
continue
}
// only vmsses in the resource group same as it's in azure config are included
if strings.EqualFold(resourceGroupName, ss.ResourceGroup) {
vmssNamesMap[vmssName] = true
}
}
} else {
vmssNamesMap[vmSetName] = true
}
for vmssName := range vmssNamesMap {
vmss, err := ss.GetScaleSetWithRetry(service, ss.ResourceGroup, vmssName)
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("ensureVMSSInPool: found vmss %s being deleted, skipping", vmssName)
continue
}
if err != nil {
return err
}
if vmss.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations == nil {
klog.V(4).Infof("EnsureHostInPool: cannot obtain the primary network interface configuration, of vmss %s", vmssName)
continue
}
vmssNIC := *vmss.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNIC, err := ss.getPrimaryNetworkInterfaceConfigurationForScaleSet(vmssNIC, vmssName)
if err != nil {
return err
}
primaryIPConfig, err := getPrimaryIPConfigFromVMSSNetworkConfig(primaryNIC)
if err != nil {
return err
}
loadBalancerBackendAddressPools := []compute.SubResource{}
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
loadBalancerBackendAddressPools = *primaryIPConfig.LoadBalancerBackendAddressPools
}
var found bool
var newBackendPools []compute.SubResource
for i := len(loadBalancerBackendAddressPools) - 1; i >= 0; i-- {
curPool := loadBalancerBackendAddressPools[i]
if strings.EqualFold(backendPoolID, *curPool.ID) {
klog.V(10).Infof("ensureBackendPoolDeletedFromVMSS gets unwanted backend pool %q for VMSS %s", backendPoolID, vmssName)
found = true
newBackendPools = append(loadBalancerBackendAddressPools[:i], loadBalancerBackendAddressPools[i+1:]...)
}
}
if !found {
continue
}
// Compose a new vmss with added backendPoolID.
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
newVMSS := compute.VirtualMachineScaleSet{
Sku: vmss.Sku,
Location: vmss.Location,
VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{
VirtualMachineProfile: &compute.VirtualMachineScaleSetVMProfile{
NetworkProfile: &compute.VirtualMachineScaleSetNetworkProfile{
NetworkInterfaceConfigurations: &vmssNIC,
},
},
},
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS begins to update vmss(%s) with backendPoolID %s", vmssName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS update backing off vmss(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
err = retryErr
klog.Errorf("ensureBackendPoolDeletedFromVMSS update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err)
}
}
if err != nil {
return err
}
}
return nil
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes.
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// Returns nil if backend address pools already deleted.
@ -965,7 +1245,9 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
ipConfigurationID := ipConfigurationIDs[i]
f := func() error {
if scaleSetName, err := extractScaleSetNameByProviderID(ipConfigurationID); err == nil {
var scaleSetName string
var err error
if scaleSetName, err = extractScaleSetNameByProviderID(ipConfigurationID); err == nil {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(scaleSetName, vmSetName) {
return nil
@ -996,5 +1278,10 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
return utilerrors.Flatten(errs)
}
err := ss.ensureBackendPoolDeletedFromVMSS(service, backendPoolID, vmSetName, ipConfigurationIDs)
if err != nil {
return err
}
return nil
}