Generated clients can return their RESTClients, RESTClient can return its RateLimiter

This commit is contained in:
gmarek 2016-04-13 20:38:32 +02:00
parent 9db45590cf
commit 3171aac57c
35 changed files with 330 additions and 9 deletions

View File

@ -51,16 +51,19 @@ func (g *genFakeForGroup) Namers(c *generator.Context) namer.NameSystems {
} }
func (g *genFakeForGroup) Imports(c *generator.Context) (imports []string) { func (g *genFakeForGroup) Imports(c *generator.Context) (imports []string) {
return append(g.imports.ImportLines(), fmt.Sprintf("%s \"%s\"", filepath.Base(g.realClientPath), g.realClientPath)) imports = append(g.imports.ImportLines(), fmt.Sprintf("%s \"%s\"", filepath.Base(g.realClientPath), g.realClientPath))
return imports
} }
func (g *genFakeForGroup) GenerateType(c *generator.Context, t *types.Type, w io.Writer) error { func (g *genFakeForGroup) GenerateType(c *generator.Context, t *types.Type, w io.Writer) error {
sw := generator.NewSnippetWriter(w, c, "$", "$") sw := generator.NewSnippetWriter(w, c, "$", "$")
const pkgTestingCore = "k8s.io/kubernetes/pkg/client/testing/core" const pkgTestingCore = "k8s.io/kubernetes/pkg/client/testing/core"
const pkgRESTClient = "k8s.io/kubernetes/pkg/client/restclient"
m := map[string]interface{}{ m := map[string]interface{}{
"group": g.group, "group": g.group,
"Group": namer.IC(g.group), "Group": namer.IC(g.group),
"Fake": c.Universe.Type(types.Name{Package: pkgTestingCore, Name: "Fake"}), "Fake": c.Universe.Type(types.Name{Package: pkgTestingCore, Name: "Fake"}),
"RESTClient": c.Universe.Type(types.Name{Package: pkgRESTClient, Name: "RESTClient"}),
} }
sw.Do(groupClientTemplate, m) sw.Do(groupClientTemplate, m)
for _, t := range g.types { for _, t := range g.types {
@ -77,6 +80,7 @@ func (g *genFakeForGroup) GenerateType(c *generator.Context, t *types.Type, w io
} }
} }
sw.Do(getRESTClient, m)
return sw.Error() return sw.Error()
} }
@ -97,3 +101,11 @@ func (c *Fake$.Group$) $.type|publicPlural$() $.realClientPackage$.$.type|public
return &Fake$.type|publicPlural${c} return &Fake$.type|publicPlural${c}
} }
` `
var getRESTClient = `
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *Fake$.Group$) GetRESTClient() *$.RESTClient|raw$ {
return nil
}
`

View File

