modify SharedIndexInformer to use newInformer constructor which supports transformers

avoids code duplication, allows transformer to be used with SharedIndexInformer

Kubernetes-commit: 754bf3b3d02c66d1dd030460b03dddd3d6c7196d
This commit is contained in:
Alexander Zielenski
2022-01-19 11:40:42 -08:00
committed by Kubernetes Publisher
parent 3255cdcd9c
commit 54928eef9f
3 changed files with 148 additions and 68 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"errors"
"fmt"
"sync"
"time"
@@ -180,6 +181,20 @@ type SharedInformer interface {
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
// The TransformFunc is called for each object which is about to be stored.
//
// This function is intended for you to take the opportunity to
// remove, transform, or normalize fields. One use case is to strip unused
// metadata fields out of objects to save on RAM cost.
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
SetTransform(handler TransformFunc) error
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@@ -318,6 +333,8 @@ type sharedIndexInformer struct {
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
transform TransformFunc
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -365,6 +382,18 @@ func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) er
return nil
}
func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.transform = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@@ -538,45 +567,47 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false
// If is a Sync event, isSync should be true
// If is a Replaced event, isSync is true if resource version is unchanged.
// If RV is unchanged: this is a Sync/Replaced event, so isSync is true
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
return nil
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
}
// sharedProcessor has a collection of processorListener and can