Use new VMSS and VMSSVM client in Azure cloud provider

This commit is contained in:
Pengfei Ni 2019-12-31 12:57:21 +08:00
parent b8e7767b67
commit c813e25892
13 changed files with 104 additions and 335 deletions

View File

@ -69,6 +69,8 @@ go_library(
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssclient:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network:go_default_library",

View File

@ -46,6 +46,8 @@ import (
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/auth"
azclients "k8s.io/legacy-cloud-providers/azure/clients"
"k8s.io/legacy-cloud-providers/azure/clients/vmssclient"
"k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient"
"k8s.io/legacy-cloud-providers/azure/retry"
"sigs.k8s.io/yaml"
)
@ -480,8 +482,12 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro
az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig.WithRateLimiter(config.VirtualMachineRateLimit))
az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig.WithRateLimiter(config.PublicIPAddressRateLimit))
az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig.WithRateLimiter(config.VirtualMachineSizeRateLimit))
az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit))
az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit))
az.VirtualMachineScaleSetsClient = vmssclient.New(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit))
vmssVMClientConfig := azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)
vmssVMClientConfig.Backoff = vmssVMClientConfig.Backoff.WithNonRetriableErrors([]string{vmssVMNotActiveErrorMessage})
az.VirtualMachineScaleSetVMsClient = vmssvmclient.New(vmssVMClientConfig)
// TODO(feiskyer): refactor azureFileClient to Interface.
az.FileClient = &azureFileClient{env: *env}

View File

