mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-14 19:17:56 +00:00
Plumb keyFunc to informer/controller/delta handling
Kubernetes-commit: 4171c8cff8e79d3624f7e3bc83513fbec1671a24
This commit is contained in:
committed by
Kubernetes Publisher
parent
c38fc0fb1f
commit
4678a135e1
20
tools/cache/controller.go
vendored
20
tools/cache/controller.go
vendored
@@ -442,7 +442,7 @@ func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
|
||||
} else {
|
||||
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
|
||||
}
|
||||
return clientState, newInformer(clientState, options)
|
||||
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
// NewInformer returns a Store and a controller for populating the store
|
||||
@@ -476,7 +476,7 @@ func NewInformer(
|
||||
Handler: h,
|
||||
ResyncPeriod: resyncPeriod,
|
||||
}
|
||||
return clientState, newInformer(clientState, options)
|
||||
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
// NewIndexerInformer returns an Indexer and a Controller for populating the index
|
||||
@@ -513,7 +513,7 @@ func NewIndexerInformer(
|
||||
ResyncPeriod: resyncPeriod,
|
||||
Indexers: indexers,
|
||||
}
|
||||
return clientState, newInformer(clientState, options)
|
||||
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
// NewTransformingInformer returns a Store and a controller for populating
|
||||
@@ -542,7 +542,7 @@ func NewTransformingInformer(
|
||||
ResyncPeriod: resyncPeriod,
|
||||
Transform: transformer,
|
||||
}
|
||||
return clientState, newInformer(clientState, options)
|
||||
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
// NewTransformingIndexerInformer returns an Indexer and a controller for
|
||||
@@ -573,7 +573,7 @@ func NewTransformingIndexerInformer(
|
||||
Indexers: indexers,
|
||||
Transform: transformer,
|
||||
}
|
||||
return clientState, newInformer(clientState, options)
|
||||
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
|
||||
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
|
||||
@@ -584,6 +584,7 @@ func processDeltas(
|
||||
clientState Store,
|
||||
deltas Deltas,
|
||||
isInInitialList bool,
|
||||
keyFunc KeyFunc,
|
||||
) error {
|
||||
// from oldest to newest
|
||||
for _, d := range deltas {
|
||||
@@ -626,6 +627,7 @@ func processDeltasInBatch(
|
||||
clientState Store,
|
||||
deltas []Delta,
|
||||
isInInitialList bool,
|
||||
keyFunc KeyFunc,
|
||||
) error {
|
||||
// from oldest to newest
|
||||
txns := make([]Transaction, 0)
|
||||
@@ -634,7 +636,7 @@ func processDeltasInBatch(
|
||||
if !txnSupported {
|
||||
var errs []error
|
||||
for _, delta := range deltas {
|
||||
if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList); err != nil {
|
||||
if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
@@ -703,7 +705,7 @@ func processDeltasInBatch(
|
||||
// Parameters
|
||||
// - clientState is the store you want to populate
|
||||
// - options contain the options to configure the controller
|
||||
func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Controller {
|
||||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
@@ -719,12 +721,12 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
|
||||
Process: func(obj interface{}, isInInitialList bool) error {
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(options.Handler, clientState, deltas, isInInitialList)
|
||||
return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
},
|
||||
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
|
||||
return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList)
|
||||
return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc)
|
||||
},
|
||||
}
|
||||
return New(cfg)
|
||||
|
||||
7
tools/cache/controller_test.go
vendored
7
tools/cache/controller_test.go
vendored
@@ -853,7 +853,8 @@ func TestProcessDeltasInBatch(t *testing.T) {
|
||||
dummyListener,
|
||||
mockStore,
|
||||
tc.deltaList,
|
||||
true)
|
||||
true,
|
||||
DeletionHandlingMetaNamespaceKeyFunc)
|
||||
if tc.assertErr != nil {
|
||||
assert.True(t, tc.assertErr(err))
|
||||
}
|
||||
@@ -923,12 +924,12 @@ func TestReplaceEvents(t *testing.T) {
|
||||
|
||||
Process: func(obj interface{}, isInInitialList bool) error {
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(recorder, store, deltas, isInInitialList)
|
||||
return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
},
|
||||
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
|
||||
return processDeltasInBatch(recorder, store, deltaList, isInInitialList)
|
||||
return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
8
tools/cache/shared_informer.go
vendored
8
tools/cache/shared_informer.go
vendored
@@ -312,6 +312,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
|
||||
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
|
||||
clock: realClock,
|
||||
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
|
||||
keyFunc: DeletionHandlingMetaNamespaceKeyFunc,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -449,6 +450,9 @@ type sharedIndexInformer struct {
|
||||
watchErrorHandler WatchErrorHandlerWithContext
|
||||
|
||||
transform TransformFunc
|
||||
|
||||
// keyFunc is called when processing deltas by the underlying process function.
|
||||
keyFunc KeyFunc
|
||||
}
|
||||
|
||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||
@@ -718,7 +722,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(s, s.indexer, deltas, isInInitialList)
|
||||
return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
}
|
||||
@@ -726,7 +730,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
|
||||
func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error {
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
return processDeltasInBatch(s, s.indexer, deltas, isInInitialList)
|
||||
return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc)
|
||||
}
|
||||
|
||||
// Conforms to ResourceEventHandler
|
||||
|
||||
Reference in New Issue
Block a user