diff --git a/pkg/cloudprovider/providers/azure/BUILD b/pkg/cloudprovider/providers/azure/BUILD index 3c7707158c1..37e23ace381 100644 --- a/pkg/cloudprovider/providers/azure/BUILD +++ b/pkg/cloudprovider/providers/azure/BUILD @@ -12,6 +12,7 @@ go_library( name = "go_default_library", srcs = [ "azure.go", + "azure_backoff.go", "azure_blob.go", "azure_file.go", "azure_instances.go", @@ -44,6 +45,8 @@ go_library( "//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 7694693e261..ca2e48b7e6c 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "time" + "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/version" @@ -32,10 +33,20 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure" "github.com/ghodss/yaml" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" ) -// CloudProviderName is the value used for the --cloud-provider flag -const CloudProviderName = "azure" +const ( + // CloudProviderName is the value used for the --cloud-provider flag + CloudProviderName = "azure" + rateLimitQPSDefault = 1.0 + rateLimitBucketDefault = 5 + backoffRetriesDefault = 6 + backoffExponentDefault = 1.5 + backoffDurationDefault = 5 // in seconds + backoffJitterDefault = 1.0 +) // Config holds the configuration parsed from the --cloud-config flag // All fields are required unless otherwise specified @@ -69,21 +80,39 @@ type Config struct { AADClientID string `json:"aadClientId" yaml:"aadClientId"` // The ClientSecret for an AAD application with RBAC access to talk to Azure RM APIs AADClientSecret string `json:"aadClientSecret" yaml:"aadClientSecret"` + // Enable exponential backoff to manage resource request retries + CloudProviderBackoff bool `json:"cloudProviderBackoff" yaml:"cloudProviderBackoff"` + // Backoff retry limit + CloudProviderBackoffRetries int `json:"cloudProviderBackoffRetries" yaml:"cloudProviderBackoffRetries"` + // Backoff exponent + CloudProviderBackoffExponent float64 `json:"cloudProviderBackoffExponent" yaml:"cloudProviderBackoffExponent"` + // Backoff duration + CloudProviderBackoffDuration int `json:"cloudProviderBackoffDuration" yaml:"cloudProviderBackoffDuration"` + // Backoff jitter + CloudProviderBackoffJitter float64 `json:"cloudProviderBackoffJitter" yaml:"cloudProviderBackoffJitter"` + // Enable rate limiting + CloudProviderRateLimit bool `json:"cloudProviderRateLimit" yaml:"cloudProviderRateLimit"` + // Rate limit QPS + CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS" yaml:"cloudProviderRateLimitQPS"` + // Rate limit Bucket Size + CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket" yaml:"cloudProviderRateLimitBucket"` } // Cloud holds the config and clients type Cloud struct { Config - Environment azure.Environment - RoutesClient network.RoutesClient - SubnetsClient network.SubnetsClient - InterfacesClient network.InterfacesClient - RouteTablesClient network.RouteTablesClient - LoadBalancerClient network.LoadBalancersClient - PublicIPAddressesClient network.PublicIPAddressesClient - SecurityGroupsClient network.SecurityGroupsClient - VirtualMachinesClient compute.VirtualMachinesClient - StorageAccountClient storage.AccountsClient + Environment azure.Environment + RoutesClient network.RoutesClient + SubnetsClient network.SubnetsClient + InterfacesClient network.InterfacesClient + RouteTablesClient network.RouteTablesClient + LoadBalancerClient network.LoadBalancersClient + PublicIPAddressesClient network.PublicIPAddressesClient + SecurityGroupsClient network.SecurityGroupsClient + VirtualMachinesClient compute.VirtualMachinesClient + StorageAccountClient storage.AccountsClient + operationPollRateLimiter flowcontrol.RateLimiter + resourceRequestBackoff wait.Backoff } func init() { @@ -177,6 +206,54 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID) az.StorageAccountClient.Authorizer = servicePrincipalToken + // Conditionally configure rate limits + if az.CloudProviderRateLimit { + // Assign rate limit defaults if no configuration was passed in + if az.CloudProviderRateLimitQPS == 0 { + az.CloudProviderRateLimitQPS = rateLimitQPSDefault + } + if az.CloudProviderRateLimitBucket == 0 { + az.CloudProviderRateLimitBucket = rateLimitBucketDefault + } + az.operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter( + az.CloudProviderRateLimitQPS, + az.CloudProviderRateLimitBucket) + glog.V(2).Infof("Azure cloudprovider using rate limit config: QPS=%d, bucket=%d", + az.CloudProviderRateLimitQPS, + az.CloudProviderRateLimitBucket) + } else { + // if rate limits are configured off, az.operationPollRateLimiter.Accept() is a no-op + az.operationPollRateLimiter = flowcontrol.NewFakeAlwaysRateLimiter() + } + + // Conditionally configure resource request backoff + if az.CloudProviderBackoff { + // Assign backoff defaults if no configuration was passed in + if az.CloudProviderBackoffRetries == 0 { + az.CloudProviderBackoffRetries = backoffRetriesDefault + } + if az.CloudProviderBackoffExponent == 0 { + az.CloudProviderBackoffExponent = backoffExponentDefault + } + if az.CloudProviderBackoffDuration == 0 { + az.CloudProviderBackoffDuration = backoffDurationDefault + } + if az.CloudProviderBackoffJitter == 0 { + az.CloudProviderBackoffJitter = backoffJitterDefault + } + az.resourceRequestBackoff = wait.Backoff{ + Steps: az.CloudProviderBackoffRetries, + Factor: az.CloudProviderBackoffExponent, + Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second, + Jitter: az.CloudProviderBackoffJitter, + } + glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f", + az.CloudProviderBackoffRetries, + az.CloudProviderBackoffExponent, + az.CloudProviderBackoffDuration, + az.CloudProviderBackoffJitter) + } + return &az, nil } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go new file mode 100644 index 00000000000..3fca4c49334 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -0,0 +1,170 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" +) + +// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry +func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, bool, error) { + var machine compute.VirtualMachine + var exists bool + err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + var retryErr error + machine, exists, retryErr = az.getVirtualMachine(name) + if retryErr != nil { + glog.Errorf("backoff: failure, will retry,err=%v", retryErr) + return false, nil + } + glog.V(2).Infof("backoff: success") + return true, nil + }) + return machine, exists, err +} + +// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + return processRetryResponse(resp, err) + }) +} + +// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry +func (az *Cloud) DeletePublicIPWithRetry(pipName string) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + return processRetryResponse(resp, err) + }) +} + +// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry +func (az *Cloud) DeleteLBWithRetry(lbName string) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateRouteTableWithRetry invokes az.RouteTablesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateRouteWithRetry invokes az.RoutesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + return processRetryResponse(resp, err) + }) +} + +// DeleteRouteWithRetry invokes az.RoutesClient.Delete with exponential backoff retry +func (az *Cloud) DeleteRouteWithRetry(routeName string) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error { + return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + az.operationPollRateLimiter.Accept() + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + return processRetryResponse(resp, err) + }) +} + +// A wait.ConditionFunc function to deal with common HTTP backoff response conditions +func processRetryResponse(resp autorest.Response, err error) (bool, error) { + if isSuccessHTTPResponse(resp) { + glog.V(2).Infof("backoff: success, HTTP response=%d", resp.StatusCode) + return true, nil + } + if shouldRetryAPIRequest(resp, err) { + glog.Errorf("backoff: failure, will retry, HTTP response=%d, err=%v", resp.StatusCode, err) + // suppress the error object so that backoff process continues + return false, nil + } + // Fall-through: stop periodic backoff, return error object from most recent request + return true, err +} + +// shouldRetryAPIRequest determines if the response from an HTTP request suggests periodic retry behavior +func shouldRetryAPIRequest(resp autorest.Response, err error) bool { + if err != nil { + return true + } + // HTTP 4xx or 5xx suggests we should retry + if 399 < resp.StatusCode && resp.StatusCode < 600 { + return true + } + return false +} + +// isSuccessHTTPResponse determines if the response from an HTTP request suggests success +func isSuccessHTTPResponse(resp autorest.Response) bool { + // HTTP 2xx suggests a successful response + if 199 < resp.StatusCode && resp.StatusCode < 300 { + return true + } + return false +} diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go index 45b109e1478..fdd0bbdd9ca 100644 --- a/pkg/cloudprovider/providers/azure/azure_instances.go +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" ) @@ -31,6 +32,7 @@ import ( func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) { ip, err := az.getIPForMachine(name) if err != nil { + glog.Errorf("error: az.NodeAddresses, az.getIPForMachine(%s), err=%v", name, err) return nil, err } @@ -55,9 +57,22 @@ func (az *Cloud) ExternalID(name types.NodeName) (string, error) { // InstanceID returns the cloud provider ID of the specified instance. // Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound) func (az *Cloud) InstanceID(name types.NodeName) (string, error) { - machine, exists, err := az.getVirtualMachine(name) + var machine compute.VirtualMachine + var exists bool + var err error + az.operationPollRateLimiter.Accept() + machine, exists, err = az.getVirtualMachine(name) if err != nil { - return "", err + if az.CloudProviderBackoff { + glog.V(2).Infof("InstanceID(%s) backing off", name) + machine, exists, err = az.GetVirtualMachineWithRetry(name) + if err != nil { + glog.V(2).Infof("InstanceID(%s) abort backoff", name) + return "", err + } + } else { + return "", err + } } else if !exists { return "", cloudprovider.InstanceNotFound } @@ -78,6 +93,7 @@ func (az *Cloud) InstanceTypeByProviderID(providerID string) (string, error) { func (az *Cloud) InstanceType(name types.NodeName) (string, error) { machine, exists, err := az.getVirtualMachine(name) if err != nil { + glog.Errorf("error: az.InstanceType(%s), az.getVirtualMachine(%s) err=%v", name, name, err) return "", err } else if !exists { return "", cloudprovider.InstanceNotFound @@ -100,8 +116,10 @@ func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) { func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error) { allNodes := []compute.VirtualMachine{} + az.operationPollRateLimiter.Accept() result, err := az.VirtualMachinesClient.List(az.ResourceGroup) if err != nil { + glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.List(%s), err=%v", az.ResourceGroup, err) return nil, err } @@ -110,8 +128,10 @@ func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error) for morePages { allNodes = append(allNodes, *result.Value...) + az.operationPollRateLimiter.Accept() result, err = az.VirtualMachinesClient.ListAllNextResults(result) if err != nil { + glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.ListAllNextResults(%v), err=%v", result, err) return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 940723d2a46..6121d26b12b 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -92,6 +92,7 @@ func (az *Cloud) getPublicIPName(clusterName string, service *v1.Service) (strin return fmt.Sprintf("%s-%s", clusterName, cloudprovider.GetLoadBalancerName(service)), nil } + az.operationPollRateLimiter.Accept() list, err := az.PublicIPAddressesClient.List(az.ResourceGroup) if err != nil { return "", err @@ -135,6 +136,7 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod serviceName := getServiceName(service) glog.V(5).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName) + az.operationPollRateLimiter.Accept() sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") if err != nil { return nil, err @@ -149,7 +151,16 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod // to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil sg.SecurityGroupPropertiesFormat.Subnets = nil - _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("ensure(%s) backing off: sg(%s) - updating", serviceName, *sg.Name) + retryErr := az.CreateOrUpdateSGWithRetry(sg) + if retryErr != nil { + glog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name) + return nil, retryErr + } + } if err != nil { return nil, err } @@ -219,7 +230,16 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod } if !existsLb || lbNeedsUpdate { glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName) - _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("ensure(%s) backing off: lb(%s) - updating", serviceName, lbName) + retryErr := az.CreateOrUpdateLBWithRetry(lb) + if retryErr != nil { + glog.V(2).Infof("ensure(%s) abort backoff: lb(%s) - updating", serviceName, lbName) + return nil, retryErr + } + } if err != nil { return nil, err } @@ -310,7 +330,16 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi // to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil sg.SecurityGroupPropertiesFormat.Subnets = nil - _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName) + retryErr := az.CreateOrUpdateSGWithRetry(reconciledSg) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName) + } + } if err != nil { return err } @@ -339,14 +368,32 @@ func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, is if lbNeedsUpdate { if len(*lb.FrontendIPConfigurations) > 0 { glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName) - _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName) + retryErr := az.CreateOrUpdateLBWithRetry(lb) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName) + } + } if err != nil { return err } } else { glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) - _, err = az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("delete(%s) backing off: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) + retryErr := az.DeleteLBWithRetry(lbName) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) + } + } if err != nil { return err } @@ -392,11 +439,21 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub pip.Tags = &map[string]*string{"service": &serviceName} glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name) - _, err = az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("ensure(%s) backing off: pip(%s) - creating", serviceName, *pip.Name) + retryErr := az.CreateOrUpdatePIPWithRetry(pip) + if retryErr != nil { + glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - creating", serviceName, *pip.Name) + err = retryErr + } + } if err != nil { return nil, err } + az.operationPollRateLimiter.Accept() pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, *pip.Name, "") if err != nil { return nil, err @@ -407,7 +464,17 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub } func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error { - _, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + glog.V(2).Infof("ensure(%s): pip(%s) - deleting", serviceName, pipName) + az.operationPollRateLimiter.Accept() + resp, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, deleteErr) { + glog.V(2).Infof("ensure(%s) backing off: pip(%s) - deleting", serviceName, pipName) + retryErr := az.DeletePublicIPWithRetry(pipName) + if retryErr != nil { + glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - deleting", serviceName, pipName) + return retryErr + } + } _, realErr := checkResourceExistsFromError(deleteErr) if realErr != nil { return nil @@ -792,6 +859,7 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b // participating in the specified LoadBalancer Backend Pool. func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string) error { vmName := mapNodeNameToVMName(nodeName) + az.operationPollRateLimiter.Accept() machine, err := az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "") if err != nil { return err @@ -817,6 +885,7 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b } } + az.operationPollRateLimiter.Accept() nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") if err != nil { return err @@ -848,7 +917,16 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) - _, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err) + retryErr := az.CreateOrUpdateInterfaceWithRetry(nic) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName) + } + } if err != nil { return err } diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index aab962f7088..0d7a23ebfd8 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -66,6 +66,7 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo routeTable, existsRouteTable, err := az.getRouteTable() if err != nil { + glog.V(2).Infof("create error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) return err } if !existsRouteTable { @@ -76,7 +77,16 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo } glog.V(3).Infof("create: creating routetable. routeTableName=%q", az.RouteTableName) - _, err = az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("create backing off: creating routetable. routeTableName=%q", az.RouteTableName) + retryErr := az.CreateOrUpdateRouteTableWithRetry(routeTable) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("create abort backoff: creating routetable. routeTableName=%q", az.RouteTableName) + } + } if err != nil { return err } @@ -103,7 +113,16 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo } glog.V(3).Infof("create: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR) - _, err = az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("create backing off: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR) + retryErr := az.CreateOrUpdateRouteWithRetry(route) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("create abort backoff: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR) + } + } if err != nil { return err } @@ -118,7 +137,16 @@ func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route) glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) routeName := mapNodeNameToRouteName(kubeRoute.TargetNode) - _, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + az.operationPollRateLimiter.Accept() + resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("delete backing off: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) + retryErr := az.DeleteRouteWithRetry(routeName) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("delete abort backoff: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) + } + } if err != nil { return err } diff --git a/pkg/cloudprovider/providers/azure/azure_storage.go b/pkg/cloudprovider/providers/azure/azure_storage.go index abaae7c4019..b810480ab47 100644 --- a/pkg/cloudprovider/providers/azure/azure_storage.go +++ b/pkg/cloudprovider/providers/azure/azure_storage.go @@ -64,7 +64,17 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l }, } vmName := mapNodeNameToVMName(nodeName) - _, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + glog.V(2).Infof("create(%s): vm(%s)", az.ResourceGroup, vmName) + az.operationPollRateLimiter.Accept() + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("create(%s) backing off: vm(%s)", az.ResourceGroup, vmName) + retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("create(%s) abort backoff: vm(%s)", az.ResourceGroup, vmName) + } + } if err != nil { glog.Errorf("azure attach failed, err: %v", err) detail := err.Error() @@ -135,7 +145,17 @@ func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeN }, } vmName := mapNodeNameToVMName(nodeName) - _, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + glog.V(2).Infof("create(%s): vm(%s)", az.ResourceGroup, vmName) + az.operationPollRateLimiter.Accept() + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) { + glog.V(2).Infof("create(%s) backing off: vm(%s)", az.ResourceGroup, vmName) + retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("create(%s) abort backoff: vm(%s)", az.ResourceGroup, vmName) + } + } if err != nil { glog.Errorf("azure disk detach failed, err: %v", err) } else { diff --git a/pkg/cloudprovider/providers/azure/azure_storageaccount.go b/pkg/cloudprovider/providers/azure/azure_storageaccount.go index 4d33fb21e66..fa436eedf05 100644 --- a/pkg/cloudprovider/providers/azure/azure_storageaccount.go +++ b/pkg/cloudprovider/providers/azure/azure_storageaccount.go @@ -27,6 +27,7 @@ type accountWithLocation struct { // getStorageAccounts gets the storage accounts' name, type, location in a resource group func (az *Cloud) getStorageAccounts() ([]accountWithLocation, error) { + az.operationPollRateLimiter.Accept() result, err := az.StorageAccountClient.ListByResourceGroup(az.ResourceGroup) if err != nil { return nil, err @@ -56,6 +57,7 @@ func (az *Cloud) getStorageAccounts() ([]accountWithLocation, error) { // getStorageAccesskey gets the storage account access key func (az *Cloud) getStorageAccesskey(account string) (string, error) { + az.operationPollRateLimiter.Accept() result, err := az.StorageAccountClient.ListKeys(az.ResourceGroup, account) if err != nil { return "", err diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 98e9eead48c..15f86751abe 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -591,11 +591,33 @@ func TestNewCloudFromJSON(t *testing.T) { "securityGroupName": "--security-group-name--", "vnetName": "--vnet-name--", "routeTableName": "--route-table-name--", - "primaryAvailabilitySetName": "--primary-availability-set-name--" + "primaryAvailabilitySetName": "--primary-availability-set-name--", + "cloudProviderBackoff": true, + "cloudProviderBackoffRetries": 6, + "cloudProviderBackoffExponent": 1.5, + "cloudProviderBackoffDuration": 5, + "cloudProviderBackoffJitter": 1.0, + "cloudProviderRatelimit": true, + "cloudProviderRateLimitQPS": 0.5, + "cloudProviderRateLimitBucket": 5 }` validateConfig(t, config) } +// Test Backoff and Rate Limit defaults (json) +func TestCloudDefaultConfigFromJSON(t *testing.T) { + config := `{}` + + validateEmptyConfig(t, config) +} + +// Test Backoff and Rate Limit defaults (yaml) +func TestCloudDefaultConfigFromYAML(t *testing.T) { + config := `` + + validateEmptyConfig(t, config) +} + // Test Configuration deserialization (yaml) func TestNewCloudFromYAML(t *testing.T) { config := ` @@ -610,21 +632,20 @@ securityGroupName: --security-group-name-- vnetName: --vnet-name-- routeTableName: --route-table-name-- primaryAvailabilitySetName: --primary-availability-set-name-- +cloudProviderBackoff: true +cloudProviderBackoffRetries: 6 +cloudProviderBackoffExponent: 1.5 +cloudProviderBackoffDuration: 5 +cloudProviderBackoffJitter: 1.0 +cloudProviderRatelimit: true +cloudProviderRateLimitQPS: 0.5 +cloudProviderRateLimitBucket: 5 ` validateConfig(t, config) } func validateConfig(t *testing.T, config string) { - configReader := strings.NewReader(config) - cloud, err := NewCloud(configReader) - if err != nil { - t.Error(err) - } - - azureCloud, ok := cloud.(*Cloud) - if !ok { - t.Error("NewCloud returned incorrect type") - } + azureCloud := getCloudFromConfig(t, config) if azureCloud.TenantID != "--tenant-id--" { t.Errorf("got incorrect value for TenantID") @@ -659,6 +680,58 @@ func validateConfig(t *testing.T, config string) { if azureCloud.PrimaryAvailabilitySetName != "--primary-availability-set-name--" { t.Errorf("got incorrect value for PrimaryAvailabilitySetName") } + if azureCloud.CloudProviderBackoff != true { + t.Errorf("got incorrect value for CloudProviderBackoff") + } + if azureCloud.CloudProviderBackoffRetries != 6 { + t.Errorf("got incorrect value for CloudProviderBackoffRetries") + } + if azureCloud.CloudProviderBackoffExponent != 1.5 { + t.Errorf("got incorrect value for CloudProviderBackoffExponent") + } + if azureCloud.CloudProviderBackoffDuration != 5 { + t.Errorf("got incorrect value for CloudProviderBackoffDuration") + } + if azureCloud.CloudProviderBackoffJitter != 1.0 { + t.Errorf("got incorrect value for CloudProviderBackoffJitter") + } + if azureCloud.CloudProviderRateLimit != true { + t.Errorf("got incorrect value for CloudProviderRateLimit") + } + if azureCloud.CloudProviderRateLimitQPS != 0.5 { + t.Errorf("got incorrect value for CloudProviderRateLimitQPS") + } + if azureCloud.CloudProviderRateLimitBucket != 5 { + t.Errorf("got incorrect value for CloudProviderRateLimitBucket") + } +} + +func getCloudFromConfig(t *testing.T, config string) *Cloud { + configReader := strings.NewReader(config) + cloud, err := NewCloud(configReader) + if err != nil { + t.Error(err) + } + azureCloud, ok := cloud.(*Cloud) + if !ok { + t.Error("NewCloud returned incorrect type") + } + return azureCloud +} + +// TODO include checks for other appropriate default config parameters +func validateEmptyConfig(t *testing.T, config string) { + azureCloud := getCloudFromConfig(t, config) + + // backoff should be disabled by default if not explicitly enabled in config + if azureCloud.CloudProviderBackoff != false { + t.Errorf("got incorrect value for CloudProviderBackoff") + } + + // rate limits should be disabled by default if not explicitly enabled in config + if azureCloud.CloudProviderRateLimit != false { + t.Errorf("got incorrect value for CloudProviderRateLimit") + } } func TestDecodeInstanceInfo(t *testing.T) { diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index 094f3aaf903..853ea29e98b 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -25,6 +25,7 @@ import ( "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" ) @@ -242,26 +243,32 @@ func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) { return "", cloudprovider.InstanceNotFound } if err != nil { + glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err) return "", err } nicID, err := getPrimaryInterfaceID(machine) if err != nil { + glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) return "", err } nicName, err := getLastSegment(nicID) if err != nil { + glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) return "", err } + az.operationPollRateLimiter.Accept() nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") if err != nil { + glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err) return "", err } ipConfig, err := getPrimaryIPConfig(nic) if err != nil { + glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err) return "", err } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index 185b7347211..613e59a439d 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -43,6 +43,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM var realErr error vmName := string(nodeName) + az.operationPollRateLimiter.Accept() vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "") exists, realErr = checkResourceExistsFromError(err) @@ -60,6 +61,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { var realErr error + az.operationPollRateLimiter.Accept() routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") exists, realErr = checkResourceExistsFromError(err) @@ -77,6 +79,7 @@ func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, er func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err error) { var realErr error + az.operationPollRateLimiter.Accept() sg, err = az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") exists, realErr = checkResourceExistsFromError(err) @@ -94,6 +97,7 @@ func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { var realErr error + az.operationPollRateLimiter.Accept() lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "") exists, realErr = checkResourceExistsFromError(err) @@ -111,6 +115,7 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, exists bool, err error) { var realErr error + az.operationPollRateLimiter.Accept() pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, name, "") exists, realErr = checkResourceExistsFromError(err) @@ -128,6 +133,7 @@ func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, e func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet network.Subnet, exists bool, err error) { var realErr error + az.operationPollRateLimiter.Accept() subnet, err = az.SubnetsClient.Get(az.ResourceGroup, virtualNetworkName, subnetName, "") exists, realErr = checkResourceExistsFromError(err)