diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 8246794a652..9bc2c2143e2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -68,8 +68,14 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { } // DeltaFIFO is like FIFO, but allows the PopProcessFunc to process -// deletes. The accumulator associated with a given object's key is a -// slice of Delta values for that object. +// deletes and adds Sync to the ways an object can be applied to an +// acumulator. The accumulator associated with a given object's key +// is a Deltas, which is a slice of Delta values for that object. +// Applying an object to a Deltas means to append a Delta except when +// the potentially appended Delta is a Delete and the Deltas already +// ends with a Delete. In that case the Deltas does not grow, +// although the terminal Delete will be replaced by the new Delete if +// the older Delete's object is a DeletedFinalStateUnknown. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls @@ -83,17 +89,14 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { // * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return -// interface{} to satisfy the Store/Queue interfaces, but it +// interface{} to satisfy the Store/Queue interfaces, but they // will always return an object of type Deltas. // -// A DeltaFIFO's knownObjects KeyListerGetter provides get/list access -// to a set of "known objects" that is used for two purposes. One is -// to conditionalize delete operations: it is only for a known object -// that a Delete Delta is recorded (this applies to both Delete and -// Replace). The deleted object will be included in the -// DeleteFinalStateUnknown markers, and those objects could be stale. -// The other purpose is in the Resync operation, which adds a Sync -// Delta for every known object. +// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities +// to list Store keys and to get objects by Store key. The objects in +// question are called "known objects" and this set of objects +// modifies the behavior of the Delete, Replace, and Resync methods +// (each in a different way). // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly @@ -119,9 +122,8 @@ type DeltaFIFO struct { // insertion and retrieval, and should be deterministic. keyFunc KeyFunc - // knownObjects list keys that are "known", for the - // purpose of figuring out which items have been deleted - // when Replace() or Delete() is called. + // knownObjects list keys that are "known" --- affecting Delete(), + // Replace(), and Resync() knownObjects KeyListerGetter // Indication the queue is closed. @@ -190,9 +192,11 @@ func (f *DeltaFIFO) Update(obj interface{}) error { return f.queueActionLocked(Updated, obj) } -// Delete is just like Add, but makes an Deleted Delta. If the item does not -// already exist, it will be ignored. (It may have already been deleted by a -// Replace (re-list), for example.) +// Delete is just like Add, but makes a Deleted Delta. If the given +// object does not already exist, it will be ignored. (It may have +// already been deleted by a Replace (re-list), for example.) In this +// method `f.knownObjects`, if not nil, provides (via GetByKey) +// _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { @@ -438,10 +442,15 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } } -// Replace will delete the contents of 'f', using instead the given map. -// 'f' takes ownership of the map, you should not reference the map again -// after calling this function. f's queue is reset, too; upon return, it -// will contain the items in the map, in no particular order. +// Replace atomically adds the given objects using the Sync type of +// Delta and does some deletions. In particular: for every +// pre-existing key K that is not the key of an object in `list` there +// is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O +// is current object of K. If `f.knownObjects == nil` then the +// pre-existing keys are those in `f.items` and the current object of +// K is the `.Newest()` of the Deltas associated with K. Otherwise +// the pre-existing keys are those listed by `f.knownObjects` and the +// current object of K is what `f.knownObjects.GetByKey(K)` returns. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() @@ -515,7 +524,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return nil } -// Resync will send a sync event for each item +// Resync adds, with a Sync type of Delta, every object listed by +// `f.knownObjects` whose key is not already queued for processing. +// If `f.knownObjects` is `nil` then Resync does nothing. func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index fc8235b4fcf..3fa54882bf7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -28,11 +28,15 @@ func testPop(f *DeltaFIFO) testFifoObject { return Pop(f).(Deltas).Newest().Object.(testFifoObject) } -// keyLookupFunc adapts a raw function to be a KeyLookup. -type keyLookupFunc func() []testFifoObject +// literalListerGetter is a KeyListerGetter that is based on a +// function that returns a slice of objects to list and get. +// The function must list the same objects every time. +type literalListerGetter func() []testFifoObject + +var _ KeyListerGetter = literalListerGetter(nil) // ListKeys just calls kl. -func (kl keyLookupFunc) ListKeys() []string { +func (kl literalListerGetter) ListKeys() []string { result := []string{} for _, fifoObj := range kl() { result = append(result, fifoObj.name) @@ -41,7 +45,7 @@ func (kl keyLookupFunc) ListKeys() []string { } // GetByKey returns the key if it exists in the list returned by kl. -func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) { +func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) { for _, v := range kl() { if v.name == key { return v, true, nil @@ -95,7 +99,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { oldObj := mkFifoObj("foo", 1) newObj := mkFifoObj("foo", 2) - f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject { + f := NewDeltaFIFO(testFifoObjectKeyFunc, literalListerGetter(func() []testFifoObject { return []testFifoObject{oldObj} })) @@ -218,7 +222,7 @@ func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), ) @@ -268,7 +272,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) { func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5)} }), ) @@ -287,7 +291,7 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{} }), ) @@ -304,9 +308,13 @@ func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { } func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { + // We test with only one pre-existing object because there is no + // promise about how their deletes are ordered. + + // Try it with a pre-existing Delete f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), ) @@ -327,12 +335,57 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { t.Errorf("Expected %#v, got %#v", e, a) } } + + // Now try starting with an Add instead of a Delete + f = NewDeltaFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + {{Sync, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try starting without an explicit KeyListerGetter + f = NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + {{Sync, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } } func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5)} }), ) @@ -354,7 +407,7 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, - keyLookupFunc(func() []testFifoObject { + literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), ) diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index 0172ae72ccf..67bb1cba858 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -44,7 +44,7 @@ func (e ErrRequeue) Error() string { return e.Err.Error() } -// Queue extends Store with a collection of keys to "process". +// Queue extends Store with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. // A Queue can be accessed concurrently from multiple goroutines.