From c7907730187428c279099ac19b0ae6a21b5624d3 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Fri, 19 Aug 2016 16:49:10 -0700 Subject: [PATCH 1/2] temporarlily stop register RateLimiter metrics in the garbage collector --- .../garbagecollector/garbagecollector.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index fe05899fb20..e79b7e43bb1 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -478,7 +478,8 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource if err != nil { return monitor, err } - gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") + // TODO: remove the comment when we find an efficient way to do locking + // gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") setObjectTypeMeta := func(obj interface{}) { runtimeObject, ok := obj.(runtime.Object) if !ok { @@ -603,7 +604,8 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool func (gc *GarbageCollector) deleteObject(item objectReference) error { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + // TODO: remove the comment when we find an efficient way to do locking + // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return err @@ -617,7 +619,8 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error { func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + // TODO: remove the comment when we find an efficient way to do locking + // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err @@ -628,7 +631,8 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + // TODO: remove the comment when we find an efficient way to do locking + // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err @@ -639,7 +643,8 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + // TODO: remove the comment when we find an efficient way to do locking + // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err From 992afd9c45f6f266cb753f96ddae4b54786dd789 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Fri, 19 Aug 2016 17:00:26 -0700 Subject: [PATCH 2/2] let rate_limiter_helper use sync.Once --- .../garbagecollector/garbagecollector.go | 19 +++++------- .../garbagecollector/rate_limiter_helper.go | 29 ++++++++++--------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index e79b7e43bb1..aae442103f0 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -478,8 +478,7 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource if err != nil { return monitor, err } - // TODO: remove the comment when we find an efficient way to do locking - // gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") + gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") setObjectTypeMeta := func(obj interface{}) { runtimeObject, ok := obj.(runtime.Object) if !ok { @@ -542,8 +541,8 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam clock: clock.RealClock{}, dirtyQueue: workqueue.NewTimedWorkQueue(), orphanQueue: workqueue.NewTimedWorkQueue(), - registeredRateLimiter: NewRegisteredRateLimiter(), - registeredRateLimiterForMonitors: NewRegisteredRateLimiter(), + registeredRateLimiter: NewRegisteredRateLimiter(resources), + registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources), } gc.propagator = &Propagator{ eventQueue: workqueue.NewTimedWorkQueue(), @@ -604,8 +603,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool func (gc *GarbageCollector) deleteObject(item objectReference) error { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - // TODO: remove the comment when we find an efficient way to do locking - // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return err @@ -619,8 +617,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error { func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - // TODO: remove the comment when we find an efficient way to do locking - // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err @@ -631,8 +628,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - // TODO: remove the comment when we find an efficient way to do locking - // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err @@ -643,8 +639,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) { fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) - // TODO: remove the comment when we find an efficient way to do locking - // gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) if err != nil { return nil, err diff --git a/pkg/controller/garbagecollector/rate_limiter_helper.go b/pkg/controller/garbagecollector/rate_limiter_helper.go index 167d4251b08..f2c448618dd 100644 --- a/pkg/controller/garbagecollector/rate_limiter_helper.go +++ b/pkg/controller/garbagecollector/rate_limiter_helper.go @@ -29,31 +29,32 @@ import ( // RegisteredRateLimiter records the registered RateLimters to avoid // duplication. type RegisteredRateLimiter struct { - rateLimiters map[unversioned.GroupVersion]struct{} - lock sync.RWMutex + rateLimiters map[unversioned.GroupVersion]*sync.Once } // NewRegisteredRateLimiter returns a new RegisteredRateLimiater. -func NewRegisteredRateLimiter() *RegisteredRateLimiter { - return &RegisteredRateLimiter{ - rateLimiters: make(map[unversioned.GroupVersion]struct{}), +// TODO: NewRegisteredRateLimiter is not dynamic. We need to find a better way +// when GC dynamically change the resources it monitors. +func NewRegisteredRateLimiter(resources []unversioned.GroupVersionResource) *RegisteredRateLimiter { + rateLimiters := make(map[unversioned.GroupVersion]*sync.Once) + for _, resource := range resources { + gv := resource.GroupVersion() + if _, found := rateLimiters[gv]; !found { + rateLimiters[gv] = &sync.Once{} + } } + return &RegisteredRateLimiter{rateLimiters: rateLimiters} } func (r *RegisteredRateLimiter) registerIfNotPresent(gv unversioned.GroupVersion, client *dynamic.Client, prefix string) { - r.lock.RLock() - _, ok := r.rateLimiters[gv] - r.lock.RUnlock() - if ok { + once, found := r.rateLimiters[gv] + if !found { return } - r.lock.Lock() - defer r.lock.Unlock() - if _, ok := r.rateLimiters[gv]; !ok { + once.Do(func() { if rateLimiter := client.GetRateLimiter(); rateLimiter != nil { group := strings.Replace(gv.Group, ".", ":", -1) metrics.RegisterMetricAndTrackRateLimiterUsage(fmt.Sprintf("%s_%s_%s", prefix, group, gv.Version), rateLimiter) } - r.rateLimiters[gv] = struct{}{} - } + }) }