From 9b5cdfb705a9ed53f5bb376133a17e6b5c051311 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Mon, 23 May 2016 23:23:53 -0700 Subject: [PATCH] GCE provider: Rate limit all API calls Instead of just rate limits to operation polling, send all API calls through a rate limited RoundTripper. This isn't a perfect solution, since the QPS is obviously getting split between different controllers, etc., but it's also spread across different APIs, which, in practice, rate limit differently. Fixes #26119 (hopefully) --- pkg/cloudprovider/providers/gce/gce.go | 56 ++++++++++++--------- pkg/cloudprovider/providers/gce/gce_test.go | 32 ++++++++++++ 2 files changed, 65 insertions(+), 23 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 20505d7098e..87d69c11748 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -76,16 +76,15 @@ const ( // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { - service *compute.Service - containerService *container.Service - projectID string - region string - localZone string // The zone in which we are running - managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) - networkURL string - nodeTags []string // List of tags to use on firewall rules for load balancers - useMetadataServer bool - operationPollRateLimiter flowcontrol.RateLimiter + service *compute.Service + containerService *container.Service + projectID string + region string + localZone string // The zone in which we are running + managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) + networkURL string + nodeTags []string // List of tags to use on firewall rules for load balancers + useMetadataServer bool } type Config struct { @@ -113,6 +112,16 @@ func (g *GCECloud) GetComputeService() *compute.Service { return g.service } +type rateLimitedRoundTripper struct { + rt http.RoundTripper + limiter flowcontrol.RateLimiter +} + +func (rl *rateLimitedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + rl.limiter.Accept() + return rl.rt.RoundTrip(req) +} + func getProjectAndZone() (string, string, error) { result, err := metadata.Get("instance/zone") if err != nil { @@ -283,6 +292,11 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo } client := oauth2.NewClient(oauth2.NoContext, tokenSource) + // Override the transport to make it rate-limited. + client.Transport = &rateLimitedRoundTripper{ + rt: client.Transport, + limiter: flowcontrol.NewTokenBucketRateLimiter(10, 100), // 10 qps, 100 bucket size. + } svc, err := compute.New(client) if err != nil { return nil, err @@ -311,19 +325,16 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo glog.Infof("managing multiple zones: %v", managedZones) } - operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. - return &GCECloud{ - service: svc, - containerService: containerSvc, - projectID: projectID, - region: region, - localZone: zone, - managedZones: managedZones, - networkURL: networkURL, - nodeTags: nodeTags, - useMetadataServer: useMetadataServer, - operationPollRateLimiter: operationPollRateLimiter, + service: svc, + containerService: containerSvc, + projectID: projectID, + region: region, + localZone: zone, + managedZones: managedZones, + networkURL: networkURL, + nodeTags: nodeTags, + useMetadataServer: useMetadataServer, }, nil } @@ -404,7 +415,6 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio opName := op.Name return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { start := time.Now() - gce.operationPollRateLimiter.Accept() duration := time.Now().Sub(start) if duration > 5*time.Second { glog.Infof("pollOperation: waited %v for %v", duration, opName) diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index f9b980c9a48..633025e681e 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -17,11 +17,15 @@ limitations under the License. package gce import ( + "net/http" + "net/http/httptest" "reflect" "testing" compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/rand" + utiltesting "k8s.io/kubernetes/pkg/util/testing" ) func TestGetRegion(t *testing.T) { @@ -260,3 +264,31 @@ func TestComputeUpdate(t *testing.T) { // } } } + +func TestRateLimitedRoundTripper(t *testing.T) { + handler := utiltesting.FakeHandler{StatusCode: 200} + server := httptest.NewServer(&handler) + defer server.Close() + + method := "GET" + path := "/foo/bar" + + req, err := http.NewRequest(method, server.URL+path, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + // TODO(zmerlynn): Validate the rate limiter is actually getting called. + client := http.Client{ + Transport: &rateLimitedRoundTripper{ + rt: http.DefaultTransport, + limiter: flowcontrol.NewFakeAlwaysRateLimiter(), + }, + } + _, err = client.Do(req) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + handler.ValidateRequest(t, path, method, nil) +}