diff --git a/staging/src/k8s.io/client-go/tools/cache/BUILD b/staging/src/k8s.io/client-go/tools/cache/BUILD index 1176312e372..289e19bd05f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/BUILD +++ b/staging/src/k8s.io/client-go/tools/cache/BUILD @@ -27,6 +27,7 @@ go_test( race = "off", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 7ed3a4e9830..f61177d85cc 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -114,9 +114,6 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, - // We set lastSyncResourceVersion to "0", because it's the value which - // we set as ResourceVersion to the first List() request. - lastSyncResourceVersion: "0", } r.setExpectedType(expectedType) return r @@ -188,16 +185,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string - // Explicitly set resource version to have it list from cache for - // performance reasons. - // It's fine for the returned state to be stale (we will catch up via - // Watch() eventually), but can't set "0" to avoid going back in time - // if we hit apiserver that is significantly delayed compared to the - // state we already had. - // TODO: There is still a potential to go back in time after component - // restart when we set ResourceVersion: "0". For more details see: - // https://github.com/kubernetes/kubernetes/issues/59848 - options := metav1.ListOptions{ResourceVersion: r.LastSyncResourceVersion()} + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) @@ -220,8 +208,20 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } - // Pager falls back to full list if paginated list calls fail due to an "Expired" error. + // Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later, + // but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired". list, err = pager.List(context.Background(), options) + if apierrs.IsResourceExpired(err) { + // For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests + // with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion + // requested is expired (e.g. an etcd compaction has remove it). + // To prevent the reflector from getting stuck retrying a list for an expired resource version in this + // case, we set ResourceVersion="" and list again to re-establish reflector to the latest available + // ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled + // and the list request returned a "Expired" error. + options = metav1.ListOptions{ResourceVersion: ""} + list, err = pager.List(context.Background(), options) + } close(listCh) }() select { @@ -441,3 +441,17 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v } + +// relistResourceVersion is the resource version the reflector should list or relist from. +func (r *Reflector) relistResourceVersion() string { + lastSyncRV := r.LastSyncResourceVersion() + if lastSyncRV == "" { + // Explicitly set resource version to have it list from cache for + // performance reasons. + // It's fine for the returned state to be stale (we will catch up via Watch() + // eventually), but we need to be at least as new as the last resource version we + // synced to avoid going back in time. + return "0" + } + return lastSyncRV +} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 1f28dfce49c..65f5124f0ef 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -26,6 +26,7 @@ import ( "time" "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } +// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends +// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or +// etcd that is partitioned and serving older data than the reflector has already processed. +func TestReflectorResyncWithResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier +// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion, +// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it). +// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than +// the requested ResourceVersion). +func TestReflectorExpiredExactResourceVersion(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + // When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned. + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="". + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", ""} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + +func TestReflectorFullListIfExpired(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + listCallRVs := []string{} + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCallRVs = append(listCallRVs, options.ResourceVersion) + pods := make([]v1.Pod, 8) + for i := 0; i < 8; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.ResourceVersion { + case "0": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil + case "10": + switch options.Limit { + case 4: + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil + case "C1": + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + default: + t.Fatalf("Unrecognized Continue: %s", options.Continue) + } + case 0: + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil + default: + t.Fatalf("Unrecognized Limit: %d", options.Limit) + } + default: + t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.WatchListPageSize = 4 + + // Initial list should use RV=0 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 4 { + t.Errorf("Expected 4 results, got %d", len(results)) + } + + // relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10 + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + + results = s.List() + if len(results) != 8 { + t.Errorf("Expected 8 results, got %d", len(results)) + } + + expectedRVs := []string{"0", "10", "10", "10"} + if !reflect.DeepEqual(listCallRVs, expectedRVs) { + t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + } +} + func TestReflectorSetExpectedType(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index d265db78683..25d190efa68 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti obj, err := p.PageFn(ctx, options) if err != nil { - if !errors.IsResourceExpired(err) || !p.FullListIfExpired { + // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and + // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from + // failing when the resource versions is established by the first page request falls out of the compaction + // during the subsequent list requests). + if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { return nil, err } // the list expired while we were processing, fall back to a full list