1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-11 20:29:52 +00:00

Add basic watch functionality for SQL cache (#653)

* Remove unused method

* Add basic watch functionality

* Remove TestWatchNamesErrReceive test
This commit is contained in:
Tom Lebreux
2025-06-03 16:07:18 -06:00
committed by GitHub
parent 2672969496
commit e3f207ddc2
7 changed files with 292 additions and 133 deletions

View File

@@ -9,8 +9,6 @@ import (
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"sync"
@@ -251,6 +249,8 @@ type Store struct {
lock sync.Mutex
columnSetter SchemaColumnSetter
transformBuilder TransformBuilder
watchers *Watchers
}
type CacheFactoryInitializer func() (CacheFactory, error)
@@ -268,6 +268,7 @@ func NewProxyStore(ctx context.Context, c SchemaColumnSetter, clientGetter Clien
notifier: notifier,
columnSetter: c,
transformBuilder: virtual.NewTransformBuilder(scache),
watchers: newWatchers(),
}
if factory == nil {
@@ -469,80 +470,6 @@ func returnErr(err error, c chan watch.Event) {
}
}
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) {
rev := w.Revision
if rev == "-1" || rev == "0" {
rev = ""
}
timeout := int64(60 * 30)
timeoutSetting := os.Getenv(watchTimeoutEnv)
if timeoutSetting != "" {
userSetTimeout, err := strconv.Atoi(timeoutSetting)
if err != nil {
logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err)
} else {
timeout = int64(userSetTimeout)
}
}
k8sClient, _ := metricsStore.Wrap(client, nil)
watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: rev,
LabelSelector: w.Selector,
})
if err != nil {
returnErr(errors.Wrapf(err, "stopping watch for %s: %v", schema.ID, err), result)
return
}
defer watcher.Stop()
logrus.Debugf("opening watcher for %s", schema.ID)
eg, ctx := errgroup.WithContext(apiOp.Context())
go func() {
<-ctx.Done()
watcher.Stop()
}()
if s.notifier != nil {
eg.Go(func() error {
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
obj, _, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
if err == nil {
rowToObject(obj)
result <- watch.Event{Type: watch.Modified, Object: obj}
} else {
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
}
}
return fmt.Errorf("closed")
})
}
eg.Go(func() error {
for event := range watcher.ResultChan() {
if event.Type == watch.Error {
if status, ok := event.Object.(*metav1.Status); ok {
returnErr(fmt.Errorf("event watch error: %s", status.Message), result)
} else {
logrus.Debugf("event watch error: could not decode event object %T", event.Object)
}
continue
}
if unstr, ok := event.Object.(*unstructured.Unstructured); ok {
rowToObject(unstr)
}
result <- event
}
return fmt.Errorf("closed")
})
_ = eg.Wait()
return
}
// WatchNames returns a channel of events filtered by an allowed set of names.
// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names,
// performing the list or watch will result in a Forbidden error, because the user does not have permission
@@ -551,7 +478,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
// be returned in watch.
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.Set[string]) (chan watch.Event, error) {
buffer := &WarningBuffer{}
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace, buffer)
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, "", buffer)
if err != nil {
return nil, err
}
@@ -592,17 +519,44 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
// Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) {
buffer := &WarningBuffer{}
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace, buffer)
client, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, "", buffer)
if err != nil {
return nil, err
}
return s.watch(apiOp, schema, w, client)
}
type Watchers struct {
watchers map[string]struct{}
}
func newWatchers() *Watchers {
return &Watchers{
watchers: make(map[string]struct{}),
}
}
func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) {
result := make(chan watch.Event)
// warnings from inside the informer are discarded
gvk := attributes.GVK(schema)
fields := getFieldsFromSchema(schema)
fields = append(fields, getFieldForGVK(gvk)...)
transformFunc := s.transformBuilder.GetTransformFunc(gvk)
tableClient := &tablelistconvert.Client{ResourceInterface: client}
attrs := attributes.GVK(schema)
ns := attributes.Namespaced(schema)
inf, err := s.cacheFactory.CacheFor(s.ctx, fields, transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(schema))
if err != nil {
return nil, err
}
result := make(chan watch.Event, 1000)
go func() {
s.listAndWatch(apiOp, client, schema, w, result)
ctx := apiOp.Context()
err := inf.ByOptionsLister.Watch(ctx, informer.WatchOptions{}, result)
if err != nil {
logrus.Error(err)
}
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
}()