shrink the cache.Queue interface to what is actually used

Kubernetes-commit: 0ddab1694579d523e3865c75ca44d6cdf1b0ef93
This commit is contained in:
David Eads 2025-01-08 15:31:47 -05:00 committed by Kubernetes Publisher
parent f2030849e1
commit f29637f7f2
5 changed files with 134 additions and 103 deletions

View File

@ -508,61 +508,6 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del
return nil
}
// List returns a list of all the items; it returns the object
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
for _, key := range f.queue {
list = append(list, key)
}
return list
}
// Get returns the complete list of deltas for the requested item,
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
// Copy item's slice so operations on this slice
// won't interfere with the object we return.
d = copyDeltas(d)
}
return d, exists, nil
}
// IsClosed checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool {
f.lock.Lock()

View File

@ -25,6 +25,66 @@ import (
"time"
)
// List returns a list of all the items; it returns the object
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
for _, key := range f.queue {
list = append(list, key)
}
return list
}
// Get returns the complete list of deltas for the requested item,
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
// Copy item's slice so operations on this slice
// won't interfere with the object we return.
d = copyDeltas(d)
}
return d, exists, nil
}
// helper function to reduce stuttering
func testPop(f *DeltaFIFO) testFifoObject {
return Pop(f).(Deltas).Newest().Object.(testFifoObject)

44
tools/cache/fifo.go vendored
View File

@ -44,13 +44,13 @@ func (e ErrRequeue) Error() string {
return e.Err.Error()
}
// Queue extends Store with a collection of Store keys to "process".
// Queue extends ReflectorStore with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
Store
ReflectorStore
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
@ -227,46 +227,6 @@ func (f *FIFO) Delete(obj interface{}) error {
return err
}
// List returns a list of all the items.
func (f *FIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
func (f *FIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.items))
for key := range f.items {
list = append(list, key)
}
return list
}
// Get returns the requested item, or sets exists=false.
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[key]
return item, exists, nil
}
// IsClosed checks if the queue is closed
func (f *FIFO) IsClosed() bool {
f.lock.Lock()

View File

@ -24,6 +24,50 @@ import (
"time"
)
// List returns a list of all the items.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *FIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *FIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.items))
for key := range f.items {
list = append(list, key)
}
return list
}
// Get returns the requested item, or sets exists=false.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[key]
return item, exists, nil
}
func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}

View File

@ -55,6 +55,28 @@ var (
defaultMinWatchTimeout = 5 * time.Minute
)
// ReflectorStore is the subset of cache.Store that the reflector uses
type ReflectorStore interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default, it will be a file:line if possible.
@ -72,7 +94,7 @@ type Reflector struct {
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
store ReflectorStore
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
@ -189,13 +211,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
@ -234,7 +256,7 @@ type ReflectorOptions struct {
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store ReflectorStore, options ReflectorOptions) *Reflector {
reflectorClock := options.Clock
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
@ -798,7 +820,7 @@ func handleWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
store ReflectorStore,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
@ -826,7 +848,7 @@ func handleAnyWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
store ReflectorStore,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,