diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index e9b14b3a..a56fce6e 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -21,9 +21,12 @@ import ( "errors" "fmt" "math/rand" + "net/http" "reflect" goruntime "runtime" "strconv" + "sync" + "sync/atomic" "syscall" "testing" "time" @@ -1753,6 +1756,152 @@ func TestReflectorListExtract(t *testing.T) { } } +func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { + mkPod := func(id string, rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}} + } + mkList := func(rv string, pods ...*v1.Pod) *v1.PodList { + list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}} + for _, pod := range pods { + list.Items = append(list.Items, *pod) + } + return list + } + makeStatus := func() *metav1.Status { + return &metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonStoreReadError, + Message: "failed to prepare current and previous objects: corrupt object has been deleted", + } + } + + // these pods preexist and never get updated/deleted + preExisting := mkPod("foo-1", "1") + pods := []*v1.Pod{preExisting, mkPod("foo-2", "2"), mkPod("foo-3", "3")} + lastExpectedRV := "5" + lists := []*v1.PodList{ + mkList("3", pods...), // initial list + mkList(lastExpectedRV, pods...), // re-list due to watch error + } + corruptObj := mkPod("foo", "4") + events := []watch.Event{ + {Type: watch.Added, Object: corruptObj}, + // the object becomes corrupt, and it gets unsafe-deleted, and + // watch sends the following Error event, note the RV has + // advanced to "5" in the storage due to the delete operation + {Type: watch.Error, Object: makeStatus()}, + } + + s := NewFIFO(MetaNamespaceKeyFunc) + var replaceInvoked atomic.Int32 + store := &fakeStore{ + Store: s, + beforeReplace: func(list []interface{}, rv string) { + // interested in the Replace call that happens after the Error event + if rv == lastExpectedRV { + replaceInvoked.Add(1) + _, exists, err := s.Get(corruptObj) + if err != nil || !exists { + t.Errorf("expected the object to exist in the store, exists: %t, err: %v", exists, err) + } + _, exists, err = s.Get(preExisting) + if err != nil || !exists { + t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err) + } + } + }, + afterReplace: func(rv string, err error) { + if rv == lastExpectedRV { + replaceInvoked.Add(1) + if err != nil { + t.Errorf("expected Replace to have succeeded, but got error: %v", err) + } + _, exists, err := s.Get(corruptObj) + if err != nil || exists { + t.Errorf("expected the object to have been removed from the store, exists: %t, err: %v", exists, err) + } + // show that a pre-existing pod is still in the cache + _, exists, err = s.Get(preExisting) + if err != nil || !exists { + t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err) + } + } + }, + } + + var once sync.Once + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + fw := watch.NewFake() + go func() { + once.Do(func() { + for _, e := range events { + fw.Action(e.Type, e.Object) + } + }) + }() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + var list runtime.Object + if len(lists) > 0 { + list = lists[0] + lists = lists[1:] + } + return list, nil + }, + } + + r := NewReflector(lw, &v1.Pod{}, store, 0) + doneCh, stopCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(doneCh) + r.Run(stopCh) + }() + + // wait for the RV to sync to the version returned by the final list + err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { + if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err) + } + + if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got { + t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got) + } + if want, got := 2, int(replaceInvoked.Load()); want != got { + t.Errorf("expected store Delete hooks to be invoked %d times, but got: %d", want, got) + } + if want, got := len(pods), len(s.List()); want != got { + t.Errorf("expected the store to have %d objects, but got: %d", want, got) + } + + close(stopCh) + select { + case <-doneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for Run to return") + } +} + +type fakeStore struct { + Store + beforeReplace func(list []interface{}, s string) + afterReplace func(rv string, err error) +} + +func (f *fakeStore) Replace(list []interface{}, rv string) error { + f.beforeReplace(list, rv) + err := f.Store.Replace(list, rv) + f.afterReplace(rv, err) + return err +} + func BenchmarkExtractList(b *testing.B) { _, _, podList := getPodListItems(0, fakeItemsNum) _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)