diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index e7b98befadf..028c75e8e19 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -288,7 +288,7 @@ func NewInformer( // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState) + fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) cfg := &Config{ Queue: fifo, @@ -355,7 +355,7 @@ func NewIndexerInformer( // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState) + fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) cfg := &Config{ Queue: fifo, diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index a7a70533ff6..64aca8d7d0b 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -44,7 +44,7 @@ func Example() { // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, downstream) + fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, downstream) // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index f06d1c5b1cf..d883c2f967c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -31,11 +31,6 @@ import ( // keyFunc is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) // -// 'compressor' may compress as many or as few items as it wants -// (including returning an empty slice), but it should do what it -// does quickly since it is called while the queue is locked. -// 'compressor' may be nil if you don't want any delta compression. -// // 'keyLister' is expected to return a list of keys that the consumer of // this queue "knows about". It is used to decide which items are missing // when Replace() is called; 'Deleted' deltas are produced for these items. @@ -48,13 +43,12 @@ import ( // fix. // // Also see the comment on DeltaFIFO. -func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO { +func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ - items: map[string]Deltas{}, - queue: []string{}, - keyFunc: keyFunc, - deltaCompressor: compressor, - knownObjects: knownObjects, + items: map[string]Deltas{}, + queue: []string{}, + keyFunc: keyFunc, + knownObjects: knownObjects, } f.cond.L = &f.lock return f @@ -86,9 +80,6 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyL // items have been deleted when Replace() or Delete() are called. The deleted // object will be included in the DeleteFinalStateUnknown markers. These objects // could be stale. -// -// You may provide a function to compress deltas (e.g., represent a -// series of Updates as a single Update). type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex @@ -110,10 +101,6 @@ type DeltaFIFO struct { // insertion and retrieval, and should be deterministic. keyFunc KeyFunc - // deltaCompressor tells us how to combine two or more - // deltas. It may be nil. - deltaCompressor DeltaCompressor - // knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. @@ -133,7 +120,6 @@ var ( var ( // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas // object with zero length is encountered (should be impossible, - // even if such an object is accidentally produced by a DeltaCompressor-- // but included for completeness). ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") ) @@ -305,8 +291,8 @@ func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool { return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted } -// queueActionLocked appends to the delta list for the object, calling -// f.deltaCompressor if needed. Caller must lock first. +// queueActionLocked appends to the delta list for the object. +// Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { @@ -322,9 +308,6 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) - if f.deltaCompressor != nil { - newDeltas = f.deltaCompressor.Compress(newDeltas) - } _, exists := f.items[id] if len(newDeltas) > 0 { @@ -334,8 +317,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err f.items[id] = newDeltas f.cond.Broadcast() } else if exists { - // The compression step removed all deltas, so - // we need to remove this from our map (extra items + // We need to remove this from our map (extra items // in the queue are ignored if they are not in the // map). delete(f.items, id) @@ -355,8 +337,8 @@ func (f *DeltaFIFO) List() []interface{} { func (f *DeltaFIFO) listLocked() []interface{} { list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { - // Copy item's slice so operations on this slice (delta - // compression) won't interfere with the object we return. + // Copy item's slice so operations on this slice + // won't interfere with the object we return. item = copyDeltas(item) list = append(list, item.Newest().Object) } @@ -394,8 +376,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err defer f.lock.RUnlock() d, exists := f.items[key] if exists { - // Copy item's slice so operations on this slice (delta - // compression) won't interfere with the object we return. + // Copy item's slice so operations on this slice + // won't interfere with the object we return. d = copyDeltas(d) } return d, exists, nil @@ -603,23 +585,6 @@ type KeyGetter interface { GetByKey(key string) (interface{}, bool, error) } -// DeltaCompressor is an algorithm that removes redundant changes. -type DeltaCompressor interface { - Compress(Deltas) Deltas -} - -// DeltaCompressorFunc should remove redundant changes; but changes that -// are redundant depend on one's desired semantics, so this is an -// injectable function. -// -// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor. -type DeltaCompressorFunc func(Deltas) Deltas - -// Compress just calls dc. -func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas { - return dc(d) -} - // DeltaType is the type of a change (addition, deletion, etc) type DeltaType string @@ -668,7 +633,7 @@ func (d Deltas) Newest() *Delta { // copyDeltas returns a shallow copy of d; that is, it copies the slice but not // the objects in the slice. This allows Get/List to return an object that we -// know won't be clobbered by a subsequent call to a delta compressor. +// know won't be clobbered by a subsequent modifications. func copyDeltas(d Deltas) Deltas { d2 := make(Deltas, len(d)) copy(d2, d) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 87bbdaeab89..9f8e575804c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -51,7 +51,7 @@ func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) { } func TestDeltaFIFO_basic(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) const amount = 500 go func() { for i := 0; i < amount; i++ { @@ -86,7 +86,7 @@ func TestDeltaFIFO_basic(t *testing.T) { } func TestDeltaFIFO_requeueOnPop(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("foo", 10)) _, err := f.Pop(func(obj interface{}) error { @@ -129,43 +129,8 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { } } -func TestDeltaFIFO_compressorWorks(t *testing.T) { - oldestTypes := []DeltaType{} - f := NewDeltaFIFO( - testFifoObjectKeyFunc, - // This function just keeps the most recent delta - // and puts deleted ones in the list. - DeltaCompressorFunc(func(d Deltas) Deltas { - if n := len(d); n > 1 { - oldestTypes = append(oldestTypes, d[0].Type) - d = d[1:] - } - return d - }), - nil, - ) - if f.HasSynced() { - t.Errorf("Expected HasSynced to be false before completion of initial population") - } - f.Add(mkFifoObj("foo", 10)) - f.Update(mkFifoObj("foo", 12)) - f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") - f.Delete(mkFifoObj("foo", 22)) - f.Add(mkFifoObj("foo", 25)) // flush the last one out - expect := []DeltaType{Added, Updated, Sync, Deleted} - if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } - if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) { - t.Fatalf("Expected %#v, got %#v", e, a) - } - if !f.HasSynced() { - t.Errorf("Expected HasSynced to be true after completion of initial population") - } -} - func TestDeltaFIFO_addUpdate(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) f.Delete(mkFifoObj("foo", 15)) @@ -203,7 +168,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { } func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("bar", 15)) f.Add(mkFifoObj("qux", 17)) @@ -226,7 +191,6 @@ func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), @@ -249,7 +213,7 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { } func TestDeltaFIFO_addReplace(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("foo", 10)) f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") got := make(chan testFifoObject, 2) @@ -277,7 +241,6 @@ func TestDeltaFIFO_addReplace(t *testing.T) { func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5)} }), @@ -297,7 +260,6 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{} }), @@ -317,7 +279,6 @@ func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), @@ -344,7 +305,6 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5)} }), @@ -367,7 +327,6 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - nil, keyLookupFunc(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), @@ -396,7 +355,7 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { } func TestDeltaFIFO_detectLineJumpers(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("foo", 10)) f.Add(mkFifoObj("bar", 1)) @@ -424,7 +383,7 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) { } func TestDeltaFIFO_addIfNotPresent(t *testing.T) { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) f.Add(mkFifoObj("b", 3)) b3 := Pop(f) @@ -521,7 +480,7 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { } for i, test := range tests { - f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil) for _, action := range test.actions { action(f) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index f6ce07f7af3..e711f161bc9 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -189,7 +189,7 @@ type deleteNotification struct { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer) + fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo,