diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index e9012b5cc53..e53c3e61fd1 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -223,6 +223,7 @@ func NewApplyConflict(causes []metav1.StatusCause, message string) *StatusError } // NewGone returns an error indicating the item no longer available at the server and no forwarding address is known. +// DEPRECATED: Please use NewResourceExpired instead. func NewGone(message string) *StatusError { return &StatusError{metav1.Status{ Status: metav1.StatusFailure, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 0c33b57fe42..ebd3314650f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -468,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w return result, nil } if resourceVersion < oldest-1 { - return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) + return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) } // Binary search the smallest index at which resourceVersion is greater than the given one. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index acf7a521812..4b66b960240 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -444,8 +444,8 @@ func TestWatch(t *testing.T) { } defer tooOldWatcher.Stop() // Ensure we get a "Gone" error - expectedGoneError := errors.NewGone("").ErrStatus - verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError) + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) if err != nil { @@ -668,8 +668,8 @@ func TestEmptyWatchEventCache(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer watcher.Stop() - expectedGoneError := errors.NewGone("").ErrStatus - verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError) + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError) } { diff --git a/staging/src/k8s.io/client-go/tools/cache/BUILD b/staging/src/k8s.io/client-go/tools/cache/BUILD index 1176312e372..289e19bd05f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/BUILD +++ b/staging/src/k8s.io/client-go/tools/cache/BUILD @@ -27,6 +27,7 @@ go_test( race = "off", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 62749ed7d31..1165c523eb6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -74,6 +74,9 @@ type Reflector struct { // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string + // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion + // failed with an HTTP 410 (Gone) status code. + isLastSyncResourceVersionGone bool // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. @@ -185,10 +188,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string - // Explicitly set "0" as resource version - it's fine for the List() - // to be served from cache and potentially be delayed relative to - // etcd contents. Reflector framework will catch up via Watch() eventually. - options := metav1.ListOptions{ResourceVersion: "0"} + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) @@ -211,8 +211,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } - // Pager falls back to full list if paginated list calls fail due to an "Expired" error. + list, err = pager.List(context.Background(), options) + if isExpiredError(err) { + r.setIsLastSyncResourceVersionExpired(true) + // Retry immediately if the resource version used to list is expired. + // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on + // continuation pages, but the pager might not be enabled, or the full list might fail because the + // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all + // to recover and ensure the reflector makes forward progress. + list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + } close(listCh) }() select { @@ -225,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } + r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { @@ -298,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { w, err := r.listerWatcher.Watch(options) if err != nil { - switch err { - case io.EOF: + switch { + case isExpiredError(err): + r.setIsLastSyncResourceVersionExpired(true) + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + case err == io.EOF: // watch closed normally - case io.ErrUnexpectedEOF: + case err == io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) @@ -320,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { - case apierrs.IsResourceExpired(err): + case isExpiredError(err): + r.setIsLastSyncResourceVersionExpired(true) klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) @@ -432,3 +446,42 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v } + +// relistResourceVersion determines the resource version the reflector should list or relist from. +// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource +// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted +// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in +// etcd via a quorum read. +func (r *Reflector) relistResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + + if r.isLastSyncResourceVersionGone { + // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache + // if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector + // to the latest available ResourceVersion, using a consistent read from etcd. + return "" + } + if r.lastSyncResourceVersion == "" { + // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to + // be served from the watch cache if it is enabled. + return "0" + } + return r.lastSyncResourceVersion +} + +// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a +// expired error: HTTP 410 (Gone) Status Code. +func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.isLastSyncResourceVersionGone = isExpired +} + +func isExpiredError(err error) bool { + // In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and + // apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent + // and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone + // check when we fully drop support for Kubernetes 1.17 servers from reflectors. + return apierrs.IsResourceExpired(err) || apierrs.IsGone(err) +} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 1f28dfce49c..65f5124f0ef 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -26,6 +26,7 @@ import ( "time" "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } +// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends +// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or +// etcd that is partitioned and serving older data than the reflector has already processed. +func TestReflectorResyncWithResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier +// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion, +// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it). +// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than +// the requested ResourceVersion). +func TestReflectorExpiredExactResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + // When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned. + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="". + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", ""} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +func TestReflectorFullListIfExpired(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + switch options.Limit { + case 4: + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil + case "C1": + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + default: + t.Fatalf("Unrecognized Continue: %s", options.Continue) + } + case 0: + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized Limit: %d", options.Limit) + } + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.WatchListPageSize = 4 + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10 + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", "10", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + func TestReflectorSetExpectedType(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index d265db78683..25d190efa68 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti obj, err := p.PageFn(ctx, options) if err != nil { - if !errors.IsResourceExpired(err) || !p.FullListIfExpired { + // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and + // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from + // failing when the resource versions is established by the first page request falls out of the compaction + // during the subsequent list requests). + if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { return nil, err } // the list expired while we were processing, fall back to a full list