From e27f3847951e7a2e9e0615cdc0c122debf4fe2c7 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 20 Jul 2020 09:21:03 -0700 Subject: [PATCH] Only send counts every second if there is a change --- pkg/resources/counts/buffer.go | 40 ++++++++++++++++++++++++++++++++++ pkg/resources/counts/counts.go | 18 +++++++-------- 2 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 pkg/resources/counts/buffer.go diff --git a/pkg/resources/counts/buffer.go b/pkg/resources/counts/buffer.go new file mode 100644 index 0000000..2552c5d --- /dev/null +++ b/pkg/resources/counts/buffer.go @@ -0,0 +1,40 @@ +package counts + +import ( + "time" + + "github.com/rancher/apiserver/pkg/types" +) + +func buffer(c chan types.APIEvent) chan types.APIEvent { + result := make(chan types.APIEvent) + go func() { + defer close(result) + debounce(result, c) + }() + return result +} + +func debounce(result, input chan types.APIEvent) { + t := time.NewTicker(time.Second) + defer t.Stop() + + var ( + lastEvent *types.APIEvent + ) + for { + select { + case event, ok := <-input: + if ok { + lastEvent = &event + } else { + return + } + case <-t.C: + if lastEvent != nil { + result <- *lastEvent + lastEvent = nil + } + } + } +} diff --git a/pkg/resources/counts/counts.go b/pkg/resources/counts/counts.go index c57eb48..ce4dd87 100644 --- a/pkg/resources/counts/counts.go +++ b/pkg/resources/counts/counts.go @@ -118,6 +118,14 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. countLock sync.Mutex ) + go func() { + <-apiOp.Context().Done() + countLock.Lock() + close(result) + result = nil + countLock.Unlock() + }() + counts = s.getCount(apiOp).Counts for id := range counts { schema := apiOp.Schemas.LookupSchema(id) @@ -128,14 +136,6 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. gvrToSchema[attributes.GVR(schema)] = schema } - go func() { - <-apiOp.Context().Done() - countLock.Lock() - close(result) - result = nil - countLock.Unlock() - }() - onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj, oldObj runtime.Object) error { countLock.Lock() defer countLock.Unlock() @@ -205,7 +205,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return onChange(false, gvr, key, obj, nil) }) - return result, nil + return buffer(result), nil } func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) {