mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 14:41:53 +00:00
Merge pull request #116623 from lavalamp/xfrmr
change where transformers are called Kubernetes-commit: 76d351065e504e23b7962158ef0e9067b6bd96ed
This commit is contained in:
commit
1517ffb8d3
22
tools/cache/controller.go
vendored
22
tools/cache/controller.go
vendored
@ -394,17 +394,6 @@ func NewIndexerInformer(
|
||||
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
|
||||
}
|
||||
|
||||
// TransformFunc allows for transforming an object before it will be processed
|
||||
// and put into the controller cache and before the corresponding handlers will
|
||||
// be called on it.
|
||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
|
||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
|
||||
//
|
||||
// The most common usage pattern is to clean-up some parts of the object to
|
||||
// reduce component memory usage if a given component doesn't care about them.
|
||||
// given controller doesn't care for them
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// NewTransformingInformer returns a Store and a controller for populating
|
||||
// the store while also providing event notifications. You should only used
|
||||
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
|
||||
@ -452,20 +441,12 @@ func processDeltas(
|
||||
// Object which receives event notifications from the given deltas
|
||||
handler ResourceEventHandler,
|
||||
clientState Store,
|
||||
transformer TransformFunc,
|
||||
deltas Deltas,
|
||||
isInInitialList bool,
|
||||
) error {
|
||||
// from oldest to newest
|
||||
for _, d := range deltas {
|
||||
obj := d.Object
|
||||
if transformer != nil {
|
||||
var err error
|
||||
obj, err = transformer(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch d.Type {
|
||||
case Sync, Replaced, Added, Updated:
|
||||
@ -517,6 +498,7 @@ func newInformer(
|
||||
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transformer,
|
||||
})
|
||||
|
||||
cfg := &Config{
|
||||
@ -528,7 +510,7 @@ func newInformer(
|
||||
|
||||
Process: func(obj interface{}, isInInitialList bool) error {
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(h, clientState, transformer, deltas, isInInitialList)
|
||||
return processDeltas(h, clientState, deltas, isInInitialList)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
},
|
||||
|
4
tools/cache/controller_test.go
vendored
4
tools/cache/controller_test.go
vendored
@ -23,7 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -32,7 +32,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
|
||||
"github.com/google/gofuzz"
|
||||
fuzz "github.com/google/gofuzz"
|
||||
)
|
||||
|
||||
func Example() {
|
||||
|
44
tools/cache/delta_fifo.go
vendored
44
tools/cache/delta_fifo.go
vendored
@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
|
||||
// When true, `Replaced` events will be sent for items passed to a Replace() call.
|
||||
// When false, `Sync` events will be sent instead.
|
||||
EmitDeltaTypeReplaced bool
|
||||
|
||||
// If set, will be called for objects before enqueueing them. Please
|
||||
// see the comment on TransformFunc for details.
|
||||
Transformer TransformFunc
|
||||
}
|
||||
|
||||
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
|
||||
@ -129,8 +133,32 @@ type DeltaFIFO struct {
|
||||
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
|
||||
// DeltaType when Replace() is called (to preserve backwards compat).
|
||||
emitDeltaTypeReplaced bool
|
||||
|
||||
// Called with every object if non-nil.
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
// TransformFunc allows for transforming an object before it will be processed.
|
||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
|
||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
|
||||
//
|
||||
// New in v1.27: In such cases, the contained object will already have gone
|
||||
// through the transform object separately (when it was added / updated prior
|
||||
// to the delete), so the TransformFunc can likely safely ignore such objects
|
||||
// (i.e., just return the input object).
|
||||
//
|
||||
// The most common usage pattern is to clean-up some parts of the object to
|
||||
// reduce component memory usage if a given component doesn't care about them.
|
||||
//
|
||||
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
|
||||
// sees the object before any other actor, and it is now safe to mutate the
|
||||
// object in place instead of making a copy.
|
||||
//
|
||||
// Note that TransformFunc is called while inserting objects into the
|
||||
// notification queue and is therefore extremely performance sensitive; please
|
||||
// do not do anything that will take a long time.
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
knownObjects: opts.KnownObjects,
|
||||
|
||||
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
|
||||
transformer: opts.Transformer,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
@ -415,6 +444,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
|
||||
// Every object comes through this code path once, so this is a good
|
||||
// place to call the transform func. If obj is a
|
||||
// DeletedFinalStateUnknown tombstone, then the containted inner object
|
||||
// will already have gone through the transformer, but we document that
|
||||
// this can happen. In cases involving Replace(), such an object can
|
||||
// come through multiple times.
|
||||
if f.transformer != nil {
|
||||
var err error
|
||||
obj, err = f.transformer(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
oldDeltas := f.items[id]
|
||||
newDeltas := append(oldDeltas, Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
|
82
tools/cache/delta_fifo_test.go
vendored
82
tools/cache/delta_fifo_test.go
vendored
@ -327,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type rvAndXfrm struct {
|
||||
rv int
|
||||
xfrm int
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_transformer(t *testing.T) {
|
||||
mk := func(name string, rv int) testFifoObject {
|
||||
return mkFifoObj(name, &rvAndXfrm{rv, 0})
|
||||
}
|
||||
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
|
||||
switch v := obj.(type) {
|
||||
case testFifoObject:
|
||||
v.val.(*rvAndXfrm).xfrm++
|
||||
case DeletedFinalStateUnknown:
|
||||
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
|
||||
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||
}
|
||||
return obj, nil
|
||||
})
|
||||
|
||||
must := func(err error) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: testFifoObjectKeyFunc,
|
||||
Transformer: xfrm,
|
||||
})
|
||||
must(f.Add(mk("foo", 10)))
|
||||
must(f.Add(mk("bar", 11)))
|
||||
must(f.Update(mk("foo", 12)))
|
||||
must(f.Delete(mk("foo", 15)))
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
must(f.Add(mk("bar", 16)))
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
|
||||
// Should be empty
|
||||
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil })
|
||||
if err != nil {
|
||||
t.Fatalf("got nothing on try %v?", i)
|
||||
}
|
||||
obj = obj.(Deltas).Newest().Object
|
||||
switch v := obj.(type) {
|
||||
case testFifoObject:
|
||||
if v.name != "foo" {
|
||||
t.Errorf("expected regular deletion of foo, got %q", v.name)
|
||||
}
|
||||
rx := v.val.(*rvAndXfrm)
|
||||
if rx.rv != 15 {
|
||||
t.Errorf("expected last message, got %#v", obj)
|
||||
}
|
||||
if rx.xfrm != 1 {
|
||||
t.Errorf("obj %v transformed wrong number of times.", obj)
|
||||
}
|
||||
case DeletedFinalStateUnknown:
|
||||
tf := v.Obj.(testFifoObject)
|
||||
rx := tf.val.(*rvAndXfrm)
|
||||
if tf.name != "bar" {
|
||||
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
|
||||
}
|
||||
if rx.rv != 16 {
|
||||
t.Errorf("expected last message, got %#v", obj)
|
||||
}
|
||||
if rx.xfrm != 1 {
|
||||
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
|
||||
}
|
||||
default:
|
||||
t.Errorf("unknown item %#v", obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
|
8
tools/cache/shared_informer.go
vendored
8
tools/cache/shared_informer.go
vendored
@ -205,10 +205,7 @@ type SharedInformer interface {
|
||||
//
|
||||
// Must be set before starting the informer.
|
||||
//
|
||||
// Note: Since the object given to the handler may be already shared with
|
||||
// other goroutines, it is advisable to copy the object being
|
||||
// transform before mutating it at all and returning the copy to prevent
|
||||
// data races.
|
||||
// Please see the comment on TransformFunc for more details.
|
||||
SetTransform(handler TransformFunc) error
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped.
|
||||
@ -465,6 +462,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
|
||||
cfg := &Config{
|
||||
@ -637,7 +635,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
|
||||
return processDeltas(s, s.indexer, deltas, isInInitialList)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
}
|
||||
|
5
tools/cache/shared_informer_test.go
vendored
5
tools/cache/shared_informer_test.go
vendored
@ -406,9 +406,8 @@ func TestSharedInformerTransformer(t *testing.T) {
|
||||
name := pod.GetName()
|
||||
|
||||
if upper := strings.ToUpper(name); upper != name {
|
||||
copied := pod.DeepCopyObject().(*v1.Pod)
|
||||
copied.SetName(upper)
|
||||
return copied, nil
|
||||
pod.SetName(upper)
|
||||
return pod, nil
|
||||
}
|
||||
}
|
||||
return obj, nil
|
||||
|
Loading…
Reference in New Issue
Block a user