Merge pull request #46660 from jackfrancis/azure-cloudprovider-backoff

Automatic merge from submit-queue (batch tested with PRs 43005, 46660, 46385, 46991, 47103)

Azure cloudprovider retry using flowcontrol

An initial attempt at engaging exponential backoff for API error responses.

Addresses #47048

Uses k8s.io/client-go/util/flowcontrol; implementation inspired by GCE
cloudprovider backoff.



**What this PR does / why we need it**:

The existing azure cloudprovider implementation has no guard rails in place to adapt to unexpected underlying operational conditions (i.e., clogs in resource plumbing between k8s runtime and the cloud API). The purpose of these changes is to support exponential backoff wrapping around API calls; and to support targeted rate limiting. Both of these options are configurable via `--cloud-config`.

Implementation inspired by the GCE's use of `k8s.io/client-go/util/flowcontrol` and `k8s.io/apimachinery/pkg/util/wait`, this PR likewise uses `flowcontrol` for rate limiting; and `wait` to thinly wrap backoff retry attempts to the API.

**Special notes for your reviewer**:


Pay especial note to the declaration of retry-able conditions from an unsuccessful HTTP request:
- all `4xx` and `5xx` HTTP responses
- non-nil error responses

And the declaration of retry success conditions:
- `2xx` HTTP responses

Tests updated to include additions to `Config`.

Those may be incomplete, or in other ways non-representative.

**Release note**:

```release-note
Added exponential backoff to Azure cloudprovider
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-07 13:30:58 -07:00 committed by GitHub
commit 3adb9b428b
11 changed files with 522 additions and 38 deletions

View File

@ -12,6 +12,7 @@ go_library(
name = "go_default_library",
srcs = [
"azure.go",
"azure_backoff.go",
"azure_blob.go",
"azure_file.go",
"azure_instances.go",
@ -44,6 +45,8 @@ go_library(
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"time"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/version"
@ -32,10 +33,20 @@ import (
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/ghodss/yaml"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)
// CloudProviderName is the value used for the --cloud-provider flag
const CloudProviderName = "azure"
const (
// CloudProviderName is the value used for the --cloud-provider flag
CloudProviderName = "azure"
rateLimitQPSDefault = 1.0
rateLimitBucketDefault = 5
backoffRetriesDefault = 6
backoffExponentDefault = 1.5
backoffDurationDefault = 5 // in seconds
backoffJitterDefault = 1.0
)
// Config holds the configuration parsed from the --cloud-config flag
// All fields are required unless otherwise specified
@ -69,6 +80,22 @@ type Config struct {
AADClientID string `json:"aadClientId" yaml:"aadClientId"`
// The ClientSecret for an AAD application with RBAC access to talk to Azure RM APIs
AADClientSecret string `json:"aadClientSecret" yaml:"aadClientSecret"`
// Enable exponential backoff to manage resource request retries
CloudProviderBackoff bool `json:"cloudProviderBackoff" yaml:"cloudProviderBackoff"`
// Backoff retry limit
CloudProviderBackoffRetries int `json:"cloudProviderBackoffRetries" yaml:"cloudProviderBackoffRetries"`
// Backoff exponent
CloudProviderBackoffExponent float64 `json:"cloudProviderBackoffExponent" yaml:"cloudProviderBackoffExponent"`
// Backoff duration
CloudProviderBackoffDuration int `json:"cloudProviderBackoffDuration" yaml:"cloudProviderBackoffDuration"`
// Backoff jitter
CloudProviderBackoffJitter float64 `json:"cloudProviderBackoffJitter" yaml:"cloudProviderBackoffJitter"`
// Enable rate limiting
CloudProviderRateLimit bool `json:"cloudProviderRateLimit" yaml:"cloudProviderRateLimit"`
// Rate limit QPS
CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS" yaml:"cloudProviderRateLimitQPS"`
// Rate limit Bucket Size
CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket" yaml:"cloudProviderRateLimitBucket"`
}
// Cloud holds the config and clients
@ -84,6 +111,8 @@ type Cloud struct {
SecurityGroupsClient network.SecurityGroupsClient
VirtualMachinesClient compute.VirtualMachinesClient
StorageAccountClient storage.AccountsClient
operationPollRateLimiter flowcontrol.RateLimiter
resourceRequestBackoff wait.Backoff
}
func init() {
@ -177,6 +206,54 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID)
az.StorageAccountClient.Authorizer = servicePrincipalToken
// Conditionally configure rate limits
if az.CloudProviderRateLimit {
// Assign rate limit defaults if no configuration was passed in
if az.CloudProviderRateLimitQPS == 0 {
az.CloudProviderRateLimitQPS = rateLimitQPSDefault
}
if az.CloudProviderRateLimitBucket == 0 {
az.CloudProviderRateLimitBucket = rateLimitBucketDefault
}
az.operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter(
az.CloudProviderRateLimitQPS,
az.CloudProviderRateLimitBucket)
glog.V(2).Infof("Azure cloudprovider using rate limit config: QPS=%d, bucket=%d",
az.CloudProviderRateLimitQPS,
az.CloudProviderRateLimitBucket)
} else {
// if rate limits are configured off, az.operationPollRateLimiter.Accept() is a no-op
az.operationPollRateLimiter = flowcontrol.NewFakeAlwaysRateLimiter()
}
// Conditionally configure resource request backoff
if az.CloudProviderBackoff {
// Assign backoff defaults if no configuration was passed in
if az.CloudProviderBackoffRetries == 0 {
az.CloudProviderBackoffRetries = backoffRetriesDefault
}
if az.CloudProviderBackoffExponent == 0 {
az.CloudProviderBackoffExponent = backoffExponentDefault
}
if az.CloudProviderBackoffDuration == 0 {
az.CloudProviderBackoffDuration = backoffDurationDefault
}
if az.CloudProviderBackoffJitter == 0 {
az.CloudProviderBackoffJitter = backoffJitterDefault
}
az.resourceRequestBackoff = wait.Backoff{
Steps: az.CloudProviderBackoffRetries,
Factor: az.CloudProviderBackoffExponent,
Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second,
Jitter: az.CloudProviderBackoffJitter,
}
glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f",
az.CloudProviderBackoffRetries,
az.CloudProviderBackoffExponent,
az.CloudProviderBackoffDuration,
az.CloudProviderBackoffJitter)
}
return &az, nil
}

