let rate_limiter_helper use sync.Once

This commit is contained in:
Chao Xu 2016-08-19 17:00:26 -07:00
parent c790773018
commit 992afd9c45
2 changed files with 22 additions and 26 deletions

View File

@ -478,8 +478,7 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
if err != nil { if err != nil {
return monitor, err return monitor, err
} }
// TODO: remove the comment when we find an efficient way to do locking gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
// gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) { setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object) runtimeObject, ok := obj.(runtime.Object)
if !ok { if !ok {
@ -542,8 +541,8 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
clock: clock.RealClock{}, clock: clock.RealClock{},
dirtyQueue: workqueue.NewTimedWorkQueue(), dirtyQueue: workqueue.NewTimedWorkQueue(),
orphanQueue: workqueue.NewTimedWorkQueue(), orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(), registeredRateLimiter: NewRegisteredRateLimiter(resources),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(), registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
} }
gc.propagator = &Propagator{ gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(), eventQueue: workqueue.NewTimedWorkQueue(),
@ -604,8 +603,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool
func (gc *GarbageCollector) deleteObject(item objectReference) error { func (gc *GarbageCollector) deleteObject(item objectReference) error {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
// TODO: remove the comment when we find an efficient way to do locking gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
// gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return err return err
@ -619,8 +617,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error {
func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) { func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
// TODO: remove the comment when we find an efficient way to do locking gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
// gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -631,8 +628,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur
func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) { func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
// TODO: remove the comment when we find an efficient way to do locking gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
// gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -643,8 +639,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) { func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
// TODO: remove the comment when we find an efficient way to do locking gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
// gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -29,31 +29,32 @@ import (
// RegisteredRateLimiter records the registered RateLimters to avoid // RegisteredRateLimiter records the registered RateLimters to avoid
// duplication. // duplication.
type RegisteredRateLimiter struct { type RegisteredRateLimiter struct {
rateLimiters map[unversioned.GroupVersion]struct{} rateLimiters map[unversioned.GroupVersion]*sync.Once
lock sync.RWMutex
} }
// NewRegisteredRateLimiter returns a new RegisteredRateLimiater. // NewRegisteredRateLimiter returns a new RegisteredRateLimiater.
func NewRegisteredRateLimiter() *RegisteredRateLimiter { // TODO: NewRegisteredRateLimiter is not dynamic. We need to find a better way
return &RegisteredRateLimiter{ // when GC dynamically change the resources it monitors.
rateLimiters: make(map[unversioned.GroupVersion]struct{}), func NewRegisteredRateLimiter(resources []unversioned.GroupVersionResource) *RegisteredRateLimiter {
rateLimiters := make(map[unversioned.GroupVersion]*sync.Once)
for _, resource := range resources {
gv := resource.GroupVersion()
if _, found := rateLimiters[gv]; !found {
rateLimiters[gv] = &sync.Once{}
}
} }
return &RegisteredRateLimiter{rateLimiters: rateLimiters}
} }
func (r *RegisteredRateLimiter) registerIfNotPresent(gv unversioned.GroupVersion, client *dynamic.Client, prefix string) { func (r *RegisteredRateLimiter) registerIfNotPresent(gv unversioned.GroupVersion, client *dynamic.Client, prefix string) {
r.lock.RLock() once, found := r.rateLimiters[gv]
_, ok := r.rateLimiters[gv] if !found {
r.lock.RUnlock()
if ok {
return return
} }
r.lock.Lock() once.Do(func() {
defer r.lock.Unlock()
if _, ok := r.rateLimiters[gv]; !ok {
if rateLimiter := client.GetRateLimiter(); rateLimiter != nil { if rateLimiter := client.GetRateLimiter(); rateLimiter != nil {
group := strings.Replace(gv.Group, ".", ":", -1) group := strings.Replace(gv.Group, ".", ":", -1)
metrics.RegisterMetricAndTrackRateLimiterUsage(fmt.Sprintf("%s_%s_%s", prefix, group, gv.Version), rateLimiter) metrics.RegisterMetricAndTrackRateLimiterUsage(fmt.Sprintf("%s_%s_%s", prefix, group, gv.Version), rateLimiter)
} }
r.rateLimiters[gv] = struct{}{} })
}
} }