mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-05 11:16:23 +00:00
Fix bug in reflector not recovering from "Too large resource version" errors
Kubernetes-commit: 3704174f95c7311e025284ef30bb56945fa6e7cc
This commit is contained in:
parent
03667fd6b1
commit
ec46b97af4
41
tools/cache/reflector.go
vendored
41
tools/cache/reflector.go
vendored
@ -82,9 +82,9 @@ type Reflector struct {
|
|||||||
// observed when doing a sync with the underlying store
|
// observed when doing a sync with the underlying store
|
||||||
// it is thread safe, but not synchronized with the underlying store
|
// it is thread safe, but not synchronized with the underlying store
|
||||||
lastSyncResourceVersion string
|
lastSyncResourceVersion string
|
||||||
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
|
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
|
||||||
// failed with an HTTP 410 (Gone) status code.
|
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
|
||||||
isLastSyncResourceVersionGone bool
|
isLastSyncResourceVersionUnavailable bool
|
||||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||||
lastSyncResourceVersionMutex sync.RWMutex
|
lastSyncResourceVersionMutex sync.RWMutex
|
||||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||||
@ -115,7 +115,7 @@ type WatchErrorHandler func(r *Reflector, err error)
|
|||||||
func DefaultWatchErrorHandler(r *Reflector, err error) {
|
func DefaultWatchErrorHandler(r *Reflector, err error) {
|
||||||
switch {
|
switch {
|
||||||
case isExpiredError(err):
|
case isExpiredError(err):
|
||||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
||||||
// has a semantic that it returns data at least as fresh as provided RV.
|
// has a semantic that it returns data at least as fresh as provided RV.
|
||||||
// So first try to LIST with setting RV to resource version of last observed object.
|
// So first try to LIST with setting RV to resource version of last observed object.
|
||||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||||
@ -288,13 +288,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
list, paginatedResult, err = pager.List(context.Background(), options)
|
list, paginatedResult, err = pager.List(context.Background(), options)
|
||||||
if isExpiredError(err) {
|
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
|
||||||
r.setIsLastSyncResourceVersionExpired(true)
|
r.setIsLastSyncResourceVersionUnavailable(true)
|
||||||
// Retry immediately if the resource version used to list is expired.
|
// Retry immediately if the resource version used to list is unavailable.
|
||||||
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
||||||
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
// continuation pages, but the pager might not be enabled, the full list might fail because the
|
||||||
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
// resource version it is listing at is expired or the cache may not yet be synced to the provided
|
||||||
// to recover and ensure the reflector makes forward progress.
|
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
|
||||||
|
// the reflector makes forward progress.
|
||||||
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||||
}
|
}
|
||||||
close(listCh)
|
close(listCh)
|
||||||
@ -324,7 +325,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
r.paginatedResult = true
|
r.paginatedResult = true
|
||||||
}
|
}
|
||||||
|
|
||||||
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
|
||||||
initTrace.Step("Objects listed")
|
initTrace.Step("Objects listed")
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -415,7 +416,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
switch {
|
switch {
|
||||||
case isExpiredError(err):
|
case isExpiredError(err):
|
||||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
||||||
// has a semantic that it returns data at least as fresh as provided RV.
|
// has a semantic that it returns data at least as fresh as provided RV.
|
||||||
// So first try to LIST with setting RV to resource version of last observed object.
|
// So first try to LIST with setting RV to resource version of last observed object.
|
||||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||||
@ -538,9 +539,9 @@ func (r *Reflector) relistResourceVersion() string {
|
|||||||
r.lastSyncResourceVersionMutex.RLock()
|
r.lastSyncResourceVersionMutex.RLock()
|
||||||
defer r.lastSyncResourceVersionMutex.RUnlock()
|
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||||
|
|
||||||
if r.isLastSyncResourceVersionGone {
|
if r.isLastSyncResourceVersionUnavailable {
|
||||||
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
||||||
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
|
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
|
||||||
// to the latest available ResourceVersion, using a consistent read from etcd.
|
// to the latest available ResourceVersion, using a consistent read from etcd.
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
@ -552,12 +553,12 @@ func (r *Reflector) relistResourceVersion() string {
|
|||||||
return r.lastSyncResourceVersion
|
return r.lastSyncResourceVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
|
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
|
||||||
// expired error: HTTP 410 (Gone) Status Code.
|
// "expired" or "too large resource version" error.
|
||||||
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
|
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
|
||||||
r.lastSyncResourceVersionMutex.Lock()
|
r.lastSyncResourceVersionMutex.Lock()
|
||||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||||
r.isLastSyncResourceVersionGone = isExpired
|
r.isLastSyncResourceVersionUnavailable = isUnavailable
|
||||||
}
|
}
|
||||||
|
|
||||||
func isExpiredError(err error) bool {
|
func isExpiredError(err error) bool {
|
||||||
@ -567,3 +568,7 @@ func isExpiredError(err error) bool {
|
|||||||
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
||||||
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
|
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isTooLargeResourceVersionError(err error) bool {
|
||||||
|
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
|
||||||
|
}
|
||||||
|
56
tools/cache/reflector_test.go
vendored
56
tools/cache/reflector_test.go
vendored
@ -714,6 +714,62 @@ func TestReflectorFullListIfExpired(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorFullListIfTooLarge(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
listCallRVs := []string{}
|
||||||
|
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
// Stop once the reflector begins watching since we're only interested in the list.
|
||||||
|
close(stopCh)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||||
|
|
||||||
|
switch options.ResourceVersion {
|
||||||
|
// initial list
|
||||||
|
case "0":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
|
||||||
|
// relist after the initial list
|
||||||
|
case "20":
|
||||||
|
err := apierrors.NewTimeoutError("too large resource version", 1)
|
||||||
|
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
|
||||||
|
return nil, err
|
||||||
|
// relist from etcd after "too large" error
|
||||||
|
case "":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||||
|
|
||||||
|
// Initial list should use RV=0
|
||||||
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Relist from the future version.
|
||||||
|
// This may happen, as watchcache is initialized from "current global etcd resource version"
|
||||||
|
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
|
||||||
|
// may be synced to a different version and they will never converge.
|
||||||
|
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
|
||||||
|
// done we simply try to relist from now to avoid continuous errors on relists.
|
||||||
|
stopCh = make(chan struct{})
|
||||||
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedRVs := []string{"0", "20", ""}
|
||||||
|
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||||
|
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflectorSetExpectedType(t *testing.T) {
|
func TestReflectorSetExpectedType(t *testing.T) {
|
||||||
obj := &unstructured.Unstructured{}
|
obj := &unstructured.Unstructured{}
|
||||||
gvk := schema.GroupVersionKind{
|
gvk := schema.GroupVersionKind{
|
||||||
|
Loading…
Reference in New Issue
Block a user