From bbd8096065df78083db629ccb276adf01b9a18d8 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 30 Aug 2016 19:27:13 -0400 Subject: [PATCH] DeltaFIFO should not report HasSynced until deletes are reported Deletions identified by knownObjects on the initial sync replace (immediately after the List from a reflector) should be considered part of the initial sync since we have the information available at the time we do the deletion. An error during Replace() can result in Populated not being set, but it was incorrect before (population would be wrong) and queueActionLocked does not error except on "my cache is broken because I gave an incorrect keyFunc". --- pkg/client/cache/delta_fifo.go | 19 +++++++++++---- pkg/client/cache/delta_fifo_test.go | 38 ++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index acf0fa89359..1d26dcde14f 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -437,11 +437,6 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { defer f.lock.Unlock() keys := make(sets.String, len(list)) - if !f.populated { - f.populated = true - f.initialPopulationCount = len(list) - } - for _, item := range list { key, err := f.KeyOf(item) if err != nil { @@ -467,6 +462,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return err } } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + } + return nil } @@ -474,6 +475,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { // TODO(lavalamp): This may be racy-- we aren't properly locked // with knownObjects. Unproven. knownKeys := f.knownObjects.ListKeys() + queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue @@ -487,10 +489,17 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { deletedObj = nil glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } + queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + queuedDeletions + } + return nil } diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index f1a95573709..87bbdaeab89 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -144,6 +144,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { }), nil, ) + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false before completion of initial population") + } f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") @@ -156,7 +159,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) { t.Fatalf("Expected %#v, got %#v", e, a) } - + if !f.HasSynced() { + t.Errorf("Expected HasSynced to be true after completion of initial population") + } } func TestDeltaFIFO_addUpdate(t *testing.T) { @@ -359,6 +364,37 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } } +func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + ) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Sync, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false") + } + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + if f.HasSynced() { + t.Errorf("Expected HasSynced to be true") + } +} + func TestDeltaFIFO_detectLineJumpers(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)