Merge pull request #92537 from wojtek-t/fix_reflector_not_making_progress

Fix bug in reflector not recovering from "Too large resource version"…

Kubernetes-commit: ed06981eab5e037b7d5b090b257054abb4d42ea0
This commit is contained in:
Kubernetes Publisher 2020-07-01 00:07:47 -07:00
commit 5f4e5d88ae
5 changed files with 83 additions and 22 deletions

2
Godeps/Godeps.json generated
View File

@ -444,7 +444,7 @@
}, },
{ {
"ImportPath": "k8s.io/apimachinery", "ImportPath": "k8s.io/apimachinery",
"Rev": "61490fe38e78" "Rev": "3c7bc0acc576"
}, },
{ {
"ImportPath": "k8s.io/gengo", "ImportPath": "k8s.io/gengo",

4
go.mod
View File

@ -27,7 +27,7 @@ require (
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6 golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/time v0.0.0-20191024005414-555d28b269f0
k8s.io/api v0.0.0-20200630090439-aaebd44608df k8s.io/api v0.0.0-20200630090439-aaebd44608df
k8s.io/apimachinery v0.0.0-20200630050232-61490fe38e78 k8s.io/apimachinery v0.0.0-20200701090254-3c7bc0acc576
k8s.io/klog/v2 v2.1.0 k8s.io/klog/v2 v2.1.0
k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19 k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0
@ -35,5 +35,5 @@ require (
replace ( replace (
k8s.io/api => k8s.io/api v0.0.0-20200630090439-aaebd44608df k8s.io/api => k8s.io/api v0.0.0-20200630090439-aaebd44608df
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200630050232-61490fe38e78 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200701090254-3c7bc0acc576
) )

2
go.sum
View File

@ -331,7 +331,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/api v0.0.0-20200630090439-aaebd44608df/go.mod h1:cQk/DBCsCL81chJRAZjSBZ4suDeNugUZE3LqnYmbQ8Q= k8s.io/api v0.0.0-20200630090439-aaebd44608df/go.mod h1:cQk/DBCsCL81chJRAZjSBZ4suDeNugUZE3LqnYmbQ8Q=
k8s.io/apimachinery v0.0.0-20200630050232-61490fe38e78/go.mod h1:m5QoVMHU94aieNFPfLqf7WmU8HLVLm3JasG64COkSXQ= k8s.io/apimachinery v0.0.0-20200701090254-3c7bc0acc576/go.mod h1:m5QoVMHU94aieNFPfLqf7WmU8HLVLm3JasG64COkSXQ=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok= k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=

View File

@ -82,9 +82,9 @@ type Reflector struct {
// observed when doing a sync with the underlying store // observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store // it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// failed with an HTTP 410 (Gone) status code. // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionGone bool isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
@ -115,7 +115,7 @@ type WatchErrorHandler func(r *Reflector, err error)
func DefaultWatchErrorHandler(r *Reflector, err error) { func DefaultWatchErrorHandler(r *Reflector, err error) {
switch { switch {
case isExpiredError(err): case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@ -288,13 +288,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
} }
list, paginatedResult, err = pager.List(context.Background(), options) list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) { if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionExpired(true) r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is expired. // Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the // continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all // resource version it is listing at is expired or the cache may not yet be synced to the provided
// to recover and ensure the reflector makes forward progress. // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
} }
close(listCh) close(listCh)
@ -324,7 +325,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.paginatedResult = true r.paginatedResult = true
} }
r.setIsLastSyncResourceVersionExpired(false) // list was successful r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed") initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list) listMetaInterface, err := meta.ListAccessor(list)
if err != nil { if err != nil {
@ -415,7 +416,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested { if err != errorStopRequested {
switch { switch {
case isExpiredError(err): case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@ -538,9 +539,9 @@ func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock() r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock() defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone { if r.isLastSyncResourceVersionUnavailable {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd. // to the latest available ResourceVersion, using a consistent read from etcd.
return "" return ""
} }
@ -552,12 +553,12 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion return r.lastSyncResourceVersion
} }
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// expired error: HTTP 410 (Gone) Status Code. // "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
r.lastSyncResourceVersionMutex.Lock() r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock() defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionGone = isExpired r.isLastSyncResourceVersionUnavailable = isUnavailable
} }
func isExpiredError(err error) bool { func isExpiredError(err error) bool {
@ -567,3 +568,7 @@ func isExpiredError(err error) bool {
// check when we fully drop support for Kubernetes 1.17 servers from reflectors. // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err) return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
} }
func isTooLargeResourceVersionError(err error) bool {
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
}

View File

@ -714,6 +714,62 @@ func TestReflectorFullListIfExpired(t *testing.T) {
} }
} }
func TestReflectorFullListIfTooLarge(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
switch options.ResourceVersion {
// initial list
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
// relist after the initial list
case "20":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
return nil, err
// relist from etcd after "too large" error
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
default:
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
}
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
// Relist from the future version.
// This may happen, as watchcache is initialized from "current global etcd resource version"
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
// may be synced to a different version and they will never converge.
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
// done we simply try to relist from now to avoid continuous errors on relists.
stopCh = make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
expectedRVs := []string{"0", "20", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
}
}
func TestReflectorSetExpectedType(t *testing.T) { func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{} obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{ gvk := schema.GroupVersionKind{