@ -693,77 +693,32 @@ func (az *Cloud) deleteRouteWithRetry(routeName string) error {
})
}
// UpdateVmssVMWithRetry invokes az.VirtualMachineScaleSetVMsClient.Update with exponential backoff retry
func (az *Cloud) UpdateVmssVMWithRetry(resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) error {
return wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
// CreateOrUpdateVMSS invokes az.VirtualMachineScaleSetsClient.Update().
func (az *Cloud) CreateOrUpdateVMSS(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()
rerr := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s): end", VMScaleSetName, instanceID)
if rerr == nil {
return true, nil
}
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
klog.V(3).Infof("CreateOrUpdateVMSS: verify the status of the vmss being created or updated")
vmss, rerr := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName)
if rerr != nil {
klog.Errorf("CreateOrUpdateVMSS: error getting vmss(%s): %v", VMScaleSetName, rerr)
return rerr
}
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("CreateOrUpdateVMSS: found vmss %s being deleted, skipping", VMScaleSetName)
return nil
}
if strings.Contains(rerr.Error().Error(), vmssVMNotActiveErrorMessage) {
// When instances are under deleting, updating API would report "not an active Virtual Machine Scale Set VM instanceId" error.
// Since they're under deleting, we shouldn't send more update requests for it.
klog.V(3).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s) gets error message %q, abort backoff because it's probably under deleting", VMScaleSetName, instanceID, vmssVMNotActiveErrorMessage)
return true, nil
}
rerr = az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName)
if rerr != nil {
klog.Errorf("CreateOrUpdateVMSS: error CreateOrUpdate vmss(%s): %v", VMScaleSetName, rerr)
return rerr
}
return !rerr.Retriable, rerr.Error()
})
}
// CreateOrUpdateVmssWithRetry invokes az.VirtualMachineScaleSetsClient.Update with exponential backoff retry
func (az *Cloud) CreateOrUpdateVmssWithRetry(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: verify the status of the vmss being created or updated")
vmss, rerr := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName)
if rerr != nil {
klog.Warningf("CreateOrUpdateVmssWithRetry: error getting vmss: %v", rerr)
}
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: found vmss %s being deleted, skipping", VMScaleSetName)
return true, nil
}
rerr = az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName)
if rerr == nil {
return true, nil
}
return !rerr.Retriable, rerr.Error()
})
}
// GetScaleSetWithRetry gets scale set with exponential backoff retry
func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vmssName string) (compute.VirtualMachineScaleSet, error) {
var result compute.VirtualMachineScaleSet
var retryErr *retry.Error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
result, retryErr = az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, vmssName)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error().Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", vmssName, retryErr)
return false, nil
}
klog.V(4).Infof("backoff: success for scale set %q", vmssName)
return true, nil
})
return result, err
return nil
}
func (cfg *Config) shouldOmitCloudProviderBackoff() bool {

View File

@ -108,7 +108,7 @@ type VirtualMachineScaleSetsClient interface {
// VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient
type VirtualMachineScaleSetVMsClient interface {
Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, rerr *retry.Error)
List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error)
List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error)
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error
}
@ -953,213 +953,6 @@ func (az *azSecurityGroupsClient) List(ctx context.Context, resourceGroupName st
return result, nil
}
// azVirtualMachineScaleSetsClient implements VirtualMachineScaleSetsClient.
type azVirtualMachineScaleSetsClient struct {
client compute.VirtualMachineScaleSetsClient
rateLimiterReader flowcontrol.RateLimiter
rateLimiterWriter flowcontrol.RateLimiter
}
func newAzVirtualMachineScaleSetsClient(config *azclients.ClientConfig) *azVirtualMachineScaleSetsClient {
virtualMachineScaleSetsClient := compute.NewVirtualMachineScaleSetsClient(config.SubscriptionID)
virtualMachineScaleSetsClient.BaseURI = config.ResourceManagerEndpoint
virtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(config.ServicePrincipalToken)
virtualMachineScaleSetsClient.PollingDelay = 5 * time.Second
if config.ShouldOmitCloudProviderBackoff {
virtualMachineScaleSetsClient.RetryAttempts = config.CloudProviderBackoffRetries
virtualMachineScaleSetsClient.RetryDuration = time.Duration(config.CloudProviderBackoffDuration) * time.Second
}
configureUserAgent(&virtualMachineScaleSetsClient.Client)
klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.RateLimitConfig.CloudProviderRateLimitQPS,
config.RateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.RateLimitConfig.CloudProviderRateLimitQPSWrite,
config.RateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig)
return &azVirtualMachineScaleSetsClient{
client: virtualMachineScaleSetsClient,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
func (az *azVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, rerr *retry.Error) {
mc := newMetricContext("vmss", "get", resourceGroupName, az.client.SubscriptionID, "")
if !az.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
rerr = createRateLimitErr(false, "VMSSGet")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): start", resourceGroupName, VMScaleSetName)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): end", resourceGroupName, VMScaleSetName)
}()
var err error
result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName)
mc.Observe(err)
return result, retry.GetError(result.Response.Response, err)
}
func (az *azVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, rerr *retry.Error) {
mc := newMetricContext("vmss", "list", resourceGroupName, az.client.SubscriptionID, "")
if !az.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
rerr = createRateLimitErr(false, "VMSSList")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q): start", resourceGroupName)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q): end", resourceGroupName)
}()
iterator, err := az.client.ListComplete(ctx, resourceGroupName)
mc.Observe(err)
if err != nil {
return nil, retry.GetRetriableError(err)
}
result = make([]compute.VirtualMachineScaleSet, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, retry.GetRetriableError(err)
}
result = append(result, iterator.Value())
}
return result, nil
}
func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, vmScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error {
mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID, "")
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
mc.RateLimitedCount()
return createRateLimitErr(true, "NiCreateOrUpdate")
}
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, vmScaleSetName)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, vmScaleSetName)
}()
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, vmScaleSetName, parameters)
if err != nil {
return retry.GetError(future.Response(), mc.Observe(err))
}
err = future.WaitForCompletionRef(ctx, az.client.Client)
return retry.GetError(future.Response(), mc.Observe(err))
}
// azVirtualMachineScaleSetVMsClient implements VirtualMachineScaleSetVMsClient.
type azVirtualMachineScaleSetVMsClient struct {
client compute.VirtualMachineScaleSetVMsClient
rateLimiterReader flowcontrol.RateLimiter
rateLimiterWriter flowcontrol.RateLimiter
}
func newAzVirtualMachineScaleSetVMsClient(config *azclients.ClientConfig) *azVirtualMachineScaleSetVMsClient {
virtualMachineScaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(config.SubscriptionID)
virtualMachineScaleSetVMsClient.BaseURI = config.ResourceManagerEndpoint
virtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(config.ServicePrincipalToken)
virtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second
if config.ShouldOmitCloudProviderBackoff {
virtualMachineScaleSetVMsClient.RetryAttempts = config.CloudProviderBackoffRetries
virtualMachineScaleSetVMsClient.RetryDuration = time.Duration(config.CloudProviderBackoffDuration) * time.Second
}
configureUserAgent(&virtualMachineScaleSetVMsClient.Client)
klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.RateLimitConfig.CloudProviderRateLimitQPS,
config.RateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.RateLimitConfig.CloudProviderRateLimitQPSWrite,
config.RateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig)
return &azVirtualMachineScaleSetVMsClient{
client: virtualMachineScaleSetVMsClient,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
func (az *azVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
mc := newMetricContext("vmssvm", "get", resourceGroupName, az.client.SubscriptionID, "")
if !az.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
rerr = createRateLimitErr(false, "VMSSGet")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID)
}()
var err error
result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName, instanceID, expand)
mc.Observe(err)
return result, retry.GetError(result.Response.Response, err)
}
func (az *azVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
mc := newMetricContext("vmssvm", "list", resourceGroupName, az.client.SubscriptionID, "")
if !az.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
rerr = createRateLimitErr(false, "VMSSList")
return
}
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): start", resourceGroupName, virtualMachineScaleSetName, filter)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): end", resourceGroupName, virtualMachineScaleSetName, filter)
}()
iterator, err := az.client.ListComplete(ctx, resourceGroupName, virtualMachineScaleSetName, filter, selectParameter, expand)
mc.Observe(err)
if err != nil {
return nil, retry.GetRetriableError(err)
}
result = make([]compute.VirtualMachineScaleSetVM, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, retry.GetRetriableError(err)
}
result = append(result, iterator.Value())
}
return result, nil
}
func (az *azVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error {
mc := newMetricContext("vmssvm", "create_or_update", resourceGroupName, az.client.SubscriptionID, source)
if !az.rateLimiterWriter.TryAccept() {
mc.RateLimitedCount()
return createRateLimitErr(true, "VMSSVMUpdate")
}
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID)
defer func() {
klog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID)
}()
future, err := az.client.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
if err != nil {
return retry.GetError(future.Response(), mc.Observe(err))
}
err = future.WaitForCompletionRef(ctx, az.client.Client)
return retry.GetError(future.Response(), mc.Observe(err))
}
// azRoutesClient implements RoutesClient.
type azRoutesClient struct {
client network.RoutesClient

View File

@ -548,7 +548,7 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) setFakeStore(store map[string]m
fVMC.FakeStore = store
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, err *retry.Error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, err *retry.Error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()

View File

@ -551,7 +551,7 @@ func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compu
ctx, cancel := getContextWithCancel()
defer cancel()
allVMs, rerr := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
allVMs, rerr := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, string(compute.InstanceView))
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", rerr)
return nil, rerr.Error()
@ -897,16 +897,12 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
defer cancel()
klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("EnsureHostInPool update backing off vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if retryErr != nil {
klog.Errorf("EnsureHostInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
}
return retryErr
if rerr != nil {
klog.Errorf("EnsureHostInPool VirtualMachineScaleSetVMsClient.Update(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
return rerr.Error()
}
return rerr.Error()
return nil
}
func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) {
@ -1022,21 +1018,10 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
},
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureVMSSInPool begins to update vmss(%s) with new backendPoolID %s", vmssName, backendPoolID)
rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureVMSSInPool update backing off vmss(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
klog.Errorf("ensureVMSSInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
return retryErr
}
}
rerr := ss.CreateOrUpdateVMSS(ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil {
klog.Errorf("ensureVMSSInPool CreateOrUpdateVMSS(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
return rerr.Error()
}
}
@ -1173,14 +1158,6 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID)
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update backing off vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if retryErr != nil {
err = retryErr
klog.Errorf("ensureBackendPoolDeletedFromNode update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
}
}
if rerr != nil {
klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err)
} else {
@ -1304,21 +1281,10 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen
},
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS begins to update vmss(%s) with backendPoolID %s", vmssName, backendPoolID)
rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS update backing off vmss(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
klog.Errorf("ensureBackendPoolDeletedFromVMSS update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, retryErr)
return retryErr
}
}
rerr := ss.CreateOrUpdateVMSS(ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil {
klog.Errorf("ensureBackendPoolDeletedFromVMSS CreateOrUpdateVMSS(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
return rerr.Error()
}
}

View File

@ -79,7 +79,7 @@ func TestVMSSVMCache(t *testing.T) {
// validate getting VMSS VM via cache.
virtualMachines, rerr := ss.VirtualMachineScaleSetVMsClient.List(
context.Background(), "rg", "vmss", "", "", "")
context.Background(), "rg", "vmss", "")
assert.Nil(t, rerr)
assert.Equal(t, 3, len(virtualMachines))
for i := range virtualMachines {

View File

@ -25,7 +25,12 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssclient:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -40,10 +40,11 @@ type ClientConfig struct {
ShouldOmitCloudProviderBackoff bool
}
// WithRateLimiter returns ClientConfig with rateLimitConfig set.
// WithRateLimiter returns a new ClientConfig with rateLimitConfig set.
func (cfg *ClientConfig) WithRateLimiter(rl *RateLimitConfig) *ClientConfig {
cfg.RateLimitConfig = rl
return cfg
newClientConfig := *cfg
newClientConfig.RateLimitConfig = rl
return &newClientConfig
}
// RateLimitConfig indicates the rate limit config options.

View File

@ -28,8 +28,8 @@ import (
func TestWithRateLimiter(t *testing.T) {
config := &ClientConfig{}
assert.Nil(t, config.RateLimitConfig)
config.WithRateLimiter(&RateLimitConfig{CloudProviderRateLimit: true})
assert.Equal(t, &RateLimitConfig{CloudProviderRateLimit: true}, config.RateLimitConfig)
c := config.WithRateLimiter(&RateLimitConfig{CloudProviderRateLimit: true})
assert.Equal(t, &RateLimitConfig{CloudProviderRateLimit: true}, c.RateLimitConfig)
config.WithRateLimiter(nil)
assert.Nil(t, config.RateLimitConfig)
}

View File

@ -60,8 +60,15 @@ func (err *Error) Error() error {
return nil
}
return fmt.Errorf("Retriable: %v, RetryAfter: %s, HTTPStatusCode: %d, RawError: %v",
err.Retriable, err.RetryAfter.String(), err.HTTPStatusCode, err.RawError)
// Convert time to seconds for better logging.
retryAfterSeconds := 0
curTime := now()
if err.RetryAfter.After(curTime) {
retryAfterSeconds = int(err.RetryAfter.Sub(curTime) / time.Second)
}
return fmt.Errorf("Retriable: %v, RetryAfter: %ds, HTTPStatusCode: %d, RawError: %v",
err.Retriable, retryAfterSeconds, err.HTTPStatusCode, err.RawError)
}
// IsThrottled returns true the if the request is being throttled.
@ -99,8 +106,13 @@ func GetRateLimitError(isWrite bool, opName string) *Error {
}
// GetThrottlingError creates a new error for throttling.
func GetThrottlingError(operation, reason string) *Error {
return GetRetriableError(fmt.Errorf("azure cloud provider throttled for operation %s with reason %q", operation, reason))
func GetThrottlingError(operation, reason string, retryAfter time.Time) *Error {
rawError := fmt.Errorf("azure cloud provider throttled for operation %s with reason %q", operation, reason)
return &Error{
Retriable: true,
RawError: rawError,
RetryAfter: retryAfter,
}
}
// GetError gets a new Error based on resp and error.

View File

@ -21,6 +21,7 @@ package retry
import (
"math/rand"
"net/http"
"strings"
"time"
"github.com/Azure/go-autorest/autorest"
@ -55,6 +56,8 @@ type Backoff struct {
// exceed the cap then the duration is set to the cap and the
// steps parameter is set to zero.
Cap time.Duration
// The errors indicate that the request shouldn't do more retrying.
NonRetriableErrors []string
}
// NewBackoff creates a new Backoff.
@ -68,6 +71,28 @@ func NewBackoff(duration time.Duration, factor float64, jitter float64, steps in
}
}
// WithNonRetriableErrors returns a new *Backoff with NonRetriableErrors assigned.
func (b *Backoff) WithNonRetriableErrors(errs []string) *Backoff {
newBackoff := *b
newBackoff.NonRetriableErrors = errs
return &newBackoff
}
// isNonRetriableError returns true if the Error is one of NonRetriableErrors.
func (b *Backoff) isNonRetriableError(rerr *Error) bool {
if rerr == nil {
return false
}
for _, err := range b.NonRetriableErrors {
if strings.Contains(rerr.RawError.Error(), err) {
return true
}
}
return false
}
// Step (1) returns an amount of time to sleep determined by the
// original Duration and Jitter and (2) mutates the provided Backoff
// to update its Steps and Duration.
@ -134,8 +159,9 @@ func doBackoffRetry(s autorest.Sender, r *http.Request, backoff *Backoff) (resp
// 1) request succeed
// 2) request is not retriable
// 3) request has been throttled
// 4) request has completed all the retry steps
if rerr == nil || !rerr.Retriable || rerr.IsThrottled() || backoff.Steps == 1 {
// 4) request contains non-retriable errors
// 5) request has completed all the retry steps
if rerr == nil || !rerr.Retriable || rerr.IsThrottled() || backoff.isNonRetriableError(rerr) || backoff.Steps == 1 {
return resp, rerr.Error()
}

3
vendor/modules.txt vendored
View File

@ -1839,6 +1839,9 @@ k8s.io/legacy-cloud-providers/aws
k8s.io/legacy-cloud-providers/azure
k8s.io/legacy-cloud-providers/azure/auth
k8s.io/legacy-cloud-providers/azure/clients
k8s.io/legacy-cloud-providers/azure/clients/armclient
k8s.io/legacy-cloud-providers/azure/clients/vmssclient
k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient
k8s.io/legacy-cloud-providers/azure/retry
k8s.io/legacy-cloud-providers/gce
k8s.io/legacy-cloud-providers/openstack