Merge pull request #31755 from smarterclayton/has_synced

Automatic merge from submit-queue

DeltaFIFO should not report HasSynced until deletes are reported

Deletions identified by knownObjects on the initial sync replace
(immediately after the List from a reflector) should be considered part
of the initial sync since we have the information available at the time
we do the deletion.

An error during Replace() can result in Populated not being set, but it
was incorrect before (population would be wrong) and queueActionLocked
does not error except on "my cache is broken because I gave an incorrect
keyFunc".

@lavalamp @deads2k hit this while trying to use DeltaFIFO to implement an "external" controller (one that uses the knownObjects as provided by a call to a remote system of record).

Not 1.4
This commit is contained in:
Kubernetes Submit Queue 2016-10-03 15:41:20 -07:00 committed by GitHub
commit dab787d80d
2 changed files with 51 additions and 6 deletions

View File

@ -437,11 +437,6 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
defer f.lock.Unlock() defer f.lock.Unlock()
keys := make(sets.String, len(list)) keys := make(sets.String, len(list))
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list)
}
for _, item := range list { for _, item := range list {
key, err := f.KeyOf(item) key, err := f.KeyOf(item)
if err != nil { if err != nil {
@ -467,6 +462,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return err return err
} }
} }
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list)
}
return nil return nil
} }
@ -474,6 +475,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
// TODO(lavalamp): This may be racy-- we aren't properly locked // TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects. Unproven. // with knownObjects. Unproven.
knownKeys := f.knownObjects.ListKeys() knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys { for _, k := range knownKeys {
if keys.Has(k) { if keys.Has(k) {
continue continue
@ -487,10 +489,17 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
deletedObj = nil deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
} }
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err return err
} }
} }
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil return nil
} }

View File

@ -144,6 +144,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
}), }),
nil, nil,
) )
if f.HasSynced() {
t.Errorf("Expected HasSynced to be false before completion of initial population")
}
f.Add(mkFifoObj("foo", 10)) f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("foo", 12)) f.Update(mkFifoObj("foo", 12))
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
@ -156,7 +159,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) { if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) {
t.Fatalf("Expected %#v, got %#v", e, a) t.Fatalf("Expected %#v, got %#v", e, a)
} }
if !f.HasSynced() {
t.Errorf("Expected HasSynced to be true after completion of initial population")
}
} }
func TestDeltaFIFO_addUpdate(t *testing.T) { func TestDeltaFIFO_addUpdate(t *testing.T) {
@ -359,6 +364,37 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
} }
} }
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList := []Deltas{
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
}
for _, expected := range expectedList {
if f.HasSynced() {
t.Errorf("Expected HasSynced to be false")
}
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
if f.HasSynced() {
t.Errorf("Expected HasSynced to be true")
}
}
func TestDeltaFIFO_detectLineJumpers(t *testing.T) { func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)