Merge pull request #86515 from feiskyer/separate-limit

Change Azure global rate limit to per client
This commit is contained in:
Kubernetes Prow Robot 2019-12-23 00:29:33 -08:00 committed by GitHub
commit 92e0de3269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 484 additions and 209 deletions

View File

@ -25,6 +25,7 @@ go_library(
"azure_loadbalancer.go",
"azure_managedDiskController.go",
"azure_metrics.go",
"azure_ratelimit.go",
"azure_routes.go",
"azure_standard.go",
"azure_storage.go",
@ -93,6 +94,7 @@ go_test(
"azure_instances_test.go",
"azure_loadbalancer_test.go",
"azure_metrics_test.go",
"azure_ratelimit_test.go",
"azure_routes_test.go",
"azure_standard_test.go",
"azure_storage_test.go",

View File

@ -42,7 +42,6 @@ import (
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/auth"
@ -101,6 +100,7 @@ var (
// for more details.
type Config struct {
auth.AzureAuthConfig
CloudProviderRateLimitConfig
// The name of the resource group that the cluster is deployed in
ResourceGroup string `json:"resourceGroup,omitempty" yaml:"resourceGroup,omitempty"`
@ -149,17 +149,6 @@ type Config struct {
// CloudProviderBackoffJitter are omitted.
// "default" will be used if not specified.
CloudProviderBackoffMode string `json:"cloudProviderBackoffMode,omitempty" yaml:"cloudProviderBackoffMode,omitempty"`
// Enable rate limiting
CloudProviderRateLimit bool `json:"cloudProviderRateLimit,omitempty" yaml:"cloudProviderRateLimit,omitempty"`
// Rate limit QPS (Read)
CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS,omitempty" yaml:"cloudProviderRateLimitQPS,omitempty"`
// Rate limit Bucket Size
CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket,omitempty" yaml:"cloudProviderRateLimitBucket,omitempty"`
// Rate limit QPS (Write)
CloudProviderRateLimitQPSWrite float32 `json:"cloudProviderRateLimitQPSWrite,omitempty" yaml:"cloudProviderRateLimitQPSWrite,omitempty"`
// Rate limit Bucket Size
CloudProviderRateLimitBucketWrite int `json:"cloudProviderRateLimitBucketWrite,omitempty" yaml:"cloudProviderRateLimitBucketWrite,omitempty"`
// Use instance metadata service where possible
UseInstanceMetadata bool `json:"useInstanceMetadata,omitempty" yaml:"useInstanceMetadata,omitempty"`
@ -220,22 +209,27 @@ var _ cloudprovider.PVLabeler = (*Cloud)(nil)
// Cloud holds the config and clients
type Cloud struct {
Config
Environment azure.Environment
RoutesClient RoutesClient
SubnetsClient SubnetsClient
InterfacesClient InterfacesClient
RouteTablesClient RouteTablesClient
LoadBalancerClient LoadBalancersClient
PublicIPAddressesClient PublicIPAddressesClient
SecurityGroupsClient SecurityGroupsClient
VirtualMachinesClient VirtualMachinesClient
StorageAccountClient StorageAccountClient
DisksClient DisksClient
SnapshotsClient *compute.SnapshotsClient
FileClient FileClient
ResourceRequestBackoff wait.Backoff
metadata *InstanceMetadataService
vmSet VMSet
Environment azure.Environment
RoutesClient RoutesClient
SubnetsClient SubnetsClient
InterfacesClient InterfacesClient
RouteTablesClient RouteTablesClient
LoadBalancerClient LoadBalancersClient
PublicIPAddressesClient PublicIPAddressesClient
SecurityGroupsClient SecurityGroupsClient
VirtualMachinesClient VirtualMachinesClient
StorageAccountClient StorageAccountClient
DisksClient DisksClient
SnapshotsClient *compute.SnapshotsClient
FileClient FileClient
VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient
VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient
VirtualMachineSizesClient VirtualMachineSizesClient
ResourceRequestBackoff wait.Backoff
metadata *InstanceMetadataService
vmSet VMSet
// ipv6DualStack allows overriding for unit testing. It's normally initialized from featuregates
ipv6DualStackEnabled bool
@ -256,13 +250,6 @@ type Cloud struct {
// routeCIDRs holds cache for route CIDRs.
routeCIDRs map[string]string
// Clients for vmss.
VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient
VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient
// client for vm sizes list
VirtualMachineSizesClient VirtualMachineSizesClient
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
@ -385,43 +372,8 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro
return err
}
// operationPollRateLimiter.Accept() is a no-op if rate limits are configured off.
operationPollRateLimiter := flowcontrol.NewFakeAlwaysRateLimiter()
operationPollRateLimiterWrite := flowcontrol.NewFakeAlwaysRateLimiter()
// If reader is provided (and no writer) we will
// use the same value for both.
if config.CloudProviderRateLimit {
// Assign rate limit defaults if no configuration was passed in
if config.CloudProviderRateLimitQPS == 0 {
config.CloudProviderRateLimitQPS = rateLimitQPSDefault
}
if config.CloudProviderRateLimitBucket == 0 {
config.CloudProviderRateLimitBucket = rateLimitBucketDefault
}
if config.CloudProviderRateLimitQPSWrite == 0 {
config.CloudProviderRateLimitQPSWrite = rateLimitQPSDefault
}
if config.CloudProviderRateLimitBucketWrite == 0 {
config.CloudProviderRateLimitBucketWrite = rateLimitBucketDefault
}
operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter(
config.CloudProviderRateLimitQPS,
config.CloudProviderRateLimitBucket)
operationPollRateLimiterWrite = flowcontrol.NewTokenBucketRateLimiter(
config.CloudProviderRateLimitQPSWrite,
config.CloudProviderRateLimitBucketWrite)
klog.V(2).Infof("Azure cloudprovider (read ops) using rate limit config: QPS=%g, bucket=%d",
config.CloudProviderRateLimitQPS,
config.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure cloudprovider (write ops) using rate limit config: QPS=%g, bucket=%d",
config.CloudProviderRateLimitQPSWrite,
config.CloudProviderRateLimitBucketWrite)
}
// Initialize rate limiting config options.
InitializeCloudProviderRateLimitConfig(&config.CloudProviderRateLimitConfig)
// Conditionally configure resource request backoff
resourceRequestBackoff := wait.Backoff{
@ -500,26 +452,25 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro
subscriptionID: config.SubscriptionID,
resourceManagerEndpoint: env.ResourceManagerEndpoint,
servicePrincipalToken: servicePrincipalToken,
rateLimiterReader: operationPollRateLimiter,
rateLimiterWriter: operationPollRateLimiterWrite,
CloudProviderBackoffRetries: config.CloudProviderBackoffRetries,
CloudProviderBackoffDuration: config.CloudProviderBackoffDuration,
ShouldOmitCloudProviderBackoff: config.shouldOmitCloudProviderBackoff(),
}
az.DisksClient = newAzDisksClient(azClientConfig)
az.SnapshotsClient = newSnapshotsClient(azClientConfig)
az.RoutesClient = newAzRoutesClient(azClientConfig)
az.SubnetsClient = newAzSubnetsClient(azClientConfig)
az.InterfacesClient = newAzInterfacesClient(azClientConfig)
az.RouteTablesClient = newAzRouteTablesClient(azClientConfig)
az.LoadBalancerClient = newAzLoadBalancersClient(azClientConfig)
az.SecurityGroupsClient = newAzSecurityGroupsClient(azClientConfig)
az.StorageAccountClient = newAzStorageAccountClient(azClientConfig)
az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig)
az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig)
az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig)
az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig)
az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig)
az.DisksClient = newAzDisksClient(azClientConfig.WithRateLimiter(config.DiskRateLimit))
az.SnapshotsClient = newSnapshotsClient(azClientConfig.WithRateLimiter(config.SnapshotRateLimit))
az.RoutesClient = newAzRoutesClient(azClientConfig.WithRateLimiter(config.RouteRateLimit))
az.SubnetsClient = newAzSubnetsClient(azClientConfig.WithRateLimiter(config.SubnetsRateLimit))
az.InterfacesClient = newAzInterfacesClient(azClientConfig.WithRateLimiter(config.InterfaceRateLimit))
az.RouteTablesClient = newAzRouteTablesClient(azClientConfig.WithRateLimiter(config.RouteTableRateLimit))
az.LoadBalancerClient = newAzLoadBalancersClient(azClientConfig.WithRateLimiter(config.LoadBalancerRateLimit))
az.SecurityGroupsClient = newAzSecurityGroupsClient(azClientConfig.WithRateLimiter(config.SecurityGroupRateLimit))
az.StorageAccountClient = newAzStorageAccountClient(azClientConfig.WithRateLimiter(config.StorageAccountRateLimit))
az.VirtualMachinesClient = newAzVirtualMachinesClient(azClientConfig.WithRateLimiter(config.VirtualMachineRateLimit))
az.PublicIPAddressesClient = newAzPublicIPAddressesClient(azClientConfig.WithRateLimiter(config.PublicIPAddressRateLimit))
az.VirtualMachineSizesClient = newAzVirtualMachineSizesClient(azClientConfig.WithRateLimiter(config.VirtualMachineSizeRateLimit))
az.VirtualMachineScaleSetsClient = newAzVirtualMachineScaleSetsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit))
az.VirtualMachineScaleSetVMsClient = newAzVirtualMachineScaleSetVMsClient(azClientConfig.WithRateLimiter(config.VirtualMachineScaleSetRateLimit))
// TODO(feiskyer): refactor azureFileClient to Interface.
az.FileClient = &azureFileClient{env: *env}
if az.MaximumLoadBalancerRuleCount == 0 {

View File

@ -149,16 +149,19 @@ type azClientConfig struct {
subscriptionID string
resourceManagerEndpoint string
servicePrincipalToken *adal.ServicePrincipalToken
// ARM Rate limiting for GET vs PUT/POST
//Details: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-request-limits
rateLimiterReader flowcontrol.RateLimiter
rateLimiterWriter flowcontrol.RateLimiter
rateLimitConfig *RateLimitConfig
CloudProviderBackoffRetries int
CloudProviderBackoffDuration int
ShouldOmitCloudProviderBackoff bool
}
// WithRateLimiter returns azClientConfig with rateLimitConfig set.
func (cfg *azClientConfig) WithRateLimiter(rl *RateLimitConfig) *azClientConfig {
cfg.rateLimitConfig = rl
return cfg
}
// azVirtualMachinesClient implements VirtualMachinesClient.
type azVirtualMachinesClient struct {
client compute.VirtualMachinesClient
@ -181,9 +184,16 @@ func newAzVirtualMachinesClient(config *azClientConfig) *azVirtualMachinesClient
}
configureUserAgent(&virtualMachinesClient.Client)
klog.V(2).Infof("Azure VirtualMachinesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachinesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azVirtualMachinesClient{
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
client: virtualMachinesClient,
}
}
@ -303,9 +313,16 @@ func newAzInterfacesClient(config *azClientConfig) *azInterfacesClient {
}
configureUserAgent(&interfacesClient.Client)
klog.V(2).Infof("Azure InterfacesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure InterfacesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azInterfacesClient{
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
client: interfacesClient,
}
}
@ -387,9 +404,16 @@ func newAzLoadBalancersClient(config *azClientConfig) *azLoadBalancersClient {
}
configureUserAgent(&loadBalancerClient.Client)
klog.V(2).Infof("Azure LoadBalancersClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure LoadBalancersClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azLoadBalancersClient{
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
client: loadBalancerClient,
}
}
@ -539,9 +563,16 @@ func newAzPublicIPAddressesClient(config *azClientConfig) *azPublicIPAddressesCl
}
configureUserAgent(&publicIPAddressClient.Client)
klog.V(2).Infof("Azure PublicIPAddressesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure PublicIPAddressesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azPublicIPAddressesClient{
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
client: publicIPAddressClient,
}
}
@ -676,10 +707,17 @@ func newAzSubnetsClient(config *azClientConfig) *azSubnetsClient {
}
configureUserAgent(&subnetsClient.Client)
klog.V(2).Infof("Azure SubnetsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure SubnetsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azSubnetsClient{
client: subnetsClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -795,10 +833,17 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient {
}
configureUserAgent(&securityGroupsClient.Client)
klog.V(2).Infof("Azure SecurityGroupsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure SecurityGroupsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azSecurityGroupsClient{
client: securityGroupsClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -946,10 +991,17 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin
}
configureUserAgent(&virtualMachineScaleSetsClient.Client)
klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachineScaleSetsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azVirtualMachineScaleSetsClient{
client: virtualMachineScaleSetsClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1043,10 +1095,17 @@ func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMach
}
configureUserAgent(&virtualMachineScaleSetVMsClient.Client)
klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachineScaleSetVMsClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azVirtualMachineScaleSetVMsClient{
client: virtualMachineScaleSetVMsClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1139,10 +1198,17 @@ func newAzRoutesClient(config *azClientConfig) *azRoutesClient {
}
configureUserAgent(&routesClient.Client)
klog.V(2).Infof("Azure RoutesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure RoutesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azRoutesClient{
client: routesClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1245,10 +1311,17 @@ func newAzRouteTablesClient(config *azClientConfig) *azRouteTablesClient {
}
configureUserAgent(&routeTablesClient.Client)
klog.V(2).Infof("Azure RouteTablesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure RouteTablesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azRouteTablesClient{
client: routeTablesClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1342,10 +1415,17 @@ func newAzStorageAccountClient(config *azClientConfig) *azStorageAccountClient {
}
configureUserAgent(&storageAccountClient.Client)
klog.V(2).Infof("Azure StorageAccountClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure StorageAccountClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azStorageAccountClient{
client: storageAccountClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1462,10 +1542,17 @@ func newAzDisksClient(config *azClientConfig) *azDisksClient {
}
configureUserAgent(&disksClient.Client)
klog.V(2).Infof("Azure DisksClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure DisksClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azDisksClient{
client: disksClient,
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
}
}
@ -1532,6 +1619,7 @@ func (az *azDisksClient) Get(ctx context.Context, resourceGroupName string, disk
return
}
// TODO(feiskyer): refactor compute.SnapshotsClient to Interface.
func newSnapshotsClient(config *azClientConfig) *compute.SnapshotsClient {
snapshotsClient := compute.NewSnapshotsClientWithBaseURI(config.resourceManagerEndpoint, config.subscriptionID)
snapshotsClient.Authorizer = autorest.NewBearerAuthorizer(config.servicePrincipalToken)
@ -1562,9 +1650,16 @@ func newAzVirtualMachineSizesClient(config *azClientConfig) *azVirtualMachineSiz
}
configureUserAgent(&VirtualMachineSizesClient.Client)
klog.V(2).Infof("Azure VirtualMachineSizesClient (read ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPS,
config.rateLimitConfig.CloudProviderRateLimitBucket)
klog.V(2).Infof("Azure VirtualMachineSizesClient (write ops) using rate limit config: QPS=%g, bucket=%d",
config.rateLimitConfig.CloudProviderRateLimitQPSWrite,
config.rateLimitConfig.CloudProviderRateLimitBucketWrite)
rateLimiterReader, rateLimiterWriter := NewRateLimiter(config.rateLimitConfig)
return &azVirtualMachineSizesClient{
rateLimiterReader: config.rateLimiterReader,
rateLimiterWriter: config.rateLimiterWriter,
rateLimiterReader: rateLimiterReader,
rateLimiterWriter: rateLimiterWriter,
client: VirtualMachineSizesClient,
}
}

View File

@ -0,0 +1,146 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"k8s.io/client-go/util/flowcontrol"
)
// RateLimitConfig indicates the rate limit config options.
type RateLimitConfig struct {
// Enable rate limiting
CloudProviderRateLimit bool `json:"cloudProviderRateLimit,omitempty" yaml:"cloudProviderRateLimit,omitempty"`
// Rate limit QPS (Read)
CloudProviderRateLimitQPS float32 `json:"cloudProviderRateLimitQPS,omitempty" yaml:"cloudProviderRateLimitQPS,omitempty"`
// Rate limit Bucket Size
CloudProviderRateLimitBucket int `json:"cloudProviderRateLimitBucket,omitempty" yaml:"cloudProviderRateLimitBucket,omitempty"`
// Rate limit QPS (Write)
CloudProviderRateLimitQPSWrite float32 `json:"cloudProviderRateLimitQPSWrite,omitempty" yaml:"cloudProviderRateLimitQPSWrite,omitempty"`
// Rate limit Bucket Size
CloudProviderRateLimitBucketWrite int `json:"cloudProviderRateLimitBucketWrite,omitempty" yaml:"cloudProviderRateLimitBucketWrite,omitempty"`
}
// CloudProviderRateLimitConfig indicates the rate limit config for each clients.
type CloudProviderRateLimitConfig struct {
// The default rate limit config options.
RateLimitConfig
// Rate limit config for each clients. Values would override default settings above.
RouteRateLimit *RateLimitConfig `json:"routeRateLimit,omitempty" yaml:"routeRateLimit,omitempty"`
SubnetsRateLimit *RateLimitConfig `json:"subnetsRateLimit,omitempty" yaml:"subnetsRateLimit,omitempty"`
InterfaceRateLimit *RateLimitConfig `json:"interfaceRateLimit,omitempty" yaml:"interfaceRateLimit,omitempty"`
RouteTableRateLimit *RateLimitConfig `json:"routeTableRateLimit,omitempty" yaml:"routeTableRateLimit,omitempty"`
LoadBalancerRateLimit *RateLimitConfig `json:"loadBalancerRateLimit,omitempty" yaml:"loadBalancerRateLimit,omitempty"`
PublicIPAddressRateLimit *RateLimitConfig `json:"publicIPAddressRateLimit,omitempty" yaml:"publicIPAddressRateLimit,omitempty"`
SecurityGroupRateLimit *RateLimitConfig `json:"securityGroupRateLimit,omitempty" yaml:"securityGroupRateLimit,omitempty"`
VirtualMachineRateLimit *RateLimitConfig `json:"virtualMachineRateLimit,omitempty" yaml:"virtualMachineRateLimit,omitempty"`
StorageAccountRateLimit *RateLimitConfig `json:"storageAccountRateLimit,omitempty" yaml:"storageAccountRateLimit,omitempty"`
DiskRateLimit *RateLimitConfig `json:"diskRateLimit,omitempty" yaml:"diskRateLimit,omitempty"`
SnapshotRateLimit *RateLimitConfig `json:"snapshotRateLimit,omitempty" yaml:"snapshotRateLimit,omitempty"`
VirtualMachineScaleSetRateLimit *RateLimitConfig `json:"virtualMachineScaleSetRateLimit,omitempty" yaml:"virtualMachineScaleSetRateLimit,omitempty"`
VirtualMachineSizeRateLimit *RateLimitConfig `json:"virtualMachineSizesRateLimit,omitempty" yaml:"virtualMachineSizesRateLimit,omitempty"`
}
// InitializeCloudProviderRateLimitConfig initializes rate limit configs.
func InitializeCloudProviderRateLimitConfig(config *CloudProviderRateLimitConfig) {
if config == nil {
return
}
// Assign read rate limit defaults if no configuration was passed in.
if config.CloudProviderRateLimitQPS == 0 {
config.CloudProviderRateLimitQPS = rateLimitQPSDefault
}
if config.CloudProviderRateLimitBucket == 0 {
config.CloudProviderRateLimitBucket = rateLimitBucketDefault
}
// Assing write rate limit defaults if no configuration was passed in.
if config.CloudProviderRateLimitQPSWrite == 0 {
config.CloudProviderRateLimitQPSWrite = config.CloudProviderRateLimitQPS
}
if config.CloudProviderRateLimitBucketWrite == 0 {
config.CloudProviderRateLimitBucketWrite = config.CloudProviderRateLimitBucket
}
config.RouteRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.RouteRateLimit)
config.SubnetsRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SubnetsRateLimit)
config.InterfaceRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.InterfaceRateLimit)
config.RouteTableRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.RouteTableRateLimit)
config.LoadBalancerRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.LoadBalancerRateLimit)
config.PublicIPAddressRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.PublicIPAddressRateLimit)
config.SecurityGroupRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SecurityGroupRateLimit)
config.VirtualMachineRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineRateLimit)
config.StorageAccountRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.StorageAccountRateLimit)
config.DiskRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.DiskRateLimit)
config.SnapshotRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.SnapshotRateLimit)
config.VirtualMachineScaleSetRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineScaleSetRateLimit)
config.VirtualMachineSizeRateLimit = overrideDefaultRateLimitConfig(&config.RateLimitConfig, config.VirtualMachineSizeRateLimit)
}
// overrideDefaultRateLimitConfig overrides the default CloudProviderRateLimitConfig.
func overrideDefaultRateLimitConfig(defaults, config *RateLimitConfig) *RateLimitConfig {
// If config not set, apply defaults.
if config == nil {
return defaults
}
// Remain disabled if it's set explicitly.
if !config.CloudProviderRateLimit {
return &RateLimitConfig{CloudProviderRateLimit: false}
}
// Apply default values.
if config.CloudProviderRateLimitQPS == 0 {
config.CloudProviderRateLimitQPS = defaults.CloudProviderRateLimitQPS
}
if config.CloudProviderRateLimitBucket == 0 {
config.CloudProviderRateLimitBucket = defaults.CloudProviderRateLimitBucket
}
if config.CloudProviderRateLimitQPSWrite == 0 {
config.CloudProviderRateLimitQPSWrite = defaults.CloudProviderRateLimitQPSWrite
}
if config.CloudProviderRateLimitBucketWrite == 0 {
config.CloudProviderRateLimitBucketWrite = defaults.CloudProviderRateLimitBucketWrite
}
return config
}
// RateLimitEnabled returns true if CloudProviderRateLimit is set to true.
func RateLimitEnabled(config *RateLimitConfig) bool {
return config != nil && config.CloudProviderRateLimit
}
// NewRateLimiter creates new read and write flowcontrol.RateLimiter from RateLimitConfig.
func NewRateLimiter(config *RateLimitConfig) (flowcontrol.RateLimiter, flowcontrol.RateLimiter) {
readLimiter := flowcontrol.NewFakeAlwaysRateLimiter()
writeLimiter := flowcontrol.NewFakeAlwaysRateLimiter()
if config != nil && config.CloudProviderRateLimit {
readLimiter = flowcontrol.NewTokenBucketRateLimiter(
config.CloudProviderRateLimitQPS,
config.CloudProviderRateLimitBucket)
writeLimiter = flowcontrol.NewTokenBucketRateLimiter(
config.CloudProviderRateLimitQPSWrite,
config.CloudProviderRateLimitBucketWrite)
}
return readLimiter, writeLimiter
}

