1
0
mirror of https://github.com/rancher/steve.git synced 2025-04-28 19:24:42 +00:00

Merge pull request #58 from MbolotSuse/count-diff

Changing count watch to only return changed counts
This commit is contained in:
Michael Bolot 2022-12-14 10:15:18 -06:00 committed by GitHub
commit 8fdf67a444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 464 additions and 26 deletions

View File

@ -6,35 +6,60 @@ import (
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
) )
func buffer(c chan types.APIEvent) chan types.APIEvent { // debounceDuration determines how long events will be held before they are sent to the consumer
var debounceDuration = 5 * time.Second
// countsBuffer creates an APIEvent channel with a buffered response time (i.e. replies are only sent once every second)
func countsBuffer(c chan Count) chan types.APIEvent {
result := make(chan types.APIEvent) result := make(chan types.APIEvent)
go func() { go func() {
defer close(result) defer close(result)
debounce(result, c) debounceCounts(result, c)
}() }()
return result return result
} }
func debounce(result, input chan types.APIEvent) { // debounceCounts converts counts from an input channel into an APIEvent, and updates the result channel at a reduced pace
t := time.NewTicker(time.Second) func debounceCounts(result chan types.APIEvent, input chan Count) {
// counts aren't a critical value. To avoid excess UI processing, only send updates after debounceDuration has elapsed
t := time.NewTicker(debounceDuration)
defer t.Stop() defer t.Stop()
var ( var currentCount *Count
lastEvent *types.APIEvent
) firstCount, fOk := <-input
if fOk {
// send a count immediately or we will have to wait a second for the first update
result <- toAPIEvent(firstCount)
}
for { for {
select { select {
case event, ok := <-input: case count, ok := <-input:
if ok { if !ok {
lastEvent = &event
} else {
return return
} }
if currentCount == nil {
currentCount = &count
} else {
itemCounts := count.Counts
for id, itemCount := range itemCounts {
// our current count will be outdated in comparison with anything in the new events
currentCount.Counts[id] = itemCount
}
}
case <-t.C: case <-t.C:
if lastEvent != nil { if currentCount != nil {
result <- *lastEvent result <- toAPIEvent(*currentCount)
lastEvent = nil currentCount = nil
} }
} }
} }
} }
func toAPIEvent(count Count) types.APIEvent {
return types.APIEvent{
Name: "resource.change",
ResourceType: "counts",
Object: toAPIObject(count),
}
}

View File

@ -0,0 +1,111 @@
package counts
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/rancher/apiserver/pkg/types"
"github.com/stretchr/testify/assert"
)
func Test_countsBuffer(t *testing.T) {
tests := []struct {
name string
numInputEvents int
overrideInput map[int]int // events whose count we should override. Don't include an event >= numInputEvents
}{
{
name: "test basic input",
numInputEvents: 1,
},
{
name: "test basic multiple input",
numInputEvents: 3,
},
{
name: "test basic input which is overriden by later events",
numInputEvents: 3,
overrideInput: map[int]int{
1: 17,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
debounceDuration = 10 * time.Millisecond
countsChannel := make(chan Count, 100)
outputChannel := countsBuffer(countsChannel)
countsChannel <- Count{
ID: "count",
Counts: map[string]ItemCount{"test": createItemCount(1)},
}
// first event is not buffered, so we expect to receive it quicker than the debounce
_, err := receiveWithTimeout(outputChannel, time.Millisecond*1)
assert.NoError(t, err, "Expected first event to be received quickly")
// stream our standard count events
for i := 0; i < test.numInputEvents; i++ {
countsChannel <- Count{
ID: "count",
Counts: map[string]ItemCount{strconv.Itoa(i): createItemCount(1)},
}
}
// stream any overrides, if applicable
for key, value := range test.overrideInput {
countsChannel <- Count{
ID: "count",
Counts: map[string]ItemCount{strconv.Itoa(key): createItemCount(value)},
}
}
// due to complexities of cycle calculation, give a slight delay for the event to actually stream
output, err := receiveWithTimeout(outputChannel, debounceDuration+time.Millisecond*1)
assert.NoError(t, err, "did not expect an error when receiving value from channel")
outputCount := output.Object.Object.(Count)
assert.Len(t, outputCount.Counts, test.numInputEvents)
for outputID, outputItem := range outputCount.Counts {
outputIdx, err := strconv.Atoi(outputID)
assert.NoError(t, err, "couldn't convert output idx")
nsTotal := 0
for _, nsSummary := range outputItem.Namespaces {
nsTotal += nsSummary.Count
}
if outputOverride, ok := test.overrideInput[outputIdx]; ok {
assert.Equal(t, outputOverride, outputItem.Summary.Count, "expected overridden output count to be most recent value")
assert.Equal(t, outputOverride, nsTotal, "expected overridden output namespace count to be most recent value")
} else {
assert.Equal(t, 1, outputItem.Summary.Count, "expected non-overridden output count to be 1")
assert.Equal(t, 1, nsTotal, "expected non-overridden output namespace count to be 1")
}
}
})
}
}
// receiveWithTimeout tries to get a value from input within duration. Returns an error if no input was received during that period
func receiveWithTimeout(input chan types.APIEvent, duration time.Duration) (*types.APIEvent, error) {
select {
case value := <-input:
return &value, nil
case <-time.After(duration):
return nil, fmt.Errorf("timeout error, no value recieved after %f seconds", duration.Seconds())
}
}
func createItemCount(countTotal int) ItemCount {
return ItemCount{
Summary: Summary{
Count: countTotal,
},
Namespaces: map[string]Summary{
"test": {
Count: countTotal,
},
},
}
}