View File

@ -0,0 +1,170 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"k8s.io/apimachinery/pkg/util/wait"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/go-autorest/autorest"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
)
// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, bool, error) {
var machine compute.VirtualMachine
var exists bool
err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
var retryErr error
machine, exists, retryErr = az.getVirtualMachine(name)
if retryErr != nil {
glog.Errorf("backoff: failure, will retry,err=%v", retryErr)
return false, nil
}
glog.V(2).Infof("backoff: success")
return true, nil
})
return machine, exists, err
}
// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
return processRetryResponse(resp, err)
})
}
// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry
func (az *Cloud) DeletePublicIPWithRetry(pipName string) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
return processRetryResponse(resp, err)
})
}
// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry
func (az *Cloud) DeleteLBWithRetry(lbName string) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdateRouteTableWithRetry invokes az.RouteTablesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdateRouteWithRetry invokes az.RoutesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
return processRetryResponse(resp, err)
})
}
// DeleteRouteWithRetry invokes az.RoutesClient.Delete with exponential backoff retry
func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
return processRetryResponse(resp, err)
})
}
// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error {
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
az.operationPollRateLimiter.Accept()
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
return processRetryResponse(resp, err)
})
}
// A wait.ConditionFunc function to deal with common HTTP backoff response conditions
func processRetryResponse(resp autorest.Response, err error) (bool, error) {
if isSuccessHTTPResponse(resp) {
glog.V(2).Infof("backoff: success, HTTP response=%d", resp.StatusCode)
return true, nil
}
if shouldRetryAPIRequest(resp, err) {
glog.Errorf("backoff: failure, will retry, HTTP response=%d, err=%v", resp.StatusCode, err)
// suppress the error object so that backoff process continues
return false, nil
}
// Fall-through: stop periodic backoff, return error object from most recent request
return true, err
}
// shouldRetryAPIRequest determines if the response from an HTTP request suggests periodic retry behavior
func shouldRetryAPIRequest(resp autorest.Response, err error) bool {
if err != nil {
return true
}
// HTTP 4xx or 5xx suggests we should retry
if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true
}
return false
}
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp autorest.Response) bool {
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true
}
return false
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
)
@ -31,6 +32,7 @@ import (
func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
ip, err := az.getIPForMachine(name)
if err != nil {
glog.Errorf("error: az.NodeAddresses, az.getIPForMachine(%s), err=%v", name, err)
return nil, err
}
@ -55,9 +57,22 @@ func (az *Cloud) ExternalID(name types.NodeName) (string, error) {
// InstanceID returns the cloud provider ID of the specified instance.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
func (az *Cloud) InstanceID(name types.NodeName) (string, error) {
machine, exists, err := az.getVirtualMachine(name)
var machine compute.VirtualMachine
var exists bool
var err error
az.operationPollRateLimiter.Accept()
machine, exists, err = az.getVirtualMachine(name)
if err != nil {
if az.CloudProviderBackoff {
glog.V(2).Infof("InstanceID(%s) backing off", name)
machine, exists, err = az.GetVirtualMachineWithRetry(name)
if err != nil {
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
return "", err
}
} else {
return "", err
}
} else if !exists {
return "", cloudprovider.InstanceNotFound
}
@ -78,6 +93,7 @@ func (az *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (az *Cloud) InstanceType(name types.NodeName) (string, error) {
machine, exists, err := az.getVirtualMachine(name)
if err != nil {
glog.Errorf("error: az.InstanceType(%s), az.getVirtualMachine(%s) err=%v", name, name, err)
return "", err
} else if !exists {
return "", cloudprovider.InstanceNotFound
@ -100,8 +116,10 @@ func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error) {
allNodes := []compute.VirtualMachine{}
az.operationPollRateLimiter.Accept()
result, err := az.VirtualMachinesClient.List(az.ResourceGroup)
if err != nil {
glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.List(%s), err=%v", az.ResourceGroup, err)
return nil, err
}
@ -110,8 +128,10 @@ func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error)
for morePages {
allNodes = append(allNodes, *result.Value...)
az.operationPollRateLimiter.Accept()
result, err = az.VirtualMachinesClient.ListAllNextResults(result)
if err != nil {
glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.ListAllNextResults(%v), err=%v", result, err)
return nil, err
}

View File

@ -92,6 +92,7 @@ func (az *Cloud) getPublicIPName(clusterName string, service *v1.Service) (strin
return fmt.Sprintf("%s-%s", clusterName, cloudprovider.GetLoadBalancerName(service)), nil
}
az.operationPollRateLimiter.Accept()
list, err := az.PublicIPAddressesClient.List(az.ResourceGroup)
if err != nil {
return "", err
@ -135,6 +136,7 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
serviceName := getServiceName(service)
glog.V(5).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
az.operationPollRateLimiter.Accept()
sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
if err != nil {
return nil, err
@ -149,7 +151,16 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
sg.SecurityGroupPropertiesFormat.Subnets = nil
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("ensure(%s) backing off: sg(%s) - updating", serviceName, *sg.Name)
retryErr := az.CreateOrUpdateSGWithRetry(sg)
if retryErr != nil {
glog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
return nil, retryErr
}
}
if err != nil {
return nil, err
}
@ -219,7 +230,16 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
}
if !existsLb || lbNeedsUpdate {
glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName)
_, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("ensure(%s) backing off: lb(%s) - updating", serviceName, lbName)
retryErr := az.CreateOrUpdateLBWithRetry(lb)
if retryErr != nil {
glog.V(2).Infof("ensure(%s) abort backoff: lb(%s) - updating", serviceName, lbName)
return nil, retryErr
}
}
if err != nil {
return nil, err
}
@ -310,7 +330,16 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
sg.SecurityGroupPropertiesFormat.Subnets = nil
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName)
retryErr := az.CreateOrUpdateSGWithRetry(reconciledSg)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName)
}
}
if err != nil {
return err
}
@ -339,14 +368,32 @@ func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, is
if lbNeedsUpdate {
if len(*lb.FrontendIPConfigurations) > 0 {
glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName)
_, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName)
retryErr := az.CreateOrUpdateLBWithRetry(lb)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName)
}
}
if err != nil {
return err
}
} else {
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
_, err = az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("delete(%s) backing off: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
retryErr := az.DeleteLBWithRetry(lbName)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
}
}
if err != nil {
return err
}
@ -392,11 +439,21 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub
pip.Tags = &map[string]*string{"service": &serviceName}
glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name)
_, err = az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("ensure(%s) backing off: pip(%s) - creating", serviceName, *pip.Name)
retryErr := az.CreateOrUpdatePIPWithRetry(pip)
if retryErr != nil {
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - creating", serviceName, *pip.Name)
err = retryErr
}
}
if err != nil {
return nil, err
}
az.operationPollRateLimiter.Accept()
pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, *pip.Name, "")
if err != nil {
return nil, err
@ -407,7 +464,17 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub
}
func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error {
_, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
glog.V(2).Infof("ensure(%s): pip(%s) - deleting", serviceName, pipName)
az.operationPollRateLimiter.Accept()
resp, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, deleteErr) {
glog.V(2).Infof("ensure(%s) backing off: pip(%s) - deleting", serviceName, pipName)
retryErr := az.DeletePublicIPWithRetry(pipName)
if retryErr != nil {
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - deleting", serviceName, pipName)
return retryErr
}
}
_, realErr := checkResourceExistsFromError(deleteErr)
if realErr != nil {
return nil
@ -792,6 +859,7 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b
// participating in the specified LoadBalancer Backend Pool.
func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string) error {
vmName := mapNodeNameToVMName(nodeName)
az.operationPollRateLimiter.Accept()
machine, err := az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "")
if err != nil {
return err
@ -817,6 +885,7 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b
}
}
az.operationPollRateLimiter.Accept()
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
if err != nil {
return err
@ -848,7 +917,16 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
_, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
retryErr := az.CreateOrUpdateInterfaceWithRetry(nic)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
}
}
if err != nil {
return err
}

