mirror of
https://github.com/niusmallnan/steve.git
synced 2025-07-13 14:24:04 +00:00
Only return changed counts, and send initial count
Three main changes: - When a count changes, only send that count. This makes the count socket significantly more usable - The count buffer will now send a count right away. This allows a message to launch wihtout an inital buffered wait period - The count buffer now sends messages every 5 seconds instead of 1
This commit is contained in:
parent
647cba2be7
commit
2eb41a9db7
@ -6,35 +6,60 @@ import (
|
|||||||
"github.com/rancher/apiserver/pkg/types"
|
"github.com/rancher/apiserver/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func buffer(c chan types.APIEvent) chan types.APIEvent {
|
// debounceDuration determines how long events will be held before they are sent to the consumer
|
||||||
|
var debounceDuration = 5 * time.Second
|
||||||
|
|
||||||
|
// countsBuffer creates an APIEvent channel with a buffered response time (i.e. replies are only sent once every second)
|
||||||
|
func countsBuffer(c chan Count) chan types.APIEvent {
|
||||||
result := make(chan types.APIEvent)
|
result := make(chan types.APIEvent)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(result)
|
defer close(result)
|
||||||
debounce(result, c)
|
debounceCounts(result, c)
|
||||||
}()
|
}()
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func debounce(result, input chan types.APIEvent) {
|
// debounceCounts converts counts from an input channel into an APIEvent, and updates the result channel at a reduced pace
|
||||||
t := time.NewTicker(time.Second)
|
func debounceCounts(result chan types.APIEvent, input chan Count) {
|
||||||
|
// counts aren't a critical value. To avoid excess UI processing, only send updates after debounceDuration has elapsed
|
||||||
|
t := time.NewTicker(debounceDuration)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
var (
|
var currentCount *Count
|
||||||
lastEvent *types.APIEvent
|
|
||||||
)
|
firstCount, fOk := <-input
|
||||||
|
if fOk {
|
||||||
|
// send a count immediately or we will have to wait a second for the first update
|
||||||
|
result <- toAPIEvent(firstCount)
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event, ok := <-input:
|
case count, ok := <-input:
|
||||||
if ok {
|
if !ok {
|
||||||
lastEvent = &event
|
|
||||||
} else {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if currentCount == nil {
|
||||||
|
currentCount = &count
|
||||||
|
} else {
|
||||||
|
itemCounts := count.Counts
|
||||||
|
for id, itemCount := range itemCounts {
|
||||||
|
// our current count will be outdated in comparison with anything in the new events
|
||||||
|
currentCount.Counts[id] = itemCount
|
||||||
|
}
|
||||||
|
}
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
if lastEvent != nil {
|
if currentCount != nil {
|
||||||
result <- *lastEvent
|
result <- toAPIEvent(*currentCount)
|
||||||
lastEvent = nil
|
currentCount = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func toAPIEvent(count Count) types.APIEvent {
|
||||||
|
return types.APIEvent{
|
||||||
|
Name: "resource.change",
|
||||||
|
ResourceType: "counts",
|
||||||
|
Object: toAPIObject(count),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -24,6 +24,7 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Register registers a new count schema. This schema isn't a true resource but instead returns counts for other resources
|
||||||
func Register(schemas *types.APISchemas, ccache clustercache.ClusterCache) {
|
func Register(schemas *types.APISchemas, ccache clustercache.ClusterCache) {
|
||||||
schemas.MustImportAndCustomize(Count{}, func(schema *types.APISchema) {
|
schemas.MustImportAndCustomize(Count{}, func(schema *types.APISchema) {
|
||||||
schema.CollectionMethods = []string{http.MethodGet}
|
schema.CollectionMethods = []string{http.MethodGet}
|
||||||
@ -110,9 +111,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Watch creates a watch for the Counts schema. This returns only the counts which have changed since the watch was established
|
||||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
||||||
var (
|
var (
|
||||||
result = make(chan types.APIEvent, 100)
|
result = make(chan Count, 100)
|
||||||
counts map[string]ItemCount
|
counts map[string]ItemCount
|
||||||
gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{}
|
gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{}
|
||||||
countLock sync.Mutex
|
countLock sync.Mutex
|
||||||
@ -178,18 +180,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
|
|||||||
}
|
}
|
||||||
|
|
||||||
counts[schema.ID] = itemCount
|
counts[schema.ID] = itemCount
|
||||||
countsCopy := map[string]ItemCount{}
|
changedCount := map[string]ItemCount{
|
||||||
for k, v := range counts {
|
schema.ID: itemCount,
|
||||||
countsCopy[k] = *v.DeepCopy()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
result <- types.APIEvent{
|
result <- Count{
|
||||||
Name: "resource.change",
|
ID: "count",
|
||||||
ResourceType: "counts",
|
Counts: changedCount,
|
||||||
Object: toAPIObject(Count{
|
|
||||||
ID: "count",
|
|
||||||
Counts: countsCopy,
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -205,7 +202,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
|
|||||||
return onChange(false, gvk, key, obj, nil)
|
return onChange(false, gvk, key, obj, nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
return buffer(result), nil
|
// buffer the counts so that we don't spam the consumer with constant updates
|
||||||
|
return countsBuffer(result), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) {
|
func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) {
|
||||||
|
Loading…
Reference in New Issue
Block a user