Updates Schemas watch logic.

Updated logic used to decide when and what schema events are sent
during a watch.
This commit is contained in:
Kevin Joiner 2022-10-20 00:16:01 -04:00
parent 44e5b8dc3d
commit 72ab913c4c
9 changed files with 738 additions and 36 deletions

1
go.mod
View File

@ -12,6 +12,7 @@ replace (
require (
github.com/adrg/xdg v0.3.1
github.com/golang/mock v1.5.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/pborman/uuid v1.2.0

1
go.sum
View File

@ -278,6 +278,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.0.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

View File

@ -12,6 +12,8 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
)
//go:generate mockgen --build_flags=--mod=mod -package fake -destination fake/AccessSetLookup.go "github.com/rancher/steve/pkg/accesscontrol" AccessSetLookup
type AccessSetLookup interface {
AccessFor(user user.Info) *AccessSet
PurgeUserData(id string)

View File

@ -0,0 +1,62 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/rancher/steve/pkg/accesscontrol (interfaces: AccessSetLookup)
// Package fake is a generated GoMock package.
package fake
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
accesscontrol "github.com/rancher/steve/pkg/accesscontrol"
user "k8s.io/apiserver/pkg/authentication/user"
)
// MockAccessSetLookup is a mock of AccessSetLookup interface.
type MockAccessSetLookup struct {
ctrl *gomock.Controller
recorder *MockAccessSetLookupMockRecorder
}
// MockAccessSetLookupMockRecorder is the mock recorder for MockAccessSetLookup.
type MockAccessSetLookupMockRecorder struct {
mock *MockAccessSetLookup
}
// NewMockAccessSetLookup creates a new mock instance.
func NewMockAccessSetLookup(ctrl *gomock.Controller) *MockAccessSetLookup {
mock := &MockAccessSetLookup{ctrl: ctrl}
mock.recorder = &MockAccessSetLookupMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockAccessSetLookup) EXPECT() *MockAccessSetLookupMockRecorder {
return m.recorder
}
// AccessFor mocks base method.
func (m *MockAccessSetLookup) AccessFor(arg0 user.Info) *accesscontrol.AccessSet {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AccessFor", arg0)
ret0, _ := ret[0].(*accesscontrol.AccessSet)
return ret0
}
// AccessFor indicates an expected call of AccessFor.
func (mr *MockAccessSetLookupMockRecorder) AccessFor(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AccessFor", reflect.TypeOf((*MockAccessSetLookup)(nil).AccessFor), arg0)
}
// PurgeUserData mocks base method.
func (m *MockAccessSetLookup) PurgeUserData(arg0 string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "PurgeUserData", arg0)
}
// PurgeUserData indicates an expected call of PurgeUserData.
func (mr *MockAccessSetLookupMockRecorder) PurgeUserData(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PurgeUserData", reflect.TypeOf((*MockAccessSetLookup)(nil).PurgeUserData), arg0)
}

View File

