Merge pull request #107507 from alexzielenski/client-preprocess-handler

Add configuration point to SharedInformer to transform objects before storing

Kubernetes-commit: f4046f9e6d0bb2fc4fa2d84af64cc9c6db44270a
This commit is contained in:
Kubernetes Publisher 2022-01-24 14:31:27 -08:00
commit 7f04550960
3 changed files with 148 additions and 68 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"errors"
"sync" "sync"
"time" "time"
@ -406,6 +407,49 @@ func NewTransformingIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer) return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
} }
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
// newInformer returns a controller for populating the store while also // newInformer returns a controller for populating the store while also
// providing event notifications. // providing event notifications.
// //
@ -444,38 +488,10 @@ func newInformer(
RetryOnError: false, RetryOnError: false,
Process: func(obj interface{}) error { Process: func(obj interface{}) error {
// from oldest to newest if deltas, ok := obj.(Deltas); ok {
for _, d := range obj.(Deltas) { return processDeltas(h, clientState, transformer, deltas)
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
} }
return nil return errors.New("object given as Process argument is not Deltas")
}, },
} }
return New(cfg) return New(cfg)

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -180,6 +181,20 @@ type SharedInformer interface {
// The handler should return quickly - any expensive processing should be // The handler should return quickly - any expensive processing should be
// offloaded. // offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error SetWatchErrorHandler(handler WatchErrorHandler) error
// The TransformFunc is called for each object which is about to be stored.
//
// This function is intended for you to take the opportunity to
// remove, transform, or normalize fields. One use case is to strip unused
// metadata fields out of objects to save on RAM cost.
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
SetTransform(handler TransformFunc) error
} }
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@ -318,6 +333,8 @@ type sharedIndexInformer struct {
// Called whenever the ListAndWatch drops the connection with an error. // Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler watchErrorHandler WatchErrorHandler
transform TransformFunc
} }
// dummyController hides the fact that a SharedInformer is different from a dedicated one // dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -365,6 +382,18 @@ func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) er
return nil return nil
} }
func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.transform = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -538,45 +567,47 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock() s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() defer s.blockDeltas.Unlock()
// from oldest to newest if deltas, ok := obj.(Deltas); ok {
for _, d := range obj.(Deltas) { return processDeltas(s, s.indexer, s.transform, deltas)
switch d.Type { }
case Sync, Replaced, Added, Updated: return errors.New("object given as Process argument is not Deltas")
s.cacheMutationDetector.AddObject(d.Object) }
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false // Conforms to ResourceEventHandler
switch { func (s *sharedIndexInformer) OnAdd(obj interface{}) {
case d.Type == Sync: // Invocation of this function is locked under s.blockDeltas, so it is
// Sync events are only propagated to listeners that requested resync // save to distribute the notification
isSync = true s.cacheMutationDetector.AddObject(obj)
case d.Type == Replaced: s.processor.distribute(addNotification{newObj: obj}, false)
if accessor, err := meta.Accessor(d.Object); err == nil { }
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events // Conforms to ResourceEventHandler
// and only propagated to listeners that requested resync func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() isSync := false
}
} // If is a Sync event, isSync should be true
} // If is a Replaced event, isSync is true if resource version is unchanged.
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) // If RV is unchanged: this is a Sync/Replaced event, so isSync is true
} else {
if err := s.indexer.Add(d.Object); err != nil { if accessor, err := meta.Accessor(new); err == nil {
return err if oldAccessor, err := meta.Accessor(old); err == nil {
} // Events that didn't change resourceVersion are treated as resync events
s.processor.distribute(addNotification{newObj: d.Object}, false) // and only propagated to listeners that requested resync
} isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
} }
} }
return nil
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
} }
// sharedProcessor has a collection of processorListener and can // sharedProcessor has a collection of processorListener and can

View File

@ -23,7 +23,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
@ -357,3 +357,36 @@ func TestSharedInformerErrorHandling(t *testing.T) {
} }
close(stop) close(stop)
} }
func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
informer.SetTransform(func(obj interface{}) (interface{}, error) {
if pod, ok := obj.(*v1.Pod); ok {
name := pod.GetName()
if upper := strings.ToUpper(name); upper != name {
copied := pod.DeepCopyObject().(*v1.Pod)
copied.SetName(upper)
return copied, nil
}
}
return obj, nil
})
listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
informer.AddEventHandler(listenerTransformer)
stop := make(chan struct{})
go informer.Run(stop)
defer close(stop)
if !listenerTransformer.ok() {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
}
}