diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index acf0fa89359..1d26dcde14f 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -437,11 +437,6 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { defer f.lock.Unlock() keys := make(sets.String, len(list)) - if !f.populated { - f.populated = true - f.initialPopulationCount = len(list) - } - for _, item := range list { key, err := f.KeyOf(item) if err != nil { @@ -467,6 +462,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return err } } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + } + return nil } @@ -474,6 +475,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { // TODO(lavalamp): This may be racy-- we aren't properly locked // with knownObjects. Unproven. knownKeys := f.knownObjects.ListKeys() + queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue @@ -487,10 +489,17 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { deletedObj = nil glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } + queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + queuedDeletions + } + return nil } diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index f1a95573709..87bbdaeab89 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -144,6 +144,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { }), nil, ) + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false before completion of initial population") + } f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") @@ -156,7 +159,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) { t.Fatalf("Expected %#v, got %#v", e, a) } - + if !f.HasSynced() { + t.Errorf("Expected HasSynced to be true after completion of initial population") + } } func TestDeltaFIFO_addUpdate(t *testing.T) { @@ -359,6 +364,37 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } } +func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + ) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Sync, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false") + } + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + if f.HasSynced() { + t.Errorf("Expected HasSynced to be true") + } +} + func TestDeltaFIFO_detectLineJumpers(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)