View File

@ -24,6 +24,7 @@ var (
} }
) )
// Register registers a new count schema. This schema isn't a true resource but instead returns counts for other resources
func Register(schemas *types.APISchemas, ccache clustercache.ClusterCache) { func Register(schemas *types.APISchemas, ccache clustercache.ClusterCache) {
schemas.MustImportAndCustomize(Count{}, func(schema *types.APISchema) { schemas.MustImportAndCustomize(Count{}, func(schema *types.APISchema) {
schema.CollectionMethods = []string{http.MethodGet} schema.CollectionMethods = []string{http.MethodGet}
@ -110,9 +111,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
}, nil }, nil
} }
// Watch creates a watch for the Counts schema. This returns only the counts which have changed since the watch was established
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
var ( var (
result = make(chan types.APIEvent, 100) result = make(chan Count, 100)
counts map[string]ItemCount counts map[string]ItemCount
gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{} gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{}
countLock sync.Mutex countLock sync.Mutex
@ -178,18 +180,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
} }
counts[schema.ID] = itemCount counts[schema.ID] = itemCount
countsCopy := map[string]ItemCount{} changedCount := map[string]ItemCount{
for k, v := range counts { schema.ID: itemCount,
countsCopy[k] = *v.DeepCopy()
} }
result <- types.APIEvent{ result <- Count{
Name: "resource.change", ID: "count",
ResourceType: "counts", Counts: changedCount,
Object: toAPIObject(Count{
ID: "count",
Counts: countsCopy,
}),
} }
return nil return nil
@ -205,7 +202,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return onChange(false, gvk, key, obj, nil) return onChange(false, gvk, key, obj, nil)
}) })
return buffer(result), nil // buffer the counts so that we don't spam the consumer with constant updates
return countsBuffer(result), nil
} }
func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) { func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISchema) {

View File

@ -0,0 +1,304 @@
package counts_test
import (
"context"
"fmt"
"net/http"
"testing"
"time"
"github.com/rancher/apiserver/pkg/server"
"github.com/rancher/apiserver/pkg/store/empty"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/clustercache"
"github.com/rancher/steve/pkg/resources/counts"
"github.com/rancher/steve/pkg/schema"
"github.com/rancher/wrangler/pkg/schemas"
"github.com/rancher/wrangler/pkg/summary"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schema2 "k8s.io/apimachinery/pkg/runtime/schema"
)
const (
testGroup = "test.k8s.io"
testVersion = "v1"
testResource = "testCRD"
testNotUsedResource = "testNotUsedCRD"
testNewResource = "testNewCRD"
)
func TestWatch(t *testing.T) {
tests := []struct {
name string
event string // the event to send, can be "add", "remove", or "change"
newSchema bool
countsForSchema int
errDesired bool
}{
{
name: "add of known schema",
event: "add",
newSchema: false,
countsForSchema: 2,
errDesired: false,
},
{
name: "add of unknown schema",
event: "add",
newSchema: true,
countsForSchema: 0,
errDesired: true,
},
{
name: "change of known schema",
event: "change",
newSchema: false,
countsForSchema: 0,
errDesired: true,
},
{
name: "change of unknown schema",
event: "change",
newSchema: true,
countsForSchema: 0,
errDesired: true,
},
{
name: "remove of known schema",
event: "remove",
newSchema: false,
countsForSchema: 0,
errDesired: false,
},
{
name: "remove of unknown schema",
event: "remove",
newSchema: true,
countsForSchema: 0,
errDesired: true,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
testSchema := makeSchema(testResource)
testNotUsedSchema := makeSchema(testNotUsedResource)
testNewSchema := makeSchema(testNewResource)
addGenericPermissionsToSchema(testSchema, "list")
addGenericPermissionsToSchema(testNotUsedSchema, "list")
testSchemas := types.EmptyAPISchemas()
testSchemas.MustAddSchema(*testSchema)
testSchemas.MustAddSchema(*testNotUsedSchema)
testOp := &types.APIRequest{
Schemas: testSchemas,
AccessControl: &server.SchemaBasedAccess{},
Request: &http.Request{},
}
fakeCache := NewFakeClusterCache()
gvk := attributes.GVK(testSchema)
newGVK := attributes.GVK(testNewSchema)
fakeCache.AddSummaryObj(makeSummarizedObject(gvk, "testName1", "testNs", "1"))
counts.Register(testSchemas, fakeCache)
// next, get the channel our results will be delivered on
countSchema := testSchemas.LookupSchema("count")
// channel will stream our events after we call the handlers to simulate/add/remove/change events
resChannel, err := countSchema.Store.Watch(testOp, nil, types.WatchRequest{})
assert.NoError(t, err, "got an error when trying to watch counts, did not expect one")
// call the handlers, triggering the update to receive the event
if test.event == "add" {
var summarizedObject *summary.SummarizedObject
var testGVK schema2.GroupVersionKind
if test.newSchema {
summarizedObject = makeSummarizedObject(newGVK, "testNew", "testNs", "1")
testGVK = newGVK
} else {
summarizedObject = makeSummarizedObject(gvk, "testName2", "testNs", "2")
testGVK = gvk
}
err = fakeCache.addHandler(testGVK, "n/a", summarizedObject)
assert.NoError(t, err, "did not expect error when calling add method")
} else if test.event == "change" {
var summarizedObject *summary.SummarizedObject
var testGVK schema2.GroupVersionKind
var changedSummarizedObject *summary.SummarizedObject
if test.newSchema {
summarizedObject = makeSummarizedObject(newGVK, "testNew", "testNs", "1")
changedSummarizedObject = makeSummarizedObject(newGVK, "testNew", "testNs", "2")
testGVK = newGVK
} else {
summarizedObject = makeSummarizedObject(gvk, "testName1", "testNs", "2")
changedSummarizedObject = makeSummarizedObject(gvk, "testName1", "testNs", "3")
testGVK = gvk
}
err = fakeCache.changeHandler(testGVK, "n/a", changedSummarizedObject, summarizedObject)
assert.NoError(t, err, "did not expect error when calling change method")
} else if test.event == "remove" {
var summarizedObject *summary.SummarizedObject
var testGVK schema2.GroupVersionKind
if test.newSchema {
summarizedObject = makeSummarizedObject(newGVK, "testNew", "testNs", "2")
testGVK = newGVK
} else {
summarizedObject = makeSummarizedObject(gvk, "testName1", "testNs", "2")
testGVK = gvk
}
err = fakeCache.removeHandler(testGVK, "n/a", summarizedObject)
assert.NoError(t, err, "did not expect error when calling add method")
} else {
assert.Failf(t, "unexpected event", "%s is not one of the allowed values of add, change, remove", test.event)
}
// need to call the event handler to force the event to stream
outputCount, err := receiveWithTimeout(resChannel, 100*time.Millisecond)
if test.errDesired {
assert.Errorf(t, err, "expected no value from channel, but got one %+v", outputCount)
} else {
assert.NoError(t, err, "got an error when attempting to get a value from the result channel")
assert.NotNilf(t, outputCount, "expected a new count value, did not get one")
count := outputCount.Object.Object.(counts.Count)
assert.Len(t, count.Counts, 1, "only expected one count event")
itemCount, ok := count.Counts[testResource]
assert.True(t, ok, "expected an item count for %s", testResource)
assert.Equal(t, test.countsForSchema, itemCount.Summary.Count, "expected counts to be correct")
}
})
}
}
// receiveWithTimeout tries to get a value from input within duration. Returns an error if no input was received during that period
func receiveWithTimeout(input chan types.APIEvent, duration time.Duration) (*types.APIEvent, error) {
select {
case value := <-input:
return &value, nil
case <-time.After(duration):
return nil, fmt.Errorf("timeout error, no value recieved after %f seconds", duration.Seconds())
}
}
// addGenericPermissions grants the specified verb for all namespaces and all resourceNames
func addGenericPermissionsToSchema(schema *types.APISchema, verb string) {
if verb == "create" {
schema.CollectionMethods = append(schema.CollectionMethods, http.MethodPost)
} else if verb == "get" {
schema.ResourceMethods = append(schema.ResourceMethods, http.MethodGet)
} else if verb == "list" || verb == "watch" {
// list and watch use the same permission checks, so we handle in one case
schema.CollectionMethods = append(schema.CollectionMethods, http.MethodGet, http.MethodPost)
} else if verb == "update" {
schema.ResourceMethods = append(schema.ResourceMethods, http.MethodPut)
} else if verb == "delete" {
schema.ResourceMethods = append(schema.ResourceMethods, http.MethodDelete)
} else {
panic(fmt.Sprintf("Can't add generic permissions for verb %s", verb))
}
currentAccess := schema.Attributes["access"].(accesscontrol.AccessListByVerb)
currentAccess[verb] = []accesscontrol.Access{
{
Namespace: "*",
ResourceName: "*",
},
}
}
func makeSchema(resourceType string) *types.APISchema {
return &types.APISchema{
Schema: &schemas.Schema{
ID: resourceType,
CollectionMethods: []string{},
ResourceMethods: []string{},
ResourceFields: map[string]schemas.Field{
"name": {Type: "string"},
"value": {Type: "string"},
},
Attributes: map[string]interface{}{
"group": testGroup,
"version": testVersion,
"kind": resourceType,
"resource": resourceType,
"verbs": []string{"get", "list", "watch", "delete", "update", "create"},
"access": accesscontrol.AccessListByVerb{},
},
},
Store: &empty.Store{},
}
}
type fakeClusterCache struct {
summarizedObjects []*summary.SummarizedObject
addHandler clustercache.Handler
removeHandler clustercache.Handler
changeHandler clustercache.ChangeHandler
}
func NewFakeClusterCache() *fakeClusterCache {
return &fakeClusterCache{
summarizedObjects: []*summary.SummarizedObject{},
addHandler: nil,
removeHandler: nil,
changeHandler: nil,
}
}
func (f *fakeClusterCache) Get(gvk schema2.GroupVersionKind, namespace, name string) (interface{}, bool, error) {
return nil, false, nil
}
func (f *fakeClusterCache) List(gvk schema2.GroupVersionKind) []interface{} {
var retList []interface{}
for _, summaryObj := range f.summarizedObjects {
if summaryObj.GroupVersionKind() != gvk {
// only list the summary objects for the provided gvk
continue
}
retList = append(retList, summaryObj)
}
return retList
}
func (f *fakeClusterCache) OnAdd(ctx context.Context, handler clustercache.Handler) {
f.addHandler = handler
}
func (f *fakeClusterCache) OnRemove(ctx context.Context, handler clustercache.Handler) {
f.removeHandler = handler
}
func (f *fakeClusterCache) OnChange(ctx context.Context, handler clustercache.ChangeHandler) {
f.changeHandler = handler
}
func (f *fakeClusterCache) OnSchemas(schemas *schema.Collection) error {
return nil
}
func (f *fakeClusterCache) AddSummaryObj(summaryObj *summary.SummarizedObject) {
f.summarizedObjects = append(f.summarizedObjects, summaryObj)
}
func makeSummarizedObject(gvk schema2.GroupVersionKind, name string, namespace string, version string) *summary.SummarizedObject {
apiVersion, kind := gvk.ToAPIVersionAndKind()
return &summary.SummarizedObject{
Summary: summary.Summary{
State: "",
Error: false,
Transitioning: false,
},
PartialObjectMetadata: metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version, // any non-zero value should work here. 0 seems to have specific meaning for counts
},
},
}
}