diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 1165c523e..62749ed7d 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -74,9 +74,6 @@ type Reflector struct { // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string - // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion - // failed with an HTTP 410 (Gone) status code. - isLastSyncResourceVersionGone bool // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. @@ -188,7 +185,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string - options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} + // Explicitly set "0" as resource version - it's fine for the List() + // to be served from cache and potentially be delayed relative to + // etcd contents. Reflector framework will catch up via Watch() eventually. + options := metav1.ListOptions{ResourceVersion: "0"} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) @@ -211,17 +211,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } - + // Pager falls back to full list if paginated list calls fail due to an "Expired" error. list, err = pager.List(context.Background(), options) - if isExpiredError(err) { - r.setIsLastSyncResourceVersionExpired(true) - // Retry immediately if the resource version used to list is expired. - // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on - // continuation pages, but the pager might not be enabled, or the full list might fail because the - // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all - // to recover and ensure the reflector makes forward progress. - list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) - } close(listCh) }() select { @@ -234,7 +225,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } - r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { @@ -308,13 +298,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { w, err := r.listerWatcher.Watch(options) if err != nil { - switch { - case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) - case err == io.EOF: + switch err { + case io.EOF: // watch closed normally - case err == io.ErrUnexpectedEOF: + case io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) @@ -333,8 +320,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { - case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) + case apierrs.IsResourceExpired(err): klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) @@ -446,42 +432,3 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v } - -// relistResourceVersion determines the resource version the reflector should list or relist from. -// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource -// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted -// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in -// etcd via a quorum read. -func (r *Reflector) relistResourceVersion() string { - r.lastSyncResourceVersionMutex.RLock() - defer r.lastSyncResourceVersionMutex.RUnlock() - - if r.isLastSyncResourceVersionGone { - // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache - // if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector - // to the latest available ResourceVersion, using a consistent read from etcd. - return "" - } - if r.lastSyncResourceVersion == "" { - // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to - // be served from the watch cache if it is enabled. - return "0" - } - return r.lastSyncResourceVersion -} - -// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a -// expired error: HTTP 410 (Gone) Status Code. -func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { - r.lastSyncResourceVersionMutex.Lock() - defer r.lastSyncResourceVersionMutex.Unlock() - r.isLastSyncResourceVersionGone = isExpired -} - -func isExpiredError(err error) bool { - // In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and - // apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent - // and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone - // check when we fully drop support for Kubernetes 1.17 servers from reflectors. - return apierrs.IsResourceExpired(err) || apierrs.IsGone(err) -} diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 2b0796c6a..1f28dfce4 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -26,7 +26,6 @@ import ( "time" "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -435,197 +434,6 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } -// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends -// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or -// etcd that is partitioned and serving older data than the reflector has already processed. -func TestReflectorResyncWithResourceVersion(t *testing.T) { - stopCh := make(chan struct{}) - s := NewStore(MetaNamespaceKeyFunc) - listCallRVs := []string{} - - lw := &testLW{ - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // Stop once the reflector begins watching since we're only interested in the list. - close(stopCh) - fw := watch.NewFake() - return fw, nil - }, - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - listCallRVs = append(listCallRVs, options.ResourceVersion) - pods := make([]v1.Pod, 8) - for i := 0; i < 8; i++ { - pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} - } - switch options.ResourceVersion { - case "0": - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil - case "10": - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil - default: - t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) - } - return nil, nil - }, - } - r := NewReflector(lw, &v1.Pod{}, s, 0) - - // Initial list should use RV=0 - r.ListAndWatch(stopCh) - - results := s.List() - if len(results) != 4 { - t.Errorf("Expected 4 results, got %d", len(results)) - } - - // relist should use lastSyncResourceVersions (RV=10) - stopCh = make(chan struct{}) - r.ListAndWatch(stopCh) - - results = s.List() - if len(results) != 8 { - t.Errorf("Expected 8 results, got %d", len(results)) - } - - expectedRVs := []string{"0", "10"} - if !reflect.DeepEqual(listCallRVs, expectedRVs) { - t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) - } -} - -// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier -// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion, -// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it). -// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than -// the requested ResourceVersion). -func TestReflectorExpiredExactResourceVersion(t *testing.T) { - stopCh := make(chan struct{}) - s := NewStore(MetaNamespaceKeyFunc) - listCallRVs := []string{} - - lw := &testLW{ - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // Stop once the reflector begins watching since we're only interested in the list. - close(stopCh) - fw := watch.NewFake() - return fw, nil - }, - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - listCallRVs = append(listCallRVs, options.ResourceVersion) - pods := make([]v1.Pod, 8) - for i := 0; i < 8; i++ { - pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} - } - switch options.ResourceVersion { - case "0": - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil - case "10": - // When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned. - return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") - case "": - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil - default: - t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) - } - return nil, nil - }, - } - r := NewReflector(lw, &v1.Pod{}, s, 0) - - // Initial list should use RV=0 - r.ListAndWatch(stopCh) - - results := s.List() - if len(results) != 4 { - t.Errorf("Expected 4 results, got %d", len(results)) - } - - // relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="". - stopCh = make(chan struct{}) - r.ListAndWatch(stopCh) - - results = s.List() - if len(results) != 8 { - t.Errorf("Expected 8 results, got %d", len(results)) - } - - expectedRVs := []string{"0", "10", ""} - if !reflect.DeepEqual(listCallRVs, expectedRVs) { - t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) - } -} - -func TestReflectorFullListIfExpired(t *testing.T) { - stopCh := make(chan struct{}) - s := NewStore(MetaNamespaceKeyFunc) - listCallRVs := []string{} - - lw := &testLW{ - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // Stop once the reflector begins watching since we're only interested in the list. - close(stopCh) - fw := watch.NewFake() - return fw, nil - }, - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - listCallRVs = append(listCallRVs, options.ResourceVersion) - pods := make([]v1.Pod, 8) - for i := 0; i < 8; i++ { - pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} - } - rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions { - return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l} - } - switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) { - // initial limited list - case rvContinueLimit("0", "", 4): - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil - // first page of the rv=10 list - case rvContinueLimit("10", "", 4): - return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil - // second page of the above list - case rvContinueLimit("", "C1", 4): - return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") - // rv=10 unlimited list - case rvContinueLimit("10", "", 0): - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil - default: - err := fmt.Errorf("unexpected list options: %#v", options) - t.Error(err) - return nil, err - } - return nil, nil - }, - } - r := NewReflector(lw, &v1.Pod{}, s, 0) - r.WatchListPageSize = 4 - - // Initial list should use RV=0 - if err := r.ListAndWatch(stopCh); err != nil { - t.Fatal(err) - } - - results := s.List() - if len(results) != 4 { - t.Errorf("Expected 4 results, got %d", len(results)) - } - - // relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10 - stopCh = make(chan struct{}) - if err := r.ListAndWatch(stopCh); err != nil { - t.Fatal(err) - } - - results = s.List() - if len(results) != 8 { - t.Errorf("Expected 8 results, got %d", len(results)) - } - - expectedRVs := []string{"0", "10", "", "10"} - if !reflect.DeepEqual(listCallRVs, expectedRVs) { - t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs) - } -} - func TestReflectorSetExpectedType(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{