@ -1,13 +1,13 @@
// Package schemas handles streaming schema updates and changes.
package schemas
import (
"context"
"fmt"
"sync"
"time"
"github.com/rancher/apiserver/pkg/builtin"
"k8s.io/apimachinery/pkg/api/equality"
schemastore "github.com/rancher/apiserver/pkg/store/schema"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
@ -15,10 +15,12 @@ import (
"github.com/rancher/wrangler/pkg/broadcast"
"github.com/rancher/wrangler/pkg/schemas/validation"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
// SetupWatcher create a new schema.Store for tracking schema changes
func SetupWatcher(ctx context.Context, schemas *types.APISchemas, asl accesscontrol.AccessSetLookup, factory schema.Factory) {
// one instance shared with all stores
notifier := schemaChangeNotifier(ctx, factory)
@ -34,6 +36,7 @@ func SetupWatcher(ctx context.Context, schemas *types.APISchemas, asl accesscont
schemas.AddSchema(schema)
}
// Store hold information for watching updates to schemas
type Store struct {
types.Store
@ -42,14 +45,16 @@ type Store struct {
schemaChangeNotify func(context.Context) (chan interface{}, error)
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
// Watch will return a APIevent channel that tracks changes to schemas for a user in a given APIRequest.
// Changes will be returned until Done is closed on the context in the given APIRequest.
func (s *Store) Watch(apiOp *types.APIRequest, _ *types.APISchema, _ types.WatchRequest) (chan types.APIEvent, error) {
user, ok := request.UserFrom(apiOp.Request.Context())
if !ok {
return nil, validation.Unauthorized
}
wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
result := make(chan types.APIEvent)
go func() {
@ -57,30 +62,38 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
close(result)
}()
go func() {
defer wg.Done()
c, err := s.schemaChangeNotify(apiOp.Context())
if err != nil {
return
}
schemas, err := s.sf.Schemas(user)
if err != nil {
logrus.Errorf("failed to generate schemas for user %v: %v", user, err)
return
}
for range c {
schemas = s.sendSchemas(result, apiOp, user, schemas)
}
}()
schemas, err := s.sf.Schemas(user)
if err != nil {
return nil, fmt.Errorf("failed to generate schemas for user '%v': %w", user, err)
}
// Create child contexts that allows us to cancel both change notifications routines.
notifyCtx, notifyCancel := context.WithCancel(apiOp.Context())
schemaChangeSignal, err := s.schemaChangeNotify(notifyCtx)
if err != nil {
notifyCancel()
return nil, fmt.Errorf("failed to start schema change notifications: %w", err)
}
userChangeSignal := s.userChangeNotify(notifyCtx, user)
go func() {
defer notifyCancel()
defer wg.Done()
schemas, err := s.sf.Schemas(user)
if err != nil {
logrus.Errorf("failed to generate schemas for notify user %v: %v", user, err)
return
}
for range s.userChangeNotify(apiOp.Context(), user) {
// For each change notification send schema updates onto the result channel.
for {
select {
case _, ok := <-schemaChangeSignal:
if !ok {
return
}
case _, ok := <-userChangeSignal:
if !ok {
return
}
}
schemas = s.sendSchemas(result, apiOp, user, schemas)
}
}()
@ -88,7 +101,9 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return result, nil
}
// sendSchemas will send APIEvents onto the provided result channel based on detected changes in the schemas for the provided users.
func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest, user user.Info, oldSchemas *types.APISchemas) *types.APISchemas {
// get the current schemas for a user
schemas, err := s.sf.Schemas(user)
if err != nil {
logrus.Errorf("failed to get schemas for %v: %v", user, err)
@ -96,9 +111,15 @@ func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest,
}
inNewSchemas := map[string]bool{}
for _, apiObject := range schemastore.FilterSchemas(apiOp, schemas.Schemas).Objects {
// Convert the schemas for the given user to a flat list of APIObjects.
apiObjects := schemastore.FilterSchemas(apiOp, schemas.Schemas).Objects
for i := range apiObjects {
apiObject := apiObjects[i]
inNewSchemas[apiObject.ID] = true
eventName := types.ChangeAPIEvent
// Check to see if the schema represented by the current APIObject exist in the oldSchemas.
if oldSchema := oldSchemas.LookupSchema(apiObject.ID); oldSchema == nil {
eventName = types.CreateAPIEvent
} else {
@ -106,10 +127,15 @@ func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest,
oldSchemaCopy := oldSchema.Schema.DeepCopy()
newSchemaCopy.Mapper = nil
oldSchemaCopy.Mapper = nil
// APIObjects are intentionally stripped of access information. Thus we will remove the field when comparing changes.
delete(oldSchemaCopy.Attributes, "access")
if equality.Semantic.DeepEqual(newSchemaCopy, oldSchemaCopy) {
continue
}
}
// Send the new or modified schema as an APIObject on the APIEvent channel.
result <- types.APIEvent{
Name: eventName,
ResourceType: "schema",
@ -117,7 +143,10 @@ func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest,
}
}
for _, oldSchema := range schemastore.FilterSchemas(apiOp, oldSchemas.Schemas).Objects {
// Identify all of the oldSchema APIObjects that have been removed and send Remove APIEvents.
oldSchemaObjs := schemastore.FilterSchemas(apiOp, oldSchemas.Schemas).Objects
for i := range oldSchemaObjs {
oldSchema := oldSchemaObjs[i]
if inNewSchemas[oldSchema.ID] {
continue
}
@ -131,6 +160,9 @@ func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest,
return schemas
}
// userChangeNotify gets the provided users AccessSet every 2 seconds.
// If the AccessSet has changed the caller is notified via an empty struct sent on the returned channel.
// If the given context is finished then the returned channel will be closed.
func (s *Store) userChangeNotify(ctx context.Context, user user.Info) chan interface{} {
as := s.asl.AccessFor(user)
result := make(chan interface{})
@ -154,6 +186,7 @@ func (s *Store) userChangeNotify(ctx context.Context, user user.Info) chan inter
return result
}
// schemaChangeNotifier returns a channel that is used to signal OnChange was called for the provided factory.
func schemaChangeNotifier(ctx context.Context, factory schema.Factory) func(ctx context.Context) (chan interface{}, error) {
notify := make(chan interface{})
bcast := &broadcast.Broadcaster{}

View File

@ -0,0 +1,491 @@
// Package schemas handles streaming schema updates and changes.
package schemas_test
import (
"context"
"encoding/json"
"net/http/httptest"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
acfake "github.com/rancher/steve/pkg/accesscontrol/fake"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/resources/schemas"
schemafake "github.com/rancher/steve/pkg/schema/fake"
v1schema "github.com/rancher/wrangler/pkg/schemas"
"github.com/stretchr/testify/assert"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
var setupTimeout = time.Millisecond * 50
const resourceType = "schemas"
func Test_WatchChangeDetection(t *testing.T) {
ctrl := gomock.NewController(t)
asl := acfake.NewMockAccessSetLookup(ctrl)
userInfo := &user.DefaultInfo{
Name: "test",
UID: "test",
Groups: nil,
Extra: nil,
}
accessSet := &accesscontrol.AccessSet{}
// always return the same empty accessSet for the test user
asl.EXPECT().AccessFor(userInfo).Return(accessSet).AnyTimes()
req := httptest.NewRequest("GET", "/", nil)
type testValues struct {
expectedChanges []types.APIEvent
mockFactory *schemafake.MockFactory
eventsReady chan struct{}
}
tests := []struct {
name string
setup func(*gomock.Controller) testValues
}{
{
name: "Schemas have no change",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
baseSchemas := types.EmptyAPISchemas()
updateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
// initial schemas
baseSchemas.AddSchema(testSchema)
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
updateSchemas.AddSchema(testSchema)
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that initial Schemas were called
close(eventsReady)
return updateSchemas, nil
})
expectedEvents := []types.APIEvent{}
return testValues{expectedEvents, factory, eventsReady}
},
},
{
name: "New schema is added to schemas.",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
baseSchemas := types.EmptyAPISchemas()
updateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
testSchemaNew := types.APISchema{
Schema: &v1schema.Schema{
ID: "secret",
PluralName: "secrets",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
baseSchemas.AddSchema(testSchema)
// initial schemas
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
updateSchemas.AddSchema(testSchema)
updateSchemas.AddSchema((testSchemaNew))
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that initial Schemas were called
close(eventsReady)
return updateSchemas, nil
})
expectedEvents := []types.APIEvent{
{
Name: types.CreateAPIEvent,
ResourceType: "schema",
Object: types.APIObject{
Type: resourceType,
ID: testSchemaNew.ID,
Object: &testSchemaNew,
},
},
}
return testValues{expectedEvents, factory, eventsReady}
},
},
{
name: "Schema is deleted from schemas.",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
baseSchemas := types.EmptyAPISchemas()
updateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
testSchemaToDelete := types.APISchema{
Schema: &v1schema.Schema{
ID: "secret",
PluralName: "secrets",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
baseSchemas.AddSchema(testSchema)
baseSchemas.AddSchema(testSchemaToDelete)
// initial schemas
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
updateSchemas.AddSchema(testSchema)
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that initial Schemas were called
close(eventsReady)
return updateSchemas, nil
})
expectedEvents := []types.APIEvent{
{
Name: types.RemoveAPIEvent,
ResourceType: "schema",
Object: types.APIObject{
Type: resourceType,
ID: testSchemaToDelete.ID,
Object: &testSchemaToDelete,
},
},
}
return testValues{expectedEvents, factory, eventsReady}
},
},
{
name: "Empty Schemas",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
// initial schemas
factory.EXPECT().Schemas(userInfo).Return(types.EmptyAPISchemas(), nil)
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that initial Schemas were called
close(eventsReady)
return types.EmptyAPISchemas(), nil
})
return testValues{nil, factory, eventsReady}
},
},
{
name: "Schema kind attribute is updated",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
baseSchemas := types.EmptyAPISchemas()
updateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
baseSchemas.AddSchema(testSchema)
// initial schemas
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
// add kind attribute
attributes.SetKind(&testSchema, "newKind")
updateSchemas.AddSchema(testSchema)
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that initial Schemas were called
close(eventsReady)
return updateSchemas, nil
})
expectedEvents := []types.APIEvent{
{
Name: types.ChangeAPIEvent,
ResourceType: "schema",
Object: types.APIObject{
Type: resourceType,
ID: testSchema.ID,
Object: &testSchema,
},
},
}
return testValues{expectedEvents, factory, eventsReady}
},
},
{
name: "Schema access attribute is updated",
setup: func(ctrl *gomock.Controller) testValues {
factory := schemafake.NewMockFactory(ctrl)
eventsReady := make(chan struct{})
baseSchemas := types.EmptyAPISchemas()
updateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
baseSchemas.AddSchema(testSchema)
// initial schemas
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
// add access attribute
attributes.SetAccess(&testSchema, map[string]string{"List": "*"})
updateSchemas.AddSchema(testSchema)
// return updated schemas for the second request
factory.EXPECT().Schemas(userInfo).DoAndReturn(func(_ user.Info) (*types.APISchemas, error) {
// signal that schemas were requested
close(eventsReady)
return updateSchemas, nil
})
expectedEvents := []types.APIEvent{}
return testValues{expectedEvents, factory, eventsReady}
},
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// create new context for the test user
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
apiOp := &types.APIRequest{
Request: req.WithContext(request.WithUser(testCtx, userInfo)),
}
// create test factory
ctrl := gomock.NewController(t)
values := test.setup(ctrl)
// store onChange cb use to trigger the notifier that will be set in schemas.SetupWatcher(..)
var onChangeCB func()
values.mockFactory.EXPECT().OnChange(gomock.AssignableToTypeOf(testCtx), gomock.AssignableToTypeOf(onChangeCB)).
Do(func(_ context.Context, cb func()) {
onChangeCB = cb
})
baseSchemas := types.EmptyAPISchemas()
// create a new store and add it to baseSchemas
schemas.SetupWatcher(testCtx, baseSchemas, asl, values.mockFactory)
schema := baseSchemas.LookupSchema(resourceType)
// Start watching
resultChan, err := schema.Store.Watch(apiOp, nil, types.WatchRequest{})
assert.NoError(t, err, "Unexpected error starting Watch")
// wait for the store's go routines to start watching for onChange events
time.Sleep(setupTimeout)
// trigger watch notification that fetches new schemas
onChangeCB()
select {
case <-values.eventsReady:
// New schema was requested now we sleep to give time for watcher to send events
time.Sleep(setupTimeout)
case <-time.After(setupTimeout):
// When we continue here then the test will fail due to missing mock calls not being called.
}
// verify correct results are sent
hasExpectedResults(t, values.expectedChanges, resultChan, setupTimeout)
})
}
}
func Test_AccessSetAndChangeSignal(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
asl := acfake.NewMockAccessSetLookup(ctrl)
userInfo := &user.DefaultInfo{
Name: "test",
UID: "test",
Groups: nil,
Extra: nil,
}
accessSet := &accesscontrol.AccessSet{}
changedSet := &accesscontrol.AccessSet{ID: "1"}
// return access set with ID "" the first time then "1" for subsequent request
gomock.InOrder(
asl.EXPECT().AccessFor(userInfo).Return(accessSet),
asl.EXPECT().AccessFor(userInfo).Return(changedSet).AnyTimes(),
)
req := httptest.NewRequest("GET", "/", nil)
factory := schemafake.NewMockFactory(ctrl)
baseSchemas := types.EmptyAPISchemas()
onChangeUpdateSchemas := types.EmptyAPISchemas()
userAccessUpdateSchemas := types.EmptyAPISchemas()
testSchema := types.APISchema{
Schema: &v1schema.Schema{
ID: "pod",
PluralName: "pods",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
// initial schemas
baseSchemas.AddSchema(testSchema)
factory.EXPECT().Schemas(userInfo).Return(baseSchemas, nil)
testSchemaNew := types.APISchema{
Schema: &v1schema.Schema{
ID: "secret",
PluralName: "secrets",
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
},
}
onChangeUpdateSchemas.AddSchema(testSchema)
onChangeUpdateSchemas.AddSchema((testSchemaNew))
// return updated schemas with new schemas added
factory.EXPECT().Schemas(userInfo).Return(onChangeUpdateSchemas, nil)
userAccessUpdateSchemas.AddSchema(testSchema)
// return updated schemas with new schemas removed
factory.EXPECT().Schemas(userInfo).Return(userAccessUpdateSchemas, nil)
expectedEvents := []types.APIEvent{
{
Name: types.CreateAPIEvent,
ResourceType: "schema",
Object: types.APIObject{
Type: resourceType,
ID: testSchemaNew.ID,
Object: &testSchemaNew,
},
},
{
Name: types.RemoveAPIEvent,
ResourceType: "schema",
Object: types.APIObject{
Type: resourceType,
ID: testSchemaNew.ID,
Object: &testSchemaNew,
},
},
}
// create new context for the test user
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
apiOp := &types.APIRequest{
Request: req.WithContext(request.WithUser(testCtx, userInfo)),
}
// store onChange cb use to trigger the notifier that will be set in schemas.SetupWatcher(..)
var onChangeCB func()
factory.EXPECT().OnChange(gomock.AssignableToTypeOf(testCtx), gomock.AssignableToTypeOf(onChangeCB)).
Do(func(_ context.Context, cb func()) {
onChangeCB = cb
})
watcherSchema := types.EmptyAPISchemas()
// create a new store and add it to watcherSchema
schemas.SetupWatcher(testCtx, watcherSchema, asl, factory)
schema := watcherSchema.LookupSchema(resourceType)
// Start watching
resultChan, err := schema.Store.Watch(apiOp, nil, types.WatchRequest{})
assert.NoError(t, err, "Unexpected error starting Watch")
// wait for the store's go routines to start watching for onChange events
time.Sleep(setupTimeout)
// trigger watch notification that fetches new schemas
onChangeCB()
// wait for user access set to be checked (2 seconds)
time.Sleep(time.Millisecond * 2100)
// verify correct results are sent
hasExpectedResults(t, expectedEvents, resultChan, setupTimeout)
}
// hasExpectedResults verifies the list of expected apiEvents are all received from the provided channel.
func hasExpectedResults(t *testing.T, expectedEvents []types.APIEvent, resultChan chan types.APIEvent, timeout time.Duration) {
t.Helper()
numEventsSent := 0
for {
select {
case event, ok := <-resultChan:
if !ok {
if numEventsSent == len(expectedEvents) {
// we got everything we expect
return
}
assert.Fail(t, "result channel unexpectedly closed")
}
if numEventsSent >= len(expectedEvents) {
assert.Failf(t, "too many events", "received unexpected events on channel %+v", event)
return
}
eventJSON, err := json.Marshal(event)
assert.NoError(t, err, "failed to marshal new event")
expectedJSON, err := json.Marshal(event)
assert.NoError(t, err, "failed to marshal expected event")
assert.JSONEq(t, string(expectedJSON), string(eventJSON), "incorrect event received")
case <-time.After(timeout):
if numEventsSent != len(expectedEvents) {
assert.Fail(t, "timeout waiting for results")
}
return
}
numEventsSent++
}
}

View File

@ -14,18 +14,9 @@ import (
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
type Factory interface {
Schemas(user user.Info) (*types.APISchemas, error)
ByGVR(gvr schema.GroupVersionResource) string
ByGVK(gvr schema.GroupVersionKind) string
OnChange(ctx context.Context, cb func())
AddTemplate(template ...Template)
}
type Collection struct {
toSync int32
baseSchema *types.APISchemas

View File

@ -1,6 +1,8 @@
package schema
//go:generate mockgen --build_flags=--mod=mod -package fake -destination fake/factory.go "github.com/rancher/steve/pkg/schema" Factory
import (
"context"
"fmt"
"net/http"
"time"
@ -9,9 +11,18 @@ import (
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authentication/user"
)
type Factory interface {
Schemas(user user.Info) (*types.APISchemas, error)
ByGVR(gvr schema.GroupVersionResource) string
ByGVK(gvr schema.GroupVersionKind) string
OnChange(ctx context.Context, cb func())
AddTemplate(template ...Template)
}
func newSchemas() (*types.APISchemas, error) {
apiSchemas := types.EmptyAPISchemas()
if err := apiSchemas.AddSchemas(builtin.Schemas); err != nil {

110
pkg/schema/fake/factory.go Normal file
View File

@ -0,0 +1,110 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/rancher/steve/pkg/schema (interfaces: Factory)
// Package fake is a generated GoMock package.
package fake
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
types "github.com/rancher/apiserver/pkg/types"
schema "github.com/rancher/steve/pkg/schema"
schema0 "k8s.io/apimachinery/pkg/runtime/schema"
user "k8s.io/apiserver/pkg/authentication/user"
)
// MockFactory is a mock of Factory interface.
type MockFactory struct {
ctrl *gomock.Controller
recorder *MockFactoryMockRecorder
}
// MockFactoryMockRecorder is the mock recorder for MockFactory.
type MockFactoryMockRecorder struct {
mock *MockFactory
}
// NewMockFactory creates a new mock instance.
func NewMockFactory(ctrl *gomock.Controller) *MockFactory {
mock := &MockFactory{ctrl: ctrl}
mock.recorder = &MockFactoryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockFactory) EXPECT() *MockFactoryMockRecorder {
return m.recorder
}
// AddTemplate mocks base method.
func (m *MockFactory) AddTemplate(arg0 ...schema.Template) {
m.ctrl.T.Helper()
varargs := []interface{}{}
for _, a := range arg0 {
varargs = append(varargs, a)
}
m.ctrl.Call(m, "AddTemplate", varargs...)
}
// AddTemplate indicates an expected call of AddTemplate.
func (mr *MockFactoryMockRecorder) AddTemplate(arg0 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTemplate", reflect.TypeOf((*MockFactory)(nil).AddTemplate), arg0...)
}
// ByGVK mocks base method.
func (m *MockFactory) ByGVK(arg0 schema0.GroupVersionKind) string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ByGVK", arg0)
ret0, _ := ret[0].(string)
return ret0
}
// ByGVK indicates an expected call of ByGVK.
func (mr *MockFactoryMockRecorder) ByGVK(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ByGVK", reflect.TypeOf((*MockFactory)(nil).ByGVK), arg0)
}
// ByGVR mocks base method.
func (m *MockFactory) ByGVR(arg0 schema0.GroupVersionResource) string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ByGVR", arg0)
ret0, _ := ret[0].(string)
return ret0
}
// ByGVR indicates an expected call of ByGVR.
func (mr *MockFactoryMockRecorder) ByGVR(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ByGVR", reflect.TypeOf((*MockFactory)(nil).ByGVR), arg0)
}
// OnChange mocks base method.
func (m *MockFactory) OnChange(arg0 context.Context, arg1 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnChange", arg0, arg1)
}
// OnChange indicates an expected call of OnChange.
func (mr *MockFactoryMockRecorder) OnChange(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnChange", reflect.TypeOf((*MockFactory)(nil).OnChange), arg0, arg1)
}
// Schemas mocks base method.
func (m *MockFactory) Schemas(arg0 user.Info) (*types.APISchemas, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Schemas", arg0)
ret0, _ := ret[0].(*types.APISchemas)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Schemas indicates an expected call of Schemas.
func (mr *MockFactoryMockRecorder) Schemas(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Schemas", reflect.TypeOf((*MockFactory)(nil).Schemas), arg0)
}