From f200f9a1e8b54c11420183b5bd403cc430324c1e Mon Sep 17 00:00:00 2001 From: Jack Francis Date: Fri, 26 May 2017 14:13:41 -0700 Subject: [PATCH] Azure cloudprovider retry using flowcontrol An initial attempt at engaging exponential backoff for API error responses. Uses k8s.io/client-go/util/flowcontrol; implementation inspired by GCE cloudprovider backoff. --- pkg/cloudprovider/providers/azure/azure.go | 25 +-- .../providers/azure/azure_backoff.go | 149 ++++++++++++++++++ .../providers/azure/azure_loadbalancer.go | 61 ++++++- .../providers/azure/azure_routes.go | 24 ++- .../providers/azure/azure_storage.go | 16 +- 5 files changed, 252 insertions(+), 23 deletions(-) create mode 100644 pkg/cloudprovider/providers/azure/azure_backoff.go diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 7694693e261..5ba1fba24ef 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "time" + "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/version" @@ -74,16 +75,17 @@ type Config struct { // Cloud holds the config and clients type Cloud struct { Config - Environment azure.Environment - RoutesClient network.RoutesClient - SubnetsClient network.SubnetsClient - InterfacesClient network.InterfacesClient - RouteTablesClient network.RouteTablesClient - LoadBalancerClient network.LoadBalancersClient - PublicIPAddressesClient network.PublicIPAddressesClient - SecurityGroupsClient network.SecurityGroupsClient - VirtualMachinesClient compute.VirtualMachinesClient - StorageAccountClient storage.AccountsClient + Environment azure.Environment + RoutesClient network.RoutesClient + SubnetsClient network.SubnetsClient + InterfacesClient network.InterfacesClient + RouteTablesClient network.RouteTablesClient + LoadBalancerClient network.LoadBalancersClient + PublicIPAddressesClient network.PublicIPAddressesClient + SecurityGroupsClient network.SecurityGroupsClient + VirtualMachinesClient compute.VirtualMachinesClient + StorageAccountClient storage.AccountsClient + operationPollRateLimiter flowcontrol.RateLimiter } func init() { @@ -177,6 +179,9 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID) az.StorageAccountClient.Authorizer = servicePrincipalToken + // 1 qps, up to 5 burst when in flowcontrol; i.e., aggressive backoff enforcement + az.operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter(1, 5) + return &az, nil } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go new file mode 100644 index 00000000000..7847e63c19a --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -0,0 +1,149 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest" +) + +const ( + operationPollInterval = 3 * time.Second + operationPollTimeoutDuration = time.Hour +) + +// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + return processRetryResponse(resp, err) + }) +} + +// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry +func (az *Cloud) DeletePublicIPWithRetry(pipName string) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + return processRetryResponse(resp, err) + }) +} + +// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry +func (az *Cloud) DeleteLBWithRetry(lbName string) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateRouteTableWithRetry invokes az.RouteTablesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateRouteWithRetry invokes az.RoutesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + return processRetryResponse(resp, err) + }) +} + +// DeleteRouteWithRetry invokes az.RoutesClient.Delete with exponential backoff retry +func (az *Cloud) DeleteRouteWithRetry(routeName string) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + return processRetryResponse(resp, err) + }) +} + +// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry +func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error { + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + return processRetryResponse(resp, err) + }) +} + +// An in-progress convenience function to deal with common HTTP backoff response conditions +func processRetryResponse(resp autorest.Response, err error) (bool, error) { + if isSuccessHTTPResponse(resp) { + return true, nil + } + if shouldRetryAPIRequest(resp, err) { + return false, err + } + // TODO determine the complete set of short-circuit conditions + if err != nil { + return false, err + } + // Fall-through: stop periodic backoff, return error object from most recent request + return true, err +} + +// shouldRetryAPIRequest determines if the response from an HTTP request suggests periodic retry behavior +func shouldRetryAPIRequest(resp autorest.Response, err error) bool { + // TODO determine the complete set of retry conditions + if err != nil { + return true + } + if resp.StatusCode == 429 || resp.StatusCode == 500 { + return true + } + return false +} + +// isSuccessHTTPResponse determines if the response from an HTTP request suggests success +func isSuccessHTTPResponse(resp autorest.Response) bool { + // TODO determine the complete set of success conditions + if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 202 { + return true + } + return false +} diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 940723d2a46..a7cbeb742a5 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -149,7 +149,13 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod // to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil sg.SecurityGroupPropertiesFormat.Subnets = nil - _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateSGWithRetry(sg) + if retryErr != nil { + return nil, retryErr + } + } if err != nil { return nil, err } @@ -219,7 +225,13 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod } if !existsLb || lbNeedsUpdate { glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName) - _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateLBWithRetry(lb) + if retryErr != nil { + return nil, retryErr + } + } if err != nil { return nil, err } @@ -310,7 +322,13 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi // to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil sg.SecurityGroupPropertiesFormat.Subnets = nil - _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil) + resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateSGWithRetry(reconciledSg) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } @@ -339,14 +357,26 @@ func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, is if lbNeedsUpdate { if len(*lb.FrontendIPConfigurations) > 0 { glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName) - _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateLBWithRetry(lb) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } } else { glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) - _, err = az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.DeleteLBWithRetry(lbName) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } @@ -392,7 +422,13 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub pip.Tags = &map[string]*string{"service": &serviceName} glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name) - _, err = az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdatePIPWithRetry(pip) + if retryErr != nil { + return nil, retryErr + } + } if err != nil { return nil, err } @@ -407,7 +443,10 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub } func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error { - _, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + resp, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + if shouldRetryAPIRequest(resp, deleteErr) { + deleteErr = az.DeletePublicIPWithRetry(pipName) + } _, realErr := checkResourceExistsFromError(deleteErr) if realErr != nil { return nil @@ -848,7 +887,13 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) - _, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateInterfaceWithRetry(nic) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index aab962f7088..ee8832d606c 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -76,7 +76,13 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo } glog.V(3).Infof("create: creating routetable. routeTableName=%q", az.RouteTableName) - _, err = az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateRouteTableWithRetry(routeTable) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } @@ -103,7 +109,13 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo } glog.V(3).Infof("create: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR) - _, err = az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateRouteWithRetry(route) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } @@ -118,7 +130,13 @@ func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route) glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) routeName := mapNodeNameToRouteName(kubeRoute.TargetNode) - _, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.DeleteRouteWithRetry(routeName) + if retryErr != nil { + return retryErr + } + } if err != nil { return err } diff --git a/pkg/cloudprovider/providers/azure/azure_storage.go b/pkg/cloudprovider/providers/azure/azure_storage.go index abaae7c4019..5a7a6cb2130 100644 --- a/pkg/cloudprovider/providers/azure/azure_storage.go +++ b/pkg/cloudprovider/providers/azure/azure_storage.go @@ -64,7 +64,13 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l }, } vmName := mapNodeNameToVMName(nodeName) - _, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM) + if retryErr != nil { + return retryErr + } + } if err != nil { glog.Errorf("azure attach failed, err: %v", err) detail := err.Error() @@ -135,7 +141,13 @@ func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeN }, } vmName := mapNodeNameToVMName(nodeName) - _, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil) + if shouldRetryAPIRequest(resp, err) { + retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM) + if retryErr != nil { + return retryErr + } + } if err != nil { glog.Errorf("azure disk detach failed, err: %v", err) } else {