From 2eb41a9db7ba56658bcdc6242851aa1df4bf60b1 Mon Sep 17 00:00:00 2001 From: Michael Bolot Date: Mon, 3 Oct 2022 16:12:11 -0500 Subject: [PATCH] Only return changed counts, and send initial count Three main changes: - When a count changes, only send that count. This makes the count socket significantly more usable - The count buffer will now send a count right away. This allows a message to launch wihtout an inital buffered wait period - The count buffer now sends messages every 5 seconds instead of 1 --- pkg/resources/counts/buffer.go | 53 +++++++++++++++++++++++++--------- pkg/resources/counts/counts.go | 22 +++++++------- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/pkg/resources/counts/buffer.go b/pkg/resources/counts/buffer.go index 2552c5d..0f97d14 100644 --- a/pkg/resources/counts/buffer.go +++ b/pkg/resources/counts/buffer.go @@ -6,35 +6,60 @@ import ( "github.com/rancher/apiserver/pkg/types" ) -func buffer(c chan types.APIEvent) chan types.APIEvent { +// debounceDuration determines how long events will be held before they are sent to the consumer +var debounceDuration = 5 * time.Second + +// countsBuffer creates an APIEvent channel with a buffered response time (i.e. replies are only sent once every second) +func countsBuffer(c chan Count) chan types.APIEvent { result := make(chan types.APIEvent) go func() { defer close(result) - debounce(result, c) + debounceCounts(result, c) }() return result } -func debounce(result, input chan types.APIEvent) { - t := time.NewTicker(time.Second) +// debounceCounts converts counts from an input channel into an APIEvent, and updates the result channel at a reduced pace +func debounceCounts(result chan types.APIEvent, input chan Count) { + // counts aren't a critical value. To avoid excess UI processing, only send updates after debounceDuration has elapsed + t := time.NewTicker(debounceDuration) defer t.Stop() - var ( - lastEvent *types.APIEvent - ) + var currentCount *Count + + firstCount, fOk := <-input + if fOk { + // send a count immediately or we will have to wait a second for the first update + result <- toAPIEvent(firstCount) + } for { select { - case event, ok := <-input: - if ok { - lastEvent = &event - } else { + case count, ok := <-input: + if !ok { return } + if currentCount == nil { + currentCount = &count + } else { + itemCounts := count.Counts + for id, itemCount := range itemCounts { + // our current count will be outdated in comparison with anything in the new events + currentCount.Counts[id] = itemCount + } + } case <-t.C: - if lastEvent != nil { - result <- *lastEvent - lastEvent = nil + if currentCount != nil { + result <- toAPIEvent(*currentCount) + currentCount = nil } } } } + +func toAPIEvent(count Count) types.APIEvent { + return types.APIEvent{ + Name: "resource.change", + ResourceType: "counts", + Object: toAPIObject(count), + } +} diff --git a/pkg/resources/counts/counts.go b/pkg/resources/counts/counts.go index a44d44b..47ce8ec 100644 --- a/pkg/resources/counts/counts.go +++ b/pkg/resources/counts/counts.go @@ -24,6 +24,7 @@ var ( } ) +// Register registers a new count schema. This schema isn't a true resource but instead returns counts for other resources func Register(schemas *types.APISchemas, ccache clustercache.ClusterCache) { schemas.MustImportAndCustomize(Count{}, func(schema *types.APISchema) { schema.CollectionMethods = []string{http.MethodGet} @@ -110,9 +111,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP }, nil } +// Watch creates a watch for the Counts schema. This returns only the counts which have changed since the watch was established func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { var ( - result = make(chan types.APIEvent, 100) + result = make(chan Count, 100) counts map[string]ItemCount gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{} countLock sync.Mutex @@ -178,18 +180,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. } counts[schema.ID] = itemCount - countsCopy := map[string]ItemCount{} - for k, v := range counts { - countsCopy[k] = *v.DeepCopy() + changedCount := map[string]ItemCount{ + schema.ID: itemCount, } - result <- types.APIEvent{ - Name: "resource.change", - ResourceType: "counts", - Object: toAPIObject(Count{ - ID: "count", - Counts: countsCopy, - }), + result <- Count{ + ID: "count", + Counts: changedCount, } return nil @@ -205,7 +202,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return onChange(false, gvk, key, obj, nil) }) - return buffer(result), nil + // buffer the counts so that we don't spam the consumer with constant updates + return countsBuffer(result), nil } func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) {