@ -131,6 +131,9 @@ type Clientset struct {
var clientsetInterfaceImplTemplate = ` var clientsetInterfaceImplTemplate = `
// $.Group$ retrieves the $.Group$Client // $.Group$ retrieves the $.Group$Client
func (c *Clientset) $.Group$() $.PackageName$.$.Group$Interface { func (c *Clientset) $.Group$() $.PackageName$.$.Group$Interface {
if c == nil {
return nil
}
return c.$.Group$Client return c.$.Group$Client
} }
` `

View File

@ -111,12 +111,14 @@ func (g *genGroup) GenerateType(c *generator.Context, t *types.Type, w io.Writer
} else { } else {
sw.Do(setClientDefaultsTemplate, m) sw.Do(setClientDefaultsTemplate, m)
} }
sw.Do(getRESTClient, m)
return sw.Error() return sw.Error()
} }
var groupInterfaceTemplate = ` var groupInterfaceTemplate = `
type $.Group$Interface interface { type $.Group$Interface interface {
GetRESTClient() *$.RESTClient|raw$
$range .types$ $.|publicPlural$Getter $range .types$ $.|publicPlural$Getter
$end$ $end$
} }
@ -168,6 +170,17 @@ func NewForConfigOrDie(c *$.Config|raw$) *$.Group$Client {
} }
` `
var getRESTClient = `
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *$.Group$Client) GetRESTClient() *$.RESTClient|raw$ {
if c == nil {
return nil
}
return c.RESTClient
}
`
var newClientForRESTClientTemplate = ` var newClientForRESTClientTemplate = `
// New creates a new $.Group$Client for the given RESTClient. // New creates a new $.Group$Client for the given RESTClient.
func New(c *$.RESTClient|raw$) *$.Group$Client { func New(c *$.RESTClient|raw$) *$.Group$Client {

View File

@ -38,6 +38,9 @@ type Clientset struct {
// Testgroup retrieves the TestgroupClient // Testgroup retrieves the TestgroupClient
func (c *Clientset) Testgroup() unversionedtestgroup.TestgroupInterface { func (c *Clientset) Testgroup() unversionedtestgroup.TestgroupInterface {
if c == nil {
return nil
}
return c.TestgroupClient return c.TestgroupClient
} }

View File

@ -0,0 +1,43 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package test_internalclientset
import (
"testing"
restclient "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
func ClientSetRateLimiterTest(t *testing.T) {
rateLimiter := flowcontrol.NewTokenBucketRateLimiter(1.0, 10)
config := restclient.Config{
RateLimiter: rateLimiter,
}
if err := restclient.SetKubernetesDefaults(&config); err != nil {
t.Errorf("setting defaults failed for %#v: %v", config, err)
}
clientSet, err := NewForConfig(&config)
if err != nil {
t.Errorf("creating clientset for config %v failed: %v", config, err)
}
testGroupThrottler := clientSet.Testgroup().GetRESTClient().GetRateLimiter()
if rateLimiter != testGroupThrottler {
t.Errorf("Clients in client set should use rateLimiter passed in config:\noriginal: %v\ntestGroup: %v", rateLimiter, testGroupThrottler)
}
}

View File

@ -18,6 +18,7 @@ package fake
import ( import (
unversioned "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned" unversioned "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
core "k8s.io/kubernetes/pkg/client/testing/core" core "k8s.io/kubernetes/pkg/client/testing/core"
) )
@ -28,3 +29,9 @@ type FakeTestgroup struct {
func (c *FakeTestgroup) TestTypes(namespace string) unversioned.TestTypeInterface { func (c *FakeTestgroup) TestTypes(namespace string) unversioned.TestTypeInterface {
return &FakeTestTypes{c, namespace} return &FakeTestTypes{c, namespace}
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *FakeTestgroup) GetRESTClient() *restclient.RESTClient {
return nil
}

View File

@ -23,6 +23,7 @@ import (
) )
type TestgroupInterface interface { type TestgroupInterface interface {
GetRESTClient() *restclient.RESTClient
TestTypesGetter TestTypesGetter
} }
@ -88,3 +89,12 @@ func setConfigDefaults(config *restclient.Config) error {
} }
return nil return nil
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *TestgroupClient) GetRESTClient() *restclient.RESTClient {
if c == nil {
return nil
}
return c.RESTClient
}

View File

@ -150,8 +150,10 @@ func New(config Config) *Executor {
nodeInfos: config.NodeInfos, nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}), initCompleted: make(chan struct{}),
registry: config.Registry, registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient}, }
nodeAPI: &clientAPIWrapper{config.APIClient}, if config.APIClient != nil {
k.kubeAPI = &clientAPIWrapper{config.APIClient.Core()}
k.nodeAPI = &clientAPIWrapper{config.APIClient.Core()}
} }
// apply functional options // apply functional options

View File

@ -44,16 +44,25 @@ type Clientset struct {
// Core retrieves the CoreClient // Core retrieves the CoreClient
func (c *Clientset) Core() unversionedcore.CoreInterface { func (c *Clientset) Core() unversionedcore.CoreInterface {
if c == nil {
return nil
}
return c.CoreClient return c.CoreClient
} }
// Extensions retrieves the ExtensionsClient // Extensions retrieves the ExtensionsClient
func (c *Clientset) Extensions() unversionedextensions.ExtensionsInterface { func (c *Clientset) Extensions() unversionedextensions.ExtensionsInterface {
if c == nil {
return nil
}
return c.ExtensionsClient return c.ExtensionsClient
} }
// Batch retrieves the BatchClient // Batch retrieves the BatchClient
func (c *Clientset) Batch() unversionedbatch.BatchInterface { func (c *Clientset) Batch() unversionedbatch.BatchInterface {
if c == nil {
return nil
}
return c.BatchClient return c.BatchClient
} }

View File

@ -23,6 +23,7 @@ import (
) )
type BatchInterface interface { type BatchInterface interface {
GetRESTClient() *restclient.RESTClient
JobsGetter JobsGetter
} }
@ -88,3 +89,12 @@ func setConfigDefaults(config *restclient.Config) error {
} }
return nil return nil
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *BatchClient) GetRESTClient() *restclient.RESTClient {
if c == nil {
return nil
}
return c.RESTClient
}

