mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #26306 from zmerlynn/revert_rate_limits
Automatic merge from submit-queue GCE provider: Revert rate limits []() This reverts #26140 and #26170. After testing with #26263, #26140 is unnecessary, and we need to be able to prioritize normal GET / POST requests over operation polling requests, which is what the pre-#26140 requests do. c.f. #26119
This commit is contained in:
commit
d08b14efcf
@ -85,6 +85,7 @@ type GCECloud struct {
|
|||||||
networkURL string
|
networkURL string
|
||||||
nodeTags []string // List of tags to use on firewall rules for load balancers
|
nodeTags []string // List of tags to use on firewall rules for load balancers
|
||||||
useMetadataServer bool
|
useMetadataServer bool
|
||||||
|
operationPollRateLimiter flowcontrol.RateLimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -112,19 +113,6 @@ func (g *GCECloud) GetComputeService() *compute.Service {
|
|||||||
return g.service
|
return g.service
|
||||||
}
|
}
|
||||||
|
|
||||||
type rateLimitedRoundTripper struct {
|
|
||||||
rt http.RoundTripper
|
|
||||||
limiter flowcontrol.RateLimiter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rl *rateLimitedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
||||||
startTime := time.Now()
|
|
||||||
rl.limiter.Accept()
|
|
||||||
// TODO: Reduce verbosity once #26119 is fixed.
|
|
||||||
glog.V(0).Infof("GCE api call: %s %s (throttled for %v)", req.Method, req.URL.String(), time.Now().Sub(startTime))
|
|
||||||
return rl.rt.RoundTrip(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getProjectAndZone() (string, string, error) {
|
func getProjectAndZone() (string, string, error) {
|
||||||
result, err := metadata.Get("instance/zone")
|
result, err := metadata.Get("instance/zone")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -295,11 +283,6 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
|||||||
}
|
}
|
||||||
|
|
||||||
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
|
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
|
||||||
// Override the transport to make it rate-limited.
|
|
||||||
client.Transport = &rateLimitedRoundTripper{
|
|
||||||
rt: client.Transport,
|
|
||||||
limiter: flowcontrol.NewTokenBucketRateLimiter(10, 100), // 10 qps, 100 bucket size.
|
|
||||||
}
|
|
||||||
svc, err := compute.New(client)
|
svc, err := compute.New(client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -328,6 +311,8 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
|||||||
glog.Infof("managing multiple zones: %v", managedZones)
|
glog.Infof("managing multiple zones: %v", managedZones)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
|
||||||
|
|
||||||
return &GCECloud{
|
return &GCECloud{
|
||||||
service: svc,
|
service: svc,
|
||||||
containerService: containerSvc,
|
containerService: containerSvc,
|
||||||
@ -338,6 +323,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
|||||||
networkURL: networkURL,
|
networkURL: networkURL,
|
||||||
nodeTags: nodeTags,
|
nodeTags: nodeTags,
|
||||||
useMetadataServer: useMetadataServer,
|
useMetadataServer: useMetadataServer,
|
||||||
|
operationPollRateLimiter: operationPollRateLimiter,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -418,6 +404,7 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
|
|||||||
opName := op.Name
|
opName := op.Name
|
||||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
gce.operationPollRateLimiter.Accept()
|
||||||
duration := time.Now().Sub(start)
|
duration := time.Now().Sub(start)
|
||||||
if duration > 5*time.Second {
|
if duration > 5*time.Second {
|
||||||
glog.Infof("pollOperation: waited %v for %v", duration, opName)
|
glog.Infof("pollOperation: waited %v for %v", duration, opName)
|
||||||
|
@ -17,15 +17,11 @@ limitations under the License.
|
|||||||
package gce
|
package gce
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
compute "google.golang.org/api/compute/v1"
|
compute "google.golang.org/api/compute/v1"
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
|
||||||
"k8s.io/kubernetes/pkg/util/rand"
|
"k8s.io/kubernetes/pkg/util/rand"
|
||||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetRegion(t *testing.T) {
|
func TestGetRegion(t *testing.T) {
|
||||||
@ -264,31 +260,3 @@ func TestComputeUpdate(t *testing.T) {
|
|||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRateLimitedRoundTripper(t *testing.T) {
|
|
||||||
handler := utiltesting.FakeHandler{StatusCode: 200}
|
|
||||||
server := httptest.NewServer(&handler)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
method := "GET"
|
|
||||||
path := "/foo/bar"
|
|
||||||
|
|
||||||
req, err := http.NewRequest(method, server.URL+path, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(zmerlynn): Validate the rate limiter is actually getting called.
|
|
||||||
client := http.Client{
|
|
||||||
Transport: &rateLimitedRoundTripper{
|
|
||||||
rt: http.DefaultTransport,
|
|
||||||
limiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
_, err = client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
handler.ValidateRequest(t, path, method, nil)
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user