View File

@ -0,0 +1,172 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/legacy-cloud-providers/azure/auth"
)
var (
testAzureConfig = `{
"aadClientCertPassword": "aadClientCertPassword",
"aadClientCertPath": "aadClientCertPath",
"aadClientId": "aadClientId",
"aadClientSecret": "aadClientSecret",
"cloud":"AzurePublicCloud",
"cloudProviderBackoff": true,
"cloudProviderBackoffDuration": 1,
"cloudProviderBackoffExponent": 1,
"cloudProviderBackoffJitter": 1,
"cloudProviderBackoffRetries": 1,
"cloudProviderRatelimit": true,
"cloudProviderRateLimitBucket": 1,
"cloudProviderRateLimitBucketWrite": 1,
"cloudProviderRateLimitQPS": 1,
"cloudProviderRateLimitQPSWrite": 1,
"virtualMachineScaleSetRateLimit": {
"cloudProviderRatelimit": true,
"cloudProviderRateLimitBucket": 2,
"CloudProviderRateLimitBucketWrite": 2,
"cloudProviderRateLimitQPS": 0,
"CloudProviderRateLimitQPSWrite": 0
},
"loadBalancerRateLimit": {
"cloudProviderRatelimit": false,
},
"availabilitySetNodesCacheTTLInSeconds": 100,
"vmssCacheTTLInSeconds": 100,
"vmssVirtualMachinesCacheTTLInSeconds": 100,
"vmCacheTTLInSeconds": 100,
"loadBalancerCacheTTLInSeconds": 100,
"nsgCacheTTLInSeconds": 100,
"routeTableCacheTTLInSeconds": 100,
"location": "location",
"maximumLoadBalancerRuleCount": 1,
"primaryAvailabilitySetName": "primaryAvailabilitySetName",
"primaryScaleSetName": "primaryScaleSetName",
"resourceGroup": "resourceGroup",
"routeTableName": "routeTableName",
"routeTableResourceGroup": "routeTableResourceGroup",
"securityGroupName": "securityGroupName",
"subnetName": "subnetName",
"subscriptionId": "subscriptionId",
"tenantId": "tenantId",
"useInstanceMetadata": true,
"useManagedIdentityExtension": true,
"vnetName": "vnetName",
"vnetResourceGroup": "vnetResourceGroup",
vmType: "standard"
}`
testDefaultRateLimitConfig = RateLimitConfig{
CloudProviderRateLimit: true,
CloudProviderRateLimitBucket: 1,
CloudProviderRateLimitBucketWrite: 1,
CloudProviderRateLimitQPS: 1,
CloudProviderRateLimitQPSWrite: 1,
}
)
func TestParseConfig(t *testing.T) {
expected := &Config{
AzureAuthConfig: auth.AzureAuthConfig{
AADClientCertPassword: "aadClientCertPassword",
AADClientCertPath: "aadClientCertPath",
AADClientID: "aadClientId",
AADClientSecret: "aadClientSecret",
Cloud: "AzurePublicCloud",
SubscriptionID: "subscriptionId",
TenantID: "tenantId",
UseManagedIdentityExtension: true,
},
CloudProviderBackoff: true,
CloudProviderBackoffDuration: 1,
CloudProviderBackoffExponent: 1,
CloudProviderBackoffJitter: 1,
CloudProviderBackoffRetries: 1,
CloudProviderRateLimitConfig: CloudProviderRateLimitConfig{
RateLimitConfig: testDefaultRateLimitConfig,
LoadBalancerRateLimit: &RateLimitConfig{
CloudProviderRateLimit: false,
},
VirtualMachineScaleSetRateLimit: &RateLimitConfig{
CloudProviderRateLimit: true,
CloudProviderRateLimitBucket: 2,
CloudProviderRateLimitBucketWrite: 2,
},
},
AvailabilitySetNodesCacheTTLInSeconds: 100,
VmssCacheTTLInSeconds: 100,
VmssVirtualMachinesCacheTTLInSeconds: 100,
VMCacheTTLInSeconds: 100,
LoadBalancerCacheTTLInSeconds: 100,
NsgCacheTTLInSeconds: 100,
RouteTableCacheTTLInSeconds: 100,
Location: "location",
MaximumLoadBalancerRuleCount: 1,
PrimaryAvailabilitySetName: "primaryAvailabilitySetName",
PrimaryScaleSetName: "primaryScaleSetName",
ResourceGroup: "resourcegroup",
RouteTableName: "routeTableName",
RouteTableResourceGroup: "routeTableResourceGroup",
SecurityGroupName: "securityGroupName",
SubnetName: "subnetName",
UseInstanceMetadata: true,
VMType: "standard",
VnetName: "vnetName",
VnetResourceGroup: "vnetResourceGroup",
}
buffer := bytes.NewBufferString(testAzureConfig)
config, err := parseConfig(buffer)
assert.NoError(t, err)
assert.Equal(t, expected, config)
}
func TestInitializeCloudProviderRateLimitConfig(t *testing.T) {
buffer := bytes.NewBufferString(testAzureConfig)
config, err := parseConfig(buffer)
assert.NoError(t, err)
InitializeCloudProviderRateLimitConfig(&config.CloudProviderRateLimitConfig)
assert.Equal(t, config.LoadBalancerRateLimit, &RateLimitConfig{
CloudProviderRateLimit: false,
})
assert.Equal(t, config.VirtualMachineScaleSetRateLimit, &RateLimitConfig{
CloudProviderRateLimit: true,
CloudProviderRateLimitBucket: 2,
CloudProviderRateLimitBucketWrite: 2,
CloudProviderRateLimitQPS: 1,
CloudProviderRateLimitQPSWrite: 1,
})
assert.Equal(t, config.VirtualMachineSizeRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.VirtualMachineRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.RouteRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.SubnetsRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.InterfaceRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.RouteTableRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.SecurityGroupRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.StorageAccountRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.DiskRateLimit, &testDefaultRateLimitConfig)
assert.Equal(t, config.SnapshotRateLimit, &testDefaultRateLimitConfig)
}

View File

@ -19,7 +19,6 @@ limitations under the License.
package azure
import (
"bytes"
"context"
"fmt"
"math"
@ -42,96 +41,6 @@ import (
var testClusterName = "testCluster"
func TestParseConfig(t *testing.T) {
azureConfig := `{
"aadClientCertPassword": "aadClientCertPassword",
"aadClientCertPath": "aadClientCertPath",
"aadClientId": "aadClientId",
"aadClientSecret": "aadClientSecret",
"cloud":"AzurePublicCloud",
"cloudProviderBackoff": true,
"cloudProviderBackoffDuration": 1,
"cloudProviderBackoffExponent": 1,
"cloudProviderBackoffJitter": 1,
"cloudProviderBackoffRetries": 1,
"cloudProviderRatelimit": true,
"cloudProviderRateLimitBucket": 1,
"CloudProviderRateLimitBucketWrite": 1,
"cloudProviderRateLimitQPS": 1,
"CloudProviderRateLimitQPSWrite": 1,
"availabilitySetNodesCacheTTLInSeconds": 100,
"vmssCacheTTLInSeconds": 100,
"vmssVirtualMachinesCacheTTLInSeconds": 100,
"vmCacheTTLInSeconds": 100,
"loadBalancerCacheTTLInSeconds": 100,
"nsgCacheTTLInSeconds": 100,
"routeTableCacheTTLInSeconds": 100,
"location": "location",
"maximumLoadBalancerRuleCount": 1,
"primaryAvailabilitySetName": "primaryAvailabilitySetName",
"primaryScaleSetName": "primaryScaleSetName",
"resourceGroup": "resourceGroup",
"routeTableName": "routeTableName",
"routeTableResourceGroup": "routeTableResourceGroup",
"securityGroupName": "securityGroupName",
"subnetName": "subnetName",
"subscriptionId": "subscriptionId",
"tenantId": "tenantId",
"useInstanceMetadata": true,
"useManagedIdentityExtension": true,
"vnetName": "vnetName",
"vnetResourceGroup": "vnetResourceGroup",
vmType: "standard"
}`
expected := &Config{
AzureAuthConfig: auth.AzureAuthConfig{
AADClientCertPassword: "aadClientCertPassword",
AADClientCertPath: "aadClientCertPath",
AADClientID: "aadClientId",
AADClientSecret: "aadClientSecret",
Cloud: "AzurePublicCloud",
SubscriptionID: "subscriptionId",
TenantID: "tenantId",
UseManagedIdentityExtension: true,
},
CloudProviderBackoff: true,
CloudProviderBackoffDuration: 1,
CloudProviderBackoffExponent: 1,
CloudProviderBackoffJitter: 1,
CloudProviderBackoffRetries: 1,
CloudProviderRateLimit: true,
CloudProviderRateLimitBucket: 1,
CloudProviderRateLimitBucketWrite: 1,
CloudProviderRateLimitQPS: 1,
CloudProviderRateLimitQPSWrite: 1,
AvailabilitySetNodesCacheTTLInSeconds: 100,
VmssCacheTTLInSeconds: 100,
VmssVirtualMachinesCacheTTLInSeconds: 100,
VMCacheTTLInSeconds: 100,
LoadBalancerCacheTTLInSeconds: 100,
NsgCacheTTLInSeconds: 100,
RouteTableCacheTTLInSeconds: 100,
Location: "location",
MaximumLoadBalancerRuleCount: 1,
PrimaryAvailabilitySetName: "primaryAvailabilitySetName",
PrimaryScaleSetName: "primaryScaleSetName",
ResourceGroup: "resourcegroup",
RouteTableName: "routeTableName",
RouteTableResourceGroup: "routeTableResourceGroup",
SecurityGroupName: "securityGroupName",
SubnetName: "subnetName",
UseInstanceMetadata: true,
VMType: "standard",
VnetName: "vnetName",
VnetResourceGroup: "vnetResourceGroup",
}
buffer := bytes.NewBufferString(azureConfig)
config, err := parseConfig(buffer)
assert.NoError(t, err)
assert.Equal(t, expected, config)
}
// Test flipServiceInternalAnnotation
func TestFlipServiceInternalAnnotation(t *testing.T) {
svc := getTestService("servicea", v1.ProtocolTCP, nil, 80)