From 5bf6b0fd7089b6454a1bf372de8df2eef3bcec8b Mon Sep 17 00:00:00 2001 From: "Khaled Henidak(Kal)" Date: Tue, 13 Feb 2018 20:05:20 +0000 Subject: [PATCH] WIP - create read/writer rate limiter --- pkg/cloudprovider/providers/azure/azure.go | 3 +- .../providers/azure/azure_client.go | 503 ++++++++++++++---- 2 files changed, 410 insertions(+), 96 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 20ec3eb0204..91378ecae5a 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -188,7 +188,8 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { subscriptionID: config.SubscriptionID, resourceManagerEndpoint: env.ResourceManagerEndpoint, servicePrincipalToken: servicePrincipalToken, - rateLimiter: operationPollRateLimiter, + rateLimiterReader: operationPollRateLimiter, + rateLimiterWriter: operationPollRateLimiter, } az := Cloud{ Config: *config, diff --git a/pkg/cloudprovider/providers/azure/azure_client.go b/pkg/cloudprovider/providers/azure/azure_client.go index a8bf5a2eac6..2df6e774f34 100644 --- a/pkg/cloudprovider/providers/azure/azure_client.go +++ b/pkg/cloudprovider/providers/azure/azure_client.go @@ -17,6 +17,7 @@ limitations under the License. package azure import ( + "fmt" "time" "github.com/Azure/azure-sdk-for-go/arm/compute" @@ -30,6 +31,15 @@ import ( "k8s.io/client-go/util/flowcontrol" ) +// Creates an error for rate limiting errors +func createArmRateLimitErr(isWrite bool, opName string) error { + opType := "read" + if isWrite { + opType = "write" + } + return fmt.Errorf("azure - ARM rate limited(%s) for operation:%s", opType, opName) +} + // VirtualMachinesClient defines needed functions for azure compute.VirtualMachinesClient type VirtualMachinesClient interface { CreateOrUpdate(resourceGroupName string, VMName string, parameters compute.VirtualMachine, cancel <-chan struct{}) (<-chan compute.VirtualMachine, <-chan error) @@ -129,13 +139,17 @@ type azClientConfig struct { subscriptionID string resourceManagerEndpoint string servicePrincipalToken *adal.ServicePrincipalToken - rateLimiter flowcontrol.RateLimiter + // ARM Rate limiting for GET vs PUT/POST + //Details: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-request-limits + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } // azVirtualMachinesClient implements VirtualMachinesClient. type azVirtualMachinesClient struct { - client compute.VirtualMachinesClient - rateLimiter flowcontrol.RateLimiter + client compute.VirtualMachinesClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzVirtualMachinesClient(config *azClientConfig) *azVirtualMachinesClient { @@ -146,13 +160,23 @@ func newAzVirtualMachinesClient(config *azClientConfig) *azVirtualMachinesClient configureUserAgent(&virtualMachinesClient.Client) return &azVirtualMachinesClient{ - rateLimiter: config.rateLimiter, - client: virtualMachinesClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, + client: virtualMachinesClient, } } func (az *azVirtualMachinesClient) CreateOrUpdate(resourceGroupName string, VMName string, parameters compute.VirtualMachine, cancel <-chan struct{}) (<-chan compute.VirtualMachine, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "NSGCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan compute.VirtualMachine, 1) + errChan <- err + resultChan <- compute.VirtualMachine{} + return resultChan, errChan + } + glog.V(10).Infof("azVirtualMachinesClient.CreateOrUpdate(%q, %q): start", resourceGroupName, VMName) defer func() { glog.V(10).Infof("azVirtualMachinesClient.CreateOrUpdate(%q, %q): end", resourceGroupName, VMName) @@ -168,7 +192,11 @@ func (az *azVirtualMachinesClient) CreateOrUpdate(resourceGroupName string, VMNa } func (az *azVirtualMachinesClient) Get(resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachinesClient.Get(%q, %q): start", resourceGroupName, VMName) defer func() { glog.V(10).Infof("azVirtualMachinesClient.Get(%q, %q): end", resourceGroupName, VMName) @@ -181,7 +209,11 @@ func (az *azVirtualMachinesClient) Get(resourceGroupName string, VMName string, } func (az *azVirtualMachinesClient) List(resourceGroupName string) (result compute.VirtualMachineListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachinesClient.List(%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azVirtualMachinesClient.List(%q): end", resourceGroupName) @@ -194,7 +226,11 @@ func (az *azVirtualMachinesClient) List(resourceGroupName string) (result comput } func (az *azVirtualMachinesClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineListResult) (result compute.VirtualMachineListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMListNextResults") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachinesClient.ListNextResults(%q): start", lastResults) defer func() { glog.V(10).Infof("azVirtualMachinesClient.ListNextResults(%q): end", lastResults) @@ -208,8 +244,9 @@ func (az *azVirtualMachinesClient) ListNextResults(resourceGroupName string, las // azInterfacesClient implements InterfacesClient. type azInterfacesClient struct { - client network.InterfacesClient - rateLimiter flowcontrol.RateLimiter + client network.InterfacesClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzInterfacesClient(config *azClientConfig) *azInterfacesClient { @@ -220,13 +257,24 @@ func newAzInterfacesClient(config *azClientConfig) *azInterfacesClient { configureUserAgent(&interfacesClient.Client) return &azInterfacesClient{ - rateLimiter: config.rateLimiter, - client: interfacesClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, + client: interfacesClient, } } func (az *azInterfacesClient) CreateOrUpdate(resourceGroupName string, networkInterfaceName string, parameters network.Interface, cancel <-chan struct{}) (<-chan network.Interface, <-chan error) { - az.rateLimiter.Accept() + + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "NiCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.Interface, 1) + errChan <- err + resultChan <- network.Interface{} + return resultChan, errChan + } + glog.V(10).Infof("azInterfacesClient.CreateOrUpdate(%q,%q): start", resourceGroupName, networkInterfaceName) defer func() { glog.V(10).Infof("azInterfacesClient.CreateOrUpdate(%q,%q): end", resourceGroupName, networkInterfaceName) @@ -242,7 +290,11 @@ func (az *azInterfacesClient) CreateOrUpdate(resourceGroupName string, networkIn } func (az *azInterfacesClient) Get(resourceGroupName string, networkInterfaceName string, expand string) (result network.Interface, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "NicGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azInterfacesClient.Get(%q,%q): start", resourceGroupName, networkInterfaceName) defer func() { glog.V(10).Infof("azInterfacesClient.Get(%q,%q): end", resourceGroupName, networkInterfaceName) @@ -255,7 +307,11 @@ func (az *azInterfacesClient) Get(resourceGroupName string, networkInterfaceName } func (az *azInterfacesClient) GetVirtualMachineScaleSetNetworkInterface(resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, expand string) (result network.Interface, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "NicGetVirtualMachineScaleSetNetworkInterface") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azInterfacesClient.GetVirtualMachineScaleSetNetworkInterface(%q,%q,%q,%q): start", resourceGroupName, virtualMachineScaleSetName, virtualmachineIndex, networkInterfaceName) defer func() { glog.V(10).Infof("azInterfacesClient.GetVirtualMachineScaleSetNetworkInterface(%q,%q,%q,%q): end", resourceGroupName, virtualMachineScaleSetName, virtualmachineIndex, networkInterfaceName) @@ -269,8 +325,9 @@ func (az *azInterfacesClient) GetVirtualMachineScaleSetNetworkInterface(resource // azLoadBalancersClient implements LoadBalancersClient. type azLoadBalancersClient struct { - client network.LoadBalancersClient - rateLimiter flowcontrol.RateLimiter + client network.LoadBalancersClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzLoadBalancersClient(config *azClientConfig) *azLoadBalancersClient { @@ -281,13 +338,23 @@ func newAzLoadBalancersClient(config *azClientConfig) *azLoadBalancersClient { configureUserAgent(&loadBalancerClient.Client) return &azLoadBalancersClient{ - rateLimiter: config.rateLimiter, - client: loadBalancerClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, + client: loadBalancerClient, } } func (az *azLoadBalancersClient) CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "LBCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.LoadBalancer, 1) + errChan <- err + resultChan <- network.LoadBalancer{} + return resultChan, errChan + } + glog.V(10).Infof("azLoadBalancersClient.CreateOrUpdate(%q,%q): start", resourceGroupName, loadBalancerName) defer func() { glog.V(10).Infof("azLoadBalancersClient.CreateOrUpdate(%q,%q): end", resourceGroupName, loadBalancerName) @@ -303,7 +370,16 @@ func (az *azLoadBalancersClient) CreateOrUpdate(resourceGroupName string, loadBa } func (az *azLoadBalancersClient) Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "LBDelete") + errChan := make(chan error, 1) + resultChan := make(chan autorest.Response, 1) + errChan <- err + resultChan <- autorest.Response{} + return resultChan, errChan + } + glog.V(10).Infof("azLoadBalancersClient.Delete(%q,%q): start", resourceGroupName, loadBalancerName) defer func() { glog.V(10).Infof("azLoadBalancersClient.Delete(%q,%q): end", resourceGroupName, loadBalancerName) @@ -319,7 +395,11 @@ func (az *azLoadBalancersClient) Delete(resourceGroupName string, loadBalancerNa } func (az *azLoadBalancersClient) Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "LBGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azLoadBalancersClient.Get(%q,%q): start", resourceGroupName, loadBalancerName) defer func() { glog.V(10).Infof("azLoadBalancersClient.Get(%q,%q): end", resourceGroupName, loadBalancerName) @@ -332,7 +412,11 @@ func (az *azLoadBalancersClient) Get(resourceGroupName string, loadBalancerName } func (az *azLoadBalancersClient) List(resourceGroupName string) (result network.LoadBalancerListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "LBList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azLoadBalancersClient.List(%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azLoadBalancersClient.List(%q): end", resourceGroupName) @@ -345,7 +429,11 @@ func (az *azLoadBalancersClient) List(resourceGroupName string) (result network. } func (az *azLoadBalancersClient) ListNextResults(resourceGroupName string, lastResult network.LoadBalancerListResult) (result network.LoadBalancerListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "LBListNextResults") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azLoadBalancersClient.ListNextResults(%q): start", lastResult) defer func() { glog.V(10).Infof("azLoadBalancersClient.ListNextResults(%q): end", lastResult) @@ -359,8 +447,9 @@ func (az *azLoadBalancersClient) ListNextResults(resourceGroupName string, lastR // azPublicIPAddressesClient implements PublicIPAddressesClient. type azPublicIPAddressesClient struct { - client network.PublicIPAddressesClient - rateLimiter flowcontrol.RateLimiter + client network.PublicIPAddressesClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzPublicIPAddressesClient(config *azClientConfig) *azPublicIPAddressesClient { @@ -371,13 +460,23 @@ func newAzPublicIPAddressesClient(config *azClientConfig) *azPublicIPAddressesCl configureUserAgent(&publicIPAddressClient.Client) return &azPublicIPAddressesClient{ - rateLimiter: config.rateLimiter, - client: publicIPAddressClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, + client: publicIPAddressClient, } } func (az *azPublicIPAddressesClient) CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "PublicIPCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.PublicIPAddress, 1) + errChan <- err + resultChan <- network.PublicIPAddress{} + return resultChan, errChan + } + glog.V(10).Infof("azPublicIPAddressesClient.CreateOrUpdate(%q,%q): start", resourceGroupName, publicIPAddressName) defer func() { glog.V(10).Infof("azPublicIPAddressesClient.CreateOrUpdate(%q,%q): end", resourceGroupName, publicIPAddressName) @@ -393,7 +492,16 @@ func (az *azPublicIPAddressesClient) CreateOrUpdate(resourceGroupName string, pu } func (az *azPublicIPAddressesClient) Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "PublicIPDelete") + errChan := make(chan error, 1) + resultChan := make(chan autorest.Response, 1) + errChan <- err + resultChan <- autorest.Response{} + return resultChan, errChan + } + glog.V(10).Infof("azPublicIPAddressesClient.Delete(%q,%q): start", resourceGroupName, publicIPAddressName) defer func() { glog.V(10).Infof("azPublicIPAddressesClient.Delete(%q,%q): end", resourceGroupName, publicIPAddressName) @@ -409,7 +517,11 @@ func (az *azPublicIPAddressesClient) Delete(resourceGroupName string, publicIPAd } func (az *azPublicIPAddressesClient) Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "PublicIPGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azPublicIPAddressesClient.Get(%q,%q): start", resourceGroupName, publicIPAddressName) defer func() { glog.V(10).Infof("azPublicIPAddressesClient.Get(%q,%q): end", resourceGroupName, publicIPAddressName) @@ -422,7 +534,11 @@ func (az *azPublicIPAddressesClient) Get(resourceGroupName string, publicIPAddre } func (az *azPublicIPAddressesClient) List(resourceGroupName string) (result network.PublicIPAddressListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "PublicIPList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azPublicIPAddressesClient.List(%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azPublicIPAddressesClient.List(%q): end", resourceGroupName) @@ -435,7 +551,11 @@ func (az *azPublicIPAddressesClient) List(resourceGroupName string) (result netw } func (az *azPublicIPAddressesClient) ListNextResults(resourceGroupName string, lastResults network.PublicIPAddressListResult) (result network.PublicIPAddressListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "PublicIPListNextResults") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azPublicIPAddressesClient.ListNextResults(%q): start", lastResults) defer func() { glog.V(10).Infof("azPublicIPAddressesClient.ListNextResults(%q): end", lastResults) @@ -449,8 +569,9 @@ func (az *azPublicIPAddressesClient) ListNextResults(resourceGroupName string, l // azSubnetsClient implements SubnetsClient. type azSubnetsClient struct { - client network.SubnetsClient - rateLimiter flowcontrol.RateLimiter + client network.SubnetsClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzSubnetsClient(config *azClientConfig) *azSubnetsClient { @@ -461,13 +582,23 @@ func newAzSubnetsClient(config *azClientConfig) *azSubnetsClient { configureUserAgent(&subnetsClient.Client) return &azSubnetsClient{ - client: subnetsClient, - rateLimiter: config.rateLimiter, + client: subnetsClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azSubnetsClient) CreateOrUpdate(resourceGroupName string, virtualNetworkName string, subnetName string, subnetParameters network.Subnet, cancel <-chan struct{}) (<-chan network.Subnet, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "SubnetCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.Subnet, 1) + errChan <- err + resultChan <- network.Subnet{} + return resultChan, errChan + } + glog.V(10).Infof("azSubnetsClient.CreateOrUpdate(%q,%q,%q): start", resourceGroupName, virtualNetworkName, subnetName) defer func() { glog.V(10).Infof("azSubnetsClient.CreateOrUpdate(%q,%q,%q): end", resourceGroupName, virtualNetworkName, subnetName) @@ -483,7 +614,16 @@ func (az *azSubnetsClient) CreateOrUpdate(resourceGroupName string, virtualNetwo } func (az *azSubnetsClient) Delete(resourceGroupName string, virtualNetworkName string, subnetName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "SubnetDelete") + errChan := make(chan error, 1) + resultChan := make(chan autorest.Response, 1) + errChan <- err + resultChan <- autorest.Response{} + return resultChan, errChan + } + glog.V(10).Infof("azSubnetsClient.Delete(%q,%q,%q): start", resourceGroupName, virtualNetworkName, subnetName) defer func() { glog.V(10).Infof("azSubnetsClient.Delete(%q,%q,%q): end", resourceGroupName, virtualNetworkName, subnetName) @@ -499,7 +639,11 @@ func (az *azSubnetsClient) Delete(resourceGroupName string, virtualNetworkName s } func (az *azSubnetsClient) Get(resourceGroupName string, virtualNetworkName string, subnetName string, expand string) (result network.Subnet, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "SubnetGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azSubnetsClient.Get(%q,%q,%q): start", resourceGroupName, virtualNetworkName, subnetName) defer func() { glog.V(10).Infof("azSubnetsClient.Get(%q,%q,%q): end", resourceGroupName, virtualNetworkName, subnetName) @@ -512,7 +656,11 @@ func (az *azSubnetsClient) Get(resourceGroupName string, virtualNetworkName stri } func (az *azSubnetsClient) List(resourceGroupName string, virtualNetworkName string) (result network.SubnetListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "SubnetList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azSubnetsClient.List(%q,%q): start", resourceGroupName, virtualNetworkName) defer func() { glog.V(10).Infof("azSubnetsClient.List(%q,%q): end", resourceGroupName, virtualNetworkName) @@ -526,8 +674,9 @@ func (az *azSubnetsClient) List(resourceGroupName string, virtualNetworkName str // azSecurityGroupsClient implements SecurityGroupsClient. type azSecurityGroupsClient struct { - client network.SecurityGroupsClient - rateLimiter flowcontrol.RateLimiter + client network.SecurityGroupsClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient { @@ -538,13 +687,23 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient { configureUserAgent(&securityGroupsClient.Client) return &azSecurityGroupsClient{ - rateLimiter: config.rateLimiter, - client: securityGroupsClient, + client: securityGroupsClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azSecurityGroupsClient) CreateOrUpdate(resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, cancel <-chan struct{}) (<-chan network.SecurityGroup, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "NSGCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.SecurityGroup, 1) + errChan <- err + resultChan <- network.SecurityGroup{} + return resultChan, errChan + } + glog.V(10).Infof("azSecurityGroupsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, networkSecurityGroupName) defer func() { glog.V(10).Infof("azSecurityGroupsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, networkSecurityGroupName) @@ -560,7 +719,16 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(resourceGroupName string, netwo } func (az *azSecurityGroupsClient) Delete(resourceGroupName string, networkSecurityGroupName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "NSGDelete") + errChan := make(chan error, 1) + resultChan := make(chan autorest.Response, 1) + errChan <- err + resultChan <- autorest.Response{} + return resultChan, errChan + } + glog.V(10).Infof("azSecurityGroupsClient.Delete(%q,%q): start", resourceGroupName, networkSecurityGroupName) defer func() { glog.V(10).Infof("azSecurityGroupsClient.Delete(%q,%q): end", resourceGroupName, networkSecurityGroupName) @@ -576,7 +744,11 @@ func (az *azSecurityGroupsClient) Delete(resourceGroupName string, networkSecuri } func (az *azSecurityGroupsClient) Get(resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "NSGGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azSecurityGroupsClient.Get(%q,%q): start", resourceGroupName, networkSecurityGroupName) defer func() { glog.V(10).Infof("azSecurityGroupsClient.Get(%q,%q): end", resourceGroupName, networkSecurityGroupName) @@ -589,7 +761,11 @@ func (az *azSecurityGroupsClient) Get(resourceGroupName string, networkSecurityG } func (az *azSecurityGroupsClient) List(resourceGroupName string) (result network.SecurityGroupListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "NSGList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azSecurityGroupsClient.List(%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azSecurityGroupsClient.List(%q): end", resourceGroupName) @@ -603,8 +779,9 @@ func (az *azSecurityGroupsClient) List(resourceGroupName string) (result network // azVirtualMachineScaleSetsClient implements VirtualMachineScaleSetsClient. type azVirtualMachineScaleSetsClient struct { - client compute.VirtualMachineScaleSetsClient - rateLimiter flowcontrol.RateLimiter + client compute.VirtualMachineScaleSetsClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachineScaleSetsClient { @@ -615,13 +792,23 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin configureUserAgent(&virtualMachineScaleSetsClient.Client) return &azVirtualMachineScaleSetsClient{ - client: virtualMachineScaleSetsClient, - rateLimiter: config.rateLimiter, + client: virtualMachineScaleSetsClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "VMSSCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan compute.VirtualMachineScaleSet, 1) + errChan <- err + resultChan <- compute.VirtualMachineScaleSet{} + return resultChan, errChan + } + glog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, VMScaleSetName) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, VMScaleSetName) @@ -637,7 +824,11 @@ func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(resourceGroupName stri } func (az *azVirtualMachineScaleSetsClient) Get(resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): start", resourceGroupName, VMScaleSetName) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): end", resourceGroupName, VMScaleSetName) @@ -650,7 +841,11 @@ func (az *azVirtualMachineScaleSetsClient) Get(resourceGroupName string, VMScale } func (az *azVirtualMachineScaleSetsClient) List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q,%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q,%q): end", resourceGroupName) @@ -663,7 +858,11 @@ func (az *azVirtualMachineScaleSetsClient) List(resourceGroupName string) (resul } func (az *azVirtualMachineScaleSetsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSListNextResults") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetsClient.ListNextResults(%q): start", lastResults) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetsClient.ListNextResults(%q): end", lastResults) @@ -676,7 +875,16 @@ func (az *azVirtualMachineScaleSetsClient) ListNextResults(resourceGroupName str } func (az *azVirtualMachineScaleSetsClient) UpdateInstances(resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "VMSSUpdateInstances") + errChan := make(chan error, 1) + resultChan := make(chan compute.OperationStatusResponse, 1) + errChan <- err + resultChan <- compute.OperationStatusResponse{} + return resultChan, errChan + } + glog.V(10).Infof("azVirtualMachineScaleSetsClient.UpdateInstances(%q,%q,%q): start", resourceGroupName, VMScaleSetName, VMInstanceIDs) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetsClient.UpdateInstances(%q,%q,%q): end", resourceGroupName, VMScaleSetName, VMInstanceIDs) @@ -693,8 +901,9 @@ func (az *azVirtualMachineScaleSetsClient) UpdateInstances(resourceGroupName str // azVirtualMachineScaleSetVMsClient implements VirtualMachineScaleSetVMsClient. type azVirtualMachineScaleSetVMsClient struct { - client compute.VirtualMachineScaleSetVMsClient - rateLimiter flowcontrol.RateLimiter + client compute.VirtualMachineScaleSetVMsClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMachineScaleSetVMsClient { @@ -705,13 +914,18 @@ func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMach configureUserAgent(&virtualMachineScaleSetVMsClient.Client) return &azVirtualMachineScaleSetVMsClient{ - client: virtualMachineScaleSetVMsClient, - rateLimiter: config.rateLimiter, + client: virtualMachineScaleSetVMsClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVM, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSGet") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID) @@ -724,7 +938,11 @@ func (az *azVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, VMSca } func (az *azVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVMInstanceView, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSGetInstanceView") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.GetInstanceView(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.GetInstanceView(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID) @@ -737,7 +955,11 @@ func (az *azVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupName s } func (az *azVirtualMachineScaleSetVMsClient) List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSList") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): start", resourceGroupName, virtualMachineScaleSetName, filter) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): end", resourceGroupName, virtualMachineScaleSetName, filter) @@ -750,7 +972,11 @@ func (az *azVirtualMachineScaleSetVMsClient) List(resourceGroupName string, virt } func (az *azVirtualMachineScaleSetVMsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "VMSSListNextResults") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.ListNextResults(%q,%q,%q): start", lastResults) defer func() { glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.ListNextResults(%q,%q,%q): end", lastResults) @@ -764,8 +990,9 @@ func (az *azVirtualMachineScaleSetVMsClient) ListNextResults(resourceGroupName s // azRoutesClient implements RoutesClient. type azRoutesClient struct { - client network.RoutesClient - rateLimiter flowcontrol.RateLimiter + client network.RoutesClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzRoutesClient(config *azClientConfig) *azRoutesClient { @@ -776,13 +1003,23 @@ func newAzRoutesClient(config *azClientConfig) *azRoutesClient { configureUserAgent(&routesClient.Client) return &azRoutesClient{ - client: routesClient, - rateLimiter: config.rateLimiter, + client: routesClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azRoutesClient) CreateOrUpdate(resourceGroupName string, routeTableName string, routeName string, routeParameters network.Route, cancel <-chan struct{}) (<-chan network.Route, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "RouteCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.Route, 1) + errChan <- err + resultChan <- network.Route{} + return resultChan, errChan + } + glog.V(10).Infof("azRoutesClient.CreateOrUpdate(%q,%q,%q): start", resourceGroupName, routeTableName, routeName) defer func() { glog.V(10).Infof("azRoutesClient.CreateOrUpdate(%q,%q,%q): end", resourceGroupName, routeTableName, routeName) @@ -798,7 +1035,16 @@ func (az *azRoutesClient) CreateOrUpdate(resourceGroupName string, routeTableNam } func (az *azRoutesClient) Delete(resourceGroupName string, routeTableName string, routeName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "RouteDelete") + errChan := make(chan error, 1) + resultChan := make(chan autorest.Response, 1) + errChan <- err + resultChan <- autorest.Response{} + return resultChan, errChan + } + glog.V(10).Infof("azRoutesClient.Delete(%q,%q,%q): start", resourceGroupName, routeTableName, routeName) defer func() { glog.V(10).Infof("azRoutesClient.Delete(%q,%q,%q): end", resourceGroupName, routeTableName, routeName) @@ -815,8 +1061,9 @@ func (az *azRoutesClient) Delete(resourceGroupName string, routeTableName string // azRouteTablesClient implements RouteTablesClient. type azRouteTablesClient struct { - client network.RouteTablesClient - rateLimiter flowcontrol.RateLimiter + client network.RouteTablesClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzRouteTablesClient(config *azClientConfig) *azRouteTablesClient { @@ -827,13 +1074,23 @@ func newAzRouteTablesClient(config *azClientConfig) *azRouteTablesClient { configureUserAgent(&routeTablesClient.Client) return &azRouteTablesClient{ - client: routeTablesClient, - rateLimiter: config.rateLimiter, + client: routeTablesClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azRouteTablesClient) CreateOrUpdate(resourceGroupName string, routeTableName string, parameters network.RouteTable, cancel <-chan struct{}) (<-chan network.RouteTable, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "RouteTableCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan network.RouteTable, 1) + errChan <- err + resultChan <- network.RouteTable{} + return resultChan, errChan + } + glog.V(10).Infof("azRouteTablesClient.CreateOrUpdate(%q,%q): start", resourceGroupName, routeTableName) defer func() { glog.V(10).Infof("azRouteTablesClient.CreateOrUpdate(%q,%q): end", resourceGroupName, routeTableName) @@ -849,7 +1106,11 @@ func (az *azRouteTablesClient) CreateOrUpdate(resourceGroupName string, routeTab } func (az *azRouteTablesClient) Get(resourceGroupName string, routeTableName string, expand string) (result network.RouteTable, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "GetRouteTable") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azRouteTablesClient.Get(%q,%q): start", resourceGroupName, routeTableName) defer func() { glog.V(10).Infof("azRouteTablesClient.Get(%q,%q): end", resourceGroupName, routeTableName) @@ -863,8 +1124,9 @@ func (az *azRouteTablesClient) Get(resourceGroupName string, routeTableName stri // azStorageAccountClient implements StorageAccountClient. type azStorageAccountClient struct { - client storage.AccountsClient - rateLimiter flowcontrol.RateLimiter + client storage.AccountsClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzStorageAccountClient(config *azClientConfig) *azStorageAccountClient { @@ -874,13 +1136,23 @@ func newAzStorageAccountClient(config *azClientConfig) *azStorageAccountClient { configureUserAgent(&storageAccountClient.Client) return &azStorageAccountClient{ - client: storageAccountClient, - rateLimiter: config.rateLimiter, + client: storageAccountClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azStorageAccountClient) Create(resourceGroupName string, accountName string, parameters storage.AccountCreateParameters, cancel <-chan struct{}) (<-chan storage.Account, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "StorageAccountCreate") + errChan := make(chan error, 1) + resultChan := make(chan storage.Account, 1) + errChan <- err + resultChan <- storage.Account{} + return resultChan, errChan + } + glog.V(10).Infof("azStorageAccountClient.Create(%q,%q): start", resourceGroupName, accountName) defer func() { glog.V(10).Infof("azStorageAccountClient.Create(%q,%q): end", resourceGroupName, accountName) @@ -896,7 +1168,11 @@ func (az *azStorageAccountClient) Create(resourceGroupName string, accountName s } func (az *azStorageAccountClient) Delete(resourceGroupName string, accountName string) (result autorest.Response, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "DeleteStorageAccount") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azStorageAccountClient.Delete(%q,%q): start", resourceGroupName, accountName) defer func() { glog.V(10).Infof("azStorageAccountClient.Delete(%q,%q): end", resourceGroupName, accountName) @@ -909,7 +1185,11 @@ func (az *azStorageAccountClient) Delete(resourceGroupName string, accountName s } func (az *azStorageAccountClient) ListKeys(resourceGroupName string, accountName string) (result storage.AccountListKeysResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "ListStorageAccountKeys") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azStorageAccountClient.ListKeys(%q,%q): start", resourceGroupName, accountName) defer func() { glog.V(10).Infof("azStorageAccountClient.ListKeys(%q,%q): end", resourceGroupName, accountName) @@ -922,7 +1202,11 @@ func (az *azStorageAccountClient) ListKeys(resourceGroupName string, accountName } func (az *azStorageAccountClient) ListByResourceGroup(resourceGroupName string) (result storage.AccountListResult, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "ListStorageAccountsByResourceGroup") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azStorageAccountClient.ListByResourceGroup(%q): start", resourceGroupName) defer func() { glog.V(10).Infof("azStorageAccountClient.ListByResourceGroup(%q): end", resourceGroupName) @@ -935,7 +1219,11 @@ func (az *azStorageAccountClient) ListByResourceGroup(resourceGroupName string) } func (az *azStorageAccountClient) GetProperties(resourceGroupName string, accountName string) (result storage.Account, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "GetStorageAccount/Properties") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azStorageAccountClient.GetProperties(%q,%q): start", resourceGroupName, accountName) defer func() { glog.V(10).Infof("azStorageAccountClient.GetProperties(%q,%q): end", resourceGroupName, accountName) @@ -949,8 +1237,9 @@ func (az *azStorageAccountClient) GetProperties(resourceGroupName string, accoun // azDisksClient implements DisksClient. type azDisksClient struct { - client disk.DisksClient - rateLimiter flowcontrol.RateLimiter + client disk.DisksClient + rateLimiterReader flowcontrol.RateLimiter + rateLimiterWriter flowcontrol.RateLimiter } func newAzDisksClient(config *azClientConfig) *azDisksClient { @@ -960,13 +1249,24 @@ func newAzDisksClient(config *azClientConfig) *azDisksClient { configureUserAgent(&disksClient.Client) return &azDisksClient{ - client: disksClient, - rateLimiter: config.rateLimiter, + client: disksClient, + rateLimiterReader: config.rateLimiterReader, + rateLimiterWriter: config.rateLimiterWriter, } } func (az *azDisksClient) CreateOrUpdate(resourceGroupName string, diskName string, diskParameter disk.Model, cancel <-chan struct{}) (<-chan disk.Model, <-chan error) { - az.rateLimiter.Accept() + + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "DiskCreateOrUpdate") + errChan := make(chan error, 1) + resultChan := make(chan disk.Model, 1) + errChan <- err + resultChan <- disk.Model{} + return resultChan, errChan + } + glog.V(10).Infof("azDisksClient.CreateOrUpdate(%q,%q): start", resourceGroupName, diskName) defer func() { glog.V(10).Infof("azDisksClient.CreateOrUpdate(%q,%q): end", resourceGroupName, diskName) @@ -982,7 +1282,16 @@ func (az *azDisksClient) CreateOrUpdate(resourceGroupName string, diskName strin } func (az *azDisksClient) Delete(resourceGroupName string, diskName string, cancel <-chan struct{}) (<-chan disk.OperationStatusResponse, <-chan error) { - az.rateLimiter.Accept() + /* Write rate limiting */ + if !az.rateLimiterWriter.TryAccept() { + err := createArmRateLimitErr(true, "DiskDelete") + errChan := make(chan error, 1) + resultChan := make(chan disk.OperationStatusResponse, 1) + errChan <- err + resultChan <- disk.OperationStatusResponse{} + return resultChan, errChan + } + glog.V(10).Infof("azDisksClient.Delete(%q,%q): start", resourceGroupName, diskName) defer func() { glog.V(10).Infof("azDisksClient.Delete(%q,%q): end", resourceGroupName, diskName) @@ -998,7 +1307,11 @@ func (az *azDisksClient) Delete(resourceGroupName string, diskName string, cance } func (az *azDisksClient) Get(resourceGroupName string, diskName string) (result disk.Model, err error) { - az.rateLimiter.Accept() + err = createArmRateLimitErr(false, "GetDisk") + if !az.rateLimiterReader.TryAccept() { + return + } + glog.V(10).Infof("azDisksClient.Get(%q,%q): start", resourceGroupName, diskName) defer func() { glog.V(10).Infof("azDisksClient.Get(%q,%q): end", resourceGroupName, diskName)