diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 9634d4d6..a92b36f2 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -82,9 +82,9 @@ type Reflector struct { // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string - // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion - // failed with an HTTP 410 (Gone) status code. - isLastSyncResourceVersionGone bool + // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with + // lastSyncResourceVersion failed with an "expired" or "too large resource version" error. + isLastSyncResourceVersionUnavailable bool // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. @@ -115,7 +115,7 @@ type WatchErrorHandler func(r *Reflector, err error) func DefaultWatchErrorHandler(r *Reflector, err error) { switch { case isExpiredError(err): - // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) @@ -288,13 +288,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } list, paginatedResult, err = pager.List(context.Background(), options) - if isExpiredError(err) { - r.setIsLastSyncResourceVersionExpired(true) - // Retry immediately if the resource version used to list is expired. + if isExpiredError(err) || isTooLargeResourceVersionError(err) { + r.setIsLastSyncResourceVersionUnavailable(true) + // Retry immediately if the resource version used to list is unavailable. // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on - // continuation pages, but the pager might not be enabled, or the full list might fail because the - // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all - // to recover and ensure the reflector makes forward progress. + // continuation pages, but the pager might not be enabled, the full list might fail because the + // resource version it is listing at is expired or the cache may not yet be synced to the provided + // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure + // the reflector makes forward progress. list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) @@ -324,7 +325,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { r.paginatedResult = true } - r.setIsLastSyncResourceVersionExpired(false) // list was successful + r.setIsLastSyncResourceVersionUnavailable(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { @@ -415,7 +416,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != errorStopRequested { switch { case isExpiredError(err): - // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) @@ -538,9 +539,9 @@ func (r *Reflector) relistResourceVersion() string { r.lastSyncResourceVersionMutex.RLock() defer r.lastSyncResourceVersionMutex.RUnlock() - if r.isLastSyncResourceVersionGone { + if r.isLastSyncResourceVersionUnavailable { // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache - // if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector + // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector // to the latest available ResourceVersion, using a consistent read from etcd. return "" } @@ -552,12 +553,12 @@ func (r *Reflector) relistResourceVersion() string { return r.lastSyncResourceVersion } -// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a -// expired error: HTTP 410 (Gone) Status Code. -func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { +// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned +// "expired" or "too large resource version" error. +func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) { r.lastSyncResourceVersionMutex.Lock() defer r.lastSyncResourceVersionMutex.Unlock() - r.isLastSyncResourceVersionGone = isExpired + r.isLastSyncResourceVersionUnavailable = isUnavailable } func isExpiredError(err error) bool { @@ -567,3 +568,7 @@ func isExpiredError(err error) bool { // check when we fully drop support for Kubernetes 1.17 servers from reflectors. return apierrors.IsResourceExpired(err) || apierrors.IsGone(err) } + +func isTooLargeResourceVersionError(err error) bool { + return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) +} diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 3e2fcc19..4a25fcce 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -714,6 +714,62 @@ func TestReflectorFullListIfExpired(t *testing.T) { } } +func TestReflectorFullListIfTooLarge(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + + switch options.ResourceVersion { + // initial list + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil + // relist after the initial list + case "20": + err := apierrors.NewTimeoutError("too large resource version", 1) + err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}} + return nil, err + // relist from etcd after "too large" error + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil + default: + return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion) + } + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + + // Relist from the future version. + // This may happen, as watchcache is initialized from "current global etcd resource version" + // when kube-apiserver is starting and if no objects are changing after that each kube-apiserver + // may be synced to a different version and they will never converge. + // TODO: We should use etcd progress-notify feature to avoid this behavior but until this is + // done we simply try to relist from now to avoid continuous errors on relists. + stopCh = make(chan struct{}) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + + expectedRVs := []string{"0", "20", ""} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs) + } +} + func TestReflectorSetExpectedType(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{