diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index d2a431d9..0f1fb6ad 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -44,7 +44,7 @@ type guardedInformer struct { mutex *sync.Mutex } -type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool) (*informer.Informer, error) +type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool) (*informer.Informer, error) type Cache struct { informer.ByOptionsLister @@ -121,7 +121,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] shouldEncrypt := f.encryptAll || encryptResourceAlways - i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced) + i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable) if err != nil { return Cache{}, err } diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index 658f5a60..8dee2da4 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -74,7 +74,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -119,7 +119,7 @@ func TestCacheFor(t *testing.T) { // need to set this so Run function is not nil SharedIndexInformer: sii, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -161,7 +161,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -200,7 +200,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -248,7 +248,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -295,7 +295,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) @@ -342,7 +342,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) { // we can't test func == func, so instead we check if the output was as expected input := "someinput" ouput, err := transform(input) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 5d614fc4..8ba193f7 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -20,6 +20,8 @@ import ( "k8s.io/client-go/tools/cache" ) +var defaultRefreshTime = 5 * time.Second + // Informer is a SQLite-backed cache.SharedIndexInformer that can execute queries on listprocessor structs type Informer struct { cache.SharedIndexInformer @@ -35,15 +37,22 @@ var newInformer = cache.NewSharedIndexInformer // NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form // using the specified client -func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*Informer, error) { +func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*Informer, error) { + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + return client.Watch(ctx, options) + } + if !watchable { + watchFunc = func(options metav1.ListOptions) (watch.Interface, error) { + ctx, cancel := context.WithCancel(ctx) + return newSyntheticWatcher(ctx, cancel).watch(client, options, defaultRefreshTime) + } + } listWatcher := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { a, err := client.List(ctx, options) return a, err }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Watch(ctx, options) - }, + WatchFunc: watchFunc, } example := &unstructured.Unstructured{} @@ -97,6 +106,11 @@ func (i *Informer) ListByOptions(ctx context.Context, lo ListOptions, partitions return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace) } +// SetSyntheticWatchableInterval - call this function to override the default interval time of 5 seconds +func SetSyntheticWatchableInterval(interval time.Duration) { + defaultRefreshTime = interval +} + func informerNameFromGVK(gvk schema.GroupVersionKind) string { return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind } diff --git a/pkg/sqlcache/informer/informer_test.go b/pkg/sqlcache/informer/informer_test.go index 408bb8b4..4fe84c6a 100644 --- a/pkg/sqlcache/informer/informer_test.go +++ b/pkg/sqlcache/informer/informer_test.go @@ -79,7 +79,7 @@ func TestNewInformer(t *testing.T) { } }) - informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -103,7 +103,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) { @@ -138,7 +138,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) { @@ -190,7 +190,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) { @@ -253,7 +253,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true) + informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -289,7 +289,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - _, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true) assert.Error(t, err) newInformer = cache.NewSharedIndexInformer }}) diff --git a/pkg/sqlcache/informer/synthetic_watcher.go b/pkg/sqlcache/informer/synthetic_watcher.go new file mode 100644 index 00000000..7d973212 --- /dev/null +++ b/pkg/sqlcache/informer/synthetic_watcher.go @@ -0,0 +1,141 @@ +package informer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" +) + +type SyntheticWatcher struct { + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + stopChanLock sync.Mutex + context context.Context + cancelFunc context.CancelFunc +} + +func newSyntheticWatcher(context context.Context, cancel context.CancelFunc) *SyntheticWatcher { + return &SyntheticWatcher{ + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + context: context, + cancelFunc: cancel, + } +} + +func (rw *SyntheticWatcher) watch(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) (*SyntheticWatcher, error) { + go rw.receive(client, options, interval) + return rw, nil +} + +type objectHolder struct { + version string + unstructuredObject *unstructured.Unstructured +} + +// receive periodically calls client.List(), and converts the returned items into Watch Events +func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) { + go func() { + defer close(rw.doneChan) + defer close(rw.resultChan) + defer rw.cancelFunc() + previousState := make(map[string]objectHolder) + ticker := time.NewTicker(interval) + + for { + select { + case <-ticker.C: + list, err := client.List(rw.context, options) + if err != nil { + logrus.Errorf("synthetic watcher: client.List => error: %s", err) + continue + } + newObjects := make(map[string]objectHolder) + for _, uItem := range list.Items { + namespace := uItem.GetNamespace() + name := uItem.GetName() + key := name + if namespace != "" { + key = fmt.Sprintf("%s/%s", namespace, name) + } + version := uItem.GetResourceVersion() + newObjects[key] = objectHolder{version: version, unstructuredObject: &uItem} + } + // Now determine whether items were added, deleted, or modified + currentState := make(map[string]objectHolder) + for key, newObject := range newObjects { + currentState[key] = newObject + if oldItem, ok := previousState[key]; !ok { + w, err := createWatchEvent(watch.Added, newObject.unstructuredObject) + if err != nil { + logrus.Errorf("can't convert unstructured obj into runtime: %s", err) + continue + } + rw.resultChan <- w + } else { + delete(previousState, key) + if oldItem.version != newObject.version { + w, err := createWatchEvent(watch.Modified, oldItem.unstructuredObject) + if err != nil { + logrus.Errorf("can't convert unstructured obj into runtime: %s", err) + continue + } + rw.resultChan <- w + } + } + } + // And anything left in the previousState didn't show up in currentState and can be deleted. + for _, item := range previousState { + w, err := createWatchEvent(watch.Deleted, item.unstructuredObject) + if err != nil { + continue + } + rw.resultChan <- w + } + previousState = currentState + + case <-rw.stopChan: + rw.cancelFunc() + return + + case <-rw.context.Done(): + return + } + } + }() +} + +func createWatchEvent(event watch.EventType, u *unstructured.Unstructured) (watch.Event, error) { + return watch.Event{Type: event, Object: u}, nil +} + +// ResultChan implements [k8s.io/apimachinery/pkg/watch].Interface. +func (rw *SyntheticWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements [k8s.io/apimachinery/pkg/watch].Interface. +func (rw *SyntheticWatcher) Stop() { + rw.stopChanLock.Lock() + defer rw.stopChanLock.Unlock() + + // Prevent closing an already closed channel to prevent a panic + select { + case <-rw.stopChan: + default: + close(rw.stopChan) + } +} + +func (rw *SyntheticWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/pkg/sqlcache/informer/synthetic_watcher_test.go b/pkg/sqlcache/informer/synthetic_watcher_test.go new file mode 100644 index 00000000..e6304064 --- /dev/null +++ b/pkg/sqlcache/informer/synthetic_watcher_test.go @@ -0,0 +1,190 @@ +/* +Copyright 2024 SUSE LLC +*/ + +package informer + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +func TestSyntheticWatcher(t *testing.T) { + dynamicClient := NewMockResourceInterface(gomock.NewController(t)) + var err error + cs1 := v1.ComponentStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ComponentStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cs1", + UID: "1", + ResourceVersion: "rv1.1", + }, + Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs1"}}, + } + cs2 := v1.ComponentStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ComponentStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cs2", + UID: "2", + ResourceVersion: "rv2.1", + }, + Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs2"}}, + } + cs3 := v1.ComponentStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ComponentStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cs3", + UID: "3", + ResourceVersion: "rv3.1", + }, + Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs3"}}, + } + cs4 := v1.ComponentStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ComponentStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cs4", + UID: "4", + ResourceVersion: "rv4.1", + }, + Conditions: []v1.ComponentCondition{v1.ComponentCondition{Type: "Healthy", Status: v1.ConditionTrue, Message: "hi from cs4"}}, + } + list, err := makeCSList(cs1, cs2, cs3, cs4) + assert.Nil(t, err) + dynamicClient.EXPECT().List(gomock.Any(), gomock.Any()).Return(list, nil) + // Make copies to avoid modifying objects before the watcher has processed them. + cs1b := cs1.DeepCopy() + cs1b.ObjectMeta.ResourceVersion = "rv1.2" + cs2b := cs2.DeepCopy() + cs2b.ObjectMeta.ResourceVersion = "rv2.2" + list2, err := makeCSList(*cs1b, *cs2b, cs4) + assert.Nil(t, err) + dynamicClient.EXPECT().List(gomock.Any(), gomock.Any()).AnyTimes().Return(list2, nil) + + ctx, cancel := context.WithCancel(context.Background()) + sw := newSyntheticWatcher(ctx, cancel) + pollingInterval := 10 * time.Millisecond + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + return sw.watch(dynamicClient, options, pollingInterval) + } + options := metav1.ListOptions{} + w, err := watchFunc(options) + assert.Nil(t, err) + errChan := make(chan error) + results := make([]processedObjectInfo, 0) + var wg sync.WaitGroup + wg.Add(2) + go func() { + results, err = handleAnyWatch(w, errChan, sw.stopChan) + wg.Done() + }() + + go func() { + time.Sleep(40 * time.Millisecond) + sw.stopChan <- struct{}{} + wg.Done() + }() + wg.Wait() + // Verify we get what we expected to see + assert.Len(t, results, 8) + for i, _ := range list.Items { + assert.Equal(t, "added-result", results[i].eventName) + } + assert.Equal(t, "modified-result", results[len(list.Items)].eventName) + assert.Equal(t, "modified-result", results[len(list.Items)+1].eventName) + assert.Equal(t, "deleted-result", results[len(list.Items)+2].eventName) + assert.Equal(t, "stop", results[7].eventName) + // We can't really assert that the events get the correct timestamps on them + // because they can be held up in the channel for unexpected durations. I did have + // assert.Greater(t, float64(timeDelta), 0.9*float64(pollingInterval)) + // but saw a failure -- the interval was actually 0.75 * pollingInterval. + // So there's no point testing that. + assert.Greater(t, results[4].createdAt, results[0].createdAt) +} + +func makeCSList(objs ...v1.ComponentStatus) (*unstructured.UnstructuredList, error) { + unList := make([]unstructured.Unstructured, len(objs)) + for i, cs := range objs { + unst, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&cs) + if err != nil { + return nil, err + } + unList[i] = unstructured.Unstructured{Object: unst} + } + list := &unstructured.UnstructuredList{ + Items: unList, + } + return list, nil +} + +type processedObjectInfo struct { + createdAt time.Time + eventName string + payload interface{} +} + +func makeProcessedObject(eventName string, payload interface{}) processedObjectInfo { + return processedObjectInfo{ + createdAt: time.Now(), + eventName: eventName, + payload: payload, + } +} + +func handleAnyWatch(w watch.Interface, + errCh chan error, + stopCh <-chan struct{}, +) ([]processedObjectInfo, error) { + results := make([]processedObjectInfo, 0) +loop: + for { + select { + case <-stopCh: + results = append(results, makeProcessedObject("stop", nil)) + return results, nil + case err := <-errCh: + results = append(results, makeProcessedObject("error", err)) + return results, err + case event, ok := <-w.ResultChan(): + if !ok { + results = append(results, makeProcessedObject("bad-result", nil)) + break loop + } + switch event.Type { + case watch.Added: + results = append(results, makeProcessedObject("added-result", &event)) + case watch.Modified: + results = append(results, makeProcessedObject("modified-result", &event)) + case watch.Deleted: + results = append(results, makeProcessedObject("deleted-result", &event)) + case watch.Bookmark: + results = append(results, makeProcessedObject("bookmark-result", &event)) + default: + results = append(results, makeProcessedObject("unexpected-result", &event)) + } + } + } + return results, nil +}