View File

@ -66,6 +66,7 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
routeTable, existsRouteTable, err := az.getRouteTable()
if err != nil {
glog.V(2).Infof("create error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
return err
}
if !existsRouteTable {
@ -76,7 +77,16 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
}
glog.V(3).Infof("create: creating routetable. routeTableName=%q", az.RouteTableName)
_, err = az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("create backing off: creating routetable. routeTableName=%q", az.RouteTableName)
retryErr := az.CreateOrUpdateRouteTableWithRetry(routeTable)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("create abort backoff: creating routetable. routeTableName=%q", az.RouteTableName)
}
}
if err != nil {
return err
}
@ -103,7 +113,16 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
}
glog.V(3).Infof("create: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
_, err = az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("create backing off: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
retryErr := az.CreateOrUpdateRouteWithRetry(route)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("create abort backoff: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
}
}
if err != nil {
return err
}
@ -118,7 +137,16 @@ func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route)
glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
routeName := mapNodeNameToRouteName(kubeRoute.TargetNode)
_, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
az.operationPollRateLimiter.Accept()
resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("delete backing off: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
retryErr := az.DeleteRouteWithRetry(routeName)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("delete abort backoff: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
}
}
if err != nil {
return err
}

View File

@ -64,7 +64,17 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l
},
}
vmName := mapNodeNameToVMName(nodeName)
_, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
glog.V(2).Infof("create(%s): vm(%s)", az.ResourceGroup, vmName)
az.operationPollRateLimiter.Accept()
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("create(%s) backing off: vm(%s)", az.ResourceGroup, vmName)
retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("create(%s) abort backoff: vm(%s)", az.ResourceGroup, vmName)
}
}
if err != nil {
glog.Errorf("azure attach failed, err: %v", err)
detail := err.Error()
@ -135,7 +145,17 @@ func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeN
},
}
vmName := mapNodeNameToVMName(nodeName)
_, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
glog.V(2).Infof("create(%s): vm(%s)", az.ResourceGroup, vmName)
az.operationPollRateLimiter.Accept()
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
glog.V(2).Infof("create(%s) backing off: vm(%s)", az.ResourceGroup, vmName)
retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("create(%s) abort backoff: vm(%s)", az.ResourceGroup, vmName)
}
}
if err != nil {
glog.Errorf("azure disk detach failed, err: %v", err)
} else {

View File

@ -27,6 +27,7 @@ type accountWithLocation struct {
// getStorageAccounts gets the storage accounts' name, type, location in a resource group
func (az *Cloud) getStorageAccounts() ([]accountWithLocation, error) {
az.operationPollRateLimiter.Accept()
result, err := az.StorageAccountClient.ListByResourceGroup(az.ResourceGroup)
if err != nil {
return nil, err
@ -56,6 +57,7 @@ func (az *Cloud) getStorageAccounts() ([]accountWithLocation, error) {
// getStorageAccesskey gets the storage account access key
func (az *Cloud) getStorageAccesskey(account string) (string, error) {
az.operationPollRateLimiter.Accept()
result, err := az.StorageAccountClient.ListKeys(az.ResourceGroup, account)
if err != nil {
return "", err

View File

@ -591,11 +591,33 @@ func TestNewCloudFromJSON(t *testing.T) {
"securityGroupName": "--security-group-name--",
"vnetName": "--vnet-name--",
"routeTableName": "--route-table-name--",
"primaryAvailabilitySetName": "--primary-availability-set-name--"
"primaryAvailabilitySetName": "--primary-availability-set-name--",
"cloudProviderBackoff": true,
"cloudProviderBackoffRetries": 6,
"cloudProviderBackoffExponent": 1.5,
"cloudProviderBackoffDuration": 5,
"cloudProviderBackoffJitter": 1.0,
"cloudProviderRatelimit": true,
"cloudProviderRateLimitQPS": 0.5,
"cloudProviderRateLimitBucket": 5
}`
validateConfig(t, config)
}
// Test Backoff and Rate Limit defaults (json)
func TestCloudDefaultConfigFromJSON(t *testing.T) {
config := `{}`
validateEmptyConfig(t, config)
}
// Test Backoff and Rate Limit defaults (yaml)
func TestCloudDefaultConfigFromYAML(t *testing.T) {
config := ``
validateEmptyConfig(t, config)
}
// Test Configuration deserialization (yaml)
func TestNewCloudFromYAML(t *testing.T) {
config := `
@ -610,21 +632,20 @@ securityGroupName: --security-group-name--
vnetName: --vnet-name--
routeTableName: --route-table-name--
primaryAvailabilitySetName: --primary-availability-set-name--
cloudProviderBackoff: true
cloudProviderBackoffRetries: 6
cloudProviderBackoffExponent: 1.5
cloudProviderBackoffDuration: 5
cloudProviderBackoffJitter: 1.0
cloudProviderRatelimit: true
cloudProviderRateLimitQPS: 0.5
cloudProviderRateLimitBucket: 5
`
validateConfig(t, config)
}
func validateConfig(t *testing.T, config string) {
configReader := strings.NewReader(config)
cloud, err := NewCloud(configReader)
if err != nil {
t.Error(err)
}
azureCloud, ok := cloud.(*Cloud)
if !ok {
t.Error("NewCloud returned incorrect type")
}
azureCloud := getCloudFromConfig(t, config)
if azureCloud.TenantID != "--tenant-id--" {
t.Errorf("got incorrect value for TenantID")
@ -659,6 +680,58 @@ func validateConfig(t *testing.T, config string) {
if azureCloud.PrimaryAvailabilitySetName != "--primary-availability-set-name--" {
t.Errorf("got incorrect value for PrimaryAvailabilitySetName")
}
if azureCloud.CloudProviderBackoff != true {
t.Errorf("got incorrect value for CloudProviderBackoff")
}
if azureCloud.CloudProviderBackoffRetries != 6 {
t.Errorf("got incorrect value for CloudProviderBackoffRetries")
}
if azureCloud.CloudProviderBackoffExponent != 1.5 {
t.Errorf("got incorrect value for CloudProviderBackoffExponent")
}
if azureCloud.CloudProviderBackoffDuration != 5 {
t.Errorf("got incorrect value for CloudProviderBackoffDuration")
}
if azureCloud.CloudProviderBackoffJitter != 1.0 {
t.Errorf("got incorrect value for CloudProviderBackoffJitter")
}
if azureCloud.CloudProviderRateLimit != true {
t.Errorf("got incorrect value for CloudProviderRateLimit")
}
if azureCloud.CloudProviderRateLimitQPS != 0.5 {
t.Errorf("got incorrect value for CloudProviderRateLimitQPS")
}
if azureCloud.CloudProviderRateLimitBucket != 5 {
t.Errorf("got incorrect value for CloudProviderRateLimitBucket")
}
}
func getCloudFromConfig(t *testing.T, config string) *Cloud {
configReader := strings.NewReader(config)
cloud, err := NewCloud(configReader)
if err != nil {
t.Error(err)
}
azureCloud, ok := cloud.(*Cloud)
if !ok {
t.Error("NewCloud returned incorrect type")
}
return azureCloud
}
// TODO include checks for other appropriate default config parameters
func validateEmptyConfig(t *testing.T, config string) {
azureCloud := getCloudFromConfig(t, config)
// backoff should be disabled by default if not explicitly enabled in config
if azureCloud.CloudProviderBackoff != false {
t.Errorf("got incorrect value for CloudProviderBackoff")
}
// rate limits should be disabled by default if not explicitly enabled in config
if azureCloud.CloudProviderRateLimit != false {
t.Errorf("got incorrect value for CloudProviderRateLimit")
}
}
func TestDecodeInstanceInfo(t *testing.T) {

View File

@ -25,6 +25,7 @@ import (
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
)
@ -242,26 +243,32 @@ func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) {
return "", cloudprovider.InstanceNotFound
}
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err)
return "", err
}
nicID, err := getPrimaryInterfaceID(machine)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
return "", err
}
nicName, err := getLastSegment(nicID)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
return "", err
}
az.operationPollRateLimiter.Accept()
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err)
return "", err
}
ipConfig, err := getPrimaryIPConfig(nic)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
return "", err
}

View File

@ -43,6 +43,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM
var realErr error
vmName := string(nodeName)
az.operationPollRateLimiter.Accept()
vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "")
exists, realErr = checkResourceExistsFromError(err)
@ -60,6 +61,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM
func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) {
var realErr error
az.operationPollRateLimiter.Accept()
routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "")
exists, realErr = checkResourceExistsFromError(err)
@ -77,6 +79,7 @@ func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, er
func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err error) {
var realErr error
az.operationPollRateLimiter.Accept()
sg, err = az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
exists, realErr = checkResourceExistsFromError(err)
@ -94,6 +97,7 @@ func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err
func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) {
var realErr error
az.operationPollRateLimiter.Accept()
lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "")
exists, realErr = checkResourceExistsFromError(err)
@ -111,6 +115,7 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi
func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, exists bool, err error) {
var realErr error
az.operationPollRateLimiter.Accept()
pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, name, "")
exists, realErr = checkResourceExistsFromError(err)
@ -128,6 +133,7 @@ func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, e
func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet network.Subnet, exists bool, err error) {
var realErr error
az.operationPollRateLimiter.Accept()
subnet, err = az.SubnetsClient.Get(az.ResourceGroup, virtualNetworkName, subnetName, "")
exists, realErr = checkResourceExistsFromError(err)