mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
GCE provider: Rate limit all API calls
Instead of just rate limits to operation polling, send all API calls through a rate limited RoundTripper. This isn't a perfect solution, since the QPS is obviously getting split between different controllers, etc., but it's also spread across different APIs, which, in practice, rate limit differently. Fixes #26119 (hopefully)
This commit is contained in:
parent
bf0a5e9fac
commit
9b5cdfb705
@ -76,16 +76,15 @@ const (
|
||||
|
||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
|
||||
type GCECloud struct {
|
||||
service *compute.Service
|
||||
containerService *container.Service
|
||||
projectID string
|
||||
region string
|
||||
localZone string // The zone in which we are running
|
||||
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
|
||||
networkURL string
|
||||
nodeTags []string // List of tags to use on firewall rules for load balancers
|
||||
useMetadataServer bool
|
||||
operationPollRateLimiter flowcontrol.RateLimiter
|
||||
service *compute.Service
|
||||
containerService *container.Service
|
||||
projectID string
|
||||
region string
|
||||
localZone string // The zone in which we are running
|
||||
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
|
||||
networkURL string
|
||||
nodeTags []string // List of tags to use on firewall rules for load balancers
|
||||
useMetadataServer bool
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@ -113,6 +112,16 @@ func (g *GCECloud) GetComputeService() *compute.Service {
|
||||
return g.service
|
||||
}
|
||||
|
||||
type rateLimitedRoundTripper struct {
|
||||
rt http.RoundTripper
|
||||
limiter flowcontrol.RateLimiter
|
||||
}
|
||||
|
||||
func (rl *rateLimitedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
rl.limiter.Accept()
|
||||
return rl.rt.RoundTrip(req)
|
||||
}
|
||||
|
||||
func getProjectAndZone() (string, string, error) {
|
||||
result, err := metadata.Get("instance/zone")
|
||||
if err != nil {
|
||||
@ -283,6 +292,11 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
||||
}
|
||||
|
||||
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
|
||||
// Override the transport to make it rate-limited.
|
||||
client.Transport = &rateLimitedRoundTripper{
|
||||
rt: client.Transport,
|
||||
limiter: flowcontrol.NewTokenBucketRateLimiter(10, 100), // 10 qps, 100 bucket size.
|
||||
}
|
||||
svc, err := compute.New(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -311,19 +325,16 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
||||
glog.Infof("managing multiple zones: %v", managedZones)
|
||||
}
|
||||
|
||||
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
|
||||
|
||||
return &GCECloud{
|
||||
service: svc,
|
||||
containerService: containerSvc,
|
||||
projectID: projectID,
|
||||
region: region,
|
||||
localZone: zone,
|
||||
managedZones: managedZones,
|
||||
networkURL: networkURL,
|
||||
nodeTags: nodeTags,
|
||||
useMetadataServer: useMetadataServer,
|
||||
operationPollRateLimiter: operationPollRateLimiter,
|
||||
service: svc,
|
||||
containerService: containerSvc,
|
||||
projectID: projectID,
|
||||
region: region,
|
||||
localZone: zone,
|
||||
managedZones: managedZones,
|
||||
networkURL: networkURL,
|
||||
nodeTags: nodeTags,
|
||||
useMetadataServer: useMetadataServer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -404,7 +415,6 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
|
||||
opName := op.Name
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
start := time.Now()
|
||||
gce.operationPollRateLimiter.Accept()
|
||||
duration := time.Now().Sub(start)
|
||||
if duration > 5*time.Second {
|
||||
glog.Infof("pollOperation: waited %v for %v", duration, opName)
|
||||
|
@ -17,11 +17,15 @@ limitations under the License.
|
||||
package gce
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/util/rand"
|
||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||
)
|
||||
|
||||
func TestGetRegion(t *testing.T) {
|
||||
@ -260,3 +264,31 @@ func TestComputeUpdate(t *testing.T) {
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateLimitedRoundTripper(t *testing.T) {
|
||||
handler := utiltesting.FakeHandler{StatusCode: 200}
|
||||
server := httptest.NewServer(&handler)
|
||||
defer server.Close()
|
||||
|
||||
method := "GET"
|
||||
path := "/foo/bar"
|
||||
|
||||
req, err := http.NewRequest(method, server.URL+path, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// TODO(zmerlynn): Validate the rate limiter is actually getting called.
|
||||
client := http.Client{
|
||||
Transport: &rateLimitedRoundTripper{
|
||||
rt: http.DefaultTransport,
|
||||
limiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
||||
},
|
||||
}
|
||||
_, err = client.Do(req)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
handler.ValidateRequest(t, path, method, nil)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user