View File

@ -18,6 +18,7 @@ package fake
import ( import (
unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned" unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
core "k8s.io/kubernetes/pkg/client/testing/core" core "k8s.io/kubernetes/pkg/client/testing/core"
) )
@ -28,3 +29,9 @@ type FakeBatch struct {
func (c *FakeBatch) Jobs(namespace string) unversioned.JobInterface { func (c *FakeBatch) Jobs(namespace string) unversioned.JobInterface {
return &FakeJobs{c, namespace} return &FakeJobs{c, namespace}
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *FakeBatch) GetRESTClient() *restclient.RESTClient {
return nil
}

View File

@ -23,6 +23,7 @@ import (
) )
type CoreInterface interface { type CoreInterface interface {
GetRESTClient() *restclient.RESTClient
ComponentStatusesGetter ComponentStatusesGetter
ConfigMapsGetter ConfigMapsGetter
EndpointsGetter EndpointsGetter
@ -163,3 +164,12 @@ func setConfigDefaults(config *restclient.Config) error {
} }
return nil return nil
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *CoreClient) GetRESTClient() *restclient.RESTClient {
if c == nil {
return nil
}
return c.RESTClient
}

View File

@ -18,6 +18,7 @@ package fake
import ( import (
unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
core "k8s.io/kubernetes/pkg/client/testing/core" core "k8s.io/kubernetes/pkg/client/testing/core"
) )
@ -88,3 +89,9 @@ func (c *FakeCore) Services(namespace string) unversioned.ServiceInterface {
func (c *FakeCore) ServiceAccounts(namespace string) unversioned.ServiceAccountInterface { func (c *FakeCore) ServiceAccounts(namespace string) unversioned.ServiceAccountInterface {
return &FakeServiceAccounts{c, namespace} return &FakeServiceAccounts{c, namespace}
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *FakeCore) GetRESTClient() *restclient.RESTClient {
return nil
}

View File

@ -23,6 +23,7 @@ import (
) )
type ExtensionsInterface interface { type ExtensionsInterface interface {
GetRESTClient() *restclient.RESTClient
DaemonSetsGetter DaemonSetsGetter
DeploymentsGetter DeploymentsGetter
HorizontalPodAutoscalersGetter HorizontalPodAutoscalersGetter
@ -118,3 +119,12 @@ func setConfigDefaults(config *restclient.Config) error {
} }
return nil return nil
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *ExtensionsClient) GetRESTClient() *restclient.RESTClient {
if c == nil {
return nil
}
return c.RESTClient
}

View File

@ -18,6 +18,7 @@ package fake
import ( import (
unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" unversioned "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
core "k8s.io/kubernetes/pkg/client/testing/core" core "k8s.io/kubernetes/pkg/client/testing/core"
) )
@ -52,3 +53,9 @@ func (c *FakeExtensions) Scales(namespace string) unversioned.ScaleInterface {
func (c *FakeExtensions) ThirdPartyResources(namespace string) unversioned.ThirdPartyResourceInterface { func (c *FakeExtensions) ThirdPartyResources(namespace string) unversioned.ThirdPartyResourceInterface {
return &FakeThirdPartyResources{c, namespace} return &FakeThirdPartyResources{c, namespace}
} }
// GetRESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *FakeExtensions) GetRESTClient() *restclient.RESTClient {
return nil
}

View File

@ -93,6 +93,14 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
} }
} }
// GetRateLimiter returns rate limier for a given client, or nil if it's called on a nil client
func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
if c == nil {
return nil
}
return c.Throttle
}
// readExpBackoffConfig handles the internal logic of determining what the // readExpBackoffConfig handles the internal logic of determining what the
// backoff policy is. By default if no information is available, NoBackoff. // backoff policy is. By default if no information is available, NoBackoff.
// TODO Generalize this see #17727 . // TODO Generalize this see #17727 .

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
@ -113,6 +114,9 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
dsc := &DaemonSetsController{ dsc := &DaemonSetsController{
kubeClient: kubeClient, kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemonset-controller"}), eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemonset-controller"}),

