mirror of
https://github.com/rancher/steve.git
synced 2025-07-31 22:36:52 +00:00
support unwatchables in vai (#458)
* Create and use a synthetic watcher for non-watchable resources. * Write unit tests for the synthetic watcher. * Make the refresh interval for synthetic watchers configurable. The default is to call `client.List(...)` every 5 seconds for each unwatchable GVK. There are currently only 3 such GVKs right now so this will be informative enough but not really noticeable. * Pass the context into the synthetic watch func. * Restore changes lost in rebasing. --------- Co-authored-by: Tom Lebreux <tom.lebreux@suse.com>
This commit is contained in:
parent
6559fa9ad7
commit
c906c36bc3
@ -44,7 +44,7 @@ type guardedInformer struct {
|
|||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool) (*informer.Informer, error)
|
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool) (*informer.Informer, error)
|
||||||
|
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
informer.ByOptionsLister
|
informer.ByOptionsLister
|
||||||
@ -121,7 +121,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor
|
|||||||
|
|
||||||
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
|
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
|
||||||
shouldEncrypt := f.encryptAll || encryptResourceAlways
|
shouldEncrypt := f.encryptAll || encryptResourceAlways
|
||||||
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced)
|
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Cache{}, err
|
return Cache{}, err
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -119,7 +119,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
// need to set this so Run function is not nil
|
// need to set this so Run function is not nil
|
||||||
SharedIndexInformer: sii,
|
SharedIndexInformer: sii,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -161,7 +161,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -200,7 +200,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -248,7 +248,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -295,7 +295,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
assert.Equal(t, fields, fields)
|
assert.Equal(t, fields, fields)
|
||||||
assert.Equal(t, expectedGVK, gvk)
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
@ -342,7 +342,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
expectedC := Cache{
|
expectedC := Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
|
||||||
// we can't test func == func, so instead we check if the output was as expected
|
// we can't test func == func, so instead we check if the output was as expected
|
||||||
input := "someinput"
|
input := "someinput"
|
||||||
ouput, err := transform(input)
|
ouput, err := transform(input)
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var defaultRefreshTime = 5 * time.Second
|
||||||
|
|
||||||
// Informer is a SQLite-backed cache.SharedIndexInformer that can execute queries on listprocessor structs
|
// Informer is a SQLite-backed cache.SharedIndexInformer that can execute queries on listprocessor structs
|
||||||
type Informer struct {
|
type Informer struct {
|
||||||
cache.SharedIndexInformer
|
cache.SharedIndexInformer
|
||||||
@ -35,15 +37,22 @@ var newInformer = cache.NewSharedIndexInformer
|
|||||||
|
|
||||||
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
|
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
|
||||||
// using the specified client
|
// using the specified client
|
||||||
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*Informer, error) {
|
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*Informer, error) {
|
||||||
|
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
return client.Watch(ctx, options)
|
||||||
|
}
|
||||||
|
if !watchable {
|
||||||
|
watchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
return newSyntheticWatcher(ctx, cancel).watch(client, options, defaultRefreshTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
listWatcher := &cache.ListWatch{
|
listWatcher := &cache.ListWatch{
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
a, err := client.List(ctx, options)
|
a, err := client.List(ctx, options)
|
||||||
return a, err
|
return a, err
|
||||||
},
|
},
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
WatchFunc: watchFunc,
|
||||||
return client.Watch(ctx, options)
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
example := &unstructured.Unstructured{}
|
example := &unstructured.Unstructured{}
|
||||||
@ -97,6 +106,11 @@ func (i *Informer) ListByOptions(ctx context.Context, lo ListOptions, partitions
|
|||||||
return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
|
return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSyntheticWatchableInterval - call this function to override the default interval time of 5 seconds
|
||||||
|
func SetSyntheticWatchableInterval(interval time.Duration) {
|
||||||
|
defaultRefreshTime = interval
|
||||||
|
}
|
||||||
|
|
||||||
func informerNameFromGVK(gvk schema.GroupVersionKind) string {
|
func informerNameFromGVK(gvk schema.GroupVersionKind) string {
|
||||||
return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind
|
return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
|
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.NotNil(t, informer.ByOptionsLister)
|
assert.NotNil(t, informer.ByOptionsLister)
|
||||||
assert.NotNil(t, informer.SharedIndexInformer)
|
assert.NotNil(t, informer.SharedIndexInformer)
|
||||||
@ -103,7 +103,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
|
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
}})
|
}})
|
||||||
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) {
|
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) {
|
||||||
@ -138,7 +138,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
|
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
}})
|
}})
|
||||||
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) {
|
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) {
|
||||||
@ -190,7 +190,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
|
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
}})
|
}})
|
||||||
tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) {
|
tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) {
|
||||||
@ -253,7 +253,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
transformFunc := func(input interface{}) (interface{}, error) {
|
transformFunc := func(input interface{}) (interface{}, error) {
|
||||||
return "someoutput", nil
|
return "someoutput", nil
|
||||||
}
|
}
|
||||||
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true)
|
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.NotNil(t, informer.ByOptionsLister)
|
assert.NotNil(t, informer.ByOptionsLister)
|
||||||
assert.NotNil(t, informer.SharedIndexInformer)
|
assert.NotNil(t, informer.SharedIndexInformer)
|
||||||
@ -289,7 +289,7 @@ func TestNewInformer(t *testing.T) {
|
|||||||
transformFunc := func(input interface{}) (interface{}, error) {
|
transformFunc := func(input interface{}) (interface{}, error) {
|
||||||
return "someoutput", nil
|
return "someoutput", nil
|
||||||
}
|
}
|
||||||
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true)
|
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
newInformer = cache.NewSharedIndexInformer
|
newInformer = cache.NewSharedIndexInformer
|
||||||
}})
|
}})
|
||||||
|
141
pkg/sqlcache/informer/synthetic_watcher.go
Normal file
141
pkg/sqlcache/informer/synthetic_watcher.go
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
package informer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/client-go/dynamic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyntheticWatcher struct {
|
||||||
|
resultChan chan watch.Event
|
||||||
|
stopChan chan struct{}
|
||||||
|
doneChan chan struct{}
|
||||||
|
stopChanLock sync.Mutex
|
||||||
|
context context.Context
|
||||||
|
cancelFunc context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyntheticWatcher(context context.Context, cancel context.CancelFunc) *SyntheticWatcher {
|
||||||
|
return &SyntheticWatcher{
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
|
doneChan: make(chan struct{}),
|
||||||
|
resultChan: make(chan watch.Event, 0),
|
||||||
|
context: context,
|
||||||
|
cancelFunc: cancel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rw *SyntheticWatcher) watch(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) (*SyntheticWatcher, error) {
|
||||||
|
go rw.receive(client, options, interval)
|
||||||
|
return rw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectHolder struct {
|
||||||
|
version string
|
||||||
|
unstructuredObject *unstructured.Unstructured
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive periodically calls client.List(), and converts the returned items into Watch Events
|
||||||
|
func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) {
|
||||||
|
go func() {
|
||||||
|
defer close(rw.doneChan)
|
||||||
|
defer close(rw.resultChan)
|
||||||
|
defer rw.cancelFunc()
|
||||||
|
previousState := make(map[string]objectHolder)
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
list, err := client.List(rw.context, options)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("synthetic watcher: client.List => error: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newObjects := make(map[string]objectHolder)
|
||||||
|
for _, uItem := range list.Items {
|
||||||
|
namespace := uItem.GetNamespace()
|
||||||
|
name := uItem.GetName()
|
||||||
|
key := name
|
||||||
|
if namespace != "" {
|
||||||
|
key = fmt.Sprintf("%s/%s", namespace, name)
|
||||||
|
}
|
||||||
|
version := uItem.GetResourceVersion()
|
||||||
|
newObjects[key] = objectHolder{version: version, unstructuredObject: &uItem}
|
||||||
|
}
|
||||||
|
// Now determine whether items were added, deleted, or modified
|
||||||
|
currentState := make(map[string]objectHolder)
|
||||||
|
for key, newObject := range newObjects {
|
||||||
|
currentState[key] = newObject
|
||||||
|
if oldItem, ok := previousState[key]; !ok {
|
||||||
|
w, err := createWatchEvent(watch.Added, newObject.unstructuredObject)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rw.resultChan <- w
|
||||||
|
} else {
|
||||||
|
delete(previousState, key)
|
||||||
|
if oldItem.version != newObject.version {
|
||||||
|
w, err := createWatchEvent(watch.Modified, oldItem.unstructuredObject)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rw.resultChan <- w
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// And anything left in the previousState didn't show up in currentState and can be deleted.
|
||||||
|
for _, item := range previousState {
|
||||||
|
w, err := createWatchEvent(watch.Deleted, item.unstructuredObject)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rw.resultChan <- w
|
||||||
|
}
|
||||||
|
previousState = currentState
|
||||||
|
|
||||||
|
case <-rw.stopChan:
|
||||||
|
rw.cancelFunc()
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-rw.context.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func createWatchEvent(event watch.EventType, u *unstructured.Unstructured) (watch.Event, error) {
|
||||||
|
return watch.Event{Type: event, Object: u}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultChan implements [k8s.io/apimachinery/pkg/watch].Interface.
|
||||||
|
func (rw *SyntheticWatcher) ResultChan() <-chan watch.Event {
|
||||||
|
return rw.resultChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop implements [k8s.io/apimachinery/pkg/watch].Interface.
|
||||||
|
func (rw *SyntheticWatcher) Stop() {
|
||||||
|
rw.stopChanLock.Lock()
|
||||||
|
defer rw.stopChanLock.Unlock()
|
||||||
|
|
||||||
|
// Prevent closing an already closed channel to prevent a panic
|
||||||
|
select {
|
||||||
|
case <-rw.stopChan:
|
||||||
|
default:
|
||||||
|
close(rw.stopChan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rw *SyntheticWatcher) Done() <-chan struct{} {
|
||||||
|
return rw.doneChan
|
||||||
|
}
|
190
pkg/sqlcache/informer/synthetic_watcher_test.go
Normal file
190
pkg/sqlcache/informer/synthetic_watcher_test.go
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 SUSE LLC
|
||||||
|
*/
|
||||||
|
|
||||||
|
package informer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/mock/gomock"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSyntheticWatcher(t *testing.T) {
|
||||||
|
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
|
||||||
|
var err error
|
||||||
|
cs1 := v1.ComponentStatus{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ComponentStatus",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "cs1",
|
||||||
|
UID: "1",
|
||||||
|
ResourceVersion: "rv1.1",
|
||||||
|
},
|
||||||
|
Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs1"}},
|
||||||
|
}
|
||||||
|
cs2 := v1.ComponentStatus{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ComponentStatus",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "cs2",
|
||||||
|
UID: "2",
|
||||||
|
ResourceVersion: "rv2.1",
|
||||||
|
},
|
||||||
|
Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs2"}},
|
||||||
|
}
|
||||||
|
cs3 := v1.ComponentStatus{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ComponentStatus",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "cs3",
|
||||||
|
UID: "3",
|
||||||
|
ResourceVersion: "rv3.1",
|
||||||
|
},
|
||||||
|
Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs3"}},
|
||||||
|
}
|
||||||
|
cs4 := v1.ComponentStatus{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ComponentStatus",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "cs4",
|
||||||
|
UID: "4",
|
||||||
|
ResourceVersion: "rv4.1",
|
||||||
|
},
|
||||||
|
Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs4"}},
|
||||||
|
}
|
||||||
|
list, err := makeCSList(cs1, cs2, cs3, cs4)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
dynamicClient.EXPECT().List(gomock.Any(), gomock.Any()).Return(list, nil)
|
||||||
|
// Make copies to avoid modifying objects before the watcher has processed them.
|
||||||
|
cs1b := cs1.DeepCopy()
|
||||||
|
cs1b.ObjectMeta.ResourceVersion = "rv1.2"
|
||||||
|
cs2b := cs2.DeepCopy()
|
||||||
|
cs2b.ObjectMeta.ResourceVersion = "rv2.2"
|
||||||
|
list2, err := makeCSList(*cs1b, *cs2b, cs4)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
dynamicClient.EXPECT().List(gomock.Any(), gomock.Any()).AnyTimes().Return(list2, nil)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
sw := newSyntheticWatcher(ctx, cancel)
|
||||||
|
pollingInterval := 10 * time.Millisecond
|
||||||
|
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
return sw.watch(dynamicClient, options, pollingInterval)
|
||||||
|
}
|
||||||
|
options := metav1.ListOptions{}
|
||||||
|
w, err := watchFunc(options)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
errChan := make(chan error)
|
||||||
|
results := make([]processedObjectInfo, 0)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
results, err = handleAnyWatch(w, errChan, sw.stopChan)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(40 * time.Millisecond)
|
||||||
|
sw.stopChan <- struct{}{}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
// Verify we get what we expected to see
|
||||||
|
assert.Len(t, results, 8)
|
||||||
|
for i, _ := range list.Items {
|
||||||
|
assert.Equal(t, "added-result", results[i].eventName)
|
||||||
|
}
|
||||||
|
assert.Equal(t, "modified-result", results[len(list.Items)].eventName)
|
||||||
|
assert.Equal(t, "modified-result", results[len(list.Items)+1].eventName)
|
||||||
|
assert.Equal(t, "deleted-result", results[len(list.Items)+2].eventName)
|
||||||
|
assert.Equal(t, "stop", results[7].eventName)
|
||||||
|
// We can't really assert that the events get the correct timestamps on them
|
||||||
|
// because they can be held up in the channel for unexpected durations. I did have
|
||||||
|
// assert.Greater(t, float64(timeDelta), 0.9*float64(pollingInterval))
|
||||||
|
// but saw a failure -- the interval was actually 0.75 * pollingInterval.
|
||||||
|
// So there's no point testing that.
|
||||||
|
assert.Greater(t, results[4].createdAt, results[0].createdAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeCSList(objs ...v1.ComponentStatus) (*unstructured.UnstructuredList, error) {
|
||||||
|
unList := make([]unstructured.Unstructured, len(objs))
|
||||||
|
for i, cs := range objs {
|
||||||
|
unst, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&cs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
unList[i] = unstructured.Unstructured{Object: unst}
|
||||||
|
}
|
||||||
|
list := &unstructured.UnstructuredList{
|
||||||
|
Items: unList,
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type processedObjectInfo struct {
|
||||||
|
createdAt time.Time
|
||||||
|
eventName string
|
||||||
|
payload interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeProcessedObject(eventName string, payload interface{}) processedObjectInfo {
|
||||||
|
return processedObjectInfo{
|
||||||
|
createdAt: time.Now(),
|
||||||
|
eventName: eventName,
|
||||||
|
payload: payload,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleAnyWatch(w watch.Interface,
|
||||||
|
errCh chan error,
|
||||||
|
stopCh <-chan struct{},
|
||||||
|
) ([]processedObjectInfo, error) {
|
||||||
|
results := make([]processedObjectInfo, 0)
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
results = append(results, makeProcessedObject("stop", nil))
|
||||||
|
return results, nil
|
||||||
|
case err := <-errCh:
|
||||||
|
results = append(results, makeProcessedObject("error", err))
|
||||||
|
return results, err
|
||||||
|
case event, ok := <-w.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
results = append(results, makeProcessedObject("bad-result", nil))
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added:
|
||||||
|
results = append(results, makeProcessedObject("added-result", &event))
|
||||||
|
case watch.Modified:
|
||||||
|
results = append(results, makeProcessedObject("modified-result", &event))
|
||||||
|
case watch.Deleted:
|
||||||
|
results = append(results, makeProcessedObject("deleted-result", &event))
|
||||||
|
case watch.Bookmark:
|
||||||
|
results = append(results, makeProcessedObject("bookmark-result", &event))
|
||||||
|
default:
|
||||||
|
results = append(results, makeProcessedObject("unexpected-result", &event))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user