Merge pull request #83520 from jpbetz/reflector-relist-rv

Avoid going back in time in Reflector relist (revived)
This commit is contained in:
Kubernetes Prow Robot 2019-11-06 18:46:44 -08:00 committed by GitHub
commit 8ed2f4775a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 263 additions and 15 deletions

View File

@ -223,6 +223,7 @@ func NewApplyConflict(causes []metav1.StatusCause, message string) *StatusError
}
// NewGone returns an error indicating the item no longer available at the server and no forwarding address is known.
// DEPRECATED: Please use NewResourceExpired instead.
func NewGone(message string) *StatusError {
return &StatusError{metav1.Status{
Status: metav1.StatusFailure,

View File

@ -468,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
return result, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
}
// Binary search the smallest index at which resourceVersion is greater than the given one.

View File

@ -444,8 +444,8 @@ func TestWatch(t *testing.T) {
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error
expectedGoneError := errors.NewGone("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
if err != nil {
@ -668,8 +668,8 @@ func TestEmptyWatchEventCache(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
expectedGoneError := errors.NewGone("").ErrStatus
verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError)
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
}
{

View File

@ -27,6 +27,7 @@ go_test(
race = "off",
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",

View File

@ -74,6 +74,9 @@ type Reflector struct {
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
isLastSyncResourceVersionGone bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@ -185,10 +188,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
@ -211,8 +211,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
@ -225,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
@ -298,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
switch {
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
@ -320,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
@ -432,3 +446,42 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}
// relistResourceVersion determines the resource version the reflector should list or relist from.
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
// etcd via a quorum read.
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
if r.lastSyncResourceVersion == "" {
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
// be served from the watch cache if it is enabled.
return "0"
}
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
// expired error: HTTP 410 (Gone) Status Code.
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionGone = isExpired
}
func isExpiredError(err error) bool {
// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrs.IsResourceExpired(err) || apierrs.IsGone(err)
}

View File

@ -26,6 +26,7 @@ import (
"time"
"k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -434,6 +435,194 @@ func TestReflectorWatchListPageSize(t *testing.T) {
}
}
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
// etcd that is partitioned and serving older data than the reflector has already processed.
func TestReflectorResyncWithResourceVersion(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
r.ListAndWatch(stopCh)
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10)
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
}
}
// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
// the requested ResourceVersion).
func TestReflectorExpiredExactResourceVersion(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
r.ListAndWatch(stopCh)
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
}
}
func TestReflectorFullListIfExpired(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
switch options.Limit {
case 4:
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
case "C1":
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
default:
t.Fatalf("Unrecognized Continue: %s", options.Continue)
}
case 0:
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized Limit: %d", options.Limit)
}
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.WatchListPageSize = 4
// Initial list should use RV=0
r.ListAndWatch(stopCh)
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10", "10", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
}
}
func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{

View File

@ -87,7 +87,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
obj, err := p.PageFn(ctx, options)
if err != nil {
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
// failing when the resource versions is established by the first page request falls out of the compaction
// during the subsequent list requests).
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, err
}
// the list expired while we were processing, fall back to a full list