mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
DeltaFIFO should not report HasSynced until deletes are reported
Deletions identified by knownObjects on the initial sync replace (immediately after the List from a reflector) should be considered part of the initial sync since we have the information available at the time we do the deletion. An error during Replace() can result in Populated not being set, but it was incorrect before (population would be wrong) and queueActionLocked does not error except on "my cache is broken because I gave an incorrect keyFunc".
This commit is contained in:
parent
354f003c34
commit
bbd8096065
19
pkg/client/cache/delta_fifo.go
vendored
19
pkg/client/cache/delta_fifo.go
vendored
@ -437,11 +437,6 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
defer f.lock.Unlock()
|
||||
keys := make(sets.String, len(list))
|
||||
|
||||
if !f.populated {
|
||||
f.populated = true
|
||||
f.initialPopulationCount = len(list)
|
||||
}
|
||||
|
||||
for _, item := range list {
|
||||
key, err := f.KeyOf(item)
|
||||
if err != nil {
|
||||
@ -467,6 +462,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !f.populated {
|
||||
f.populated = true
|
||||
f.initialPopulationCount = len(list)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -474,6 +475,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||
// with knownObjects. Unproven.
|
||||
knownKeys := f.knownObjects.ListKeys()
|
||||
queuedDeletions := 0
|
||||
for _, k := range knownKeys {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
@ -487,10 +489,17 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
deletedObj = nil
|
||||
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
||||
}
|
||||
queuedDeletions++
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !f.populated {
|
||||
f.populated = true
|
||||
f.initialPopulationCount = len(list) + queuedDeletions
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
38
pkg/client/cache/delta_fifo_test.go
vendored
38
pkg/client/cache/delta_fifo_test.go
vendored
@ -144,6 +144,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
|
||||
}),
|
||||
nil,
|
||||
)
|
||||
if f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be false before completion of initial population")
|
||||
}
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
|
||||
@ -156,7 +159,9 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
|
||||
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) {
|
||||
t.Fatalf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
|
||||
if !f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be true after completion of initial population")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
@ -359,6 +364,37 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
)
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
|
||||
|
||||
expectedList := []Deltas{
|
||||
{{Sync, mkFifoObj("foo", 5)}},
|
||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||
// it should get a tombstone key with the right Obj.
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
|
||||
}
|
||||
|
||||
for _, expected := range expectedList {
|
||||
if f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be false")
|
||||
}
|
||||
cur := Pop(f).(Deltas)
|
||||
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
if f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user