mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 14:23:37 +00:00
Merge pull request #6203 from brendandburns/qps
Add a QPS limiter to the kubernetes client.
This commit is contained in:
commit
88dbdc4a69
@ -208,7 +208,7 @@ func contactOthers(state *State) {
|
|||||||
Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
|
Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
|
||||||
Path: "/api/v1beta1",
|
Path: "/api/v1beta1",
|
||||||
}
|
}
|
||||||
client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true)}
|
client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 0)}
|
||||||
|
|
||||||
// Do this repeatedly, in case there's some propagation delay with getting
|
// Do this repeatedly, in case there's some propagation delay with getting
|
||||||
// newly started pods into the endpoints list.
|
// newly started pods into the endpoints list.
|
||||||
|
@ -508,7 +508,7 @@ __EOF__
|
|||||||
# Pre-condition: frontend replication controller is running
|
# Pre-condition: frontend replication controller is running
|
||||||
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:'
|
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:'
|
||||||
# Command
|
# Command
|
||||||
kubectl delete rc frontend-controller "${kube_flags[@]}"
|
kubectl stop rc frontend-controller "${kube_flags[@]}"
|
||||||
# Post-condition: no replication controller is running
|
# Post-condition: no replication controller is running
|
||||||
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''
|
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
|
|
||||||
@ -525,7 +525,7 @@ __EOF__
|
|||||||
# Pre-condition: frontend and redis-slave
|
# Pre-condition: frontend and redis-slave
|
||||||
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:redis-slave-controller:'
|
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:redis-slave-controller:'
|
||||||
# Command
|
# Command
|
||||||
kubectl delete rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once
|
kubectl stop rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once
|
||||||
# Post-condition: no replication controller is running
|
# Post-condition: no replication controller is running
|
||||||
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''
|
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
|
|
||||||
|
@ -77,6 +77,9 @@ type Config struct {
|
|||||||
// Transport may be used for custom HTTP behavior. This attribute may not
|
// Transport may be used for custom HTTP behavior. This attribute may not
|
||||||
// be specified with the TLS client certificate options.
|
// be specified with the TLS client certificate options.
|
||||||
Transport http.RoundTripper
|
Transport http.RoundTripper
|
||||||
|
|
||||||
|
// QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited.
|
||||||
|
QPS float32
|
||||||
}
|
}
|
||||||
|
|
||||||
type KubeletConfig struct {
|
type KubeletConfig struct {
|
||||||
@ -175,6 +178,9 @@ func SetKubernetesDefaults(config *Config) error {
|
|||||||
config.Codec = versionInterfaces.Codec
|
config.Codec = versionInterfaces.Codec
|
||||||
}
|
}
|
||||||
config.LegacyBehavior = (version == "v1beta1" || version == "v1beta2")
|
config.LegacyBehavior = (version == "v1beta1" || version == "v1beta2")
|
||||||
|
if config.QPS == 0.0 {
|
||||||
|
config.QPS = 5.0
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,7 +201,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior)
|
client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS)
|
||||||
|
|
||||||
transport, err := TransportFor(config)
|
transport, err := TransportFor(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -273,6 +273,7 @@ func TestSetKubernetesDefaults(t *testing.T) {
|
|||||||
Version: latest.Version,
|
Version: latest.Version,
|
||||||
Codec: latest.Codec,
|
Codec: latest.Codec,
|
||||||
LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"),
|
LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"),
|
||||||
|
QPS: 5,
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
|
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
|
||||||
@ -50,13 +51,16 @@ type RESTClient struct {
|
|||||||
Client HTTPClient
|
Client HTTPClient
|
||||||
|
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
|
||||||
|
Throttle util.RateLimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
|
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
|
||||||
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
|
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
|
||||||
// decoding of responses from the server. If this client should use the older, legacy
|
// decoding of responses from the server. If this client should use the older, legacy
|
||||||
// API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true.
|
// API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true.
|
||||||
func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool) *RESTClient {
|
func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32) *RESTClient {
|
||||||
base := *baseURL
|
base := *baseURL
|
||||||
if !strings.HasSuffix(base.Path, "/") {
|
if !strings.HasSuffix(base.Path, "/") {
|
||||||
base.Path += "/"
|
base.Path += "/"
|
||||||
@ -64,6 +68,10 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
|
|||||||
base.RawQuery = ""
|
base.RawQuery = ""
|
||||||
base.Fragment = ""
|
base.Fragment = ""
|
||||||
|
|
||||||
|
var throttle util.RateLimiter
|
||||||
|
if maxQPS > 0 {
|
||||||
|
throttle = util.NewTokenBucketRateLimiter(maxQPS, 10)
|
||||||
|
}
|
||||||
return &RESTClient{
|
return &RESTClient{
|
||||||
baseURL: &base,
|
baseURL: &base,
|
||||||
apiVersion: apiVersion,
|
apiVersion: apiVersion,
|
||||||
@ -71,6 +79,8 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
|
|||||||
Codec: c,
|
Codec: c,
|
||||||
|
|
||||||
LegacyBehavior: legacyBehavior,
|
LegacyBehavior: legacyBehavior,
|
||||||
|
|
||||||
|
Throttle: throttle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,11 +97,9 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
|
|||||||
// list, ok := resp.(*api.PodList)
|
// list, ok := resp.(*api.PodList)
|
||||||
//
|
//
|
||||||
func (c *RESTClient) Verb(verb string) *Request {
|
func (c *RESTClient) Verb(verb string) *Request {
|
||||||
// TODO: uncomment when Go 1.2 support is dropped
|
if c.Throttle != nil {
|
||||||
//var timeout time.Duration = 0
|
c.Throttle.Accept()
|
||||||
// if c.Client != nil {
|
}
|
||||||
// timeout = c.Client.Timeout
|
|
||||||
// }
|
|
||||||
return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Timeout(c.Timeout)
|
return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Timeout(c.Timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ func getFakeClient(t *testing.T, validURLs []string) (ClientPosterFunc, *httptes
|
|||||||
return func(mapping *meta.RESTMapping) (RESTClientPoster, error) {
|
return func(mapping *meta.RESTMapping) (RESTClientPoster, error) {
|
||||||
fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1")
|
fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1")
|
||||||
fakeUri, _ := url.Parse(server.URL + "/api/v1beta1")
|
fakeUri, _ := url.Parse(server.URL + "/api/v1beta1")
|
||||||
return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true), nil
|
return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 0), nil
|
||||||
}, server
|
}, server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user