From b2b285a766540278a768dc9e6bd07a3a676ae543 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 29 Aug 2018 11:40:25 +0200 Subject: [PATCH 1/4] Avoid going back in time in reflector framework --- .../k8s.io/client-go/tools/cache/reflector.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 62749ed7d31..7ed3a4e9830 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -114,6 +114,9 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, + // We set lastSyncResourceVersion to "0", because it's the value which + // we set as ResourceVersion to the first List() request. + lastSyncResourceVersion: "0", } r.setExpectedType(expectedType) return r @@ -185,10 +188,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string - // Explicitly set "0" as resource version - it's fine for the List() - // to be served from cache and potentially be delayed relative to - // etcd contents. Reflector framework will catch up via Watch() eventually. - options := metav1.ListOptions{ResourceVersion: "0"} + // Explicitly set resource version to have it list from cache for + // performance reasons. + // It's fine for the returned state to be stale (we will catch up via + // Watch() eventually), but can't set "0" to avoid going back in time + // if we hit apiserver that is significantly delayed compared to the + // state we already had. + // TODO: There is still a potential to go back in time after component + // restart when we set ResourceVersion: "0". For more details see: + // https://github.com/kubernetes/kubernetes/issues/59848 + options := metav1.ListOptions{ResourceVersion: r.LastSyncResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) From 1f3dc14eea231c90819cbba06f25cb4221236b3f Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Fri, 4 Oct 2019 17:08:22 -0700 Subject: [PATCH 2/4] Handle expired errors with RV>0 in pager, don't full list if 1st page is expired --- .../src/k8s.io/client-go/tools/cache/BUILD | 1 + .../k8s.io/client-go/tools/cache/reflector.go | 42 ++-- .../client-go/tools/cache/reflector_test.go | 189 ++++++++++++++++++ .../src/k8s.io/client-go/tools/pager/pager.go | 6 +- 4 files changed, 223 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/BUILD b/staging/src/k8s.io/client-go/tools/cache/BUILD index 1176312e372..289e19bd05f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/BUILD +++ b/staging/src/k8s.io/client-go/tools/cache/BUILD @@ -27,6 +27,7 @@ go_test( race = "off", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 7ed3a4e9830..f61177d85cc 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -114,9 +114,6 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, - // We set lastSyncResourceVersion to "0", because it's the value which - // we set as ResourceVersion to the first List() request. - lastSyncResourceVersion: "0", } r.setExpectedType(expectedType) return r @@ -188,16 +185,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string - // Explicitly set resource version to have it list from cache for - // performance reasons. - // It's fine for the returned state to be stale (we will catch up via - // Watch() eventually), but can't set "0" to avoid going back in time - // if we hit apiserver that is significantly delayed compared to the - // state we already had. - // TODO: There is still a potential to go back in time after component - // restart when we set ResourceVersion: "0". For more details see: - // https://github.com/kubernetes/kubernetes/issues/59848 - options := metav1.ListOptions{ResourceVersion: r.LastSyncResourceVersion()} + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) @@ -220,8 +208,20 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } - // Pager falls back to full list if paginated list calls fail due to an "Expired" error. + // Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later, + // but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired". list, err = pager.List(context.Background(), options) + if apierrs.IsResourceExpired(err) { + // For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests + // with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion + // requested is expired (e.g. an etcd compaction has remove it). + // To prevent the reflector from getting stuck retrying a list for an expired resource version in this + // case, we set ResourceVersion="" and list again to re-establish reflector to the latest available + // ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled + // and the list request returned a "Expired" error. + options = metav1.ListOptions{ResourceVersion: ""} + list, err = pager.List(context.Background(), options) + } close(listCh) }() select { @@ -441,3 +441,17 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v } + +// relistResourceVersion is the resource version the reflector should list or relist from. +func (r *Reflector) relistResourceVersion() string { + lastSyncRV := r.LastSyncResourceVersion() + if lastSyncRV == "" { + // Explicitly set resource version to have it list from cache for + // performance reasons. + // It's fine for the returned state to be stale (we will catch up via Watch() + // eventually), but we need to be at least as new as the last resource version we + // synced to avoid going back in time. + return "0" + } + return lastSyncRV +} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 1f28dfce49c..65f5124f0ef 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -26,6 +26,7 @@ import ( "time" "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } +// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends +// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or +// etcd that is partitioned and serving older data than the reflector has already processed. +func TestReflectorResyncWithResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier +// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion, +// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it). +// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than +// the requested ResourceVersion). +func TestReflectorExpiredExactResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + // When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned. + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="". + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", ""} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +func TestReflectorFullListIfExpired(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + switch options.Limit { + case 4: + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil + case "C1": + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + default: + t.Fatalf("Unrecognized Continue: %s", options.Continue) + } + case 0: + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized Limit: %d", options.Limit) + } + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.WatchListPageSize = 4 + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10 + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", "10", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + func TestReflectorSetExpectedType(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index d265db78683..25d190efa68 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti obj, err := p.PageFn(ctx, options) if err != nil { - if !errors.IsResourceExpired(err) || !p.FullListIfExpired { + // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and + // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from + // failing when the resource versions is established by the first page request falls out of the compaction + // during the subsequent list requests). + if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { return nil, err } // the list expired while we were processing, fall back to a full list From e2fe126d485af243d45f06bafd2ca83931881429 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 9 Oct 2019 16:45:32 -0700 Subject: [PATCH 3/4] Add HTTP 410 (Gone) status code checks to reflector and relist with RV='' --- .../apimachinery/pkg/api/errors/errors.go | 1 + .../pkg/storage/cacher/watch_cache.go | 2 +- .../k8s.io/client-go/tools/cache/reflector.go | 80 +++++++++++++------ 3 files changed, 57 insertions(+), 26 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index 95d5c7a3553..a7ebbecbec5 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -201,6 +201,7 @@ func NewApplyConflict(causes []metav1.StatusCause, message string) *StatusError } // NewGone returns an error indicating the item no longer available at the server and no forwarding address is known. +// DEPRECATED: Please use NewResourceExpired instead. func NewGone(message string) *StatusError { return &StatusError{metav1.Status{ Status: metav1.StatusFailure, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 0c33b57fe42..ebd3314650f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -468,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w return result, nil } if resourceVersion < oldest-1 { - return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) + return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) } // Binary search the smallest index at which resourceVersion is greater than the given one. diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index f61177d85cc..1165c523eb6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -74,6 +74,9 @@ type Reflector struct { // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string + // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion + // failed with an HTTP 410 (Gone) status code. + isLastSyncResourceVersionGone bool // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. @@ -208,19 +211,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } - // Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later, - // but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired". + list, err = pager.List(context.Background(), options) - if apierrs.IsResourceExpired(err) { - // For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests - // with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion - // requested is expired (e.g. an etcd compaction has remove it). - // To prevent the reflector from getting stuck retrying a list for an expired resource version in this - // case, we set ResourceVersion="" and list again to re-establish reflector to the latest available - // ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled - // and the list request returned a "Expired" error. - options = metav1.ListOptions{ResourceVersion: ""} - list, err = pager.List(context.Background(), options) + if isExpiredError(err) { + r.setIsLastSyncResourceVersionExpired(true) + // Retry immediately if the resource version used to list is expired. + // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on + // continuation pages, but the pager might not be enabled, or the full list might fail because the + // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all + // to recover and ensure the reflector makes forward progress. + list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() @@ -234,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } + r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { @@ -307,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { w, err := r.listerWatcher.Watch(options) if err != nil { - switch err { - case io.EOF: + switch { + case isExpiredError(err): + r.setIsLastSyncResourceVersionExpired(true) + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + case err == io.EOF: // watch closed normally - case io.ErrUnexpectedEOF: + case err == io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) @@ -329,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { - case apierrs.IsResourceExpired(err): + case isExpiredError(err): + r.setIsLastSyncResourceVersionExpired(true) klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) @@ -442,16 +447,41 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { r.lastSyncResourceVersion = v } -// relistResourceVersion is the resource version the reflector should list or relist from. +// relistResourceVersion determines the resource version the reflector should list or relist from. +// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource +// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted +// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in +// etcd via a quorum read. func (r *Reflector) relistResourceVersion() string { - lastSyncRV := r.LastSyncResourceVersion() - if lastSyncRV == "" { - // Explicitly set resource version to have it list from cache for - // performance reasons. - // It's fine for the returned state to be stale (we will catch up via Watch() - // eventually), but we need to be at least as new as the last resource version we - // synced to avoid going back in time. + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + + if r.isLastSyncResourceVersionGone { + // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache + // if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector + // to the latest available ResourceVersion, using a consistent read from etcd. + return "" + } + if r.lastSyncResourceVersion == "" { + // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to + // be served from the watch cache if it is enabled. return "0" } - return lastSyncRV + return r.lastSyncResourceVersion +} + +// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a +// expired error: HTTP 410 (Gone) Status Code. +func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.isLastSyncResourceVersionGone = isExpired +} + +func isExpiredError(err error) bool { + // In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and + // apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent + // and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone + // check when we fully drop support for Kubernetes 1.17 servers from reflectors. + return apierrs.IsResourceExpired(err) || apierrs.IsGone(err) } From 57b451cfb6738fca45fc05cd50c1ff6d7240e3a7 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Tue, 5 Nov 2019 22:06:10 -0800 Subject: [PATCH 4/4] Fix watch test to expect Expired instead of Gone --- .../src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index acf7a521812..4b66b960240 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -444,8 +444,8 @@ func TestWatch(t *testing.T) { } defer tooOldWatcher.Stop() // Ensure we get a "Gone" error - expectedGoneError := errors.NewGone("").ErrStatus - verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError) + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) if err != nil { @@ -668,8 +668,8 @@ func TestEmptyWatchEventCache(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer watcher.Stop() - expectedGoneError := errors.NewGone("").ErrStatus - verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError) + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError) } {