diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD index 93284148831..5670a855675 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD @@ -25,6 +25,7 @@ go_library( "azure_loadbalancer.go", "azure_managedDiskController.go", "azure_metrics.go", + "azure_ratelimit.go", "azure_routes.go", "azure_standard.go", "azure_storage.go", @@ -93,6 +94,7 @@ go_test( "azure_instances_test.go", "azure_loadbalancer_test.go", "azure_metrics_test.go", + "azure_ratelimit_test.go", "azure_routes_test.go", "azure_standard_test.go", "azure_storage_test.go", diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go index 988f9af9df9..dbcd998710f 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go @@ -42,7 +42,6 @@ import ( "k8s.io/client-go/pkg/version" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" "k8s.io/legacy-cloud-providers/azure/auth" @@ -101,6 +100,7 @@ var ( // for more details. type Config struct { auth.AzureAuthConfig + CloudProviderRateLimitConfig // The name of the resource group that the cluster is deployed in ResourceGroup string `json:"resourceGroup,omitempty" yaml:"resourceGroup,omitempty"` @@ -149,17 +149,6 @@ type Config struct { // CloudProviderBackoffJitter are omitted. // "default" will be used if not specified. CloudProviderBackoffMode string `json:"cloudProviderBackoffMode,omitempty" yaml:"cloudProviderBackoffMode,omitempty"` - // Enable rate limiting - CloudProviderRateLimit bool `json:"cloudProviderRateLimit,omitempty" yaml:"cloudProviderRateLimit,omitempty"` - // Rate limit QPS (Read) - CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS,omitempty" yaml:"cloudProviderRateLimitQPS,omitempty"` - // Rate limit Bucket Size - CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket,omitempty" yaml:"cloudProviderRateLimitBucket,omitempty"` - // Rate limit QPS (Write) - CloudProviderRateLimitQPSWrite float32 `json:"cloudProviderRateLimitQPSWrite,omitempty" yaml:"cloudProviderRateLimitQPSWrite,omitempty"` - // Rate limit Bucket Size - CloudProviderRateLimitBucketWrite int `json:"cloudProviderRateLimitBucketWrite,omitempty" yaml:"cloudProviderRateLimitBucketWrite,omitempty"` - // Use instance metadata service where possible UseInstanceMetadata bool `json:"useInstanceMetadata,omitempty" yaml:"useInstanceMetadata,omitempty"` @@ -220,22 +209,27 @@ var _ cloudprovider.PVLabeler = (*Cloud)(nil) // Cloud holds the config and clients type Cloud struct { Config - Environment azure.Environment - RoutesClient RoutesClient - SubnetsClient SubnetsClient - InterfacesClient InterfacesClient - RouteTablesClient RouteTablesClient - LoadBalancerClient LoadBalancersClient - PublicIPAddressesClient PublicIPAddressesClient - SecurityGroupsClient SecurityGroupsClient - VirtualMachinesClient VirtualMachinesClient - StorageAccountClient StorageAccountClient - DisksClient DisksClient - SnapshotsClient *compute.SnapshotsClient - FileClient FileClient - ResourceRequestBackoff wait.Backoff - metadata *InstanceMetadataService - vmSet VMSet + Environment azure.Environment + + RoutesClient RoutesClient + SubnetsClient SubnetsClient + InterfacesClient InterfacesClient + RouteTablesClient RouteTablesClient + LoadBalancerClient LoadBalancersClient + PublicIPAddressesClient PublicIPAddressesClient + SecurityGroupsClient SecurityGroupsClient + VirtualMachinesClient VirtualMachinesClient + StorageAccountClient StorageAccountClient + DisksClient DisksClient + SnapshotsClient *compute.SnapshotsClient + FileClient FileClient + VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient + VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient + VirtualMachineSizesClient VirtualMachineSizesClient + + ResourceRequestBackoff wait.Backoff + metadata *InstanceMetadataService + vmSet VMSet // ipv6DualStack allows overriding for unit testing. It's normally initialized from featuregates ipv6DualStackEnabled bool @@ -256,13 +250,6 @@ type Cloud struct { // routeCIDRs holds cache for route CIDRs. routeCIDRs map[string]string - // Clients for vmss. - VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient - VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient - - // client for vm sizes list - VirtualMachineSizesClient VirtualMachineSizesClient - kubeClient clientset.Interface eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder @@ -385,43 +372,8 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro return err } - // operationPollRateLimiter.Accept() is a no-op if rate limits are configured off. - operationPollRateLimiter := flowcontrol.NewFakeAlwaysRateLimiter() - operationPollRateLimiterWrite := flowcontrol.NewFakeAlwaysRateLimiter() - - // If reader is provided (and no writer) we will - // use the same value for both. - if config.CloudProviderRateLimit { - // Assign rate limit defaults if no configuration was passed in - if config.CloudProviderRateLimitQPS == 0 { - config.CloudProviderRateLimitQPS = rateLimitQPSDefault - } - if config.CloudProviderRateLimitBucket == 0 { - config.CloudProviderRateLimitBucket = rateLimitBucketDefault - } - if config.CloudProviderRateLimitQPSWrite == 0 { - config.CloudProviderRateLimitQPSWrite = rateLimitQPSDefault - } - if config.CloudProviderRateLimitBucketWrite == 0 { - config.CloudProviderRateLimitBucketWrite = rateLimitBucketDefault - } - - operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter( - config.CloudProviderRateLimitQPS, - config.CloudProviderRateLimitBucket) - - operationPollRateLimiterWrite = flowcontrol.NewTokenBucketRateLimiter( - config.CloudProviderRateLimitQPSWrite, - config.CloudProviderRateLimitBucketWrite) - - klog.V(2).Infof("Azure cloudprovider (read ops) using rate limit config: QPS=%g, bucket=%d", - config.CloudProviderRateLimitQPS, - config.CloudProviderRateLimitBucket) - - klog.V(2).Infof("Azure cloudprovider (write ops) using rate limit config: QPS=%g, bucket=%d", - config.CloudProviderRateLimitQPSWrite, - config.CloudProviderRateLimitBucketWrite) - } + // Initialize rate limiting config options. + InitializeCloudProviderRateLimitConfig(&config.CloudProviderRateLimitConfig) // Conditionally configure resource request backoff resourceRequestBackoff := wait.Backoff{ @@ -500,26 +452,25 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro subscriptionID: config.SubscriptionID, resourceManagerEndpoint: env.ResourceManagerEndpoint, servicePrincipalToken: servicePrincipalToken, - rateLimiterReader: operationPollRateLimiter, - rateLimiterWriter: operationPollRateLimiterWrite, CloudProviderBackoffRetries: config.CloudProviderBackoffRetries, CloudProviderBackoffDuration: config.CloudProviderBackoffDuration, ShouldOmitCloudProviderBackoff: config.shouldOmitCloudProviderBackoff(), } - az.DisksClient = newAzDisksClient(azClientConfig) - az.SnapshotsClient = newSnapshotsClient(azClientConfig) - az.RoutesClient = newAzRoutesClient(azClientConfig) - az.SubnetsClient = newAzSubnetsClient(azClientConfig) - az.InterfacesClient = newAzInterfacesClient(azClientConfig) - az.RouteTablesClient = newAzRouteTablesClient(azClientConfig) - az.LoadBalancerClient = newAzLoadBalancersClient(azClientConfig) - az.SecurityGroupsClient = newAzSecurityGroupsClient(azClientConfig) - az.StorageAccountClient = newAzStorageAccountClient(azClientConfig) - az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig) - az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig) - az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig) - az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig) - az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig) + az.DisksClient = newAzDisksClient(azClientConfig.WithRateLimiter(config.DiskRateLimit)) + az.SnapshotsClient = newSnapshotsClient(azClientConfig.WithRateLimiter(config.SnapshotRateLimit)) + az.RoutesClient = newAzRoutesClient(azClientConfig.WithRateLimiter(config.RouteRateLimit)) + az.SubnetsClient = newAzSubnetsClient(azClientConfig.WithRateLimiter(config.SubnetsRateLimit)) + az.InterfacesClient = newAzInterfacesClient(azClientConfig.WithRateLimiter(config.InterfaceRateLimit)) + az.RouteTablesClient = newAzRouteTablesClient(azClientConfig.WithRateLimiter(config.RouteTableRateLimit)) + az.LoadBalancerClient = newAzLoadBalancersClient(azClientConfig.WithRateLimiter(config.LoadBalancerRateLimit)) + az.SecurityGroupsClient = newAzSecurityGroupsClient(azClientConfig.WithRateLimiter(config.SecurityGroupRateLimit)) + az.StorageAccountClient = newAzStorageAccountClient(azClientConfig.WithRateLimiter(config.StorageAccountRateLimit)) + az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig.WithRateLimiter(config.VirtualMachineRateLimit)) + az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig.WithRateLimiter(config.PublicIPAddressRateLimit)) + az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig.WithRateLimiter(config.VirtualMachineSizeRateLimit)) + az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)) + az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit)) + // TODO(feiskyer): refactor azureFileClient to Interface. az.FileClient = &azureFileClient{env: *env} if az.MaximumLoadBalancerRuleCount == 0 { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go index b165f8a0c2b..77e250b1b4e 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go @@ -149,16 +149,19 @@ type azClientConfig struct { subscriptionID string resourceManagerEndpoint string servicePrincipalToken *adal.ServicePrincipalToken - // ARM Rate limiting for GET vs PUT/POST - //Details: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-request-limits - rateLimiterReader flowcontrol.RateLimiter - rateLimiterWriter flowcontrol.RateLimiter + rateLimitConfig *RateLimitConfig CloudProviderBackoffRetries int CloudProviderBackoffDuration int ShouldOmitCloudProviderBackoff bool } +// WithRateLimiter returns azClientConfig with rateLimitConfig set. +func (cfg *azClientConfig) WithRateLimiter(rl *RateLimitConfig) *azClientConfig { + cfg.rateLimitConfig = rl + return cfg +} + // azVirtualMachinesClient implements VirtualMachinesClient. type azVirtualMachinesClient struct { client compute.VirtualMachinesClient @@ -181,9 +184,16 @@ func newAzVirtualMachinesClient(config *azClientConfig) *azVirtualMachinesClient } configureUserAgent(&virtualMachinesClient.Client) + klog.V(2).Infof("Azure VirtualMachinesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure VirtualMachinesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azVirtualMachinesClient{ - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, client: virtualMachinesClient, } } @@ -303,9 +313,16 @@ func newAzInterfacesClient(config *azClientConfig) *azInterfacesClient { } configureUserAgent(&interfacesClient.Client) + klog.V(2).Infof("Azure InterfacesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure InterfacesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azInterfacesClient{ - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, client: interfacesClient, } } @@ -387,9 +404,16 @@ func newAzLoadBalancersClient(config *azClientConfig) *azLoadBalancersClient { } configureUserAgent(&loadBalancerClient.Client) + klog.V(2).Infof("Azure LoadBalancersClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure LoadBalancersClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azLoadBalancersClient{ - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, client: loadBalancerClient, } } @@ -539,9 +563,16 @@ func newAzPublicIPAddressesClient(config *azClientConfig) *azPublicIPAddressesCl } configureUserAgent(&publicIPAddressClient.Client) + klog.V(2).Infof("Azure PublicIPAddressesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure PublicIPAddressesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azPublicIPAddressesClient{ - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, client: publicIPAddressClient, } } @@ -676,10 +707,17 @@ func newAzSubnetsClient(config *azClientConfig) *azSubnetsClient { } configureUserAgent(&subnetsClient.Client) + klog.V(2).Infof("Azure SubnetsClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure SubnetsClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azSubnetsClient{ client: subnetsClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -795,10 +833,17 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient { } configureUserAgent(&securityGroupsClient.Client) + klog.V(2).Infof("Azure SecurityGroupsClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure SecurityGroupsClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azSecurityGroupsClient{ client: securityGroupsClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -946,10 +991,17 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin } configureUserAgent(&virtualMachineScaleSetsClient.Client) + klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azVirtualMachineScaleSetsClient{ client: virtualMachineScaleSetsClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1043,10 +1095,17 @@ func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMach } configureUserAgent(&virtualMachineScaleSetVMsClient.Client) + klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azVirtualMachineScaleSetVMsClient{ client: virtualMachineScaleSetVMsClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1139,10 +1198,17 @@ func newAzRoutesClient(config *azClientConfig) *azRoutesClient { } configureUserAgent(&routesClient.Client) + klog.V(2).Infof("Azure RoutesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure RoutesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azRoutesClient{ client: routesClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1245,10 +1311,17 @@ func newAzRouteTablesClient(config *azClientConfig) *azRouteTablesClient { } configureUserAgent(&routeTablesClient.Client) + klog.V(2).Infof("Azure RouteTablesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure RouteTablesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azRouteTablesClient{ client: routeTablesClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1342,10 +1415,17 @@ func newAzStorageAccountClient(config *azClientConfig) *azStorageAccountClient { } configureUserAgent(&storageAccountClient.Client) + klog.V(2).Infof("Azure StorageAccountClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure StorageAccountClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azStorageAccountClient{ client: storageAccountClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1462,10 +1542,17 @@ func newAzDisksClient(config *azClientConfig) *azDisksClient { } configureUserAgent(&disksClient.Client) + klog.V(2).Infof("Azure DisksClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure DisksClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azDisksClient{ client: disksClient, - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, } } @@ -1532,6 +1619,7 @@ func (az *azDisksClient) Get(ctx context.Context, resourceGroupName string, disk return } +// TODO(feiskyer): refactor compute.SnapshotsClient to Interface. func newSnapshotsClient(config *azClientConfig) *compute.SnapshotsClient { snapshotsClient := compute.NewSnapshotsClientWithBaseURI(config.resourceManagerEndpoint, config.subscriptionID) snapshotsClient.Authorizer = autorest.NewBearerAuthorizer(config.servicePrincipalToken) @@ -1562,9 +1650,16 @@ func newAzVirtualMachineSizesClient(config *azClientConfig) *azVirtualMachineSiz } configureUserAgent(&VirtualMachineSizesClient.Client) + klog.V(2).Infof("Azure VirtualMachineSizesClient (read ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPS, + config.rateLimitConfig.CloudProviderRateLimitBucket) + klog.V(2).Infof("Azure VirtualMachineSizesClient (write ops) using rate limit config: QPS=%g, bucket=%d", + config.rateLimitConfig.CloudProviderRateLimitQPSWrite, + config.rateLimitConfig.CloudProviderRateLimitBucketWrite) + rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig) return &azVirtualMachineSizesClient{ - rateLimiterReader: config.rateLimiterReader, - rateLimiterWriter: config.rateLimiterWriter, + rateLimiterReader: rateLimiterReader, + rateLimiterWriter: rateLimiterWriter, client: VirtualMachineSizesClient, } } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit.go new file mode 100644 index 00000000000..bae3dee59bc --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit.go @@ -0,0 +1,146 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "k8s.io/client-go/util/flowcontrol" +) + +// RateLimitConfig indicates the rate limit config options. +type RateLimitConfig struct { + // Enable rate limiting + CloudProviderRateLimit bool `json:"cloudProviderRateLimit,omitempty" yaml:"cloudProviderRateLimit,omitempty"` + // Rate limit QPS (Read) + CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS,omitempty" yaml:"cloudProviderRateLimitQPS,omitempty"` + // Rate limit Bucket Size + CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket,omitempty" yaml:"cloudProviderRateLimitBucket,omitempty"` + // Rate limit QPS (Write) + CloudProviderRateLimitQPSWrite float32 `json:"cloudProviderRateLimitQPSWrite,omitempty" yaml:"cloudProviderRateLimitQPSWrite,omitempty"` + // Rate limit Bucket Size + CloudProviderRateLimitBucketWrite int `json:"cloudProviderRateLimitBucketWrite,omitempty" yaml:"cloudProviderRateLimitBucketWrite,omitempty"` +} + +// CloudProviderRateLimitConfig indicates the rate limit config for each clients. +type CloudProviderRateLimitConfig struct { + // The default rate limit config options. + RateLimitConfig + + // Rate limit config for each clients. Values would override default settings above. + RouteRateLimit *RateLimitConfig `json:"routeRateLimit,omitempty" yaml:"routeRateLimit,omitempty"` + SubnetsRateLimit *RateLimitConfig `json:"subnetsRateLimit,omitempty" yaml:"subnetsRateLimit,omitempty"` + InterfaceRateLimit *RateLimitConfig `json:"interfaceRateLimit,omitempty" yaml:"interfaceRateLimit,omitempty"` + RouteTableRateLimit *RateLimitConfig `json:"routeTableRateLimit,omitempty" yaml:"routeTableRateLimit,omitempty"` + LoadBalancerRateLimit *RateLimitConfig `json:"loadBalancerRateLimit,omitempty" yaml:"loadBalancerRateLimit,omitempty"` + PublicIPAddressRateLimit *RateLimitConfig `json:"publicIPAddressRateLimit,omitempty" yaml:"publicIPAddressRateLimit,omitempty"` + SecurityGroupRateLimit *RateLimitConfig `json:"securityGroupRateLimit,omitempty" yaml:"securityGroupRateLimit,omitempty"` + VirtualMachineRateLimit *RateLimitConfig `json:"virtualMachineRateLimit,omitempty" yaml:"virtualMachineRateLimit,omitempty"` + StorageAccountRateLimit *RateLimitConfig `json:"storageAccountRateLimit,omitempty" yaml:"storageAccountRateLimit,omitempty"` + DiskRateLimit *RateLimitConfig `json:"diskRateLimit,omitempty" yaml:"diskRateLimit,omitempty"` + SnapshotRateLimit *RateLimitConfig `json:"snapshotRateLimit,omitempty" yaml:"snapshotRateLimit,omitempty"` + VirtualMachineScaleSetRateLimit *RateLimitConfig `json:"virtualMachineScaleSetRateLimit,omitempty" yaml:"virtualMachineScaleSetRateLimit,omitempty"` + VirtualMachineSizeRateLimit *RateLimitConfig `json:"virtualMachineSizesRateLimit,omitempty" yaml:"virtualMachineSizesRateLimit,omitempty"` +} + +// InitializeCloudProviderRateLimitConfig initializes rate limit configs. +func InitializeCloudProviderRateLimitConfig(config *CloudProviderRateLimitConfig) { + if config == nil { + return + } + + // Assign read rate limit defaults if no configuration was passed in. + if config.CloudProviderRateLimitQPS == 0 { + config.CloudProviderRateLimitQPS = rateLimitQPSDefault + } + if config.CloudProviderRateLimitBucket == 0 { + config.CloudProviderRateLimitBucket = rateLimitBucketDefault + } + // Assing write rate limit defaults if no configuration was passed in. + if config.CloudProviderRateLimitQPSWrite == 0 { + config.CloudProviderRateLimitQPSWrite = config.CloudProviderRateLimitQPS + } + if config.CloudProviderRateLimitBucketWrite == 0 { + config.CloudProviderRateLimitBucketWrite = config.CloudProviderRateLimitBucket + } + + config.RouteRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.RouteRateLimit) + config.SubnetsRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SubnetsRateLimit) + config.InterfaceRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.InterfaceRateLimit) + config.RouteTableRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.RouteTableRateLimit) + config.LoadBalancerRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.LoadBalancerRateLimit) + config.PublicIPAddressRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.PublicIPAddressRateLimit) + config.SecurityGroupRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SecurityGroupRateLimit) + config.VirtualMachineRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineRateLimit) + config.StorageAccountRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.StorageAccountRateLimit) + config.DiskRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.DiskRateLimit) + config.SnapshotRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SnapshotRateLimit) + config.VirtualMachineScaleSetRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineScaleSetRateLimit) + config.VirtualMachineSizeRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineSizeRateLimit) +} + +// overrideDefaultRateLimitConfig overrides the default CloudProviderRateLimitConfig. +func overrideDefaultRateLimitConfig(defaults, config *RateLimitConfig) *RateLimitConfig { + // If config not set, apply defaults. + if config == nil { + return defaults + } + + // Remain disabled if it's set explicitly. + if !config.CloudProviderRateLimit { + return &RateLimitConfig{CloudProviderRateLimit: false} + } + + // Apply default values. + if config.CloudProviderRateLimitQPS == 0 { + config.CloudProviderRateLimitQPS = defaults.CloudProviderRateLimitQPS + } + if config.CloudProviderRateLimitBucket == 0 { + config.CloudProviderRateLimitBucket = defaults.CloudProviderRateLimitBucket + } + if config.CloudProviderRateLimitQPSWrite == 0 { + config.CloudProviderRateLimitQPSWrite = defaults.CloudProviderRateLimitQPSWrite + } + if config.CloudProviderRateLimitBucketWrite == 0 { + config.CloudProviderRateLimitBucketWrite = defaults.CloudProviderRateLimitBucketWrite + } + + return config +} + +// RateLimitEnabled returns true if CloudProviderRateLimit is set to true. +func RateLimitEnabled(config *RateLimitConfig) bool { + return config != nil && config.CloudProviderRateLimit +} + +// NewRateLimiter creates new read and write flowcontrol.RateLimiter from RateLimitConfig. +func NewRateLimiter(config *RateLimitConfig) (flowcontrol.RateLimiter, flowcontrol.RateLimiter) { + readLimiter := flowcontrol.NewFakeAlwaysRateLimiter() + writeLimiter := flowcontrol.NewFakeAlwaysRateLimiter() + + if config != nil && config.CloudProviderRateLimit { + readLimiter = flowcontrol.NewTokenBucketRateLimiter( + config.CloudProviderRateLimitQPS, + config.CloudProviderRateLimitBucket) + + writeLimiter = flowcontrol.NewTokenBucketRateLimiter( + config.CloudProviderRateLimitQPSWrite, + config.CloudProviderRateLimitBucketWrite) + } + + return readLimiter, writeLimiter +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit_test.go new file mode 100644 index 00000000000..9601d45f4e6 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_ratelimit_test.go @@ -0,0 +1,172 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/legacy-cloud-providers/azure/auth" +) + +var ( + testAzureConfig = `{ + "aadClientCertPassword": "aadClientCertPassword", + "aadClientCertPath": "aadClientCertPath", + "aadClientId": "aadClientId", + "aadClientSecret": "aadClientSecret", + "cloud":"AzurePublicCloud", + "cloudProviderBackoff": true, + "cloudProviderBackoffDuration": 1, + "cloudProviderBackoffExponent": 1, + "cloudProviderBackoffJitter": 1, + "cloudProviderBackoffRetries": 1, + "cloudProviderRatelimit": true, + "cloudProviderRateLimitBucket": 1, + "cloudProviderRateLimitBucketWrite": 1, + "cloudProviderRateLimitQPS": 1, + "cloudProviderRateLimitQPSWrite": 1, + "virtualMachineScaleSetRateLimit": { + "cloudProviderRatelimit": true, + "cloudProviderRateLimitBucket": 2, + "CloudProviderRateLimitBucketWrite": 2, + "cloudProviderRateLimitQPS": 0, + "CloudProviderRateLimitQPSWrite": 0 + }, + "loadBalancerRateLimit": { + "cloudProviderRatelimit": false, + }, + "availabilitySetNodesCacheTTLInSeconds": 100, + "vmssCacheTTLInSeconds": 100, + "vmssVirtualMachinesCacheTTLInSeconds": 100, + "vmCacheTTLInSeconds": 100, + "loadBalancerCacheTTLInSeconds": 100, + "nsgCacheTTLInSeconds": 100, + "routeTableCacheTTLInSeconds": 100, + "location": "location", + "maximumLoadBalancerRuleCount": 1, + "primaryAvailabilitySetName": "primaryAvailabilitySetName", + "primaryScaleSetName": "primaryScaleSetName", + "resourceGroup": "resourceGroup", + "routeTableName": "routeTableName", + "routeTableResourceGroup": "routeTableResourceGroup", + "securityGroupName": "securityGroupName", + "subnetName": "subnetName", + "subscriptionId": "subscriptionId", + "tenantId": "tenantId", + "useInstanceMetadata": true, + "useManagedIdentityExtension": true, + "vnetName": "vnetName", + "vnetResourceGroup": "vnetResourceGroup", + vmType: "standard" + }` + + testDefaultRateLimitConfig = RateLimitConfig{ + CloudProviderRateLimit: true, + CloudProviderRateLimitBucket: 1, + CloudProviderRateLimitBucketWrite: 1, + CloudProviderRateLimitQPS: 1, + CloudProviderRateLimitQPSWrite: 1, + } +) + +func TestParseConfig(t *testing.T) { + expected := &Config{ + AzureAuthConfig: auth.AzureAuthConfig{ + AADClientCertPassword: "aadClientCertPassword", + AADClientCertPath: "aadClientCertPath", + AADClientID: "aadClientId", + AADClientSecret: "aadClientSecret", + Cloud: "AzurePublicCloud", + SubscriptionID: "subscriptionId", + TenantID: "tenantId", + UseManagedIdentityExtension: true, + }, + CloudProviderBackoff: true, + CloudProviderBackoffDuration: 1, + CloudProviderBackoffExponent: 1, + CloudProviderBackoffJitter: 1, + CloudProviderBackoffRetries: 1, + CloudProviderRateLimitConfig: CloudProviderRateLimitConfig{ + RateLimitConfig: testDefaultRateLimitConfig, + LoadBalancerRateLimit: &RateLimitConfig{ + CloudProviderRateLimit: false, + }, + VirtualMachineScaleSetRateLimit: &RateLimitConfig{ + CloudProviderRateLimit: true, + CloudProviderRateLimitBucket: 2, + CloudProviderRateLimitBucketWrite: 2, + }, + }, + AvailabilitySetNodesCacheTTLInSeconds: 100, + VmssCacheTTLInSeconds: 100, + VmssVirtualMachinesCacheTTLInSeconds: 100, + VMCacheTTLInSeconds: 100, + LoadBalancerCacheTTLInSeconds: 100, + NsgCacheTTLInSeconds: 100, + RouteTableCacheTTLInSeconds: 100, + Location: "location", + MaximumLoadBalancerRuleCount: 1, + PrimaryAvailabilitySetName: "primaryAvailabilitySetName", + PrimaryScaleSetName: "primaryScaleSetName", + ResourceGroup: "resourcegroup", + RouteTableName: "routeTableName", + RouteTableResourceGroup: "routeTableResourceGroup", + SecurityGroupName: "securityGroupName", + SubnetName: "subnetName", + UseInstanceMetadata: true, + VMType: "standard", + VnetName: "vnetName", + VnetResourceGroup: "vnetResourceGroup", + } + + buffer := bytes.NewBufferString(testAzureConfig) + config, err := parseConfig(buffer) + assert.NoError(t, err) + assert.Equal(t, expected, config) +} + +func TestInitializeCloudProviderRateLimitConfig(t *testing.T) { + buffer := bytes.NewBufferString(testAzureConfig) + config, err := parseConfig(buffer) + assert.NoError(t, err) + + InitializeCloudProviderRateLimitConfig(&config.CloudProviderRateLimitConfig) + assert.Equal(t, config.LoadBalancerRateLimit, &RateLimitConfig{ + CloudProviderRateLimit: false, + }) + assert.Equal(t, config.VirtualMachineScaleSetRateLimit, &RateLimitConfig{ + CloudProviderRateLimit: true, + CloudProviderRateLimitBucket: 2, + CloudProviderRateLimitBucketWrite: 2, + CloudProviderRateLimitQPS: 1, + CloudProviderRateLimitQPSWrite: 1, + }) + assert.Equal(t, config.VirtualMachineSizeRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.VirtualMachineRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.RouteRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.SubnetsRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.InterfaceRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.RouteTableRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.SecurityGroupRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.StorageAccountRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.DiskRateLimit, &testDefaultRateLimitConfig) + assert.Equal(t, config.SnapshotRateLimit, &testDefaultRateLimitConfig) +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go index a91e9915e94..88408000a3d 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go @@ -19,7 +19,6 @@ limitations under the License. package azure import ( - "bytes" "context" "fmt" "math" @@ -42,96 +41,6 @@ import ( var testClusterName = "testCluster" -func TestParseConfig(t *testing.T) { - azureConfig := `{ - "aadClientCertPassword": "aadClientCertPassword", - "aadClientCertPath": "aadClientCertPath", - "aadClientId": "aadClientId", - "aadClientSecret": "aadClientSecret", - "cloud":"AzurePublicCloud", - "cloudProviderBackoff": true, - "cloudProviderBackoffDuration": 1, - "cloudProviderBackoffExponent": 1, - "cloudProviderBackoffJitter": 1, - "cloudProviderBackoffRetries": 1, - "cloudProviderRatelimit": true, - "cloudProviderRateLimitBucket": 1, - "CloudProviderRateLimitBucketWrite": 1, - "cloudProviderRateLimitQPS": 1, - "CloudProviderRateLimitQPSWrite": 1, - "availabilitySetNodesCacheTTLInSeconds": 100, - "vmssCacheTTLInSeconds": 100, - "vmssVirtualMachinesCacheTTLInSeconds": 100, - "vmCacheTTLInSeconds": 100, - "loadBalancerCacheTTLInSeconds": 100, - "nsgCacheTTLInSeconds": 100, - "routeTableCacheTTLInSeconds": 100, - "location": "location", - "maximumLoadBalancerRuleCount": 1, - "primaryAvailabilitySetName": "primaryAvailabilitySetName", - "primaryScaleSetName": "primaryScaleSetName", - "resourceGroup": "resourceGroup", - "routeTableName": "routeTableName", - "routeTableResourceGroup": "routeTableResourceGroup", - "securityGroupName": "securityGroupName", - "subnetName": "subnetName", - "subscriptionId": "subscriptionId", - "tenantId": "tenantId", - "useInstanceMetadata": true, - "useManagedIdentityExtension": true, - "vnetName": "vnetName", - "vnetResourceGroup": "vnetResourceGroup", - vmType: "standard" - }` - expected := &Config{ - AzureAuthConfig: auth.AzureAuthConfig{ - AADClientCertPassword: "aadClientCertPassword", - AADClientCertPath: "aadClientCertPath", - AADClientID: "aadClientId", - AADClientSecret: "aadClientSecret", - Cloud: "AzurePublicCloud", - SubscriptionID: "subscriptionId", - TenantID: "tenantId", - UseManagedIdentityExtension: true, - }, - CloudProviderBackoff: true, - CloudProviderBackoffDuration: 1, - CloudProviderBackoffExponent: 1, - CloudProviderBackoffJitter: 1, - CloudProviderBackoffRetries: 1, - CloudProviderRateLimit: true, - CloudProviderRateLimitBucket: 1, - CloudProviderRateLimitBucketWrite: 1, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - AvailabilitySetNodesCacheTTLInSeconds: 100, - VmssCacheTTLInSeconds: 100, - VmssVirtualMachinesCacheTTLInSeconds: 100, - VMCacheTTLInSeconds: 100, - LoadBalancerCacheTTLInSeconds: 100, - NsgCacheTTLInSeconds: 100, - RouteTableCacheTTLInSeconds: 100, - Location: "location", - MaximumLoadBalancerRuleCount: 1, - PrimaryAvailabilitySetName: "primaryAvailabilitySetName", - PrimaryScaleSetName: "primaryScaleSetName", - ResourceGroup: "resourcegroup", - RouteTableName: "routeTableName", - RouteTableResourceGroup: "routeTableResourceGroup", - SecurityGroupName: "securityGroupName", - SubnetName: "subnetName", - UseInstanceMetadata: true, - VMType: "standard", - VnetName: "vnetName", - VnetResourceGroup: "vnetResourceGroup", - } - - buffer := bytes.NewBufferString(azureConfig) - config, err := parseConfig(buffer) - assert.NoError(t, err) - assert.Equal(t, expected, config) -} - // Test flipServiceInternalAnnotation func TestFlipServiceInternalAnnotation(t *testing.T) { svc := getTestService("servicea", v1.ProtocolTCP, nil, 80)