View File

@ -40,6 +40,7 @@ import (
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/integer" "k8s.io/kubernetes/pkg/util/integer"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/metrics"
podutil "k8s.io/kubernetes/pkg/util/pod" podutil "k8s.io/kubernetes/pkg/util/pod"
rsutil "k8s.io/kubernetes/pkg/util/replicaset" rsutil "k8s.io/kubernetes/pkg/util/replicaset"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -97,6 +98,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: client.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: client.Core().Events("")})
if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.Core().GetRESTClient().GetRateLimiter())
}
dc := &DeploymentController{ dc := &DeploymentController{
client: client, client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -60,6 +61,9 @@ var (
// NewEndpointController returns a new *EndpointController. // NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController { func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController {
if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter())
}
e := &EndpointController{ e := &EndpointController{
client: client, client: client,
queue: workqueue.New(), queue: workqueue.New(),

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -49,6 +50,9 @@ type GCController struct {
} }
func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController { func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
gcc := &GCController{ gcc := &GCController{
kubeClient: kubeClient, kubeClient: kubeClient,
threshold: threshold, threshold: threshold,

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -82,6 +83,10 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
jm := &JobController{ jm := &JobController{
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -72,6 +73,10 @@ func NewNamespaceController(
finalizerToken: finalizerToken, finalizerToken: finalizerToken,
} }
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
// configure the backing store/controller // configure the backing store/controller
store, controller := framework.NewInformer( store, controller := framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -145,6 +146,11 @@ func NewNodeController(
} else { } else {
glog.Infof("No api server defined - no events will be sent to API server.") glog.Infof("No api server defined - no events will be sent to API server.")
} }
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
if allocateNodeCIDRs && clusterCIDR == nil { if allocateNodeCIDRs && clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
} }

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
@ -45,6 +46,9 @@ type PersistentVolumeClaimBinder struct {
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("pv_claim_binder_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
volumeIndex := NewPersistentVolumeOrderedIndex() volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient) binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{ binder := &PersistentVolumeClaimBinder{

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
ioutil "k8s.io/kubernetes/pkg/util/io" ioutil "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -66,6 +67,9 @@ type releasedVolumeStatus struct {
// NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler // NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) { func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient) recyclerClient := NewRecyclerClient(kubeClient)
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("pv_recycler_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
recycler := &PersistentVolumeRecycler{ recycler := &PersistentVolumeRecycler{
client: recyclerClient, client: recyclerClient,
kubeClient: kubeClient, kubeClient: kubeClient,

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -100,6 +101,10 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
rsc := &ReplicaSetController{ rsc := &ReplicaSetController{
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -106,6 +107,10 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
rm := &ReplicationManager{ rm := &ReplicationManager{
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/quota/evaluator/core"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -113,6 +114,10 @@ func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface)
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) { func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) {
var result framework.ControllerInterface var result framework.ControllerInterface
if r.kubeClient != nil && r.kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().GetRESTClient().GetRateLimiter())
}
switch options.GroupKind { switch options.GroupKind {
case api.Kind("Pod"): case api.Kind("Pod"):
if r.podInformer != nil { if r.podInformer != nil {

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -81,7 +82,9 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
registry: options.Registry, registry: options.Registry,
replenishmentControllers: []framework.ControllerInterface{}, replenishmentControllers: []framework.ControllerInterface{},
} }
if options.KubeClient != nil && options.KubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().GetRESTClient().GetRateLimiter())
}
// set the synchronization handler // set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey rq.syncHandler = rq.syncResourceQuotaFromKey

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -37,6 +38,9 @@ type RouteController struct {
} }
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController { func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
return &RouteController{ return &RouteController{
routes: routes, routes: routes,
kubeClient: kubeClient, kubeClient: kubeClient,

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
) )
@ -92,6 +93,10 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
return &ServiceController{ return &ServiceController{
cloud: cloud, cloud: cloud,
kubeClient: kubeClient, kubeClient: kubeClient,

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -71,7 +72,9 @@ func NewServiceAccountsController(cl clientset.Interface, options ServiceAccount
client: cl, client: cl,
serviceAccountsToEnsure: options.ServiceAccounts, serviceAccountsToEnsure: options.ServiceAccounts,
} }
if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter())
}
accountSelector := fields.Everything() accountSelector := fields.Everything()
if len(options.ServiceAccounts) == 1 { if len(options.ServiceAccounts) == 1 {
// If we're maintaining a single account, we can scope the accounts we watch to just that name // If we're maintaining a single account, we can scope the accounts we watch to just that name

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/registry/secret"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -68,7 +69,9 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
token: options.TokenGenerator, token: options.TokenGenerator,
rootCA: options.RootCA, rootCA: options.RootCA,
} }
if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter())
}
e.serviceAccounts, e.serviceAccountController = framework.NewIndexerInformer( e.serviceAccounts, e.serviceAccountController = framework.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {

View File

@ -38,6 +38,22 @@ var KnownControllerManagerMetrics = map[string][]string{
"rest_client_request_latency_microseconds_count": {"url", "verb"}, "rest_client_request_latency_microseconds_count": {"url", "verb"},
"rest_client_request_latency_microseconds_sum": {"url", "verb"}, "rest_client_request_latency_microseconds_sum": {"url", "verb"},
"rest_client_request_status_codes": {"method", "code", "host"}, "rest_client_request_status_codes": {"method", "code", "host"},
"pv_recycler_controller_rate_limiter_use": {},
"node_controller_rate_limiter_use": {},
"serviceaccount_controller_rate_limiter_use": {},
"route_controller_rate_limiter_use": {},
"resource_quota_controller_rate_limiter_use": {},
"replenishment_controller_rate_limiter_use": {},
"job_controller_rate_limiter_use": {},
"gc_controller_rate_limiter_use": {},
"endpoint_controller_rate_limiter_use": {},
"replication_controller_rate_limiter_use": {},
"replicaset_controller_rate_limiter_use": {},
"deployment_controller_rate_limiter_use": {},
"service_controller_rate_limiter_use": {},
"pv_claim_binder_controller_rate_limiter_use": {},
"namespace_controller_rate_limiter_use": {},
"daemon_controller_rate_limiter_use": {},
} }
type ControllerManagerMetrics Metrics type ControllerManagerMetrics Metrics

71
pkg/util/metrics/util.go Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const (
updatePeriod = 5 * time.Second
)
var (
metricsLock sync.Mutex
rateLimiterMetrics map[string]prometheus.Gauge = make(map[string]prometheus.Gauge)
)
func registerRateLimiterMetric(ownerName string) error {
metricsLock.Lock()
defer metricsLock.Unlock()
if _, ok := rateLimiterMetrics[ownerName]; ok {
glog.Errorf("Metric for %v already registered", ownerName)
return fmt.Errorf("Metric for %v already registered", ownerName)
}
metric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rate_limiter_use",
Subsystem: ownerName,
Help: fmt.Sprintf("A metric measuring the saturation of the rate limiter for %v", ownerName),
})
rateLimiterMetrics[ownerName] = metric
prometheus.MustRegister(metric)
return nil
}
// RegisterMetricAndTrackRateLimiterUsage registers a metric ownerName_rate_limiter_use in prometheus to track
// how much used rateLimiter is and starts a goroutine that updates this metric every updatePeriod
func RegisterMetricAndTrackRateLimiterUsage(ownerName string, rateLimiter flowcontrol.RateLimiter) error {
err := registerRateLimiterMetric(ownerName)
if err != nil {
return err
}
go wait.Forever(func() {
metricsLock.Lock()
defer metricsLock.Unlock()
rateLimiterMetrics[ownerName].Set(rateLimiter.Saturation())
}, updatePeriod)
return nil
}