mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-20 10:00:23 +00:00
Add HTTP 410 (Gone) status code checks to reflector and relist with RV=''
Kubernetes-commit: e2fe126d485af243d45f06bafd2ca83931881429
This commit is contained in:
parent
54033229aa
commit
a3f022a93c
80
tools/cache/reflector.go
vendored
80
tools/cache/reflector.go
vendored
@ -74,6 +74,9 @@ type Reflector struct {
|
|||||||
// observed when doing a sync with the underlying store
|
// observed when doing a sync with the underlying store
|
||||||
// it is thread safe, but not synchronized with the underlying store
|
// it is thread safe, but not synchronized with the underlying store
|
||||||
lastSyncResourceVersion string
|
lastSyncResourceVersion string
|
||||||
|
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
|
||||||
|
// failed with an HTTP 410 (Gone) status code.
|
||||||
|
isLastSyncResourceVersionGone bool
|
||||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||||
lastSyncResourceVersionMutex sync.RWMutex
|
lastSyncResourceVersionMutex sync.RWMutex
|
||||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||||
@ -208,19 +211,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if r.WatchListPageSize != 0 {
|
if r.WatchListPageSize != 0 {
|
||||||
pager.PageSize = r.WatchListPageSize
|
pager.PageSize = r.WatchListPageSize
|
||||||
}
|
}
|
||||||
// Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later,
|
|
||||||
// but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired".
|
|
||||||
list, err = pager.List(context.Background(), options)
|
list, err = pager.List(context.Background(), options)
|
||||||
if apierrs.IsResourceExpired(err) {
|
if isExpiredError(err) {
|
||||||
// For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests
|
r.setIsLastSyncResourceVersionExpired(true)
|
||||||
// with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion
|
// Retry immediately if the resource version used to list is expired.
|
||||||
// requested is expired (e.g. an etcd compaction has remove it).
|
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
||||||
// To prevent the reflector from getting stuck retrying a list for an expired resource version in this
|
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
||||||
// case, we set ResourceVersion="" and list again to re-establish reflector to the latest available
|
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
||||||
// ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled
|
// to recover and ensure the reflector makes forward progress.
|
||||||
// and the list request returned a "Expired" error.
|
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||||
options = metav1.ListOptions{ResourceVersion: ""}
|
|
||||||
list, err = pager.List(context.Background(), options)
|
|
||||||
}
|
}
|
||||||
close(listCh)
|
close(listCh)
|
||||||
}()
|
}()
|
||||||
@ -234,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
||||||
}
|
}
|
||||||
|
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
||||||
initTrace.Step("Objects listed")
|
initTrace.Step("Objects listed")
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -307,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
w, err := r.listerWatcher.Watch(options)
|
w, err := r.listerWatcher.Watch(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch {
|
||||||
case io.EOF:
|
case isExpiredError(err):
|
||||||
|
r.setIsLastSyncResourceVersionExpired(true)
|
||||||
|
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||||
|
case err == io.EOF:
|
||||||
// watch closed normally
|
// watch closed normally
|
||||||
case io.ErrUnexpectedEOF:
|
case err == io.ErrUnexpectedEOF:
|
||||||
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
||||||
default:
|
default:
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
||||||
@ -329,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
switch {
|
switch {
|
||||||
case apierrs.IsResourceExpired(err):
|
case isExpiredError(err):
|
||||||
|
r.setIsLastSyncResourceVersionExpired(true)
|
||||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||||
default:
|
default:
|
||||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||||
@ -442,16 +447,41 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
|
|||||||
r.lastSyncResourceVersion = v
|
r.lastSyncResourceVersion = v
|
||||||
}
|
}
|
||||||
|
|
||||||
// relistResourceVersion is the resource version the reflector should list or relist from.
|
// relistResourceVersion determines the resource version the reflector should list or relist from.
|
||||||
|
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
|
||||||
|
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
|
||||||
|
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
|
||||||
|
// etcd via a quorum read.
|
||||||
func (r *Reflector) relistResourceVersion() string {
|
func (r *Reflector) relistResourceVersion() string {
|
||||||
lastSyncRV := r.LastSyncResourceVersion()
|
r.lastSyncResourceVersionMutex.RLock()
|
||||||
if lastSyncRV == "" {
|
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||||
// Explicitly set resource version to have it list from cache for
|
|
||||||
// performance reasons.
|
if r.isLastSyncResourceVersionGone {
|
||||||
// It's fine for the returned state to be stale (we will catch up via Watch()
|
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
||||||
// eventually), but we need to be at least as new as the last resource version we
|
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
|
||||||
// synced to avoid going back in time.
|
// to the latest available ResourceVersion, using a consistent read from etcd.
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if r.lastSyncResourceVersion == "" {
|
||||||
|
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
|
||||||
|
// be served from the watch cache if it is enabled.
|
||||||
return "0"
|
return "0"
|
||||||
}
|
}
|
||||||
return lastSyncRV
|
return r.lastSyncResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
|
||||||
|
// expired error: HTTP 410 (Gone) Status Code.
|
||||||
|
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
|
||||||
|
r.lastSyncResourceVersionMutex.Lock()
|
||||||
|
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||||
|
r.isLastSyncResourceVersionGone = isExpired
|
||||||
|
}
|
||||||
|
|
||||||
|
func isExpiredError(err error) bool {
|
||||||
|
// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
|
||||||
|
// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
|
||||||
|
// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
|
||||||
|
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
||||||
|
return apierrs.IsResourceExpired(err) || apierrs.IsGone(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user