1
0
mirror of https://github.com/rancher/steve.git synced 2025-07-17 08:21:41 +00:00
steve/pkg/sqlcache/informer/synthetic_watcher.go
Eric Promislow c906c36bc3
support unwatchables in vai (#458)
* Create and use a synthetic watcher for non-watchable resources.

* Write unit tests for the synthetic watcher.

* Make the refresh interval for synthetic watchers configurable.

The default is to call `client.List(...)` every 5 seconds for each unwatchable GVK.

There are currently only 3 such GVKs right now so this will be informative
enough but not really noticeable.

* Pass the context into the synthetic watch func.

* Restore changes lost in rebasing.

---------

Co-authored-by: Tom Lebreux <tom.lebreux@suse.com>
2025-02-20 12:45:58 -08:00

142 lines
3.9 KiB
Go

package informer
import (
"context"
"fmt"
"sync"
"time"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)
type SyntheticWatcher struct {
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
stopChanLock sync.Mutex
context context.Context
cancelFunc context.CancelFunc
}
func newSyntheticWatcher(context context.Context, cancel context.CancelFunc) *SyntheticWatcher {
return &SyntheticWatcher{
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
context: context,
cancelFunc: cancel,
}
}
func (rw *SyntheticWatcher) watch(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) (*SyntheticWatcher, error) {
go rw.receive(client, options, interval)
return rw, nil
}
type objectHolder struct {
version string
unstructuredObject *unstructured.Unstructured
}
// receive periodically calls client.List(), and converts the returned items into Watch Events
func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) {
go func() {
defer close(rw.doneChan)
defer close(rw.resultChan)
defer rw.cancelFunc()
previousState := make(map[string]objectHolder)
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
list, err := client.List(rw.context, options)
if err != nil {
logrus.Errorf("synthetic watcher: client.List => error: %s", err)
continue
}
newObjects := make(map[string]objectHolder)
for _, uItem := range list.Items {
namespace := uItem.GetNamespace()
name := uItem.GetName()
key := name
if namespace != "" {
key = fmt.Sprintf("%s/%s", namespace, name)
}
version := uItem.GetResourceVersion()
newObjects[key] = objectHolder{version: version, unstructuredObject: &uItem}
}
// Now determine whether items were added, deleted, or modified
currentState := make(map[string]objectHolder)
for key, newObject := range newObjects {
currentState[key] = newObject
if oldItem, ok := previousState[key]; !ok {
w, err := createWatchEvent(watch.Added, newObject.unstructuredObject)
if err != nil {
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
continue
}
rw.resultChan <- w
} else {
delete(previousState, key)
if oldItem.version != newObject.version {
w, err := createWatchEvent(watch.Modified, oldItem.unstructuredObject)
if err != nil {
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
continue
}
rw.resultChan <- w
}
}
}
// And anything left in the previousState didn't show up in currentState and can be deleted.
for _, item := range previousState {
w, err := createWatchEvent(watch.Deleted, item.unstructuredObject)
if err != nil {
continue
}
rw.resultChan <- w
}
previousState = currentState
case <-rw.stopChan:
rw.cancelFunc()
return
case <-rw.context.Done():
return
}
}
}()
}
func createWatchEvent(event watch.EventType, u *unstructured.Unstructured) (watch.Event, error) {
return watch.Event{Type: event, Object: u}, nil
}
// ResultChan implements [k8s.io/apimachinery/pkg/watch].Interface.
func (rw *SyntheticWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}
// Stop implements [k8s.io/apimachinery/pkg/watch].Interface.
func (rw *SyntheticWatcher) Stop() {
rw.stopChanLock.Lock()
defer rw.stopChanLock.Unlock()
// Prevent closing an already closed channel to prevent a panic
select {
case <-rw.stopChan:
default:
close(rw.stopChan)
}
}
func (rw *SyntheticWatcher) Done() <-chan struct{} {
return rw.doneChan
}