From 3171aac57c5ad99d7f9efdd631d087ad0cde7039 Mon Sep 17 00:00:00 2001 From: gmarek Date: Wed, 13 Apr 2016 20:38:32 +0200 Subject: [PATCH] Generated clients can return their RESTClients, RESTClient can return its RateLimiter --- .../fake/generator_fake_for_group.go | 20 ++++-- .../generators/generator_for_clientset.go | 3 + .../generators/generator_for_group.go | 13 ++++ .../test_internalclientset/clientset.go | 3 + .../test_internalclientset/clientset_test.go | 43 +++++++++++ .../unversioned/fake/fake_testgroup_client.go | 7 ++ .../unversioned/testgroup_client.go | 10 +++ contrib/mesos/pkg/executor/executor.go | 6 +- .../internalclientset/clientset.go | 9 +++ .../typed/batch/unversioned/batch_client.go | 10 +++ .../unversioned/fake/fake_batch_client.go | 7 ++ .../typed/core/unversioned/core_client.go | 10 +++ .../core/unversioned/fake/fake_core_client.go | 7 ++ .../unversioned/extensions_client.go | 10 +++ .../fake/fake_extensions_client.go | 7 ++ pkg/client/restclient/client.go | 8 +++ pkg/controller/daemon/controller.go | 4 ++ .../deployment/deployment_controller.go | 4 ++ .../endpoint/endpoints_controller.go | 4 ++ pkg/controller/gc/gc_controller.go | 4 ++ pkg/controller/job/controller.go | 5 ++ .../namespace/namespace_controller.go | 5 ++ pkg/controller/node/nodecontroller.go | 6 ++ ...ersistentvolume_claim_binder_controller.go | 4 ++ .../persistentvolume_recycler_controller.go | 4 ++ pkg/controller/replicaset/replica_set.go | 5 ++ .../replication/replication_controller.go | 5 ++ .../resourcequota/replenishment_controller.go | 5 ++ .../resource_quota_controller.go | 5 +- pkg/controller/route/routecontroller.go | 4 ++ pkg/controller/service/servicecontroller.go | 5 ++ .../serviceaccounts_controller.go | 5 +- .../serviceaccount/tokens_controller.go | 5 +- pkg/metrics/controller_manager_metrics.go | 16 +++++ pkg/util/metrics/util.go | 71 +++++++++++++++++++ 35 files changed, 330 insertions(+), 9 deletions(-) create mode 100644 cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset_test.go create mode 100644 pkg/util/metrics/util.go diff --git a/cmd/libs/go2idl/client-gen/generators/fake/generator_fake_for_group.go b/cmd/libs/go2idl/client-gen/generators/fake/generator_fake_for_group.go index 2497e6baddc..88f2d64ec39 100644 --- a/cmd/libs/go2idl/client-gen/generators/fake/generator_fake_for_group.go +++ b/cmd/libs/go2idl/client-gen/generators/fake/generator_fake_for_group.go @@ -51,16 +51,19 @@ func (g *genFakeForGroup) Namers(c *generator.Context) namer.NameSystems { } func (g *genFakeForGroup) Imports(c *generator.Context) (imports []string) { - return append(g.imports.ImportLines(), fmt.Sprintf("%s \"%s\"", filepath.Base(g.realClientPath), g.realClientPath)) + imports = append(g.imports.ImportLines(), fmt.Sprintf("%s \"%s\"", filepath.Base(g.realClientPath), g.realClientPath)) + return imports } func (g *genFakeForGroup) GenerateType(c *generator.Context, t *types.Type, w io.Writer) error { sw := generator.NewSnippetWriter(w, c, "$", "$") const pkgTestingCore = "k8s.io/kubernetes/pkg/client/testing/core" + const pkgRESTClient = "k8s.io/kubernetes/pkg/client/restclient" m := map[string]interface{}{ - "group": g.group, - "Group": namer.IC(g.group), - "Fake": c.Universe.Type(types.Name{Package: pkgTestingCore, Name: "Fake"}), + "group": g.group, + "Group": namer.IC(g.group), + "Fake": c.Universe.Type(types.Name{Package: pkgTestingCore, Name: "Fake"}), + "RESTClient": c.Universe.Type(types.Name{Package: pkgRESTClient, Name: "RESTClient"}), } sw.Do(groupClientTemplate, m) for _, t := range g.types { @@ -77,6 +80,7 @@ func (g *genFakeForGroup) GenerateType(c *generator.Context, t *types.Type, w io } } + sw.Do(getRESTClient, m) return sw.Error() } @@ -97,3 +101,11 @@ func (c *Fake$.Group$) $.type|publicPlural$() $.realClientPackage$.$.type|public return &Fake$.type|publicPlural${c} } ` + +var getRESTClient = ` +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *Fake$.Group$) GetRESTClient() *$.RESTClient|raw$ { + return nil +} +` diff --git a/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go b/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go index 84ed6e997cf..c50be650d1f 100644 --- a/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go +++ b/cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go @@ -131,6 +131,9 @@ type Clientset struct { var clientsetInterfaceImplTemplate = ` // $.Group$ retrieves the $.Group$Client func (c *Clientset) $.Group$() $.PackageName$.$.Group$Interface { + if c == nil { + return nil + } return c.$.Group$Client } ` diff --git a/cmd/libs/go2idl/client-gen/generators/generator_for_group.go b/cmd/libs/go2idl/client-gen/generators/generator_for_group.go index c36207d3187..afc4064efe4 100644 --- a/cmd/libs/go2idl/client-gen/generators/generator_for_group.go +++ b/cmd/libs/go2idl/client-gen/generators/generator_for_group.go @@ -111,12 +111,14 @@ func (g *genGroup) GenerateType(c *generator.Context, t *types.Type, w io.Writer } else { sw.Do(setClientDefaultsTemplate, m) } + sw.Do(getRESTClient, m) return sw.Error() } var groupInterfaceTemplate = ` type $.Group$Interface interface { + GetRESTClient() *$.RESTClient|raw$ $range .types$ $.|publicPlural$Getter $end$ } @@ -168,6 +170,17 @@ func NewForConfigOrDie(c *$.Config|raw$) *$.Group$Client { } ` +var getRESTClient = ` +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *$.Group$Client) GetRESTClient() *$.RESTClient|raw$ { + if c == nil { + return nil + } + return c.RESTClient +} +` + var newClientForRESTClientTemplate = ` // New creates a new $.Group$Client for the given RESTClient. func New(c *$.RESTClient|raw$) *$.Group$Client { diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go index 003536335bc..2ff07a670b3 100644 --- a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset.go @@ -38,6 +38,9 @@ type Clientset struct { // Testgroup retrieves the TestgroupClient func (c *Clientset) Testgroup() unversionedtestgroup.TestgroupInterface { + if c == nil { + return nil + } return c.TestgroupClient } diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset_test.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset_test.go new file mode 100644 index 00000000000..42b4f0bac2f --- /dev/null +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/clientset_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test_internalclientset + +import ( + "testing" + + restclient "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/util/flowcontrol" +) + +func ClientSetRateLimiterTest(t *testing.T) { + rateLimiter := flowcontrol.NewTokenBucketRateLimiter(1.0, 10) + config := restclient.Config{ + RateLimiter: rateLimiter, + } + if err := restclient.SetKubernetesDefaults(&config); err != nil { + t.Errorf("setting defaults failed for %#v: %v", config, err) + } + clientSet, err := NewForConfig(&config) + if err != nil { + t.Errorf("creating clientset for config %v failed: %v", config, err) + } + testGroupThrottler := clientSet.Testgroup().GetRESTClient().GetRateLimiter() + + if rateLimiter != testGroupThrottler { + t.Errorf("Clients in client set should use rateLimiter passed in config:\noriginal: %v\ntestGroup: %v", rateLimiter, testGroupThrottler) + } +} diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/fake/fake_testgroup_client.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/fake/fake_testgroup_client.go index eba165e0bea..774f75d673c 100644 --- a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/fake/fake_testgroup_client.go +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/fake/fake_testgroup_client.go @@ -18,6 +18,7 @@ package fake import ( unversioned "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned" + restclient "k8s.io/kubernetes/pkg/client/restclient" core "k8s.io/kubernetes/pkg/client/testing/core" ) @@ -28,3 +29,9 @@ type FakeTestgroup struct { func (c *FakeTestgroup) TestTypes(namespace string) unversioned.TestTypeInterface { return &FakeTestTypes{c, namespace} } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeTestgroup) GetRESTClient() *restclient.RESTClient { + return nil +} diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go index eed0d008ab9..56d08722b3e 100644 --- a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go @@ -23,6 +23,7 @@ import ( ) type TestgroupInterface interface { + GetRESTClient() *restclient.RESTClient TestTypesGetter } @@ -88,3 +89,12 @@ func setConfigDefaults(config *restclient.Config) error { } return nil } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *TestgroupClient) GetRESTClient() *restclient.RESTClient { + if c == nil { + return nil + } + return c.RESTClient +} diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 3dfe68d14d2..e9a834cdf1a 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -150,8 +150,10 @@ func New(config Config) *Executor { nodeInfos: config.NodeInfos, initCompleted: make(chan struct{}), registry: config.Registry, - kubeAPI: &clientAPIWrapper{config.APIClient}, - nodeAPI: &clientAPIWrapper{config.APIClient}, + } + if config.APIClient != nil { + k.kubeAPI = &clientAPIWrapper{config.APIClient.Core()} + k.nodeAPI = &clientAPIWrapper{config.APIClient.Core()} } // apply functional options diff --git a/pkg/client/clientset_generated/internalclientset/clientset.go b/pkg/client/clientset_generated/internalclientset/clientset.go index 1810e83d98b..d2060d74111 100644 --- a/pkg/client/clientset_generated/internalclientset/clientset.go +++ b/pkg/client/clientset_generated/internalclientset/clientset.go @@ -44,16 +44,25 @@ type Clientset struct { // Core retrieves the CoreClient func (c *Clientset) Core() unversionedcore.CoreInterface { + if c == nil { + return nil + } return c.CoreClient } // Extensions retrieves the ExtensionsClient func (c *Clientset) Extensions() unversionedextensions.ExtensionsInterface { + if c == nil { + return nil + } return c.ExtensionsClient } // Batch retrieves the BatchClient func (c *Clientset) Batch() unversionedbatch.BatchInterface { + if c == nil { + return nil + } return c.BatchClient } diff --git a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go index 666faa5a098..e42c2d84a6e 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go @@ -23,6 +23,7 @@ import ( ) type BatchInterface interface { + GetRESTClient() *restclient.RESTClient JobsGetter } @@ -88,3 +89,12 @@ func setConfigDefaults(config *restclient.Config) error { } return nil } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *BatchClient) GetRESTClient() *restclient.RESTClient { + if c == nil { + return nil + } + return c.RESTClient +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/fake/fake_batch_client.go b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/fake/fake_batch_client.go index 094e81f48ae..8845934671c 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/fake/fake_batch_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/fake/fake_batch_client.go @@ -18,6 +18,7 @@ package fake import ( unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned" + restclient "k8s.io/kubernetes/pkg/client/restclient" core "k8s.io/kubernetes/pkg/client/testing/core" ) @@ -28,3 +29,9 @@ type FakeBatch struct { func (c *FakeBatch) Jobs(namespace string) unversioned.JobInterface { return &FakeJobs{c, namespace} } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeBatch) GetRESTClient() *restclient.RESTClient { + return nil +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go index dc3561c0b49..9250a2261b5 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go @@ -23,6 +23,7 @@ import ( ) type CoreInterface interface { + GetRESTClient() *restclient.RESTClient ComponentStatusesGetter ConfigMapsGetter EndpointsGetter @@ -163,3 +164,12 @@ func setConfigDefaults(config *restclient.Config) error { } return nil } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *CoreClient) GetRESTClient() *restclient.RESTClient { + if c == nil { + return nil + } + return c.RESTClient +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/fake/fake_core_client.go b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/fake/fake_core_client.go index 632ec173e19..533735e7c1b 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/fake/fake_core_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/fake/fake_core_client.go @@ -18,6 +18,7 @@ package fake import ( unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + restclient "k8s.io/kubernetes/pkg/client/restclient" core "k8s.io/kubernetes/pkg/client/testing/core" ) @@ -88,3 +89,9 @@ func (c *FakeCore) Services(namespace string) unversioned.ServiceInterface { func (c *FakeCore) ServiceAccounts(namespace string) unversioned.ServiceAccountInterface { return &FakeServiceAccounts{c, namespace} } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeCore) GetRESTClient() *restclient.RESTClient { + return nil +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go index 9a0fafc933c..d8b97a05ed9 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go @@ -23,6 +23,7 @@ import ( ) type ExtensionsInterface interface { + GetRESTClient() *restclient.RESTClient DaemonSetsGetter DeploymentsGetter HorizontalPodAutoscalersGetter @@ -118,3 +119,12 @@ func setConfigDefaults(config *restclient.Config) error { } return nil } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *ExtensionsClient) GetRESTClient() *restclient.RESTClient { + if c == nil { + return nil + } + return c.RESTClient +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/fake/fake_extensions_client.go b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/fake/fake_extensions_client.go index 4f509cc51c3..0148dab06d1 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/fake/fake_extensions_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/fake/fake_extensions_client.go @@ -18,6 +18,7 @@ package fake import ( unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" + restclient "k8s.io/kubernetes/pkg/client/restclient" core "k8s.io/kubernetes/pkg/client/testing/core" ) @@ -52,3 +53,9 @@ func (c *FakeExtensions) Scales(namespace string) unversioned.ScaleInterface { func (c *FakeExtensions) ThirdPartyResources(namespace string) unversioned.ThirdPartyResourceInterface { return &FakeThirdPartyResources{c, namespace} } + +// GetRESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeExtensions) GetRESTClient() *restclient.RESTClient { + return nil +} diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index 0494c28a5af..b66884f91ab 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -93,6 +93,14 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf } } +// GetRateLimiter returns rate limier for a given client, or nil if it's called on a nil client +func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter { + if c == nil { + return nil + } + return c.Throttle +} + // readExpBackoffConfig handles the internal logic of determining what the // backoff policy is. By default if no information is available, NoBackoff. // TODO Generalize this see #17727 . diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 84c4c2dafde..8e6d970f427 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" @@ -113,6 +114,9 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } dsc := &DaemonSetsController{ kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemonset-controller"}), diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 85442571556..5af0be405ad 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -40,6 +40,7 @@ import ( utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/integer" labelsutil "k8s.io/kubernetes/pkg/util/labels" + "k8s.io/kubernetes/pkg/util/metrics" podutil "k8s.io/kubernetes/pkg/util/pod" rsutil "k8s.io/kubernetes/pkg/util/replicaset" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -97,6 +98,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: client.Core().Events("")}) + if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.Core().GetRESTClient().GetRateLimiter()) + } dc := &DeploymentController{ client: client, eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 26fdad8669c..6dbc2364f01 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -60,6 +61,9 @@ var ( // NewEndpointController returns a new *EndpointController. func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController { + if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter()) + } e := &EndpointController{ client: client, queue: workqueue.New(), diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index bf09ae928b2..1129794c09d 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" @@ -49,6 +50,9 @@ type GCController struct { } func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController { + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } gcc := &GCController{ kubeClient: kubeClient, threshold: threshold, diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 63e504a2ebf..ee495555b1f 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -82,6 +83,10 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + jm := &JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index a1c6d4aacac..23d7fefeb4d 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -72,6 +73,10 @@ func NewNamespaceController( finalizerToken: finalizerToken, } + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + // configure the backing store/controller store, controller := framework.NewInformer( &cache.ListWatch{ diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index c73e4a00f0d..3c092ab476d 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -145,6 +146,11 @@ func NewNodeController( } else { glog.Infof("No api server defined - no events will be sent to API server.") } + + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + if allocateNodeCIDRs && clusterCIDR == nil { glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") } diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index 12de9f0177c..a0e105a09c4 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -45,6 +46,9 @@ type PersistentVolumeClaimBinder struct { // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("pv_claim_binder_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } volumeIndex := NewPersistentVolumeOrderedIndex() binderClient := NewBinderClient(kubeClient) binder := &PersistentVolumeClaimBinder{ diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index a0cb6aa8144..20dd1b83eb8 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" ioutil "k8s.io/kubernetes/pkg/util/io" + "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" @@ -66,6 +67,9 @@ type releasedVolumeStatus struct { // NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) { recyclerClient := NewRecyclerClient(kubeClient) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("pv_recycler_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } recycler := &PersistentVolumeRecycler{ client: recyclerClient, kubeClient: kubeClient, diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 68cfc5ec9e2..6407dc02386 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -100,6 +101,10 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + rsc := &ReplicaSetController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 42b72a9ef22..1d716672acc 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -106,6 +107,10 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index 19cd1e8895e..a344bec8709 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" ) @@ -113,6 +114,10 @@ func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) { var result framework.ControllerInterface + if r.kubeClient != nil && r.kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + switch options.GroupKind { case api.Kind("Pod"): if r.podInformer != nil { diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 47b684d462e..cb965ef7ef8 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -81,7 +82,9 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour registry: options.Registry, replenishmentControllers: []framework.ControllerInterface{}, } - + if options.KubeClient != nil && options.KubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().GetRESTClient().GetRateLimiter()) + } // set the synchronization handler rq.syncHandler = rq.syncResourceQuotaFromKey diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index c297347cc56..206a5c79fbc 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/wait" ) @@ -37,6 +38,9 @@ type RouteController struct { } func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController { + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } return &RouteController{ routes: routes, kubeClient: kubeClient, diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 1f50a40e393..0689552b8ae 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/runtime" ) @@ -92,6 +93,10 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"}) + if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) + } + return &ServiceController{ cloud: cloud, kubeClient: kubeClient, diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index dd385312791..084ddae500a 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/watch" ) @@ -71,7 +72,9 @@ func NewServiceAccountsController(cl clientset.Interface, options ServiceAccount client: cl, serviceAccountsToEnsure: options.ServiceAccounts, } - + if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter()) + } accountSelector := fields.Everything() if len(options.ServiceAccounts) == 1 { // If we're maintaining a single account, we can scope the accounts we watch to just that name diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 168941493f4..b660afc963d 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -68,7 +69,9 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions token: options.TokenGenerator, rootCA: options.RootCA, } - + if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter()) + } e.serviceAccounts, e.serviceAccountController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { diff --git a/pkg/metrics/controller_manager_metrics.go b/pkg/metrics/controller_manager_metrics.go index defbfc0bb56..ed99caef308 100644 --- a/pkg/metrics/controller_manager_metrics.go +++ b/pkg/metrics/controller_manager_metrics.go @@ -38,6 +38,22 @@ var KnownControllerManagerMetrics = map[string][]string{ "rest_client_request_latency_microseconds_count": {"url", "verb"}, "rest_client_request_latency_microseconds_sum": {"url", "verb"}, "rest_client_request_status_codes": {"method", "code", "host"}, + "pv_recycler_controller_rate_limiter_use": {}, + "node_controller_rate_limiter_use": {}, + "serviceaccount_controller_rate_limiter_use": {}, + "route_controller_rate_limiter_use": {}, + "resource_quota_controller_rate_limiter_use": {}, + "replenishment_controller_rate_limiter_use": {}, + "job_controller_rate_limiter_use": {}, + "gc_controller_rate_limiter_use": {}, + "endpoint_controller_rate_limiter_use": {}, + "replication_controller_rate_limiter_use": {}, + "replicaset_controller_rate_limiter_use": {}, + "deployment_controller_rate_limiter_use": {}, + "service_controller_rate_limiter_use": {}, + "pv_claim_binder_controller_rate_limiter_use": {}, + "namespace_controller_rate_limiter_use": {}, + "daemon_controller_rate_limiter_use": {}, } type ControllerManagerMetrics Metrics diff --git a/pkg/util/metrics/util.go b/pkg/util/metrics/util.go new file mode 100644 index 00000000000..ab74e35c463 --- /dev/null +++ b/pkg/util/metrics/util.go @@ -0,0 +1,71 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "fmt" + "sync" + "time" + + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + updatePeriod = 5 * time.Second +) + +var ( + metricsLock sync.Mutex + rateLimiterMetrics map[string]prometheus.Gauge = make(map[string]prometheus.Gauge) +) + +func registerRateLimiterMetric(ownerName string) error { + metricsLock.Lock() + defer metricsLock.Unlock() + + if _, ok := rateLimiterMetrics[ownerName]; ok { + glog.Errorf("Metric for %v already registered", ownerName) + return fmt.Errorf("Metric for %v already registered", ownerName) + } + metric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "rate_limiter_use", + Subsystem: ownerName, + Help: fmt.Sprintf("A metric measuring the saturation of the rate limiter for %v", ownerName), + }) + rateLimiterMetrics[ownerName] = metric + prometheus.MustRegister(metric) + return nil +} + +// RegisterMetricAndTrackRateLimiterUsage registers a metric ownerName_rate_limiter_use in prometheus to track +// how much used rateLimiter is and starts a goroutine that updates this metric every updatePeriod +func RegisterMetricAndTrackRateLimiterUsage(ownerName string, rateLimiter flowcontrol.RateLimiter) error { + err := registerRateLimiterMetric(ownerName) + if err != nil { + return err + } + go wait.Forever(func() { + metricsLock.Lock() + defer metricsLock.Unlock() + rateLimiterMetrics[ownerName].Set(rateLimiter.Saturation()) + }, updatePeriod) + return nil +}