diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD index 4ac822a5a4b..90e6bd6b838 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD @@ -69,6 +69,8 @@ go_library( "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", + "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssclient:go_default_library", + "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network:go_default_library", diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go index cb66e9fc64c..a04bb231727 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go @@ -46,6 +46,8 @@ import ( "k8s.io/klog" "k8s.io/legacy-cloud-providers/azure/auth" azclients "k8s.io/legacy-cloud-providers/azure/clients" + "k8s.io/legacy-cloud-providers/azure/clients/vmssclient" + "k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient" "k8s.io/legacy-cloud-providers/azure/retry" "sigs.k8s.io/yaml" ) @@ -480,8 +482,12 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig.WithRateLimiter(config.VirtualMachineRateLimit)) az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig.WithRateLimiter(config.PublicIPAddressRateLimit)) az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig.WithRateLimiter(config.VirtualMachineSizeRateLimit)) - az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)) - az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)) + + az.VirtualMachineScaleSetsClient = vmssclient.New(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)) + vmssVMClientConfig := azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit) + vmssVMClientConfig.Backoff = vmssVMClientConfig.Backoff.WithNonRetriableErrors([]string{vmssVMNotActiveErrorMessage}) + az.VirtualMachineScaleSetVMsClient = vmssvmclient.New(vmssVMClientConfig) + // TODO(feiskyer): refactor azureFileClient to Interface. az.FileClient = &azureFileClient{env: *env} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go index 480a71517b4..569c36a1b71 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go @@ -693,77 +693,32 @@ func (az *Cloud) deleteRouteWithRetry(routeName string) error { }) } -// UpdateVmssVMWithRetry invokes az.VirtualMachineScaleSetVMsClient.Update with exponential backoff retry -func (az *Cloud) UpdateVmssVMWithRetry(resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) error { - return wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { - ctx, cancel := getContextWithCancel() - defer cancel() +// CreateOrUpdateVMSS invokes az.VirtualMachineScaleSetsClient.Update(). +func (az *Cloud) CreateOrUpdateVMSS(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error { + ctx, cancel := getContextWithCancel() + defer cancel() - rerr := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) - klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s): end", VMScaleSetName, instanceID) - if rerr == nil { - return true, nil - } + // When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error. + // Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it. + klog.V(3).Infof("CreateOrUpdateVMSS: verify the status of the vmss being created or updated") + vmss, rerr := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName) + if rerr != nil { + klog.Errorf("CreateOrUpdateVMSS: error getting vmss(%s): %v", VMScaleSetName, rerr) + return rerr + } + if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) { + klog.V(3).Infof("CreateOrUpdateVMSS: found vmss %s being deleted, skipping", VMScaleSetName) + return nil + } - if strings.Contains(rerr.Error().Error(), vmssVMNotActiveErrorMessage) { - // When instances are under deleting, updating API would report "not an active Virtual Machine Scale Set VM instanceId" error. - // Since they're under deleting, we shouldn't send more update requests for it. - klog.V(3).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s) gets error message %q, abort backoff because it's probably under deleting", VMScaleSetName, instanceID, vmssVMNotActiveErrorMessage) - return true, nil - } + rerr = az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters) + klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName) + if rerr != nil { + klog.Errorf("CreateOrUpdateVMSS: error CreateOrUpdate vmss(%s): %v", VMScaleSetName, rerr) + return rerr + } - return !rerr.Retriable, rerr.Error() - }) -} - -// CreateOrUpdateVmssWithRetry invokes az.VirtualMachineScaleSetsClient.Update with exponential backoff retry -func (az *Cloud) CreateOrUpdateVmssWithRetry(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) error { - return wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { - ctx, cancel := getContextWithCancel() - defer cancel() - - // When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error. - // Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it. - klog.V(3).Infof("CreateOrUpdateVmssWithRetry: verify the status of the vmss being created or updated") - vmss, rerr := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName) - if rerr != nil { - klog.Warningf("CreateOrUpdateVmssWithRetry: error getting vmss: %v", rerr) - } - if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) { - klog.V(3).Infof("CreateOrUpdateVmssWithRetry: found vmss %s being deleted, skipping", VMScaleSetName) - return true, nil - } - - rerr = az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters) - klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName) - if rerr == nil { - return true, nil - } - - return !rerr.Retriable, rerr.Error() - }) -} - -// GetScaleSetWithRetry gets scale set with exponential backoff retry -func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vmssName string) (compute.VirtualMachineScaleSet, error) { - var result compute.VirtualMachineScaleSet - var retryErr *retry.Error - - err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { - ctx, cancel := getContextWithCancel() - defer cancel() - - result, retryErr = az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, vmssName) - if retryErr != nil { - az.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error().Error()) - klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", vmssName, retryErr) - return false, nil - } - klog.V(4).Infof("backoff: success for scale set %q", vmssName) - return true, nil - }) - - return result, err + return nil } func (cfg *Config) shouldOmitCloudProviderBackoff() bool { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go index 9caf2247869..ce086cb7ebf 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go @@ -108,7 +108,7 @@ type VirtualMachineScaleSetsClient interface { // VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient type VirtualMachineScaleSetVMsClient interface { Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, rerr *retry.Error) - List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) + List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error } @@ -953,213 +953,6 @@ func (az *azSecurityGroupsClient) List(ctx context.Context, resourceGroupName st return result, nil } -// azVirtualMachineScaleSetsClient implements VirtualMachineScaleSetsClient. -type azVirtualMachineScaleSetsClient struct { - client compute.VirtualMachineScaleSetsClient - rateLimiterReader flowcontrol.RateLimiter - rateLimiterWriter flowcontrol.RateLimiter -} - -func newAzVirtualMachineScaleSetsClient(config *azclients.ClientConfig) *azVirtualMachineScaleSetsClient { - virtualMachineScaleSetsClient := compute.NewVirtualMachineScaleSetsClient(config.SubscriptionID) - virtualMachineScaleSetsClient.BaseURI = config.ResourceManagerEndpoint - virtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(config.ServicePrincipalToken) - virtualMachineScaleSetsClient.PollingDelay = 5 * time.Second - if config.ShouldOmitCloudProviderBackoff { - virtualMachineScaleSetsClient.RetryAttempts = config.CloudProviderBackoffRetries - virtualMachineScaleSetsClient.RetryDuration = time.Duration(config.CloudProviderBackoffDuration) * time.Second - } - configureUserAgent(&virtualMachineScaleSetsClient.Client) - - klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (read ops) using rate limit config: QPS=%g, bucket=%d", - config.RateLimitConfig.CloudProviderRateLimitQPS, - config.RateLimitConfig.CloudProviderRateLimitBucket) - klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (write ops) using rate limit config: QPS=%g, bucket=%d", - config.RateLimitConfig.CloudProviderRateLimitQPSWrite, - config.RateLimitConfig.CloudProviderRateLimitBucketWrite) - rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig) - return &azVirtualMachineScaleSetsClient{ - client: virtualMachineScaleSetsClient, - rateLimiterReader: rateLimiterReader, - rateLimiterWriter: rateLimiterWriter, - } -} - -func (az *azVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, rerr *retry.Error) { - mc := newMetricContext("vmss", "get", resourceGroupName, az.client.SubscriptionID, "") - if !az.rateLimiterReader.TryAccept() { - mc.RateLimitedCount() - rerr = createRateLimitErr(false, "VMSSGet") - return - } - - klog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): start", resourceGroupName, VMScaleSetName) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): end", resourceGroupName, VMScaleSetName) - }() - - var err error - result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName) - mc.Observe(err) - return result, retry.GetError(result.Response.Response, err) -} - -func (az *azVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, rerr *retry.Error) { - mc := newMetricContext("vmss", "list", resourceGroupName, az.client.SubscriptionID, "") - if !az.rateLimiterReader.TryAccept() { - mc.RateLimitedCount() - rerr = createRateLimitErr(false, "VMSSList") - return - } - - klog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q): start", resourceGroupName) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q): end", resourceGroupName) - }() - - iterator, err := az.client.ListComplete(ctx, resourceGroupName) - mc.Observe(err) - if err != nil { - return nil, retry.GetRetriableError(err) - } - - result = make([]compute.VirtualMachineScaleSet, 0) - for ; iterator.NotDone(); err = iterator.Next() { - if err != nil { - return nil, retry.GetRetriableError(err) - } - - result = append(result, iterator.Value()) - } - - return result, nil -} - -func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, vmScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error { - mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID, "") - /* Write rate limiting */ - if !az.rateLimiterWriter.TryAccept() { - mc.RateLimitedCount() - return createRateLimitErr(true, "NiCreateOrUpdate") - } - - klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, vmScaleSetName) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, vmScaleSetName) - }() - - future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, vmScaleSetName, parameters) - if err != nil { - return retry.GetError(future.Response(), mc.Observe(err)) - } - - err = future.WaitForCompletionRef(ctx, az.client.Client) - return retry.GetError(future.Response(), mc.Observe(err)) -} - -// azVirtualMachineScaleSetVMsClient implements VirtualMachineScaleSetVMsClient. -type azVirtualMachineScaleSetVMsClient struct { - client compute.VirtualMachineScaleSetVMsClient - rateLimiterReader flowcontrol.RateLimiter - rateLimiterWriter flowcontrol.RateLimiter -} - -func newAzVirtualMachineScaleSetVMsClient(config *azclients.ClientConfig) *azVirtualMachineScaleSetVMsClient { - virtualMachineScaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(config.SubscriptionID) - virtualMachineScaleSetVMsClient.BaseURI = config.ResourceManagerEndpoint - virtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(config.ServicePrincipalToken) - virtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second - if config.ShouldOmitCloudProviderBackoff { - virtualMachineScaleSetVMsClient.RetryAttempts = config.CloudProviderBackoffRetries - virtualMachineScaleSetVMsClient.RetryDuration = time.Duration(config.CloudProviderBackoffDuration) * time.Second - } - configureUserAgent(&virtualMachineScaleSetVMsClient.Client) - - klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (read ops) using rate limit config: QPS=%g, bucket=%d", - config.RateLimitConfig.CloudProviderRateLimitQPS, - config.RateLimitConfig.CloudProviderRateLimitBucket) - klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (write ops) using rate limit config: QPS=%g, bucket=%d", - config.RateLimitConfig.CloudProviderRateLimitQPSWrite, - config.RateLimitConfig.CloudProviderRateLimitBucketWrite) - rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig) - return &azVirtualMachineScaleSetVMsClient{ - client: virtualMachineScaleSetVMsClient, - rateLimiterReader: rateLimiterReader, - rateLimiterWriter: rateLimiterWriter, - } -} - -func (az *azVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, rerr *retry.Error) { - mc := newMetricContext("vmssvm", "get", resourceGroupName, az.client.SubscriptionID, "") - if !az.rateLimiterReader.TryAccept() { - mc.RateLimitedCount() - rerr = createRateLimitErr(false, "VMSSGet") - return - } - - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID) - }() - - var err error - result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName, instanceID, expand) - mc.Observe(err) - return result, retry.GetError(result.Response.Response, err) -} - -func (az *azVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) { - mc := newMetricContext("vmssvm", "list", resourceGroupName, az.client.SubscriptionID, "") - if !az.rateLimiterReader.TryAccept() { - mc.RateLimitedCount() - rerr = createRateLimitErr(false, "VMSSList") - return - } - - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): start", resourceGroupName, virtualMachineScaleSetName, filter) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): end", resourceGroupName, virtualMachineScaleSetName, filter) - }() - - iterator, err := az.client.ListComplete(ctx, resourceGroupName, virtualMachineScaleSetName, filter, selectParameter, expand) - mc.Observe(err) - if err != nil { - return nil, retry.GetRetriableError(err) - } - - result = make([]compute.VirtualMachineScaleSetVM, 0) - for ; iterator.NotDone(); err = iterator.Next() { - if err != nil { - return nil, retry.GetRetriableError(err) - } - - result = append(result, iterator.Value()) - } - - return result, nil -} - -func (az *azVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error { - mc := newMetricContext("vmssvm", "create_or_update", resourceGroupName, az.client.SubscriptionID, source) - if !az.rateLimiterWriter.TryAccept() { - mc.RateLimitedCount() - return createRateLimitErr(true, "VMSSVMUpdate") - } - - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID) - defer func() { - klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID) - }() - - future, err := az.client.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters) - if err != nil { - return retry.GetError(future.Response(), mc.Observe(err)) - } - - err = future.WaitForCompletionRef(ctx, az.client.Client) - return retry.GetError(future.Response(), mc.Observe(err)) -} - // azRoutesClient implements RoutesClient. type azRoutesClient struct { client network.RoutesClient diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go index 71f82cebc49..bd1d7839279 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go @@ -548,7 +548,7 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) setFakeStore(store map[string]m fVMC.FakeStore = store } -func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, err *retry.Error) { +func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, err *retry.Error) { fVMC.mutex.Lock() defer fVMC.mutex.Unlock() diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 154d683e73b..b7950a40d84 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -551,7 +551,7 @@ func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compu ctx, cancel := getContextWithCancel() defer cancel() - allVMs, rerr := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView)) + allVMs, rerr := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, string(compute.InstanceView)) if rerr != nil { klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", rerr) return nil, rerr.Error() @@ -897,16 +897,12 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam defer cancel() klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID) rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update") - if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff { - klog.V(2).Infof("EnsureHostInPool update backing off vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err) - retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update") - if retryErr != nil { - klog.Errorf("EnsureHostInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err) - } - return retryErr + if rerr != nil { + klog.Errorf("EnsureHostInPool VirtualMachineScaleSetVMsClient.Update(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err) + return rerr.Error() } - return rerr.Error() + return nil } func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) { @@ -1022,21 +1018,10 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back }, } - // Update vmssVM with backoff. - ctx, cancel := getContextWithCancel() - defer cancel() - klog.V(2).Infof("ensureVMSSInPool begins to update vmss(%s) with new backendPoolID %s", vmssName, backendPoolID) - rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS) - if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff { - klog.V(2).Infof("ensureVMSSInPool update backing off vmss(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err) - retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS) - if retryErr != nil { - klog.Errorf("ensureVMSSInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err) - return retryErr - } - } + rerr := ss.CreateOrUpdateVMSS(ss.ResourceGroup, vmssName, newVMSS) if rerr != nil { + klog.Errorf("ensureVMSSInPool CreateOrUpdateVMSS(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err) return rerr.Error() } } @@ -1173,14 +1158,6 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa defer cancel() klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID) rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update") - if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff { - klog.V(2).Infof("ensureBackendPoolDeletedFromNode update backing off vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err) - retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update") - if retryErr != nil { - err = retryErr - klog.Errorf("ensureBackendPoolDeletedFromNode update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err) - } - } if rerr != nil { klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err) } else { @@ -1304,21 +1281,10 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen }, } - // Update vmssVM with backoff. - ctx, cancel := getContextWithCancel() - defer cancel() - klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS begins to update vmss(%s) with backendPoolID %s", vmssName, backendPoolID) - rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS) - if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff { - klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS update backing off vmss(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err) - retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS) - if retryErr != nil { - klog.Errorf("ensureBackendPoolDeletedFromVMSS update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, retryErr) - return retryErr - } - } + rerr := ss.CreateOrUpdateVMSS(ss.ResourceGroup, vmssName, newVMSS) if rerr != nil { + klog.Errorf("ensureBackendPoolDeletedFromVMSS CreateOrUpdateVMSS(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err) return rerr.Error() } } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go index 35ee33c8cf9..84ff972a1ad 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go @@ -79,7 +79,7 @@ func TestVMSSVMCache(t *testing.T) { // validate getting VMSS VM via cache. virtualMachines, rerr := ss.VirtualMachineScaleSetVMsClient.List( - context.Background(), "rg", "vmss", "", "", "") + context.Background(), "rg", "vmss", "") assert.Nil(t, rerr) assert.Equal(t, 3, len(virtualMachines)) for i := range virtualMachines { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/BUILD index 7d742947cb1..18aa96c0fac 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/BUILD @@ -25,7 +25,12 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient:all-srcs", + "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssclient:all-srcs", + "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config.go index 88aef868e00..a0be77309c3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config.go @@ -40,10 +40,11 @@ type ClientConfig struct { ShouldOmitCloudProviderBackoff bool } -// WithRateLimiter returns ClientConfig with rateLimitConfig set. +// WithRateLimiter returns a new ClientConfig with rateLimitConfig set. func (cfg *ClientConfig) WithRateLimiter(rl *RateLimitConfig) *ClientConfig { - cfg.RateLimitConfig = rl - return cfg + newClientConfig := *cfg + newClientConfig.RateLimitConfig = rl + return &newClientConfig } // RateLimitConfig indicates the rate limit config options. diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config_test.go index f06db05d988..b1155d0038b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/azure_client_config_test.go @@ -28,8 +28,8 @@ import ( func TestWithRateLimiter(t *testing.T) { config := &ClientConfig{} assert.Nil(t, config.RateLimitConfig) - config.WithRateLimiter(&RateLimitConfig{CloudProviderRateLimit: true}) - assert.Equal(t, &RateLimitConfig{CloudProviderRateLimit: true}, config.RateLimitConfig) + c := config.WithRateLimiter(&RateLimitConfig{CloudProviderRateLimit: true}) + assert.Equal(t, &RateLimitConfig{CloudProviderRateLimit: true}, c.RateLimitConfig) config.WithRateLimiter(nil) assert.Nil(t, config.RateLimitConfig) } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go index 9e604e973de..bf0ec8df851 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_error.go @@ -60,8 +60,15 @@ func (err *Error) Error() error { return nil } - return fmt.Errorf("Retriable: %v, RetryAfter: %s, HTTPStatusCode: %d, RawError: %v", - err.Retriable, err.RetryAfter.String(), err.HTTPStatusCode, err.RawError) + // Convert time to seconds for better logging. + retryAfterSeconds := 0 + curTime := now() + if err.RetryAfter.After(curTime) { + retryAfterSeconds = int(err.RetryAfter.Sub(curTime) / time.Second) + } + + return fmt.Errorf("Retriable: %v, RetryAfter: %ds, HTTPStatusCode: %d, RawError: %v", + err.Retriable, retryAfterSeconds, err.HTTPStatusCode, err.RawError) } // IsThrottled returns true the if the request is being throttled. @@ -99,8 +106,13 @@ func GetRateLimitError(isWrite bool, opName string) *Error { } // GetThrottlingError creates a new error for throttling. -func GetThrottlingError(operation, reason string) *Error { - return GetRetriableError(fmt.Errorf("azure cloud provider throttled for operation %s with reason %q", operation, reason)) +func GetThrottlingError(operation, reason string, retryAfter time.Time) *Error { + rawError := fmt.Errorf("azure cloud provider throttled for operation %s with reason %q", operation, reason) + return &Error{ + Retriable: true, + RawError: rawError, + RetryAfter: retryAfter, + } } // GetError gets a new Error based on resp and error. diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go index bc8a4778541..120b7dffaa1 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/retry/azure_retry.go @@ -21,6 +21,7 @@ package retry import ( "math/rand" "net/http" + "strings" "time" "github.com/Azure/go-autorest/autorest" @@ -55,6 +56,8 @@ type Backoff struct { // exceed the cap then the duration is set to the cap and the // steps parameter is set to zero. Cap time.Duration + // The errors indicate that the request shouldn't do more retrying. + NonRetriableErrors []string } // NewBackoff creates a new Backoff. @@ -68,6 +71,28 @@ func NewBackoff(duration time.Duration, factor float64, jitter float64, steps in } } +// WithNonRetriableErrors returns a new *Backoff with NonRetriableErrors assigned. +func (b *Backoff) WithNonRetriableErrors(errs []string) *Backoff { + newBackoff := *b + newBackoff.NonRetriableErrors = errs + return &newBackoff +} + +// isNonRetriableError returns true if the Error is one of NonRetriableErrors. +func (b *Backoff) isNonRetriableError(rerr *Error) bool { + if rerr == nil { + return false + } + + for _, err := range b.NonRetriableErrors { + if strings.Contains(rerr.RawError.Error(), err) { + return true + } + } + + return false +} + // Step (1) returns an amount of time to sleep determined by the // original Duration and Jitter and (2) mutates the provided Backoff // to update its Steps and Duration. @@ -134,8 +159,9 @@ func doBackoffRetry(s autorest.Sender, r *http.Request, backoff *Backoff) (resp // 1) request succeed // 2) request is not retriable // 3) request has been throttled - // 4) request has completed all the retry steps - if rerr == nil || !rerr.Retriable || rerr.IsThrottled() || backoff.Steps == 1 { + // 4) request contains non-retriable errors + // 5) request has completed all the retry steps + if rerr == nil || !rerr.Retriable || rerr.IsThrottled() || backoff.isNonRetriableError(rerr) || backoff.Steps == 1 { return resp, rerr.Error() } diff --git a/vendor/modules.txt b/vendor/modules.txt index 20610a2a98f..6ae21fec0b4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1839,6 +1839,9 @@ k8s.io/legacy-cloud-providers/aws k8s.io/legacy-cloud-providers/azure k8s.io/legacy-cloud-providers/azure/auth k8s.io/legacy-cloud-providers/azure/clients +k8s.io/legacy-cloud-providers/azure/clients/armclient +k8s.io/legacy-cloud-providers/azure/clients/vmssclient +k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient k8s.io/legacy-cloud-providers/azure/retry k8s.io/legacy-cloud-providers/gce k8s.io/legacy-cloud-providers/openstack